From bec6b3cf9c0df9aa7b25bf2628d529e3301ded8e Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 11 Jan 2020 18:34:25 -0500 Subject: [PATCH] Add MemDB and test framework This commit adds a test framework for the InvertedIndex and SeriesStore traits. Structs that implement these traits can call into their tests to ensure they work. This commit also adds an in memory database that keeps time series data in a ring buffer per series and an inverted index in memory using a combination of HashMap and BTreeMap. --- src/storage/inverted_index.rs | 155 +++++++++ src/storage/memdb.rs | 625 ++++++++++++++++++++++++++++++++++ src/storage/mod.rs | 1 + src/storage/rocksdb.rs | 4 +- src/storage/series_store.rs | 127 ++++++- 5 files changed, 908 insertions(+), 4 deletions(-) create mode 100644 src/storage/memdb.rs diff --git a/src/storage/inverted_index.rs b/src/storage/inverted_index.rs index cc5dac64eb..b85c386d90 100644 --- a/src/storage/inverted_index.rs +++ b/src/storage/inverted_index.rs @@ -36,3 +36,158 @@ pub struct SeriesFilter { pub value_predicate: Option, pub series_type: SeriesDataType, } + +// Test helpers for other implementations to run +#[cfg(test)] +pub mod tests { + use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; + use crate::line_parser::PointType; + use crate::storage::predicate::parse_predicate; + use crate::storage::SeriesDataType; + + pub fn series_id_indexing(index: Box) { + let bucket_id = 1; + let bucket_2 = 2; + let p1 = PointType::new_i64("one".to_string(), 1, 0); + let p2 = PointType::new_i64("two".to_string(), 23, 40); + let p3 = PointType::new_i64("three".to_string(), 33, 86); + + let mut points = vec![p1.clone(), p2.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(2)); + + + // now put series in a different bucket, but make sure the IDs start from the beginning + let mut points = vec![p1.clone()]; + index.get_or_create_series_ids_for_points(bucket_2, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + + // now insert a new series in the first bucket and make sure it shows up + let mut points = vec![p1.clone(), p3.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(3)); + } + + pub fn series_metadata_indexing(index: Box) { + let bucket_id = 1; + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0); + let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0); + let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0); + let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0); + + let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points).unwrap(); + + let tag_keys: Vec = index.get_tag_keys(bucket_id, None).unwrap().collect(); + assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]); + + let tag_values: Vec= index.get_tag_values(bucket_id, "host", None).unwrap().collect(); + assert_eq!(tag_values, vec!["a", "b"]); + + // get all series + + // get series with measurement = mem + let pred = parse_predicate("_m = \"cpu\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 1, + key: "cpu,host=b,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + + // get series with host = a + let pred = parse_predicate("host = \"a\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + + // get series with measurement = cpu and host = b + let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![SeriesFilter { + id: 1, + key: "cpu,host=b,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + },] + ); + + let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 4, + key: "mem,host=b,region=west\tfree".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + } +} \ No newline at end of file diff --git a/src/storage/memdb.rs b/src/storage/memdb.rs new file mode 100644 index 0000000000..cfd1c34660 --- /dev/null +++ b/src/storage/memdb.rs @@ -0,0 +1,625 @@ +use crate::delorean::node::{Comparison, Logical, Value}; +use crate::delorean::{Predicate, Node}; +use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; +use crate::line_parser::{PointType, ParseError, Point}; +use crate::storage::{StorageError, Range, SeriesDataType}; +use crate::storage::series_store::{SeriesStore, ReadPoint}; + +use std::sync::{RwLock, Arc, RwLockReadGuard, Mutex}; +use std::collections::{HashMap, BTreeMap}; + +use croaring::Treemap; +/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It +/// works with a ring buffer. Currently, it does not limit the number of series that can be +/// created. It also assumes that data per series arrives in time acending order. + +// TODO: return errors if trying to insert data out of order in an individual series + +pub struct MemDB { + default_ring_buffer_size: usize, + bucket_id_to_series_data: Arc>>>, + bucket_id_to_series_map: Arc>>>, +} + +struct SeriesData { + ring_buffer_size: usize, + i64_series: HashMap>, + f64_series: HashMap>, +} + +impl SeriesData { + fn write_points(&mut self, points: &Vec) { + for p in points { + match p { + PointType::I64(p) => { + match self.i64_series.get_mut(&p.series_id.unwrap()) { + Some(buff) => buff.write(&p), + None => { + let mut buff = new_i64_ring_buffer(self.ring_buffer_size); + buff.write(&p); + self.i64_series.insert(p.series_id.unwrap(), buff); + }, + } + }, + PointType::F64(p) => { + match self.f64_series.get_mut(&p.series_id.unwrap()) { + Some(buff) => buff.write(&p), + None => { + let mut buff = new_f64_ring_buffer(self.ring_buffer_size); + buff.write(&p); + self.f64_series.insert(p.series_id.unwrap(), buff); + } + } + }, + } + } + } +} + +struct SeriesRingBuffer { + next_position: usize, + data: Vec>, +} + +fn new_i64_ring_buffer(size: usize) -> SeriesRingBuffer { + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(ReadPoint{time: std::i64::MAX, value: 0 as i64}) + } + + SeriesRingBuffer{ + next_position: 0, + data, + } +} + +fn new_f64_ring_buffer(size: usize) -> SeriesRingBuffer { + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(ReadPoint{time: std::i64::MAX, value: 0 as f64}) + } + + SeriesRingBuffer{ + next_position: 0, + data, + } +} + +impl SeriesRingBuffer { + fn write(&mut self, point: &Point) { + if self.next_position == self.data.len() { + self.next_position = 0; + } + self.data[self.next_position].time = point.time; + self.data[self.next_position].value = point.value; + self.next_position += 1; + } + + fn get_range(&self, range: &Range) -> Vec> { + let (_, pos) = self.oldest_time_and_position(); + + let mut values = Vec::new(); + + for i in pos..self.data.len() { + if self.data[i].time > range.stop { + return values; + } else if self.data[i].time >= range.start { + values.push(self.data[i].clone()); + } + }; + + for i in 0..self.next_position { + if self.data[i].time > range.stop { + return values; + } else if self.data[i].time >= range.start { + values.push(self.data[i].clone()); + } + } + + values + } + + fn oldest_time_and_position(&self) -> (i64, usize) { + let mut pos = self.next_position; + if self.next_position == self.data.len() { + pos = 0; + } else if self.data[pos].time == std::i64::MAX { + pos = 0; + } + + (self.data[pos].time, pos) + } +} + +struct SeriesMap { + last_id: u64, + series_key_to_id: HashMap, + series_id_to_key_and_type: HashMap, + tag_keys: BTreeMap>, + posting_list: HashMap, Treemap>, +} + +impl SeriesMap { + fn new() -> SeriesMap { + SeriesMap{ + last_id: 0, + series_key_to_id: HashMap::new(), + series_id_to_key_and_type: HashMap::new(), + tag_keys: BTreeMap::new(), + posting_list: HashMap::new(), + } + } + + fn insert_series(&mut self, point: &mut PointType) -> Result<(), ParseError> { + if let Some(id) = self.series_key_to_id.get(point.series()) { + point.set_series_id(*id); + return Ok(()); + } + + // insert the series id + self.last_id += 1; + point.set_series_id(self.last_id); + self.series_key_to_id.insert(point.series().clone(), self.last_id); + + let series_type = match point { + PointType::I64(_) => SeriesDataType::I64, + PointType::F64(_) => SeriesDataType::F64, + }; + self.series_id_to_key_and_type.insert(self.last_id, (point.series().clone(), series_type)); + + for pair in point.index_pairs()? { + // insert this id into the posting list + let list_key = list_key(&pair.key, &pair.value); + + let posting_list = self.posting_list.entry(list_key).or_insert(Treemap::create()); + posting_list.add(self.last_id); + + // insert the tag key value mapping + let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new()); + tag_values.insert(pair.value, true); + } + + Ok(()) + } + + fn posting_list_for_key_value(&self, key: &str, value: &str) -> Treemap { + let list_key = list_key(key, value); + match self.posting_list.get(&list_key) { + Some(m) => m.clone(), + None => Treemap::create(), + } + } +} + +fn list_key(key: &str, value: &str) -> Vec { + let mut list_key = key.as_bytes().to_vec(); + list_key.push(0 as u8); + list_key.append(&mut value.as_bytes().to_vec()); + list_key +} + +impl MemDB { + pub fn new() -> MemDB { + MemDB { + default_ring_buffer_size: 10000, + bucket_id_to_series_data: Arc::new(RwLock::new(HashMap::new())), + bucket_id_to_series_map: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + // first try to do everything with just a read lock + if self.get_series_ids_for_points(bucket_id, points) { + return Ok(()) + } + + // if we got this far, we have new series to insert + let buckets = self.bucket_id_to_series_map.read().unwrap(); + let mut series_map = buckets.get(&bucket_id).unwrap().write().unwrap(); + + for p in points { + match p.series_id() { + Some(_) => (), + None => { + match series_map.insert_series(p) { + Ok(_) => (), + Err(e) => return Err(StorageError { description: format!("error parsing line protocol metadata {}", e) }) + } + }, + } + } + + Ok(()) + } + + // get_series_ids_for_points attempts to fill the series ids for all points in the passed in + // collection using only a read lock. If no SeriesMap exists for the bucket, it will be inserted. + // It will return true if all points have series ids filled in. + fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut Vec) -> bool { + let buckets = self.bucket_id_to_series_map.read().unwrap(); + match buckets.get(&bucket_id) { + Some(b) => { + let b = b.read().unwrap(); + let mut all_have_id = true; + for p in points { + match b.series_key_to_id.get(p.series()) { + Some(id) => p.set_series_id(*id), + None => all_have_id = false, + } + } + + if all_have_id { + return true + } + }, + None => { + // we've never even seen this bucket. insert it and then we'll get to inserting the series + drop(buckets); + self.bucket_id_to_series_map.write().unwrap().insert(bucket_id, RwLock::new(SeriesMap::new())); + }, + } + + false + } + + fn get_tag_keys( + &self, + bucket_id: u32, + _predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { + Some(map) => { + let keys: Vec = map.read().unwrap().tag_keys.keys().map(|k| k.clone()).collect(); + Ok(Box::new(keys.into_iter())) + }, + None => { + Err(StorageError{description: format!("bucket {} not found", bucket_id)}) + } + } + } + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + _predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { + Some(map) => { + match map.read().unwrap().tag_keys.get(tag_key) { + Some(values) => { + let values: Vec = values.keys().map(|v| v.clone()).collect(); + Ok(Box::new(values.into_iter())) + }, + None => Ok(Box::new(vec![].into_iter())), + } + }, + None => { + Err(StorageError{description: format!("bucket {} not found", bucket_id)}) + } + } + } + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + let pred = match predicate { + Some(p) => p, + None => return Err(StorageError{description: "unable to list all series".to_string()}), + }; + + let root = match &pred.root { + Some(r) => r, + None => return Err(StorageError{description: "expected root node to evaluate".to_string()}), + }; + + let bucket_map = self.bucket_id_to_series_map.read().unwrap(); + let series_map = match bucket_map.get(&bucket_id) { + Some(m) => m.read().unwrap(), + None => return Err(StorageError{description: format!("no series written for bucket {}", bucket_id)}), + }; + + let map = evaluate_node(&series_map, &root)?; + let mut filters = Vec::with_capacity(map.cardinality() as usize); + + for id in map.iter() { + let (key, series_type) = series_map.series_id_to_key_and_type.get(&id).unwrap(); + filters.push(SeriesFilter { + id, + key: key.clone(), + value_predicate: None, + series_type: *series_type, + }); + } + + Ok(Box::new(filters.into_iter())) + } + + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError> { + let bucket_data = self.bucket_id_to_series_data.read().unwrap(); + + // ensure the the bucket has a series data struct to write into + let series_data = match bucket_data.get(&bucket_id) { + Some(d) => d, + None => { + drop(bucket_data); + let mut bucket_data = self.bucket_id_to_series_data.write().unwrap(); + let series_data = SeriesData{ + ring_buffer_size: self.default_ring_buffer_size, + i64_series: HashMap::new(), + f64_series: HashMap::new(), + }; + bucket_data.insert(bucket_id, Mutex::new(series_data)); + drop(bucket_data); + return self.write_points_with_series_ids(bucket_id, points); + } + }; + + let mut series_data = series_data.lock().unwrap(); + series_data.write_points(points); + Ok(()) + } + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let buckets = self.bucket_id_to_series_data.read().unwrap(); + let data = match buckets.get(&bucket_id) { + Some(d) => d, + None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}), + }; + + let data = data.lock().unwrap(); + let buff = match data.i64_series.get(&series_id) { + Some(b) => b, + None => return Err(StorageError{description: format!("series {} not found", series_id)}), + }; + + let values = buff.get_range(&range); + Ok(Box::new(PointsIterator{values: Some(values), batch_size})) + } + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let buckets = self.bucket_id_to_series_data.read().unwrap(); + let data = match buckets.get(&bucket_id) { + Some(d) => d, + None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}), + }; + + let data = data.lock().unwrap(); + let buff = match data.f64_series.get(&series_id) { + Some(b) => b, + None => return Err(StorageError{description: format!("series {} not found", series_id)}), + }; + + let values = buff.get_range(&range); + Ok(Box::new(PointsIterator{values: Some(values), batch_size})) + } +} + +struct PointsIterator { + values: Option>>, + batch_size: usize, +} + +impl Iterator for PointsIterator { + type Item = Vec>; + + fn next(&mut self) -> Option { + if let Some(mut values) = self.values.take() { + if self.batch_size > values.len() { + return Some(values); + } + + let remaining = values.split_off(self.batch_size); + + if remaining.len() != 0 { + self.values = Some(remaining); + } + + return Some(values); + } + + None + } +} + +fn evaluate_node(series_map: &RwLockReadGuard, n: &Node) -> Result { + if n.children.len() != 2 { + return Err(StorageError { + description: format!( + "expected only two children of node but found {}", + n.children.len() + ), + }); + } + + match &n.value { + Some(node_value) => match node_value { + Value::Logical(l) => { + let l = Logical::from_i32(*l).unwrap(); + evaluate_logical(series_map, &n.children[0], &n.children[1], l) + } + Value::Comparison(c) => { + let c = Comparison::from_i32(*c).unwrap(); + evaluate_comparison(series_map, &n.children[0], &n.children[1], c) + } + val => Err(StorageError { + description: format!("evaluate_node called on wrong type {:?}", val), + }), + }, + None => Err(StorageError { + description: "emtpy node value".to_string(), + }), + } +} + +fn evaluate_logical( + series_map: &RwLockReadGuard, + left: &Node, + right: &Node, + op: Logical, +) -> Result { + let mut left_result = evaluate_node(series_map, left)?; + let right_result = evaluate_node(series_map, right)?; + + match op { + Logical::And => left_result.and_inplace(&right_result), + Logical::Or => left_result.or_inplace(&right_result), + }; + + Ok(left_result) +} + +fn evaluate_comparison( + series_map: &RwLockReadGuard, + left: &Node, + right: &Node, + op: Comparison, +) -> Result { + let left = match &left.value { + Some(Value::TagRefValue(s)) => s, + _ => { + return Err(StorageError { + description: "expected left operand to be a TagRefValue".to_string(), + }) + } + }; + + let right = match &right.value { + Some(Value::StringValue(s)) => s, + _ => { + return Err(StorageError { + description: "unable to run comparison against anything other than a string" + .to_string(), + }) + } + }; + + match op { + Comparison::Equal => { + return Ok(series_map.posting_list_for_key_value(&left, &right)) + } + comp => { + return Err(StorageError { + description: format!("unable to handle comparison {:?}", comp), + }) + } + } +} + +impl InvertedIndex for MemDB { + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + self.get_or_create_series_ids_for_points(bucket_id, points) + } + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.read_series_matching(bucket_id, predicate) + } + + fn get_tag_keys( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.get_tag_keys(bucket_id, predicate) + } + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.get_tag_values(bucket_id, tag_key, predicate) + } +} + +impl SeriesStore for MemDB { + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError> { + self.write_points_with_series_ids(bucket_id, points) + } + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_i64_range(bucket_id, series_id, range, batch_size) + } + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_f64_range(bucket_id, series_id, range, batch_size) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::inverted_index; + use crate::storage::series_store; + + #[test] + fn write_and_read_i64() { + let db = Box::new(MemDB::new()); + series_store::tests::write_and_read_i64(db); + } + + #[test] + fn write_and_read_f64() { + let db = Box::new(MemDB::new()); + series_store::tests::write_and_read_f64(db); + } + + #[test] + fn series_id_indexing() { + let db = Box::new(MemDB::new()); + inverted_index::tests::series_id_indexing(db) + } + + #[test] + fn series_metadata_indexing() { + let db = Box::new(MemDB::new()); + inverted_index::tests::series_metadata_indexing(db); + } +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b021af78d4..673d8a02a1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,6 +9,7 @@ pub mod inverted_index; pub mod predicate; pub mod rocksdb; pub mod series_store; +pub mod memdb; pub struct Range { pub start: i64, diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 7ce39eb229..c847d2e2f1 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -952,7 +952,7 @@ fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec v } -pub struct PointsIterator<'a, T> { +pub struct PointsIterator<'a, T: Copy> { batch_size: usize, iter: DBIterator<'a>, stop_time: i64, @@ -961,7 +961,7 @@ pub struct PointsIterator<'a, T> { read: fn(b: &[u8]) -> T, } -impl Iterator for PointsIterator<'_, T> { +impl Iterator for PointsIterator<'_, T> { type Item = Vec>; fn next(&mut self) -> Option { diff --git a/src/storage/series_store.rs b/src/storage/series_store.rs index eb2ef9f14f..6cfb104605 100644 --- a/src/storage/series_store.rs +++ b/src/storage/series_store.rs @@ -25,8 +25,131 @@ pub trait SeriesStore: Sync + Send { ) -> Result>>>, StorageError>; } -#[derive(Debug, PartialEq)] -pub struct ReadPoint { +#[derive(Debug, PartialEq, Clone)] +pub struct ReadPoint { pub time: i64, pub value: T, } + +// Test helpers for other implementations to run +#[cfg(test)] +pub mod tests { + use crate::storage::series_store::{SeriesStore, ReadPoint}; + use crate::line_parser::PointType; + use crate::storage::Range; + + pub fn write_and_read_i64(store: Box) { + let b1_id = 1; + let b2_id = 2; + let mut p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); + p1.set_series_id(1); + let mut p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2); + p2.set_series_id(1); + let mut p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2); + p3.set_series_id(2); + let mut p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4); + p4.set_series_id(2); + + let b1_points = vec![p1.clone(), p2.clone()]; + store.write_points_with_series_ids(b1_id, &b1_points).unwrap(); + + let b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + store.write_points_with_series_ids(b2_id, &b2_points).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range { start: 1, stop: 4 }; + let mut points_iter = store + .read_i64_range(b1_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 1, value: 1 }, + ReadPoint { time: 2, value: 1 }, + ] + ); + assert_eq!(points_iter.next(), None); + + // test that we'll read multiple series + let mut points_iter = store + .read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 1, value: 1 }, + ReadPoint { time: 2, value: 1 }, + ] + ); + + let mut points_iter = store + .read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 2, value: 1 }, + ReadPoint { time: 4, value: 1 }, + ] + ); + + // test that the batch size is honored + let mut points_iter = store + .read_i64_range(b1_id, p1.series_id().unwrap(), &range, 1) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + assert_eq!(points_iter.next(), None); + + // test that the time range is properly limiting + let range = Range { start: 2, stop: 3 }; + let mut points_iter = store + .read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + + let mut points_iter = store + .read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + } + + pub fn write_and_read_f64(store: Box) { + let bucket_id = 1; + let mut p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1); + p1.set_series_id(1); + let mut p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2); + p2.set_series_id(1); + + let points = vec![p1.clone(), p2.clone()]; + store.write_points_with_series_ids(bucket_id, &points).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range { start: 0, stop: 4 }; + let mut points_iter = store + .read_f64_range(bucket_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { + time: 1, + value: 1.0 + }, + ReadPoint { + time: 2, + value: 2.2 + }, + ] + ); + assert_eq!(points_iter.next(), None); + } +} \ No newline at end of file