Merge pull request #5149 from influxdata/ach-parse-tsm-key
chore: Add org and bucket ID parsing to tsm_readerpull/24376/head
commit
b1756528a9
|
@ -1,7 +1,10 @@
|
|||
use super::*;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ParsedTsmKey {
|
||||
pub org_id: InfluxId,
|
||||
pub bucket_id: InfluxId,
|
||||
pub measurement: String,
|
||||
pub tagset: Vec<(String, String)>,
|
||||
pub field_key: String,
|
||||
|
@ -17,6 +20,9 @@ pub enum Error {
|
|||
|
||||
#[derive(Debug, Snafu, PartialEq)]
|
||||
pub enum DataError {
|
||||
#[snafu(display(r#"Key length too short"#))]
|
||||
KeyTooShort {},
|
||||
|
||||
#[snafu(display(r#"No measurement found (expected to find in tag field \x00)"#))]
|
||||
NoMeasurement {},
|
||||
|
||||
|
@ -93,9 +99,13 @@ pub fn parse_tsm_key(key: &[u8]) -> Result<ParsedTsmKey, Error> {
|
|||
}
|
||||
|
||||
fn parse_tsm_key_internal(key: &[u8]) -> Result<ParsedTsmKey, DataError> {
|
||||
// skip over org id, bucket id, comma
|
||||
// The next n-1 bytes are the measurement name, where the nᵗʰ byte is a `,`.
|
||||
let mut rem_key = key.iter().copied().skip(8 + 8 + 1);
|
||||
// Get the org and bucket id from the first section of the key.
|
||||
let mut rem_key = key.iter().copied();
|
||||
let org_id = parse_id(&mut rem_key)?;
|
||||
let bucket_id = parse_id(&mut rem_key)?;
|
||||
|
||||
// Now fetch the measurement and tags, starting after the org, bucket and a comma.
|
||||
rem_key.next(); // Skip the comma
|
||||
|
||||
let mut tagset = Vec::with_capacity(10);
|
||||
let mut measurement = None;
|
||||
|
@ -149,12 +159,39 @@ fn parse_tsm_key_internal(key: &[u8]) -> Result<ParsedTsmKey, DataError> {
|
|||
}
|
||||
|
||||
Ok(ParsedTsmKey {
|
||||
org_id,
|
||||
bucket_id,
|
||||
measurement: measurement.context(NoMeasurementSnafu)?,
|
||||
tagset,
|
||||
field_key: field_key.context(NoFieldKeySnafu)?,
|
||||
})
|
||||
}
|
||||
|
||||
// Parses an influx id from the byte sequence. IDs are generally just 8 bytes, but we escape
|
||||
// certain characters ('\', ' ' and '='), so we unescape them as part of this process.
|
||||
// The iterator will consume all bytes that are part of the id.
|
||||
fn parse_id(key: impl Iterator<Item = u8>) -> Result<InfluxId, DataError> {
|
||||
let mut id: [u8; 8] = [0; 8];
|
||||
|
||||
let mut i = 0;
|
||||
let mut escaped = false;
|
||||
for x in key {
|
||||
if x == b'\\' && !escaped {
|
||||
escaped = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
id[i] = x;
|
||||
if i >= 7 {
|
||||
return Ok(InfluxId::from_be_bytes(id));
|
||||
}
|
||||
i += 1;
|
||||
escaped = false;
|
||||
}
|
||||
|
||||
Err(DataError::KeyTooShort {})
|
||||
}
|
||||
|
||||
/// Parses the field value stored in a TSM field key into a field name.
|
||||
/// fields are stored on the series keys in TSM indexes as follows:
|
||||
///
|
||||
|
@ -499,6 +536,39 @@ fn parse_tsm_tag_value(
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_id_good() {
|
||||
// Simple with no escaping
|
||||
let mut key = b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02,\x00=cpu"
|
||||
.iter()
|
||||
.copied();
|
||||
let org_id = parse_id(&mut key).expect("unable to parse id");
|
||||
assert_eq!(org_id, InfluxId(1));
|
||||
|
||||
let bucket_id = parse_id(&mut key).expect("unable to parse id");
|
||||
assert_eq!(bucket_id, InfluxId(2));
|
||||
|
||||
// Check that the iterator has been left at the comma
|
||||
let rem: Vec<u8> = key.collect();
|
||||
assert_eq!(rem, b",\x00=cpu");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_id_escaped() {
|
||||
// ID with escaped characters: we escape space (\x20), comma (\x2c) and backslash (\x5c)
|
||||
let mut key = b"\x00\x5c\x20\x5c\x5c\x5c\x2c\x01\x5c\x2c\x03\x04,\x00=cpu"
|
||||
.iter()
|
||||
.copied();
|
||||
let unescaped: [u8; 8] = hex::decode("00205c2c012c0304").unwrap().try_into().unwrap();
|
||||
|
||||
let id = parse_id(&mut key).expect("unable to parse id");
|
||||
assert_eq!(id, InfluxId::from_be_bytes(unescaped));
|
||||
|
||||
// Check that the iterator has been left at the next byte
|
||||
let rem: Vec<u8> = key.collect();
|
||||
assert_eq!(rem, b",\x00=cpu");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_tsm_field_key_value() {
|
||||
// test the operation of parse_tsm_field_key_value
|
||||
|
@ -647,7 +717,12 @@ mod tests {
|
|||
let mut key = make_tsm_key_prefix("m", "tag1=val1,tag2=val2");
|
||||
key = add_field_key(key, "f");
|
||||
|
||||
let org_id = InfluxId::from_be_bytes(*b"12345678");
|
||||
let bucket_id = InfluxId::from_be_bytes(*b"87654321");
|
||||
|
||||
let parsed_key = super::parse_tsm_key(&key).unwrap();
|
||||
assert_eq!(parsed_key.org_id, org_id);
|
||||
assert_eq!(parsed_key.bucket_id, bucket_id);
|
||||
assert_eq!(parsed_key.measurement, String::from("m"));
|
||||
let exp_tagset = vec![
|
||||
(String::from("tag1"), String::from("val1")),
|
||||
|
@ -657,6 +732,21 @@ mod tests {
|
|||
assert_eq!(parsed_key.field_key, String::from("f"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tsm_key_too_short() {
|
||||
let key = b"1234567887654";
|
||||
let err_str = parse_tsm_key(&key[..])
|
||||
.expect_err("expect parsing error")
|
||||
.to_string();
|
||||
|
||||
assert!(
|
||||
err_str
|
||||
.contains("Error while parsing tsm tag key '1234567887654': Key length too short"),
|
||||
"{}",
|
||||
err_str
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tsm_error_has_key() {
|
||||
//<org_id bucket_id>,\x00=<measurement>,<tag_keys_str>
|
||||
|
|
Loading…
Reference in New Issue