From 4892a8789871d2a52aa204054bacfaefc86bc8db Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 2 Jan 2020 17:32:04 -0500 Subject: [PATCH] Implement read range on the database This commit adds iterators for iterating over series and batches of points for a read range request. The exact signature/structure of the iterators are likely to change when this is generalized for other data types and other storage backends (S3 & memory). --- src/storage/iterators.rs | 57 ++++++++++ src/storage/mod.rs | 3 +- src/storage/rocksdb.rs | 227 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 275 insertions(+), 12 deletions(-) create mode 100644 src/storage/iterators.rs diff --git a/src/storage/iterators.rs b/src/storage/iterators.rs new file mode 100644 index 0000000000..9e72171fb6 --- /dev/null +++ b/src/storage/iterators.rs @@ -0,0 +1,57 @@ +use crate::delorean::Predicate; +use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, PointsIterator}; + +use rocksdb::{DB, IteratorMode, DBIterator}; + +pub struct SeriesIterator<'a> { + range: &'a Range, + batch_size: usize, + predicate: &'a Predicate, + org_id: u32, + bucket_id: u32, + series_filters: Vec, + next_filter: usize, +} + +impl SeriesIterator<'_> { + pub fn new<'a>(range: &'a Range, batch_size: usize, predicate: &'a Predicate, org_id: u32, bucket_id: u32, series_filters: Vec) -> SeriesIterator<'a> { + SeriesIterator{ + range, + batch_size, + predicate, + org_id, + bucket_id, + next_filter: 0, + series_filters: series_filters, + } + } + + pub fn points_iterator<'a>(&self, db: &'a Database, series_filter: &'a SeriesFilter) -> Result, StorageError> { + db.get_db_points_iter(self.org_id, self.bucket_id, series_filter.id, self.range, self.batch_size) + } +} + +impl Iterator for SeriesIterator<'_> { + type Item = SeriesFilter; + + fn next(&mut self) -> Option { + self.next_filter += 1; + match self.series_filters.get(self.next_filter - 1) { + Some(f) => Some(f.clone()), + None => None, + } + } +} + +#[derive(Debug, PartialEq)] +pub struct ReadSeries { + pub id: u64, + pub key: String, + pub points: Vec>, +} + +#[derive(Debug, PartialEq)] +pub struct ReadPoint { + pub time: i64, + pub value: T, +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e99b254aa0..b4086b0cd3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,2 +1,3 @@ pub mod predicate; -pub mod rocksdb; \ No newline at end of file +pub mod rocksdb; +pub mod iterators; diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index ba8956d25e..9f6f438c77 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -1,4 +1,5 @@ use crate::line_parser::{Point, Pair}; +use crate::storage::iterators::{ReadPoint, SeriesIterator}; use crate::delorean::{Bucket, IndexLevel, Predicate, Node, node}; use crate::delorean::node::{Value, Comparison, Logical}; @@ -9,9 +10,8 @@ use std::collections::HashMap; use std::time::SystemTime; use std::io::Cursor; -use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily}; -use rocksdb::MemtableFactory::{Vector, HashLinkList}; -use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt}; +use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily, DBIterator}; +use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt}; use prost::Message; use futures::AsyncWriteExt; use croaring::Treemap; @@ -93,13 +93,16 @@ impl Database { /// * points - individual values with their timestamps and series keys pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec) -> Result<(), StorageError> { let key = bucket_key(org_id, bucket_name); - let bucket = match self.bucket_map.read().unwrap().get(&key) { - Some(b) => b.clone(), + let bucket_map = self.bucket_map.read().unwrap(); + let bucket = match bucket_map.get(&key) { + Some(b) => b, None => return Err(StorageError{description: "bucket not found".to_string()}), }; let mut series = self.get_series_ids(org_id, &bucket, points); + println!("get_series_ids: {:?}", series); self.insert_series_without_ids(org_id, &bucket, &mut series); + println!("get_series_ids after insert: {:?}", series); let mut batch = WriteBatch::default(); for s in series { @@ -113,6 +116,26 @@ impl Database { Ok(()) } + pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result, StorageError> { + let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() { + Some(b) => b, + None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}), + }; + + let series_filters = self.get_series_filters(&bucket, Some(&predicate), range)?; + println!("filters: {:?}", series_filters); + + Ok(SeriesIterator::new(range, batch_size, predicate, org_id, bucket.id, series_filters)) + } + + pub fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result { + let mut prefix = prefix_for_series(bucket_id, series_id, range.start); + let mode = IteratorMode::From(&prefix, Direction::Forward); + let mut iter = self.db.read().unwrap().iterator(mode); + + Ok(PointsIterator::new(batch_size, iter, range.stop, prefix[0..12].to_vec())) + } + /// If the bucket name exists within an org, this function returns the ID (ignoring whether the /// bucket options are different than the one that exists). If it doesn't exist, this function /// creates the bucket and returns its unique identifier. @@ -428,6 +451,7 @@ impl Database { // Keep an in memory map for updating multiple index entries at a time let mut index_map: HashMap, Treemap> = HashMap::new(); + let mut series_id_map: HashMap = HashMap::new(); // now loop through the series and insert the index entries into the map for series in series { @@ -436,6 +460,12 @@ impl Database { continue; } + // if we've already put this series in the map in this write, skip it + if let Some(id) = series_id_map.get(&series.point.series) { + series.id = Some(*id); + continue; + } + // now that we have the mutex on series, make sure these weren't inserted in some other thread if let Some(_) = self.get_series_id(&cf_name, &series.point.series) { continue; @@ -445,7 +475,8 @@ impl Database { let id = *next_id; let mut series_id = Vec::with_capacity(8); series_id.write_u64::(*next_id).unwrap(); - batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id); + batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone()); + series_id_map.insert(series.point.series.clone(), *next_id); *next_id += 1; // insert the index entries @@ -596,9 +627,92 @@ TODO: other pieces 2. HTTP GET endpoint with predicate and time ranges 3. API endpoint to delete old series data 4. API endpoint to delete old indexes +5. API endpoint to run tsm compaction +6. Write/read other data types */ +enum SeriesDataType { + Int64, + Float64, + UInt64, + String, + Bool, +} + +fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec { + let mut v = Vec::with_capacity(20); + v.write_u32::(bucket_id).unwrap(); + v.write_u64::(series_id).unwrap(); + v.write_i64::(start_time).unwrap(); + v +} + +pub struct PointsIterator<'a> { + batch_size: usize, + iter: DBIterator<'a>, + stop_time: i64, + series_prefix: Vec, + drained: bool, +} + +impl PointsIterator<'_> { + pub fn new(batch_size: usize, iter: DBIterator, stop_time: i64, series_prefix: Vec) -> PointsIterator { + PointsIterator{ + batch_size, + iter, + stop_time, + series_prefix, + drained: false, + } + } +} + +impl Iterator for PointsIterator<'_> { + type Item = Vec>; + + fn next(&mut self) -> Option { + if self.drained { + return None; + } + + let mut v = Vec::with_capacity(self.batch_size); + let mut n = 0; + + while let Some((key, value)) = self.iter.next() { + if !key.starts_with(&self.series_prefix) { + self.drained = true; + break; + } + + let time = BigEndian::read_i64(&key[12..]); + if time > self.stop_time { + self.drained = true; + break; + } + + let point = ReadPoint{ + value: BigEndian::read_i64(&value), + time, + }; + + v.push(point); + + n += 1; + if n >= self.batch_size { + break; + } + } + + if v.is_empty() { + self.drained = true; + None + } else { + Some(v) + } + } +} + // IndexEntryType is used as a u8 prefix for any key in rocks for these different index entries enum IndexEntryType { SeriesKeyToID, @@ -766,15 +880,15 @@ fn key_for_series_and_time(bucket_id: u32, series_id: u64, timestamp: i64) -> Ve v } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct SeriesFilter { - id: u64, - value_predicate: Option, + pub id: u64, + pub value_predicate: Option, } pub struct Range { - start: i64, - stop: i64, + pub start: i64, + pub stop: i64, } #[derive(Debug, Clone)] @@ -807,6 +921,8 @@ mod tests { use crate::storage::predicate::parse_predicate; use crate::storage::rocksdb::IndexEntryType::SeriesKeyToID; use crate::line_parser::parse; + use crate::storage::iterators::ReadSeries; + use std::net::Shutdown::Read; #[test] fn create_and_get_buckets() { @@ -992,6 +1108,95 @@ mod tests { ]); } + #[test] + fn write_and_read_points() { + let mut b1 = Bucket::new(1, "bucket1".to_string()); + let mut b2 = Bucket::new(2, "bucket2".to_string()); + let mut db = test_database("write_and_read_points", true); + + let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; + let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2}; + let p3 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 2}; + let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 4}; + + b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); + b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); + + db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]).unwrap(); + db.write_points(b2.org_id, &b2.name, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range{start: 1, stop: 4}; + let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); + let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 1); + assert_eq!(iter.next(), None); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 1, value: 1}, + ReadPoint{time: 2, value: 1}, + ]); + + // test that we'll read multiple series + let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); + let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 1); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 1, value: 1}, + ReadPoint{time: 2, value: 1}, + ]); + + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 2); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 2, value: 1}, + ReadPoint{time: 4, value: 1}, + ]); + + // test that the batch size is honored + let pred = parse_predicate("host = \"b\"").unwrap(); + let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 1).unwrap(); + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 1); + assert_eq!(iter.next(), None); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 1, value: 1}, + ]); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 2, value: 1}, + ]); + + // test that the time range is properly limiting + let range = Range{start: 2, stop: 3}; + let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); + let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 1); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 2, value: 1}, + ]); + + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter.id, 2); + let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 2, value: 1}, + ]); + } + // Test helpers fn get_test_storage_path() -> String { dotenv().ok();