Wire up get tag values in storage
This adds basic support for getting tag values in storage. Still needs to add predicate and time range support.pull/24376/head
parent
7effec0f48
commit
54ef130cea
|
@ -221,14 +221,13 @@ impl Database {
|
|||
let mut iter = self.db.iterator_cf(index, mode)
|
||||
.expect("unexpected rocksdb error getting iterator for index");
|
||||
|
||||
for (key, value) in iter {
|
||||
match index_entry_type_from_byte(key[0]) {
|
||||
IndexEntryType::KeyList => {
|
||||
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
|
||||
keys.push(k.to_string());
|
||||
},
|
||||
_ => break
|
||||
for (key, _) in iter {
|
||||
if !key.starts_with(&prefix) {
|
||||
break;
|
||||
}
|
||||
|
||||
let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors
|
||||
keys.push(k.to_string());
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
|
@ -238,7 +237,32 @@ impl Database {
|
|||
}
|
||||
|
||||
pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, range: &Range) -> Vec<String> {
|
||||
vec![]
|
||||
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
|
||||
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
|
||||
|
||||
let mut values = vec![];
|
||||
|
||||
match self.db.cf_handle(&cf_name) {
|
||||
Some(index) => {
|
||||
let prefix = index_tag_key_value_prefix(bucket.id, tag);
|
||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||
let mut iter = self.db.iterator_cf(index, mode)
|
||||
.expect("unexpected rocksdb error getting iterator for index");
|
||||
|
||||
for (key, _) in iter {
|
||||
if !key.starts_with(&prefix) {
|
||||
break;
|
||||
}
|
||||
|
||||
let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors
|
||||
values.push(v.to_string());
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
|
||||
values
|
||||
}
|
||||
|
||||
// ensure_series_mutex_exists makes sure that the passed in bucket id has a mutex, which is used
|
||||
|
@ -309,6 +333,9 @@ impl Database {
|
|||
for pair in pairs {
|
||||
// insert the tag key index
|
||||
batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]);
|
||||
|
||||
// insert the tag value index
|
||||
batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -433,6 +460,25 @@ fn index_tag_key_prefix(bucket_id: u32) -> Vec<u8> {
|
|||
v
|
||||
}
|
||||
|
||||
fn index_tag_key_value(bucket_id: u32, key: &str, value: &str) -> Vec<u8> {
|
||||
let mut v = Vec::with_capacity(key.len() + value.len() + 6);
|
||||
v.push(IndexEntryType::KeyValueList as u8);
|
||||
v.write_u32::<BigEndian>(bucket_id).unwrap();
|
||||
v.append(&mut key.as_bytes().to_vec());
|
||||
v.push(0 as u8);
|
||||
v.append(&mut value.as_bytes().to_vec());
|
||||
v
|
||||
}
|
||||
|
||||
fn index_tag_key_value_prefix(bucket_id: u32, key: &str) -> Vec<u8> {
|
||||
let mut v = Vec::with_capacity(key.len() + 6);
|
||||
v.push(IndexEntryType::KeyValueList as u8);
|
||||
v.write_u32::<BigEndian>(bucket_id).unwrap();
|
||||
v.append(&mut key.as_bytes().to_vec());
|
||||
v.push(0 as u8);
|
||||
v
|
||||
}
|
||||
|
||||
fn index_entry_type_from_byte(b: u8) -> IndexEntryType {
|
||||
unsafe { ::std::mem::transmute(b) }
|
||||
}
|
||||
|
@ -677,10 +723,10 @@ mod tests {
|
|||
fn series_metadata_indexing() {
|
||||
let mut bucket = Bucket::new(1, "foo".to_string());
|
||||
let mut db = test_database("series_metadata_indexing", true);
|
||||
let p1 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p3 = Point{series: "cpu,host=b,region=west\tusage_user".to_string(), value: 1, time: 0};
|
||||
let p4 = Point{series: "mem,host=a,region=west\tfree".to_string(), value: 1, time: 0};
|
||||
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p2 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0};
|
||||
let p3 = Point{series: "cpu,host=a,region=west\tusage_user".to_string(), value: 1, time: 0};
|
||||
let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 0};
|
||||
|
||||
bucket.id = db.create_bucket_if_not_exists(bucket.org_id, &bucket).unwrap();
|
||||
let mut series = db.get_series_ids(bucket.org_id, &bucket, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]);
|
||||
|
@ -688,6 +734,9 @@ mod tests {
|
|||
|
||||
let tag_keys = db.get_tag_keys(&bucket, None, &Range{start:0, stop: std::i64::MAX});
|
||||
assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]);
|
||||
|
||||
let tag_values = db.get_tag_values(&bucket, "host", None, &Range{start: 0, stop: std::i64::MAX});
|
||||
assert_eq!(tag_values, vec!["a", "b"]);
|
||||
}
|
||||
|
||||
// Test helpers
|
||||
|
|
Loading…
Reference in New Issue