From 80493ba51722bc5531772c497633ee3bf03076e3 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 9 Jan 2020 19:13:52 -0500 Subject: [PATCH 1/3] Refactor database for arbitrary backends This commit pulls the database behavior into three traits: ConfigStore, InvertedIndex, and SeriesStore. The ConfigStore holds the Bucket definitions, the InvertedIndex holds the index of measurement names, tags, and fields, and the SeriesStore holds the raw time series data indexed only by bucket id, series id, and time. This is the initital work to enable memory and S3 backed databases. It is also the preliminary work to break data out into shards. --- src/line_parser/mod.rs | 18 + src/main.rs | 63 ++- src/storage/config_store.rs | 17 + src/storage/database.rs | 88 ++++ src/storage/inverted_index.rs | 38 ++ src/storage/iterators.rs | 44 -- src/storage/mod.rs | 49 +- src/storage/points_iterator.rs | 0 src/storage/predicate.rs | 2 +- src/storage/rocksdb.rs | 855 +++++++++++++-------------------- src/storage/series_iterator.rs | 38 ++ src/storage/series_store.rs | 32 ++ 12 files changed, 668 insertions(+), 576 deletions(-) create mode 100644 src/storage/config_store.rs create mode 100644 src/storage/database.rs create mode 100644 src/storage/inverted_index.rs delete mode 100644 src/storage/iterators.rs create mode 100644 src/storage/points_iterator.rs create mode 100644 src/storage/series_iterator.rs create mode 100644 src/storage/series_store.rs diff --git a/src/line_parser/mod.rs b/src/line_parser/mod.rs index 0a2cbff0ed..6331d9cde6 100644 --- a/src/line_parser/mod.rs +++ b/src/line_parser/mod.rs @@ -6,6 +6,7 @@ use std::{error, fmt}; #[derive(Debug, PartialEq, Clone)] pub struct Point { pub series: String, + pub series_id: Option, pub time: i64, pub value: T, } @@ -26,6 +27,7 @@ impl PointType { pub fn new_i64(series: String, value: i64, time: i64) -> PointType { PointType::I64(Point { series, + series_id: None, value, time, }) @@ -34,6 +36,7 @@ impl PointType { pub fn new_f64(series: String, value: f64, time: i64) -> PointType { PointType::F64(Point { series, + series_id: None, value, time, }) @@ -60,6 +63,20 @@ impl PointType { } } + pub fn series_id(&self) -> Option { + match self { + PointType::I64(p) => p.series_id, + PointType::F64(p) => p.series_id, + } + } + + pub fn set_series_id(&mut self, id: u64) { + match self { + PointType::I64(p) => p.series_id = Some(id), + PointType::F64(p) => p.series_id = Some(id), + } + } + pub fn i64_value(&self) -> Option { match self { PointType::I64(p) => Some(p.value), @@ -321,6 +338,7 @@ mod test { fn index_pairs() { let p = Point { series: "cpu,host=A,region=west\tusage_system".to_string(), + series_id: None, value: 0, time: 0, }; diff --git a/src/main.rs b/src/main.rs index 48e3f2bcfa..a85dd7eae7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,9 @@ +use delorean::delorean::Bucket; use delorean::line_parser; use delorean::line_parser::index_pairs; +use delorean::storage::database::Database; use delorean::storage::predicate::parse_predicate; -use delorean::storage::rocksdb::Range; -use delorean::storage::rocksdb::{ - new_f64_points_iterator, new_i64_points_iterator, Database, SeriesDataType, -}; +use delorean::storage::{Range, SeriesDataType}; use delorean::time::{parse_duration, time_as_i64_nanos}; use std::env::VarError; @@ -37,6 +36,28 @@ async fn write( write_info: web::Query, s: web::Data>, ) -> Result { + let bucket = match s + .db + .get_bucket_by_name(write_info.org_id, &write_info.bucket_name)? + { + Some(b) => b, + None => { + // create this as the default bucket + let b = Bucket { + org_id: write_info.org_id, + id: 0, + name: write_info.bucket_name.clone(), + retention: "0".to_string(), + posting_list_rollover: 10_000, + index_levels: vec![], + }; + + let _ = s.db.create_bucket_if_not_exists(write_info.org_id, &b)?; + s.db.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)? + .unwrap() + } + }; + let mut body = BytesMut::new(); while let Some(chunk) = payload.next().await { let chunk = chunk?; @@ -49,11 +70,9 @@ async fn write( let body = body.freeze(); let body = str::from_utf8(&body).unwrap(); - let points = line_parser::parse(body); + let mut points = line_parser::parse(body); - if let Err(err) = - s.db.write_points(write_info.org_id, &write_info.bucket_name, points) - { + if let Err(err) = s.db.write_points(write_info.org_id, &bucket, &mut points) { return Ok(HttpResponse::InternalServerError() .json(serde_json::json!({ "error": format!("{}", err) }))); } @@ -167,15 +186,21 @@ async fn read( let range = Range { start, stop }; - let series = s.db.read_range( - read_info.org_id, - &read_info.bucket_name, - &range, - &predicate, - 10, - )?; + let bucket = match s + .db + .get_bucket_by_name(read_info.org_id, &read_info.bucket_name)? + { + Some(b) => b, + None => { + return Ok(HttpResponse::NotFound().json(serde_json::json!({ + "error": format!("bucket {} not found", read_info.bucket_name) + }))) + } + }; + + let series = + s.db.read_series_matching_predicate_and_range(&bucket, Some(&predicate), Some(&range))?; - let bucket_id = series.bucket_id; let db = &s.db; let mut response_body = vec![]; @@ -205,8 +230,7 @@ async fn read( match s.series_type { SeriesDataType::I64 => { - let points = - new_i64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10); + let points = db.read_i64_range(&bucket, &s, &range, 10)?; for batch in points { for p in batch { @@ -220,8 +244,7 @@ async fn read( } } SeriesDataType::F64 => { - let points = - new_f64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10); + let points = db.read_f64_range(&bucket, &s, &range, 10)?; for batch in points { for p in batch { diff --git a/src/storage/config_store.rs b/src/storage/config_store.rs new file mode 100644 index 0000000000..e3310d34d7 --- /dev/null +++ b/src/storage/config_store.rs @@ -0,0 +1,17 @@ +use crate::delorean::Bucket; +use crate::storage::StorageError; +use std::sync::Arc; + +pub trait ConfigStore: Sync + Send { + fn create_bucket_if_not_exists( + &self, + org_id: u32, + bucket: &Bucket, + ) -> Result; + + fn get_bucket_by_name( + &self, + org_id: u32, + bucket_name: &str, + ) -> Result>, StorageError>; +} diff --git a/src/storage/database.rs b/src/storage/database.rs new file mode 100644 index 0000000000..3804e5a188 --- /dev/null +++ b/src/storage/database.rs @@ -0,0 +1,88 @@ +use crate::delorean::{Bucket, Predicate}; +use crate::line_parser::PointType; +use crate::storage::config_store::ConfigStore; +use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; +use crate::storage::rocksdb::RocksDB; +use crate::storage::series_store::{ReadPoint, SeriesStore}; +use crate::storage::{Range, StorageError}; + +use std::sync::Arc; + +pub struct Database { + local_index: Arc, + local_series_store: Arc, + local_config_store: Arc, +} + +impl Database { + pub fn new(dir: &str) -> Database { + let db = Arc::new(RocksDB::new(dir)); + + Database { + local_index: db.clone(), + local_config_store: db.clone(), + local_series_store: db, + } + } + + pub fn write_points( + &self, + _org_id: u32, + bucket: &Bucket, + points: &mut Vec, + ) -> Result<(), StorageError> { + self.local_index + .get_or_create_series_ids_for_points(bucket.id, points)?; + self.local_series_store + .write_points_with_series_ids(bucket.id, &points) + } + + pub fn get_bucket_by_name( + &self, + org_id: u32, + bucket_name: &str, + ) -> Result>, StorageError> { + self.local_config_store + .get_bucket_by_name(org_id, bucket_name) + } + + pub fn create_bucket_if_not_exists( + &self, + org_id: u32, + bucket: &Bucket, + ) -> Result { + self.local_config_store + .create_bucket_if_not_exists(org_id, bucket) + } + + pub fn read_series_matching_predicate_and_range( + &self, + bucket: &Bucket, + predicate: Option<&Predicate>, + _range: Option<&Range>, + ) -> Result>, StorageError> { + self.local_index.read_series_matching(bucket.id, predicate) + } + + pub fn read_i64_range( + &self, + bucket: &Bucket, + series_filter: &SeriesFilter, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.local_series_store + .read_i64_range(bucket.id, series_filter.id, range, batch_size) + } + + pub fn read_f64_range( + &self, + bucket: &Bucket, + series_filter: &SeriesFilter, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.local_series_store + .read_f64_range(bucket.id, series_filter.id, range, batch_size) + } +} diff --git a/src/storage/inverted_index.rs b/src/storage/inverted_index.rs new file mode 100644 index 0000000000..cc5dac64eb --- /dev/null +++ b/src/storage/inverted_index.rs @@ -0,0 +1,38 @@ +use crate::delorean::Predicate; +use crate::line_parser::PointType; +use crate::storage::{SeriesDataType, StorageError}; + +pub trait InvertedIndex: Sync + Send { + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError>; + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError>; + + fn get_tag_keys( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError>; + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + predicate: Option<&Predicate>, + ) -> Result>, StorageError>; +} + +#[derive(Debug, PartialEq, Clone)] +pub struct SeriesFilter { + pub id: u64, + pub key: String, + pub value_predicate: Option, + pub series_type: SeriesDataType, +} diff --git a/src/storage/iterators.rs b/src/storage/iterators.rs deleted file mode 100644 index a3e606538f..0000000000 --- a/src/storage/iterators.rs +++ /dev/null @@ -1,44 +0,0 @@ -use crate::storage::rocksdb::SeriesFilter; - -pub struct SeriesIterator { - pub org_id: u32, - pub bucket_id: u32, - series_filters: Vec, - next_filter: usize, -} - -impl SeriesIterator { - pub fn new(org_id: u32, bucket_id: u32, series_filters: Vec) -> SeriesIterator { - SeriesIterator { - org_id, - bucket_id, - next_filter: 0, - series_filters: series_filters, - } - } -} - -impl Iterator for SeriesIterator { - type Item = SeriesFilter; - - fn next(&mut self) -> Option { - self.next_filter += 1; - match self.series_filters.get(self.next_filter - 1) { - Some(f) => Some(f.clone()), - None => None, - } - } -} - -#[derive(Debug, PartialEq)] -pub struct ReadSeries { - pub id: u64, - pub key: String, - pub points: Vec>, -} - -#[derive(Debug, PartialEq)] -pub struct ReadPoint { - pub time: i64, - pub value: T, -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0c4f57e8e0..b021af78d4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,3 +1,50 @@ -pub mod iterators; +use actix_web::http::StatusCode; +use actix_web::ResponseError; +use std::error; +use std::fmt; + +pub mod config_store; +pub mod database; +pub mod inverted_index; pub mod predicate; pub mod rocksdb; +pub mod series_store; + +pub struct Range { + pub start: i64, + pub stop: i64, +} + +#[repr(u8)] +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum SeriesDataType { + I64, + F64, + // U64, + // String, + // Bool, +} + +#[derive(Debug, Clone)] +pub struct StorageError { + pub description: String, +} + +impl fmt::Display for StorageError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.description) + } +} + +impl error::Error for StorageError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + // Generic error, underlying cause isn't tracked. + None + } +} + +impl ResponseError for StorageError { + fn status_code(&self) -> StatusCode { + StatusCode::BAD_REQUEST + } +} diff --git a/src/storage/points_iterator.rs b/src/storage/points_iterator.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/storage/predicate.rs b/src/storage/predicate.rs index e9a9dc306c..2c3cf31574 100644 --- a/src/storage/predicate.rs +++ b/src/storage/predicate.rs @@ -1,6 +1,6 @@ use crate::delorean::node::{Comparison, Value}; use crate::delorean::{node, Node, Predicate}; -use crate::storage::rocksdb::StorageError; +use crate::storage::StorageError; use std::iter::Peekable; use std::str::Chars; diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index ac2733739a..7ce39eb229 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -1,16 +1,15 @@ use crate::delorean::node::{Comparison, Logical, Value}; use crate::delorean::{Bucket, IndexLevel, Node, Predicate}; use crate::line_parser::PointType; -use crate::storage::iterators::{ReadPoint, SeriesIterator}; +use crate::storage::config_store::ConfigStore; +use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; +use crate::storage::series_store::{ReadPoint, SeriesStore}; +use crate::storage::{Range, SeriesDataType, StorageError}; use std::collections::HashMap; use std::io::Cursor; use std::sync::{Arc, Mutex, RwLock}; -use std::time::SystemTime; -use std::{error, fmt}; -use actix_web::http::StatusCode; -use actix_web::ResponseError; use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use croaring::treemap::NativeSerializer; use croaring::Treemap; @@ -27,26 +26,20 @@ use rocksdb::{ /// /// Series (measurement + tagset + field) are identified by a u64 ID that is unique within a bucket. /// Each bucket keeps an incrementing counter for new series IDs. -pub struct Database { +pub struct RocksDB { db: Arc>, // bucket_map is an in memory map of what buckets exist in the system. the key is the org id and bucket name together as bytes - bucket_map: Arc, Bucket>>>, + bucket_map: Arc, Arc>>>, // series_insert_lock is a map of mutexes for creating new series in each bucket. Bucket ids are unique across all orgs series_insert_lock: Arc>>>, } -#[derive(Debug, PartialEq)] -pub struct Series { - id: Option, - point: PointType, -} - const BUCKET_CF: &str = "buckets"; const BUCKET_CF_WRITE_BUFFER_SIZE: usize = 1024 * 1024; // 1MB const INDEX_CF_WRITE_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB -impl Database { - pub fn new(dir: &str) -> Database { +impl RocksDB { + pub fn new(dir: &str) -> RocksDB { let mut opts = Options::default(); // create the database and missing column families @@ -74,7 +67,7 @@ impl Database { let db = DB::open_cf_descriptors(&opts, dir, cf_descriptors).unwrap(); - let mut database = Database { + let mut database = RocksDB { db: Arc::new(RwLock::new(db)), bucket_map: Arc::new(RwLock::new(HashMap::new())), series_insert_lock: Arc::new(RwLock::new(HashMap::new())), @@ -90,31 +83,31 @@ impl Database { /// single index level of all time if it hasn't been created yet. /// /// # Arguments - /// * org_id - the organization this data resides under - /// * bucket_name - the string identifier of the bucket - /// * points - individual values with their timestamps and series keys + /// * bucket_id - the globally unique bucket id + /// * points - individual values with their timestamps, series keys, and series IDs pub fn write_points( &self, - org_id: u32, - bucket_name: &str, - points: Vec, + bucket_id: u32, + points: &Vec, ) -> Result<(), StorageError> { - let key = bucket_key(org_id, bucket_name); - - let _ = self.create_default_bucket_if_not_exists(org_id, bucket_name, &key)?; - let bucket_map = self.bucket_map.read().unwrap(); - let bucket = bucket_map.get(&key).unwrap(); - - let mut series = self.get_series_ids(org_id, &bucket, points); - self.insert_series_without_ids(org_id, &bucket, &mut series); + // TODO: validate bucket exists? let mut batch = WriteBatch::default(); - for s in series { - let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time()); + for p in points { + let id = match p.series_id() { + Some(id) => id, + None => { + return Err(StorageError { + description: format!("point {:?} had no series id", p), + }) + } + }; + + let key = key_for_series_and_time(bucket_id, id, p.time()); let mut value = Vec::with_capacity(8); - match s.point { + match p { PointType::I64(p) => value.write_i64::(p.value).unwrap(), PointType::F64(p) => value.write_f64::(p.value).unwrap(), } @@ -130,29 +123,54 @@ impl Database { Ok(()) } - fn create_default_bucket_if_not_exists( + // TODO: update this so it decompresses at least the first point to verify the data type or return error + fn read_i64_range<'a>( &self, - org_id: u32, - bucket_name: &str, - bucket_key: &[u8], - ) -> Result { - match self.bucket_map.read().unwrap().get(bucket_key) { - Some(b) => return Ok(b.id), - None => (), - } + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let (iter, series_prefix) = self.get_db_points_iter(bucket_id, series_id, range.start); - let bucket = Bucket::new(org_id, bucket_name.to_string()); - self.create_bucket_if_not_exists(org_id, &bucket) + Ok(Box::new(PointsIterator { + batch_size, + iter, + stop_time: range.stop, + series_prefix, + drained: false, + read: i64_from_bytes, + })) + } + + // TODO: update this so it decompresses at least the first point to verify the data type or return error + fn read_f64_range<'a>( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let (iter, series_prefix) = self.get_db_points_iter(bucket_id, series_id, range.start); + + Ok(Box::new(PointsIterator { + batch_size, + iter, + stop_time: range.stop, + series_prefix, + drained: false, + read: f64_from_bytes, + })) } pub fn read_range<'a>( &self, org_id: u32, bucket_name: &str, - range: &'a Range, + _range: &'a Range, predicate: &'a Predicate, _batch_size: usize, - ) -> Result { + ) -> Result>, StorageError> { let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() { Some(b) => b, None => { @@ -162,18 +180,17 @@ impl Database { } }; - let series_filters = self.get_series_filters(&bucket, Some(&predicate), range)?; + let series_filters = self.get_series_filters(bucket.id, Some(&predicate))?; - Ok(SeriesIterator::new(org_id, bucket.id, series_filters)) + Ok(Box::new(series_filters.into_iter())) } - fn get_db_points_iter( + fn get_db_points_iter<'a>( &self, - _org_id: u32, bucket_id: u32, series_id: u64, start: i64, - ) -> (DBIterator, Vec) { + ) -> (DBIterator<'a>, Vec) { let prefix = prefix_for_series(bucket_id, series_id, start); let mode = IteratorMode::From(&prefix, Direction::Forward); @@ -242,7 +259,7 @@ impl Database { .expect("unexpected rocksdb error writing to DB"); let id = store.id; - map.insert(key, store); + map.insert(key, Arc::new(store)); Ok(id) } @@ -257,20 +274,27 @@ impl Database { &self, org_id: u32, name: &str, - ) -> Result, rocksdb::Error> { - let db = self.db.read().unwrap(); - let buckets = db.cf_handle(BUCKET_CF).unwrap(); - - match db.get_cf(buckets, bucket_key(org_id, &name.to_string())) { - Ok(b) => match b { - Some(b) => { - let bucket = Bucket::decode(b).unwrap(); - return Ok(Some(bucket)); - } - None => return Ok(None), - }, - Err(e) => return Err(e), + ) -> Result>, StorageError> { + let buckets = self.bucket_map.read().unwrap(); + let key = bucket_key(org_id, &name.to_string()); + match buckets.get(&key) { + Some(b) => Ok(Some(b.clone())), + None => Ok(None), } + + // let db = self.db.read().unwrap(); + // let buckets = db.cf_handle(BUCKET_CF).unwrap(); + // + // match db.get_cf(buckets, key) { + // Ok(b) => match b { + // Some(b) => { + // let bucket = Bucket::decode(b).unwrap(); + // return Ok(Some(bucket)); + // } + // None => return Ok(None), + // }, + // Err(e) => return Err(StorageError{description: e.to_string()}), + // } } // TODO: ensure that points with timestamps older than the first index level get matched against the appropriate index @@ -289,27 +313,18 @@ impl Database { /// A vector of series where each point in the passed in vector is contained in a series pub fn get_series_ids( &self, - _org_id: u32, - bucket: &Bucket, - points: Vec, - ) -> Vec { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + let cf_name = index_cf_name(bucket_id); - let series = points - .into_iter() - .map(|p| { - let mut series = Series { id: None, point: p }; - let level = &bucket.index_levels[0]; - let cf_name = index_cf_name(bucket.id, level.duration_seconds, now); - series.id = self.get_series_id(&cf_name, &series.point.series()); - series - }) - .collect(); + for point in points { + if let Some(id) = self.get_series_id(&cf_name, &point.series()) { + point.set_series_id(id); + } + } - series + Ok(()) } // TODO: create test with different data and predicates loaded to ensure it hits the index properly @@ -320,18 +335,16 @@ impl Database { /// storage layer. pub fn get_series_filters( &self, - bucket: &Bucket, + bucket_id: u32, predicate: Option<&Predicate>, - range: &Range, ) -> Result, StorageError> { if let Some(pred) = predicate { if let Some(root) = &pred.root { - let map = self.evaluate_node(bucket, &root, range)?; + let map = self.evaluate_node(bucket_id, &root)?; let mut filters = Vec::with_capacity(map.cardinality() as usize); for id in map.iter() { - let (key, series_type) = - self.get_series_key_and_type_by_id(&bucket, id, &range)?; + let (key, series_type) = self.get_series_key_and_type_by_id(bucket_id, id)?; filters.push(SeriesFilter { id, key, @@ -352,16 +365,10 @@ impl Database { fn get_series_key_and_type_by_id( &self, - bucket: &Bucket, + bucket_id: u32, id: u64, - _range: &Range, ) -> Result<(String, SeriesDataType), StorageError> { - let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); + let cf_name = index_cf_name(bucket_id); let db = self.db.read().unwrap(); match db.cf_handle(&cf_name) { @@ -381,12 +388,7 @@ impl Database { } } - fn evaluate_node( - &self, - bucket: &Bucket, - n: &Node, - range: &Range, - ) -> Result { + fn evaluate_node(&self, bucket_id: u32, n: &Node) -> Result { if n.children.len() != 2 { return Err(StorageError { description: format!( @@ -400,11 +402,11 @@ impl Database { Some(node_value) => match node_value { Value::Logical(l) => { let l = Logical::from_i32(*l).unwrap(); - self.evaluate_logical(bucket, &n.children[0], &n.children[1], l, range) + self.evaluate_logical(bucket_id, &n.children[0], &n.children[1], l) } Value::Comparison(c) => { let c = Comparison::from_i32(*c).unwrap(); - self.evaluate_comparison(bucket, &n.children[0], &n.children[1], c, range) + self.evaluate_comparison(bucket_id, &n.children[0], &n.children[1], c) } val => Err(StorageError { description: format!("evaluate_node called on wrong type {:?}", val), @@ -418,14 +420,13 @@ impl Database { fn evaluate_logical( &self, - bucket: &Bucket, + bucket_id: u32, left: &Node, right: &Node, op: Logical, - range: &Range, ) -> Result { - let mut left_result = self.evaluate_node(bucket, left, range)?; - let right_result = self.evaluate_node(bucket, right, range)?; + let mut left_result = self.evaluate_node(bucket_id, left)?; + let right_result = self.evaluate_node(bucket_id, right)?; match op { Logical::And => left_result.and_inplace(&right_result), @@ -437,11 +438,10 @@ impl Database { fn evaluate_comparison( &self, - bucket: &Bucket, + bucket_id: u32, left: &Node, right: &Node, op: Comparison, - range: &Range, ) -> Result { let left = match &left.value { Some(Value::TagRefValue(s)) => s, @@ -464,7 +464,7 @@ impl Database { match op { Comparison::Equal => { - return self.get_posting_list_for_tag_key_value(bucket, &left, &right, range); + return self.get_posting_list_for_tag_key_value(bucket_id, &left, &right); } comp => { return Err(StorageError { @@ -476,24 +476,18 @@ impl Database { fn get_posting_list_for_tag_key_value( &self, - bucket: &Bucket, + bucket_id: u32, key: &str, value: &str, - _range: &Range, ) -> Result { // first get the cf for this index - let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); + let cf_name = index_cf_name(bucket_id); let db = self.db.read().unwrap(); match db.cf_handle(&cf_name) { Some(cf) => { match db - .get_cf(cf, index_key_value_posting_list(bucket.id, key, value)) + .get_cf(cf, index_key_value_posting_list(bucket_id, key, value)) .unwrap() { Some(val) => { @@ -511,26 +505,15 @@ impl Database { } // TODO: handle predicate - pub fn get_tag_keys( - &self, - bucket: &Bucket, - _predicate: Option<&Predicate>, - _range: &Range, - ) -> Vec { - let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); - + pub fn get_tag_keys(&self, bucket_id: u32, _predicate: Option<&Predicate>) -> Vec { + let cf_name = index_cf_name(bucket_id); let mut keys = vec![]; let db = self.db.read().unwrap(); match db.cf_handle(&cf_name) { Some(index) => { - let prefix = index_tag_key_prefix(bucket.id); + let prefix = index_tag_key_prefix(bucket_id); let mode = IteratorMode::From(&prefix, Direction::Forward); let iter = db .iterator_cf(index, mode) @@ -553,24 +536,18 @@ impl Database { pub fn get_tag_values( &self, - bucket: &Bucket, + bucket_id: u32, tag: &str, _predicate: Option<&Predicate>, - _range: &Range, ) -> Vec { - let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); + let cf_name = index_cf_name(bucket_id); let db = self.db.read().unwrap(); let mut values = vec![]; match db.cf_handle(&cf_name) { Some(index) => { - let prefix = index_tag_key_value_prefix(bucket.id, tag); + let prefix = index_tag_key_value_prefix(bucket_id, tag); let mode = IteratorMode::From(&prefix, Direction::Forward); let iter = db .iterator_cf(index, mode) @@ -614,28 +591,19 @@ impl Database { // TODO: build the index for levels other than the first // insert_series_without_ids will insert any series into the index and obtain an identifier for it. // the passed in series vector is modified so that the newly inserted series have their ids - pub fn insert_series_without_ids( - &self, - org_id: u32, - bucket: &Bucket, - series: &mut Vec, - ) { + pub fn insert_series_without_ids(&self, bucket_id: u32, points: &mut Vec) { // We want to get a lock on new series only for this bucket - self.ensure_series_mutex_exists(bucket.id); + self.ensure_series_mutex_exists(bucket_id); let map = self.series_insert_lock.read().expect("mutex poisoned"); let next_id = map - .get(&bucket.id) + .get(&bucket_id) .expect("should exist because of call to ensure_series_mutex_exists"); let mut next_id = next_id.lock().expect("mutex poisoned"); let mut batch = WriteBatch::default(); // create the column family to store the index if it doesn't exist - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let cf_name = index_cf_name(bucket.id, bucket.index_levels[0].duration_seconds, now); + let cf_name = index_cf_name(bucket_id); let index_exists = match self.db.read().unwrap().cf_handle(&cf_name) { Some(_) => true, None => false, @@ -659,32 +627,32 @@ impl Database { let mut series_id_map: HashMap = HashMap::new(); // now loop through the series and insert the index entries into the map - for series in series { + for point in points { // don't bother with series in the collection that already have IDs - if let Some(_) = series.id { + if let Some(_) = point.series_id() { continue; } // if we've already put this series in the map in this write, skip it - if let Some(id) = series_id_map.get(series.point.series()) { - series.id = Some(*id); + if let Some(id) = series_id_map.get(point.series()) { + point.set_series_id(*id); continue; } // now that we have the mutex on series, make sure these weren't inserted in some other thread - if let Some(id) = self.get_series_id(&cf_name, &series.point.series()) { - series.id = Some(id); + if let Some(id) = self.get_series_id(&cf_name, &point.series()) { + point.set_series_id(id); continue; } - series.id = Some(*next_id); + point.set_series_id(*next_id); let id = *next_id; let mut series_id = Vec::with_capacity(8); series_id.write_u64::(*next_id).unwrap(); batch .put_cf( index_cf, - index_series_key_id(&series.point.series()), + index_series_key_id(&point.series()), series_id.clone(), ) .unwrap(); @@ -692,38 +660,35 @@ impl Database { .put_cf( index_cf, index_series_id(&series_id), - index_series_id_value( - series_type_from_point_type(&series.point), - &series.point.series(), - ), + index_series_id_value(series_type_from_point_type(&point), &point.series()), ) .unwrap(); - series_id_map.insert(series.point.series().clone(), *next_id); + series_id_map.insert(point.series().clone(), *next_id); *next_id += 1; // insert the index entries // TODO: do the error handling bits, but how to handle? Should all series be validated before // and fail the whole write if any one is bad, or insert the ones we can and ignore and log the bad? - let pairs = series.point.index_pairs().unwrap(); + let pairs = point.index_pairs().unwrap(); for pair in pairs { // insert the tag key index batch - .put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]) + .put_cf(index_cf, index_tag_key(bucket_id, &pair.key), vec![0 as u8]) .unwrap(); // insert the tag value index batch .put_cf( index_cf, - index_tag_key_value(bucket.id, &pair.key, &pair.value), + index_tag_key_value(bucket_id, &pair.key, &pair.value), vec![0 as u8], ) .unwrap(); // update the key to id bitmap let index_key_posting_list_key = - index_key_posting_list(bucket.id, &pair.key).to_vec(); + index_key_posting_list(bucket_id, &pair.key).to_vec(); // put it in the temporary in memory map for a single write update later match index_map.get_mut(&index_key_posting_list_key) { @@ -749,7 +714,7 @@ impl Database { // update the key/value to id bitmap let index_key_value_posting_list_key = - index_key_value_posting_list(bucket.id, &pair.key, &pair.value).to_vec(); + index_key_value_posting_list(bucket_id, &pair.key, &pair.value).to_vec(); match index_map.get_mut(&index_key_value_posting_list_key) { Some(tree) => { @@ -783,11 +748,7 @@ impl Database { let bucket_cf = db.cf_handle(BUCKET_CF).unwrap(); let mut next_series_id_val = Vec::with_capacity(8); next_series_id_val.write_u64::(*next_id).unwrap(); - let _ = batch.put_cf( - bucket_cf, - next_series_id_key(org_id, bucket.id), - next_series_id_val, - ); + let _ = batch.put_cf(bucket_cf, next_series_id_key(bucket_id), next_series_id_val); db.write(batch).expect("unexpected rocksdb error"); } @@ -828,7 +789,7 @@ impl Database { match bucket_entry_type_from_byte(key[0]) { BucketEntryType::NextSeriesID => { // read the bucket id from the key - let mut c = Cursor::new(key[5..].to_vec()); + let mut c = Cursor::new(key[1..].to_vec()); let bucket_id = c.read_u32::().expect(&format!( "couldn't read the bucket id from the key {:?}", key @@ -846,7 +807,7 @@ impl Database { let bucket = Bucket::decode(value.into_vec()).expect("unexpected error decoding bucket"); let key = bucket_key(bucket.org_id, &bucket.name); - bucket_map.insert(key, bucket); + bucket_map.insert(key, Arc::new(bucket)); } BucketEntryType::NextBucketID => (), } @@ -855,6 +816,94 @@ impl Database { } } +impl InvertedIndex for RocksDB { + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + self.get_series_ids(bucket_id, points)?; + self.insert_series_without_ids(bucket_id, points); + Ok(()) + } + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + let filters = self.get_series_filters(bucket_id, predicate)?; + Ok(Box::new(filters.into_iter())) + } + + fn get_tag_keys( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + let keys = self.get_tag_keys(bucket_id, predicate); + Ok(Box::new(keys.into_iter())) + } + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + let values = self.get_tag_values(bucket_id, tag_key, predicate); + Ok(Box::new(values.into_iter())) + } +} + +impl SeriesStore for RocksDB { + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError> { + self.write_points(bucket_id, &points) + } + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_i64_range(bucket_id, series_id, range, batch_size) + } + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_f64_range(bucket_id, series_id, range, batch_size) + } +} + +impl ConfigStore for RocksDB { + fn create_bucket_if_not_exists( + &self, + org_id: u32, + bucket: &Bucket, + ) -> Result { + self.create_bucket_if_not_exists(org_id, bucket) + } + + fn get_bucket_by_name( + &self, + org_id: u32, + bucket_name: &str, + ) -> Result>, StorageError> { + self.get_bucket_by_name(org_id, bucket_name) + } +} + /* Index entries all have the prefix: @@ -895,15 +944,6 @@ TODO: other pieces */ -#[derive(Debug, PartialEq, Clone)] -pub enum SeriesDataType { - I64, - F64, - // U64, - // String, - // Bool, -} - fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec { let mut v = Vec::with_capacity(20); v.write_u32::(bucket_id).unwrap(); @@ -921,48 +961,6 @@ pub struct PointsIterator<'a, T> { read: fn(b: &[u8]) -> T, } -pub fn new_i64_points_iterator<'a>( - org_id: u32, - bucket_id: u32, - db: &'a Database, - series_filter: &'a SeriesFilter, - range: &Range, - batch_size: usize, -) -> PointsIterator<'a, i64> { - let (iter, series_prefix) = - db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start); - - PointsIterator { - batch_size, - iter, - stop_time: range.stop, - series_prefix, - drained: false, - read: i64_from_bytes, - } -} - -pub fn new_f64_points_iterator<'a>( - org_id: u32, - bucket_id: u32, - db: &'a Database, - series_filter: &'a SeriesFilter, - range: &Range, - batch_size: usize, -) -> PointsIterator<'a, f64> { - let (iter, series_prefix) = - db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start); - - PointsIterator { - batch_size, - iter, - stop_time: range.stop, - series_prefix, - drained: false, - read: f64_from_bytes, - } -} - impl Iterator for PointsIterator<'_, T> { type Item = Vec>; @@ -1032,20 +1030,8 @@ fn index_cf_options() -> Options { options } -// index_cf_name returns the name of the column family for the given index duration at a given epoch time (in seconds) -fn index_cf_name(bucket_id: u32, duration: u32, epoch: u64) -> String { - if duration == 0 { - return format!("index_{}_{}", bucket_id, "0"); - } - - let duration = duration as u64; - - format!( - "index_{}_{}_{}", - bucket_id, - duration, - epoch / duration * duration - ) +fn index_cf_name(bucket_id: u32) -> String { + format!("index_{}", bucket_id,) } fn index_series_key_id(series_key: &str) -> Vec { @@ -1140,10 +1126,9 @@ fn index_key_value_posting_list(bucket_id: u32, key: &str, value: &str) -> Vec Vec { - let mut v = Vec::with_capacity(9); +fn next_series_id_key(bucket_id: u32) -> Vec { + let mut v = Vec::with_capacity(5); v.push(BucketEntryType::NextSeriesID as u8); - v.write_u32::(org_id).unwrap(); v.write_u32::(bucket_id).unwrap(); v } @@ -1227,44 +1212,6 @@ fn key_for_series_and_time(bucket_id: u32, series_id: u64, timestamp: i64) -> Ve v } -// TODO: add series type to series filter -#[derive(Debug, PartialEq, Clone)] -pub struct SeriesFilter { - pub id: u64, - pub key: String, - pub value_predicate: Option, - pub series_type: SeriesDataType, -} - -pub struct Range { - pub start: i64, - pub stop: i64, -} - -#[derive(Debug, Clone)] -pub struct StorageError { - pub description: String, -} - -impl fmt::Display for StorageError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.description) - } -} - -impl error::Error for StorageError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - // Generic error, underlying cause isn't tracked. - None - } -} - -impl ResponseError for StorageError { - fn status_code(&self) -> StatusCode { - StatusCode::BAD_REQUEST - } -} - #[cfg(test)] mod tests { use super::*; @@ -1276,7 +1223,7 @@ mod tests { #[test] fn create_and_get_buckets() { - let bucket: Bucket; + let bucket: Arc; let org_id = 1; let mut bucket2 = Bucket::new(2, "Foo".to_string()); { @@ -1286,7 +1233,7 @@ mod tests { b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); assert_eq!(b.id, 1); let stored_bucket = db.get_bucket_by_name(org_id, &b.name).unwrap().unwrap(); - assert_eq!(b, stored_bucket); + assert_eq!(Arc::new(b.clone()), stored_bucket); bucket = stored_bucket; // ensure it doesn't insert again @@ -1302,14 +1249,14 @@ mod tests { .get_bucket_by_name(bucket2.org_id, &bucket2.name) .unwrap() .unwrap(); - assert_eq!(bucket2, stored2); + assert_eq!(Arc::new(bucket2), stored2); // ensure second bucket gets new ID let mut b2 = Bucket::new(org_id, "two".to_string()); b2.id = db.create_bucket_if_not_exists(org_id, &b2).unwrap(); assert_eq!(b2.id, 3); let stored_bucket = db.get_bucket_by_name(org_id, &b2.name).unwrap().unwrap(); - assert_eq!(b2, stored_bucket); + assert_eq!(Arc::new(b2), stored_bucket); // TODO: ensure that a bucket orders levels correctly } @@ -1345,85 +1292,31 @@ mod tests { b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); - let mut series = db.get_series_ids(org_id, &b, vec![p1.clone(), p2.clone()]); - assert_eq!( - series, - vec![ - Series { - id: None, - point: p1.clone() - }, - Series { - id: None, - point: p2.clone() - }, - ] - ); - - db.insert_series_without_ids(org_id, &b, &mut series); - assert_eq!( - series, - vec![ - Series { - id: Some(1), - point: p1.clone() - }, - Series { - id: Some(2), - point: p2.clone() - }, - ] - ); + let mut points = vec![p1.clone(), p2.clone()]; + db.get_or_create_series_ids_for_points(b.id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(2)); // now insert a new series and make sure it shows up - series = db.get_series_ids(org_id, &b, vec![p1.clone(), p3.clone()]); - assert_eq!( - series, - vec![ - Series { - id: Some(1), - point: p1.clone() - }, - Series { - id: None, - point: p3.clone() - }, - ] - ); + let mut points = vec![p1.clone(), p3.clone()]; + db.get_series_ids(b.id, &mut points).unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), None); - db.insert_series_without_ids(org_id, &b, &mut series); - assert_eq!( - series, - vec![ - Series { - id: Some(1), - point: p1.clone() - }, - Series { - id: Some(3), - point: p3.clone() - }, - ] - ); + db.get_or_create_series_ids_for_points(b.id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(3)); - series = db.get_series_ids(b2.org_id, &b2, vec![p1.clone()]); - assert_eq!( - series, - vec![Series { - id: None, - point: p1.clone() - }] - ); + let mut points = vec![p1.clone()]; + db.get_series_ids(b2.id, &mut points).unwrap(); + assert_eq!(points[0].series_id(), None); // insert a series into the other org bucket - db.insert_series_without_ids(b2.org_id, &b2, &mut series); - assert_eq!( - series, - vec![Series { - id: Some(1), - point: p1.clone() - }] - ); + db.get_or_create_series_ids_for_points(b2.id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); } // now make sure that a new series gets inserted properly after restart @@ -1431,74 +1324,27 @@ mod tests { let db = test_database("series_id_indexing", false); // check the first org - let mut series = vec![Series { - id: None, - point: p4.clone(), - }]; - db.insert_series_without_ids(org_id, &b, &mut series); - assert_eq!( - series, - vec![Series { - id: Some(4), - point: p4.clone() - }] - ); - assert_eq!( - db.get_series_ids( - org_id, - &b, - vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()] - ), - vec![ - Series { - id: Some(1), - point: p1.clone() - }, - Series { - id: Some(2), - point: p2.clone() - }, - Series { - id: Some(3), - point: p3.clone() - }, - Series { - id: Some(4), - point: p4.clone() - }, - ], - ); + let mut points = vec![p4.clone()]; + db.insert_series_without_ids(b.id, &mut points); + assert_eq!(points[0].series_id(), Some(4)); + + let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + db.get_series_ids(b.id, &mut points).unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(2)); + assert_eq!(points[2].series_id(), Some(3)); + assert_eq!(points[3].series_id(), Some(4)); // check the second org - series = vec![Series { - id: None, - point: p2.clone(), - }]; - db.insert_series_without_ids(b2.org_id, &b2, &mut series); - assert_eq!( - series, - vec![Series { - id: Some(2), - point: p2.clone() - }] - ); - assert_eq!( - db.get_series_ids(b2.org_id, &b2, vec![p1.clone(), p2.clone(), p3.clone()]), - vec![ - Series { - id: Some(1), - point: p1 - }, - Series { - id: Some(2), - point: p2 - }, - Series { - id: None, - point: p3 - }, - ], - ); + let mut points = vec![p2.clone()]; + db.insert_series_without_ids(b2.id, &mut points); + assert_eq!(points[0].series_id(), Some(2)); + + let mut points = vec![p1.clone(), p2.clone(), p3.clone()]; + db.get_series_ids(b2.id, &mut points).unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(2)); + assert_eq!(points[2].series_id(), None); } } @@ -1514,28 +1360,24 @@ mod tests { bucket.id = db .create_bucket_if_not_exists(bucket.org_id, &bucket) .unwrap(); - let mut series = db.get_series_ids( - bucket.org_id, - &bucket, - vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()], - ); - db.insert_series_without_ids(bucket.org_id, &bucket, &mut series); + let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + db.get_or_create_series_ids_for_points(bucket.id, &mut points) + .unwrap(); - let range = Range { - start: 0, - stop: std::i64::MAX, - }; - let tag_keys = db.get_tag_keys(&bucket, None, &range); + let tag_keys = db.get_tag_keys(bucket.id, None); assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]); - let tag_values = db.get_tag_values(&bucket, "host", None, &range); + let tag_values = db.get_tag_values(bucket.id, "host", None); assert_eq!(tag_values, vec!["a", "b"]); // get all series // get series with measurement = mem let pred = parse_predicate("_m = \"cpu\"").unwrap(); - let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); + let series: Vec = db + .read_series_matching(bucket.id, Some(&pred)) + .unwrap() + .collect(); assert_eq!( series, vec![ @@ -1562,7 +1404,10 @@ mod tests { // get series with host = a let pred = parse_predicate("host = \"a\"").unwrap(); - let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); + let series: Vec = db + .read_series_matching(bucket.id, Some(&pred)) + .unwrap() + .collect(); assert_eq!( series, vec![ @@ -1583,7 +1428,10 @@ mod tests { // get series with measurement = cpu and host = b let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap(); - let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); + let series: Vec = db + .read_series_matching(bucket.id, Some(&pred)) + .unwrap() + .collect(); assert_eq!( series, vec![SeriesFilter { @@ -1595,7 +1443,10 @@ mod tests { ); let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap(); - let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); + let series: Vec = db + .read_series_matching(bucket.id, Some(&pred)) + .unwrap() + .collect(); assert_eq!( series, vec![ @@ -1621,24 +1472,6 @@ mod tests { ); } - #[test] - fn write_creates_bucket() { - let b1 = Bucket::new(1, "bucket1".to_string()); - let db = test_database("write_creates_bucket", true); - - let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); - let p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2); - - db.write_points(b1.org_id, &b1.name, vec![p1, p2]).unwrap(); - assert_eq!( - db.get_bucket_by_name(b1.org_id, &b1.name) - .unwrap() - .unwrap() - .id, - 1 - ); - } - #[test] fn catch_rocksdb_iterator_segfault() { let mut b1 = Bucket::new(1, "bucket1".to_string()); @@ -1648,15 +1481,16 @@ mod tests { b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); - db.write_points(b1.org_id, &b1.name, vec![p1.clone()]) + let mut points = vec![p1.clone()]; + db.get_or_create_series_ids_for_points(b1.id, &mut points) .unwrap(); + db.write_points(b1.id, &points).unwrap(); // test that we'll only read from the bucket we wrote points into let range = Range { start: 1, stop: 4 }; let pred = parse_predicate("_m = \"cpu\"").unwrap(); - let mut iter = db - .read_range(b1.org_id, &b1.name, &range, &pred, 10) - .unwrap(); + let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap(); + let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1668,8 +1502,9 @@ mod tests { } ); assert_eq!(iter.next(), None); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b1.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]); assert_eq!(points_iter.next(), None); @@ -1689,21 +1524,20 @@ mod tests { b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); - db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]) + let mut b1_points = vec![p1.clone(), p2.clone()]; + db.get_or_create_series_ids_for_points(b1.id, &mut b1_points) .unwrap(); - db.write_points( - b2.org_id, - &b2.name, - vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()], - ) - .unwrap(); + db.write_points(b1.id, &b1_points).unwrap(); + + let mut b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + db.get_or_create_series_ids_for_points(b2.id, &mut b2_points) + .unwrap(); + db.write_points(b2.id, &b2_points).unwrap(); // test that we'll only read from the bucket we wrote points into let range = Range { start: 1, stop: 4 }; let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); - let mut iter = db - .read_range(b1.org_id, &b1.name, &range, &pred, 10) - .unwrap(); + let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap(); let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1715,8 +1549,9 @@ mod tests { } ); assert_eq!(iter.next(), None); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b1.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!( points, @@ -1729,9 +1564,7 @@ mod tests { // test that we'll read multiple series let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); - let mut iter = db - .read_range(b2.org_id, &b2.name, &range, &pred, 10) - .unwrap(); + let mut iter = db.read_series_matching(b2.id, Some(&pred)).unwrap(); let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1742,8 +1575,9 @@ mod tests { series_type: SeriesDataType::I64 } ); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b2.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!( points, @@ -1763,8 +1597,9 @@ mod tests { series_type: SeriesDataType::I64 } ); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b2.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!( points, @@ -1776,9 +1611,7 @@ mod tests { // test that the batch size is honored let pred = parse_predicate("host = \"b\"").unwrap(); - let mut iter = db - .read_range(b1.org_id, &b1.name, &range, &pred, 1) - .unwrap(); + let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap(); let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1790,8 +1623,9 @@ mod tests { } ); assert_eq!(iter.next(), None); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1); + let mut points_iter = db + .read_i64_range(b1.id, series_filter.id, &range, 1) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]); let points = points_iter.next().unwrap(); @@ -1800,9 +1634,7 @@ mod tests { // test that the time range is properly limiting let range = Range { start: 2, stop: 3 }; let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); - let mut iter = db - .read_range(b2.org_id, &b2.name, &range, &pred, 10) - .unwrap(); + let mut iter = db.read_series_matching(b2.id, Some(&pred)).unwrap(); let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1813,8 +1645,9 @@ mod tests { series_type: SeriesDataType::I64 } ); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b2.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); @@ -1828,8 +1661,9 @@ mod tests { series_type: SeriesDataType::I64 } ); - let mut points_iter = - new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_i64_range(b2.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); } @@ -1844,15 +1678,15 @@ mod tests { b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); - db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]) + let mut points = vec![p1.clone(), p2.clone()]; + db.get_or_create_series_ids_for_points(b1.id, &mut points) .unwrap(); + db.write_points_with_series_ids(b1.id, &points).unwrap(); // test that we'll only read from the bucket we wrote points into let range = Range { start: 0, stop: 4 }; let pred = parse_predicate("_m = \"cpu\"").unwrap(); - let mut iter = db - .read_range(b1.org_id, &b1.name, &range, &pred, 10) - .unwrap(); + let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap(); let series_filter = iter.next().unwrap(); assert_eq!( series_filter, @@ -1864,8 +1698,9 @@ mod tests { } ); assert_eq!(iter.next(), None); - let mut points_iter = - new_f64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let mut points_iter = db + .read_f64_range(b1.id, series_filter.id, &range, 10) + .unwrap(); let points = points_iter.next().unwrap(); assert_eq!( points, @@ -1897,11 +1732,11 @@ mod tests { } } - fn test_database(name: &str, remove_old: bool) -> Database { + fn test_database(name: &str, remove_old: bool) -> RocksDB { let path = std::path::Path::new(&get_test_storage_path()).join(name); if remove_old { let _ = std::fs::remove_dir_all(path.to_str().unwrap()); } - Database::new(path.to_str().unwrap()) + RocksDB::new(path.to_str().unwrap()) } } diff --git a/src/storage/series_iterator.rs b/src/storage/series_iterator.rs new file mode 100644 index 0000000000..7a842e9d6e --- /dev/null +++ b/src/storage/series_iterator.rs @@ -0,0 +1,38 @@ +//use crate::storage::rocksdb::SeriesFilter; +// +//pub struct SeriesIterator { +// pub org_id: u32, +// pub bucket_id: u32, +// series_filters: Vec, +// next_filter: usize, +//} +// +//impl SeriesIterator { +// pub fn new(org_id: u32, bucket_id: u32, series_filters: Vec) -> SeriesIterator { +// SeriesIterator { +// org_id, +// bucket_id, +// next_filter: 0, +// series_filters: series_filters, +// } +// } +//} +// +//impl Iterator for SeriesIterator { +// type Item = SeriesFilter; +// +// fn next(&mut self) -> Option { +// self.next_filter += 1; +// match self.series_filters.get(self.next_filter - 1) { +// Some(f) => Some(f.clone()), +// None => None, +// } +// } +//} +// +//#[derive(Debug, PartialEq)] +//pub struct ReadSeries { +// pub id: u64, +// pub key: String, +// pub points: Vec>, +//} diff --git a/src/storage/series_store.rs b/src/storage/series_store.rs new file mode 100644 index 0000000000..eb2ef9f14f --- /dev/null +++ b/src/storage/series_store.rs @@ -0,0 +1,32 @@ +use crate::line_parser::PointType; +use crate::storage::{Range, StorageError}; + +pub trait SeriesStore: Sync + Send { + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError>; + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError>; + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError>; +} + +#[derive(Debug, PartialEq)] +pub struct ReadPoint { + pub time: i64, + pub value: T, +} From bec6b3cf9c0df9aa7b25bf2628d529e3301ded8e Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 11 Jan 2020 18:34:25 -0500 Subject: [PATCH 2/3] Add MemDB and test framework This commit adds a test framework for the InvertedIndex and SeriesStore traits. Structs that implement these traits can call into their tests to ensure they work. This commit also adds an in memory database that keeps time series data in a ring buffer per series and an inverted index in memory using a combination of HashMap and BTreeMap. --- src/storage/inverted_index.rs | 155 +++++++++ src/storage/memdb.rs | 625 ++++++++++++++++++++++++++++++++++ src/storage/mod.rs | 1 + src/storage/rocksdb.rs | 4 +- src/storage/series_store.rs | 127 ++++++- 5 files changed, 908 insertions(+), 4 deletions(-) create mode 100644 src/storage/memdb.rs diff --git a/src/storage/inverted_index.rs b/src/storage/inverted_index.rs index cc5dac64eb..b85c386d90 100644 --- a/src/storage/inverted_index.rs +++ b/src/storage/inverted_index.rs @@ -36,3 +36,158 @@ pub struct SeriesFilter { pub value_predicate: Option, pub series_type: SeriesDataType, } + +// Test helpers for other implementations to run +#[cfg(test)] +pub mod tests { + use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; + use crate::line_parser::PointType; + use crate::storage::predicate::parse_predicate; + use crate::storage::SeriesDataType; + + pub fn series_id_indexing(index: Box) { + let bucket_id = 1; + let bucket_2 = 2; + let p1 = PointType::new_i64("one".to_string(), 1, 0); + let p2 = PointType::new_i64("two".to_string(), 23, 40); + let p3 = PointType::new_i64("three".to_string(), 33, 86); + + let mut points = vec![p1.clone(), p2.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(2)); + + + // now put series in a different bucket, but make sure the IDs start from the beginning + let mut points = vec![p1.clone()]; + index.get_or_create_series_ids_for_points(bucket_2, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + + // now insert a new series in the first bucket and make sure it shows up + let mut points = vec![p1.clone(), p3.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points) + .unwrap(); + assert_eq!(points[0].series_id(), Some(1)); + assert_eq!(points[1].series_id(), Some(3)); + } + + pub fn series_metadata_indexing(index: Box) { + let bucket_id = 1; + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0); + let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0); + let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0); + let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0); + + let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + index.get_or_create_series_ids_for_points(bucket_id, &mut points).unwrap(); + + let tag_keys: Vec = index.get_tag_keys(bucket_id, None).unwrap().collect(); + assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]); + + let tag_values: Vec= index.get_tag_values(bucket_id, "host", None).unwrap().collect(); + assert_eq!(tag_values, vec!["a", "b"]); + + // get all series + + // get series with measurement = mem + let pred = parse_predicate("_m = \"cpu\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 1, + key: "cpu,host=b,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + + // get series with host = a + let pred = parse_predicate("host = \"a\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + + // get series with measurement = cpu and host = b + let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![SeriesFilter { + id: 1, + key: "cpu,host=b,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + },] + ); + + let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap(); + let series: Vec = index + .read_series_matching(bucket_id, Some(&pred)) + .unwrap() + .collect(); + assert_eq!( + series, + vec![ + SeriesFilter { + id: 2, + key: "cpu,host=a,region=west\tusage_system".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 3, + key: "cpu,host=a,region=west\tusage_user".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + SeriesFilter { + id: 4, + key: "mem,host=b,region=west\tfree".to_string(), + value_predicate: None, + series_type: SeriesDataType::I64 + }, + ] + ); + } +} \ No newline at end of file diff --git a/src/storage/memdb.rs b/src/storage/memdb.rs new file mode 100644 index 0000000000..cfd1c34660 --- /dev/null +++ b/src/storage/memdb.rs @@ -0,0 +1,625 @@ +use crate::delorean::node::{Comparison, Logical, Value}; +use crate::delorean::{Predicate, Node}; +use crate::storage::inverted_index::{InvertedIndex, SeriesFilter}; +use crate::line_parser::{PointType, ParseError, Point}; +use crate::storage::{StorageError, Range, SeriesDataType}; +use crate::storage::series_store::{SeriesStore, ReadPoint}; + +use std::sync::{RwLock, Arc, RwLockReadGuard, Mutex}; +use std::collections::{HashMap, BTreeMap}; + +use croaring::Treemap; +/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It +/// works with a ring buffer. Currently, it does not limit the number of series that can be +/// created. It also assumes that data per series arrives in time acending order. + +// TODO: return errors if trying to insert data out of order in an individual series + +pub struct MemDB { + default_ring_buffer_size: usize, + bucket_id_to_series_data: Arc>>>, + bucket_id_to_series_map: Arc>>>, +} + +struct SeriesData { + ring_buffer_size: usize, + i64_series: HashMap>, + f64_series: HashMap>, +} + +impl SeriesData { + fn write_points(&mut self, points: &Vec) { + for p in points { + match p { + PointType::I64(p) => { + match self.i64_series.get_mut(&p.series_id.unwrap()) { + Some(buff) => buff.write(&p), + None => { + let mut buff = new_i64_ring_buffer(self.ring_buffer_size); + buff.write(&p); + self.i64_series.insert(p.series_id.unwrap(), buff); + }, + } + }, + PointType::F64(p) => { + match self.f64_series.get_mut(&p.series_id.unwrap()) { + Some(buff) => buff.write(&p), + None => { + let mut buff = new_f64_ring_buffer(self.ring_buffer_size); + buff.write(&p); + self.f64_series.insert(p.series_id.unwrap(), buff); + } + } + }, + } + } + } +} + +struct SeriesRingBuffer { + next_position: usize, + data: Vec>, +} + +fn new_i64_ring_buffer(size: usize) -> SeriesRingBuffer { + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(ReadPoint{time: std::i64::MAX, value: 0 as i64}) + } + + SeriesRingBuffer{ + next_position: 0, + data, + } +} + +fn new_f64_ring_buffer(size: usize) -> SeriesRingBuffer { + let mut data = Vec::with_capacity(size); + for _ in 0..size { + data.push(ReadPoint{time: std::i64::MAX, value: 0 as f64}) + } + + SeriesRingBuffer{ + next_position: 0, + data, + } +} + +impl SeriesRingBuffer { + fn write(&mut self, point: &Point) { + if self.next_position == self.data.len() { + self.next_position = 0; + } + self.data[self.next_position].time = point.time; + self.data[self.next_position].value = point.value; + self.next_position += 1; + } + + fn get_range(&self, range: &Range) -> Vec> { + let (_, pos) = self.oldest_time_and_position(); + + let mut values = Vec::new(); + + for i in pos..self.data.len() { + if self.data[i].time > range.stop { + return values; + } else if self.data[i].time >= range.start { + values.push(self.data[i].clone()); + } + }; + + for i in 0..self.next_position { + if self.data[i].time > range.stop { + return values; + } else if self.data[i].time >= range.start { + values.push(self.data[i].clone()); + } + } + + values + } + + fn oldest_time_and_position(&self) -> (i64, usize) { + let mut pos = self.next_position; + if self.next_position == self.data.len() { + pos = 0; + } else if self.data[pos].time == std::i64::MAX { + pos = 0; + } + + (self.data[pos].time, pos) + } +} + +struct SeriesMap { + last_id: u64, + series_key_to_id: HashMap, + series_id_to_key_and_type: HashMap, + tag_keys: BTreeMap>, + posting_list: HashMap, Treemap>, +} + +impl SeriesMap { + fn new() -> SeriesMap { + SeriesMap{ + last_id: 0, + series_key_to_id: HashMap::new(), + series_id_to_key_and_type: HashMap::new(), + tag_keys: BTreeMap::new(), + posting_list: HashMap::new(), + } + } + + fn insert_series(&mut self, point: &mut PointType) -> Result<(), ParseError> { + if let Some(id) = self.series_key_to_id.get(point.series()) { + point.set_series_id(*id); + return Ok(()); + } + + // insert the series id + self.last_id += 1; + point.set_series_id(self.last_id); + self.series_key_to_id.insert(point.series().clone(), self.last_id); + + let series_type = match point { + PointType::I64(_) => SeriesDataType::I64, + PointType::F64(_) => SeriesDataType::F64, + }; + self.series_id_to_key_and_type.insert(self.last_id, (point.series().clone(), series_type)); + + for pair in point.index_pairs()? { + // insert this id into the posting list + let list_key = list_key(&pair.key, &pair.value); + + let posting_list = self.posting_list.entry(list_key).or_insert(Treemap::create()); + posting_list.add(self.last_id); + + // insert the tag key value mapping + let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new()); + tag_values.insert(pair.value, true); + } + + Ok(()) + } + + fn posting_list_for_key_value(&self, key: &str, value: &str) -> Treemap { + let list_key = list_key(key, value); + match self.posting_list.get(&list_key) { + Some(m) => m.clone(), + None => Treemap::create(), + } + } +} + +fn list_key(key: &str, value: &str) -> Vec { + let mut list_key = key.as_bytes().to_vec(); + list_key.push(0 as u8); + list_key.append(&mut value.as_bytes().to_vec()); + list_key +} + +impl MemDB { + pub fn new() -> MemDB { + MemDB { + default_ring_buffer_size: 10000, + bucket_id_to_series_data: Arc::new(RwLock::new(HashMap::new())), + bucket_id_to_series_map: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + // first try to do everything with just a read lock + if self.get_series_ids_for_points(bucket_id, points) { + return Ok(()) + } + + // if we got this far, we have new series to insert + let buckets = self.bucket_id_to_series_map.read().unwrap(); + let mut series_map = buckets.get(&bucket_id).unwrap().write().unwrap(); + + for p in points { + match p.series_id() { + Some(_) => (), + None => { + match series_map.insert_series(p) { + Ok(_) => (), + Err(e) => return Err(StorageError { description: format!("error parsing line protocol metadata {}", e) }) + } + }, + } + } + + Ok(()) + } + + // get_series_ids_for_points attempts to fill the series ids for all points in the passed in + // collection using only a read lock. If no SeriesMap exists for the bucket, it will be inserted. + // It will return true if all points have series ids filled in. + fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut Vec) -> bool { + let buckets = self.bucket_id_to_series_map.read().unwrap(); + match buckets.get(&bucket_id) { + Some(b) => { + let b = b.read().unwrap(); + let mut all_have_id = true; + for p in points { + match b.series_key_to_id.get(p.series()) { + Some(id) => p.set_series_id(*id), + None => all_have_id = false, + } + } + + if all_have_id { + return true + } + }, + None => { + // we've never even seen this bucket. insert it and then we'll get to inserting the series + drop(buckets); + self.bucket_id_to_series_map.write().unwrap().insert(bucket_id, RwLock::new(SeriesMap::new())); + }, + } + + false + } + + fn get_tag_keys( + &self, + bucket_id: u32, + _predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { + Some(map) => { + let keys: Vec = map.read().unwrap().tag_keys.keys().map(|k| k.clone()).collect(); + Ok(Box::new(keys.into_iter())) + }, + None => { + Err(StorageError{description: format!("bucket {} not found", bucket_id)}) + } + } + } + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + _predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) { + Some(map) => { + match map.read().unwrap().tag_keys.get(tag_key) { + Some(values) => { + let values: Vec = values.keys().map(|v| v.clone()).collect(); + Ok(Box::new(values.into_iter())) + }, + None => Ok(Box::new(vec![].into_iter())), + } + }, + None => { + Err(StorageError{description: format!("bucket {} not found", bucket_id)}) + } + } + } + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + let pred = match predicate { + Some(p) => p, + None => return Err(StorageError{description: "unable to list all series".to_string()}), + }; + + let root = match &pred.root { + Some(r) => r, + None => return Err(StorageError{description: "expected root node to evaluate".to_string()}), + }; + + let bucket_map = self.bucket_id_to_series_map.read().unwrap(); + let series_map = match bucket_map.get(&bucket_id) { + Some(m) => m.read().unwrap(), + None => return Err(StorageError{description: format!("no series written for bucket {}", bucket_id)}), + }; + + let map = evaluate_node(&series_map, &root)?; + let mut filters = Vec::with_capacity(map.cardinality() as usize); + + for id in map.iter() { + let (key, series_type) = series_map.series_id_to_key_and_type.get(&id).unwrap(); + filters.push(SeriesFilter { + id, + key: key.clone(), + value_predicate: None, + series_type: *series_type, + }); + } + + Ok(Box::new(filters.into_iter())) + } + + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError> { + let bucket_data = self.bucket_id_to_series_data.read().unwrap(); + + // ensure the the bucket has a series data struct to write into + let series_data = match bucket_data.get(&bucket_id) { + Some(d) => d, + None => { + drop(bucket_data); + let mut bucket_data = self.bucket_id_to_series_data.write().unwrap(); + let series_data = SeriesData{ + ring_buffer_size: self.default_ring_buffer_size, + i64_series: HashMap::new(), + f64_series: HashMap::new(), + }; + bucket_data.insert(bucket_id, Mutex::new(series_data)); + drop(bucket_data); + return self.write_points_with_series_ids(bucket_id, points); + } + }; + + let mut series_data = series_data.lock().unwrap(); + series_data.write_points(points); + Ok(()) + } + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let buckets = self.bucket_id_to_series_data.read().unwrap(); + let data = match buckets.get(&bucket_id) { + Some(d) => d, + None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}), + }; + + let data = data.lock().unwrap(); + let buff = match data.i64_series.get(&series_id) { + Some(b) => b, + None => return Err(StorageError{description: format!("series {} not found", series_id)}), + }; + + let values = buff.get_range(&range); + Ok(Box::new(PointsIterator{values: Some(values), batch_size})) + } + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + let buckets = self.bucket_id_to_series_data.read().unwrap(); + let data = match buckets.get(&bucket_id) { + Some(d) => d, + None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}), + }; + + let data = data.lock().unwrap(); + let buff = match data.f64_series.get(&series_id) { + Some(b) => b, + None => return Err(StorageError{description: format!("series {} not found", series_id)}), + }; + + let values = buff.get_range(&range); + Ok(Box::new(PointsIterator{values: Some(values), batch_size})) + } +} + +struct PointsIterator { + values: Option>>, + batch_size: usize, +} + +impl Iterator for PointsIterator { + type Item = Vec>; + + fn next(&mut self) -> Option { + if let Some(mut values) = self.values.take() { + if self.batch_size > values.len() { + return Some(values); + } + + let remaining = values.split_off(self.batch_size); + + if remaining.len() != 0 { + self.values = Some(remaining); + } + + return Some(values); + } + + None + } +} + +fn evaluate_node(series_map: &RwLockReadGuard, n: &Node) -> Result { + if n.children.len() != 2 { + return Err(StorageError { + description: format!( + "expected only two children of node but found {}", + n.children.len() + ), + }); + } + + match &n.value { + Some(node_value) => match node_value { + Value::Logical(l) => { + let l = Logical::from_i32(*l).unwrap(); + evaluate_logical(series_map, &n.children[0], &n.children[1], l) + } + Value::Comparison(c) => { + let c = Comparison::from_i32(*c).unwrap(); + evaluate_comparison(series_map, &n.children[0], &n.children[1], c) + } + val => Err(StorageError { + description: format!("evaluate_node called on wrong type {:?}", val), + }), + }, + None => Err(StorageError { + description: "emtpy node value".to_string(), + }), + } +} + +fn evaluate_logical( + series_map: &RwLockReadGuard, + left: &Node, + right: &Node, + op: Logical, +) -> Result { + let mut left_result = evaluate_node(series_map, left)?; + let right_result = evaluate_node(series_map, right)?; + + match op { + Logical::And => left_result.and_inplace(&right_result), + Logical::Or => left_result.or_inplace(&right_result), + }; + + Ok(left_result) +} + +fn evaluate_comparison( + series_map: &RwLockReadGuard, + left: &Node, + right: &Node, + op: Comparison, +) -> Result { + let left = match &left.value { + Some(Value::TagRefValue(s)) => s, + _ => { + return Err(StorageError { + description: "expected left operand to be a TagRefValue".to_string(), + }) + } + }; + + let right = match &right.value { + Some(Value::StringValue(s)) => s, + _ => { + return Err(StorageError { + description: "unable to run comparison against anything other than a string" + .to_string(), + }) + } + }; + + match op { + Comparison::Equal => { + return Ok(series_map.posting_list_for_key_value(&left, &right)) + } + comp => { + return Err(StorageError { + description: format!("unable to handle comparison {:?}", comp), + }) + } + } +} + +impl InvertedIndex for MemDB { + fn get_or_create_series_ids_for_points( + &self, + bucket_id: u32, + points: &mut Vec, + ) -> Result<(), StorageError> { + self.get_or_create_series_ids_for_points(bucket_id, points) + } + + fn read_series_matching( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.read_series_matching(bucket_id, predicate) + } + + fn get_tag_keys( + &self, + bucket_id: u32, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.get_tag_keys(bucket_id, predicate) + } + + fn get_tag_values( + &self, + bucket_id: u32, + tag_key: &str, + predicate: Option<&Predicate>, + ) -> Result>, StorageError> { + self.get_tag_values(bucket_id, tag_key, predicate) + } +} + +impl SeriesStore for MemDB { + fn write_points_with_series_ids( + &self, + bucket_id: u32, + points: &Vec, + ) -> Result<(), StorageError> { + self.write_points_with_series_ids(bucket_id, points) + } + + fn read_i64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_i64_range(bucket_id, series_id, range, batch_size) + } + + fn read_f64_range( + &self, + bucket_id: u32, + series_id: u64, + range: &Range, + batch_size: usize, + ) -> Result>>>, StorageError> { + self.read_f64_range(bucket_id, series_id, range, batch_size) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::inverted_index; + use crate::storage::series_store; + + #[test] + fn write_and_read_i64() { + let db = Box::new(MemDB::new()); + series_store::tests::write_and_read_i64(db); + } + + #[test] + fn write_and_read_f64() { + let db = Box::new(MemDB::new()); + series_store::tests::write_and_read_f64(db); + } + + #[test] + fn series_id_indexing() { + let db = Box::new(MemDB::new()); + inverted_index::tests::series_id_indexing(db) + } + + #[test] + fn series_metadata_indexing() { + let db = Box::new(MemDB::new()); + inverted_index::tests::series_metadata_indexing(db); + } +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b021af78d4..673d8a02a1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,6 +9,7 @@ pub mod inverted_index; pub mod predicate; pub mod rocksdb; pub mod series_store; +pub mod memdb; pub struct Range { pub start: i64, diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 7ce39eb229..c847d2e2f1 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -952,7 +952,7 @@ fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec v } -pub struct PointsIterator<'a, T> { +pub struct PointsIterator<'a, T: Copy> { batch_size: usize, iter: DBIterator<'a>, stop_time: i64, @@ -961,7 +961,7 @@ pub struct PointsIterator<'a, T> { read: fn(b: &[u8]) -> T, } -impl Iterator for PointsIterator<'_, T> { +impl Iterator for PointsIterator<'_, T> { type Item = Vec>; fn next(&mut self) -> Option { diff --git a/src/storage/series_store.rs b/src/storage/series_store.rs index eb2ef9f14f..6cfb104605 100644 --- a/src/storage/series_store.rs +++ b/src/storage/series_store.rs @@ -25,8 +25,131 @@ pub trait SeriesStore: Sync + Send { ) -> Result>>>, StorageError>; } -#[derive(Debug, PartialEq)] -pub struct ReadPoint { +#[derive(Debug, PartialEq, Clone)] +pub struct ReadPoint { pub time: i64, pub value: T, } + +// Test helpers for other implementations to run +#[cfg(test)] +pub mod tests { + use crate::storage::series_store::{SeriesStore, ReadPoint}; + use crate::line_parser::PointType; + use crate::storage::Range; + + pub fn write_and_read_i64(store: Box) { + let b1_id = 1; + let b2_id = 2; + let mut p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); + p1.set_series_id(1); + let mut p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2); + p2.set_series_id(1); + let mut p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2); + p3.set_series_id(2); + let mut p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4); + p4.set_series_id(2); + + let b1_points = vec![p1.clone(), p2.clone()]; + store.write_points_with_series_ids(b1_id, &b1_points).unwrap(); + + let b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]; + store.write_points_with_series_ids(b2_id, &b2_points).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range { start: 1, stop: 4 }; + let mut points_iter = store + .read_i64_range(b1_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 1, value: 1 }, + ReadPoint { time: 2, value: 1 }, + ] + ); + assert_eq!(points_iter.next(), None); + + // test that we'll read multiple series + let mut points_iter = store + .read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 1, value: 1 }, + ReadPoint { time: 2, value: 1 }, + ] + ); + + let mut points_iter = store + .read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { time: 2, value: 1 }, + ReadPoint { time: 4, value: 1 }, + ] + ); + + // test that the batch size is honored + let mut points_iter = store + .read_i64_range(b1_id, p1.series_id().unwrap(), &range, 1) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + assert_eq!(points_iter.next(), None); + + // test that the time range is properly limiting + let range = Range { start: 2, stop: 3 }; + let mut points_iter = store + .read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + + let mut points_iter = store + .read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]); + } + + pub fn write_and_read_f64(store: Box) { + let bucket_id = 1; + let mut p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1); + p1.set_series_id(1); + let mut p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2); + p2.set_series_id(1); + + let points = vec![p1.clone(), p2.clone()]; + store.write_points_with_series_ids(bucket_id, &points).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range { start: 0, stop: 4 }; + let mut points_iter = store + .read_f64_range(bucket_id, p1.series_id().unwrap(), &range, 10) + .unwrap(); + let points = points_iter.next().unwrap(); + assert_eq!( + points, + vec![ + ReadPoint { + time: 1, + value: 1.0 + }, + ReadPoint { + time: 2, + value: 2.2 + }, + ] + ); + assert_eq!(points_iter.next(), None); + } +} \ No newline at end of file From f1329a8a0837ea4b5ad4b7c572462fbf1b35aee8 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 13 Jan 2020 10:34:41 -0500 Subject: [PATCH 3/3] Remove unused code --- src/storage/points_iterator.rs | 0 src/storage/rocksdb.rs | 14 ------------- src/storage/series_iterator.rs | 38 ---------------------------------- 3 files changed, 52 deletions(-) delete mode 100644 src/storage/points_iterator.rs delete mode 100644 src/storage/series_iterator.rs diff --git a/src/storage/points_iterator.rs b/src/storage/points_iterator.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index c847d2e2f1..357e41b370 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -281,20 +281,6 @@ impl RocksDB { Some(b) => Ok(Some(b.clone())), None => Ok(None), } - - // let db = self.db.read().unwrap(); - // let buckets = db.cf_handle(BUCKET_CF).unwrap(); - // - // match db.get_cf(buckets, key) { - // Ok(b) => match b { - // Some(b) => { - // let bucket = Bucket::decode(b).unwrap(); - // return Ok(Some(bucket)); - // } - // None => return Ok(None), - // }, - // Err(e) => return Err(StorageError{description: e.to_string()}), - // } } // TODO: ensure that points with timestamps older than the first index level get matched against the appropriate index diff --git a/src/storage/series_iterator.rs b/src/storage/series_iterator.rs deleted file mode 100644 index 7a842e9d6e..0000000000 --- a/src/storage/series_iterator.rs +++ /dev/null @@ -1,38 +0,0 @@ -//use crate::storage::rocksdb::SeriesFilter; -// -//pub struct SeriesIterator { -// pub org_id: u32, -// pub bucket_id: u32, -// series_filters: Vec, -// next_filter: usize, -//} -// -//impl SeriesIterator { -// pub fn new(org_id: u32, bucket_id: u32, series_filters: Vec) -> SeriesIterator { -// SeriesIterator { -// org_id, -// bucket_id, -// next_filter: 0, -// series_filters: series_filters, -// } -// } -//} -// -//impl Iterator for SeriesIterator { -// type Item = SeriesFilter; -// -// fn next(&mut self) -> Option { -// self.next_filter += 1; -// match self.series_filters.get(self.next_filter - 1) { -// Some(f) => Some(f.clone()), -// None => None, -// } -// } -//} -// -//#[derive(Debug, PartialEq)] -//pub struct ReadSeries { -// pub id: u64, -// pub key: String, -// pub points: Vec>, -//}