diff --git a/src/storage.rs b/src/storage.rs index c5a31ea437..587ae1be90 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -221,14 +221,13 @@ impl Database { let mut iter = self.db.iterator_cf(index, mode) .expect("unexpected rocksdb error getting iterator for index"); - for (key, value) in iter { - match index_entry_type_from_byte(key[0]) { - IndexEntryType::KeyList => { - let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors - keys.push(k.to_string()); - }, - _ => break + for (key, _) in iter { + if !key.starts_with(&prefix) { + break; } + + let k = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what we want to do with errors + keys.push(k.to_string()); } }, None => (), @@ -238,7 +237,32 @@ impl Database { } pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, range: &Range) -> Vec { - vec![] + let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); + + let mut values = vec![]; + + match self.db.cf_handle(&cf_name) { + Some(index) => { + let prefix = index_tag_key_value_prefix(bucket.id, tag); + let mode = IteratorMode::From(&prefix, Direction::Forward); + let mut iter = self.db.iterator_cf(index, mode) + .expect("unexpected rocksdb error getting iterator for index"); + + for (key, _) in iter { + if !key.starts_with(&prefix) { + break; + } + + let v = std::str::from_utf8(&key[prefix.len()..]).unwrap(); // TODO: determine what to do with errors + values.push(v.to_string()); + } + }, + None => (), + } + + values } // ensure_series_mutex_exists makes sure that the passed in bucket id has a mutex, which is used @@ -309,6 +333,9 @@ impl Database { for pair in pairs { // insert the tag key index batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]); + + // insert the tag value index + batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]); } } @@ -433,6 +460,25 @@ fn index_tag_key_prefix(bucket_id: u32) -> Vec { v } +fn index_tag_key_value(bucket_id: u32, key: &str, value: &str) -> Vec { + let mut v = Vec::with_capacity(key.len() + value.len() + 6); + v.push(IndexEntryType::KeyValueList as u8); + v.write_u32::(bucket_id).unwrap(); + v.append(&mut key.as_bytes().to_vec()); + v.push(0 as u8); + v.append(&mut value.as_bytes().to_vec()); + v +} + +fn index_tag_key_value_prefix(bucket_id: u32, key: &str) -> Vec { + let mut v = Vec::with_capacity(key.len() + 6); + v.push(IndexEntryType::KeyValueList as u8); + v.write_u32::(bucket_id).unwrap(); + v.append(&mut key.as_bytes().to_vec()); + v.push(0 as u8); + v +} + fn index_entry_type_from_byte(b: u8) -> IndexEntryType { unsafe { ::std::mem::transmute(b) } } @@ -677,10 +723,10 @@ mod tests { fn series_metadata_indexing() { let mut bucket = Bucket::new(1, "foo".to_string()); let mut db = test_database("series_metadata_indexing", true); - let p1 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0}; - let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0}; - let p3 = Point{series: "cpu,host=b,region=west\tusage_user".to_string(), value: 1, time: 0}; - let p4 = Point{series: "mem,host=a,region=west\tfree".to_string(), value: 1, time: 0}; + let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0}; + let p2 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0}; + let p3 = Point{series: "cpu,host=a,region=west\tusage_user".to_string(), value: 1, time: 0}; + let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 0}; bucket.id = db.create_bucket_if_not_exists(bucket.org_id, &bucket).unwrap(); let mut series = db.get_series_ids(bucket.org_id, &bucket, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]); @@ -688,6 +734,9 @@ mod tests { let tag_keys = db.get_tag_keys(&bucket, None, &Range{start:0, stop: std::i64::MAX}); assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]); + + let tag_values = db.get_tag_values(&bucket, "host", None, &Range{start: 0, stop: std::i64::MAX}); + assert_eq!(tag_values, vec!["a", "b"]); } // Test helpers