Merge pull request #20 from influxdata/pd-add-stubs-for-other-datastores

feat: add memory backed inverted index and series store
pull/24376/head
Paul Dix 2020-01-13 10:35:41 -05:00 committed by GitHub
commit 418b89a87b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1522 additions and 578 deletions

View File

@ -6,6 +6,7 @@ use std::{error, fmt};
#[derive(Debug, PartialEq, Clone)]
pub struct Point<T> {
pub series: String,
pub series_id: Option<u64>,
pub time: i64,
pub value: T,
}
@ -26,6 +27,7 @@ impl PointType {
pub fn new_i64(series: String, value: i64, time: i64) -> PointType {
PointType::I64(Point {
series,
series_id: None,
value,
time,
})
@ -34,6 +36,7 @@ impl PointType {
pub fn new_f64(series: String, value: f64, time: i64) -> PointType {
PointType::F64(Point {
series,
series_id: None,
value,
time,
})
@ -60,6 +63,20 @@ impl PointType {
}
}
pub fn series_id(&self) -> Option<u64> {
match self {
PointType::I64(p) => p.series_id,
PointType::F64(p) => p.series_id,
}
}
pub fn set_series_id(&mut self, id: u64) {
match self {
PointType::I64(p) => p.series_id = Some(id),
PointType::F64(p) => p.series_id = Some(id),
}
}
pub fn i64_value(&self) -> Option<i64> {
match self {
PointType::I64(p) => Some(p.value),
@ -321,6 +338,7 @@ mod test {
fn index_pairs() {
let p = Point {
series: "cpu,host=A,region=west\tusage_system".to_string(),
series_id: None,
value: 0,
time: 0,
};

View File

@ -1,10 +1,9 @@
use delorean::delorean::Bucket;
use delorean::line_parser;
use delorean::line_parser::index_pairs;
use delorean::storage::database::Database;
use delorean::storage::predicate::parse_predicate;
use delorean::storage::rocksdb::Range;
use delorean::storage::rocksdb::{
new_f64_points_iterator, new_i64_points_iterator, Database, SeriesDataType,
};
use delorean::storage::{Range, SeriesDataType};
use delorean::time::{parse_duration, time_as_i64_nanos};
use std::env::VarError;
@ -37,6 +36,28 @@ async fn write(
write_info: web::Query<WriteInfo>,
s: web::Data<Arc<Server>>,
) -> Result<HttpResponse, AWError> {
let bucket = match s
.db
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)?
{
Some(b) => b,
None => {
// create this as the default bucket
let b = Bucket {
org_id: write_info.org_id,
id: 0,
name: write_info.bucket_name.clone(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
};
let _ = s.db.create_bucket_if_not_exists(write_info.org_id, &b)?;
s.db.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)?
.unwrap()
}
};
let mut body = BytesMut::new();
while let Some(chunk) = payload.next().await {
let chunk = chunk?;
@ -49,11 +70,9 @@ async fn write(
let body = body.freeze();
let body = str::from_utf8(&body).unwrap();
let points = line_parser::parse(body);
let mut points = line_parser::parse(body);
if let Err(err) =
s.db.write_points(write_info.org_id, &write_info.bucket_name, points)
{
if let Err(err) = s.db.write_points(write_info.org_id, &bucket, &mut points) {
return Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({ "error": format!("{}", err) })));
}
@ -167,15 +186,21 @@ async fn read(
let range = Range { start, stop };
let series = s.db.read_range(
read_info.org_id,
&read_info.bucket_name,
&range,
&predicate,
10,
)?;
let bucket = match s
.db
.get_bucket_by_name(read_info.org_id, &read_info.bucket_name)?
{
Some(b) => b,
None => {
return Ok(HttpResponse::NotFound().json(serde_json::json!({
"error": format!("bucket {} not found", read_info.bucket_name)
})))
}
};
let series =
s.db.read_series_matching_predicate_and_range(&bucket, Some(&predicate), Some(&range))?;
let bucket_id = series.bucket_id;
let db = &s.db;
let mut response_body = vec![];
@ -205,8 +230,7 @@ async fn read(
match s.series_type {
SeriesDataType::I64 => {
let points =
new_i64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10);
let points = db.read_i64_range(&bucket, &s, &range, 10)?;
for batch in points {
for p in batch {
@ -220,8 +244,7 @@ async fn read(
}
}
SeriesDataType::F64 => {
let points =
new_f64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10);
let points = db.read_f64_range(&bucket, &s, &range, 10)?;
for batch in points {
for p in batch {

View File

@ -0,0 +1,17 @@
use crate::delorean::Bucket;
use crate::storage::StorageError;
use std::sync::Arc;
pub trait ConfigStore: Sync + Send {
fn create_bucket_if_not_exists(
&self,
org_id: u32,
bucket: &Bucket,
) -> Result<u32, StorageError>;
fn get_bucket_by_name(
&self,
org_id: u32,
bucket_name: &str,
) -> Result<Option<Arc<Bucket>>, StorageError>;
}

88
src/storage/database.rs Normal file
View File

@ -0,0 +1,88 @@
use crate::delorean::{Bucket, Predicate};
use crate::line_parser::PointType;
use crate::storage::config_store::ConfigStore;
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::rocksdb::RocksDB;
use crate::storage::series_store::{ReadPoint, SeriesStore};
use crate::storage::{Range, StorageError};
use std::sync::Arc;
pub struct Database {
local_index: Arc<dyn InvertedIndex>,
local_series_store: Arc<dyn SeriesStore>,
local_config_store: Arc<dyn ConfigStore>,
}
impl Database {
pub fn new(dir: &str) -> Database {
let db = Arc::new(RocksDB::new(dir));
Database {
local_index: db.clone(),
local_config_store: db.clone(),
local_series_store: db,
}
}
pub fn write_points(
&self,
_org_id: u32,
bucket: &Bucket,
points: &mut Vec<PointType>,
) -> Result<(), StorageError> {
self.local_index
.get_or_create_series_ids_for_points(bucket.id, points)?;
self.local_series_store
.write_points_with_series_ids(bucket.id, &points)
}
pub fn get_bucket_by_name(
&self,
org_id: u32,
bucket_name: &str,
) -> Result<Option<Arc<Bucket>>, StorageError> {
self.local_config_store
.get_bucket_by_name(org_id, bucket_name)
}
pub fn create_bucket_if_not_exists(
&self,
org_id: u32,
bucket: &Bucket,
) -> Result<u32, StorageError> {
self.local_config_store
.create_bucket_if_not_exists(org_id, bucket)
}
pub fn read_series_matching_predicate_and_range(
&self,
bucket: &Bucket,
predicate: Option<&Predicate>,
_range: Option<&Range>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
self.local_index.read_series_matching(bucket.id, predicate)
}
pub fn read_i64_range(
&self,
bucket: &Bucket,
series_filter: &SeriesFilter,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
self.local_series_store
.read_i64_range(bucket.id, series_filter.id, range, batch_size)
}
pub fn read_f64_range(
&self,
bucket: &Bucket,
series_filter: &SeriesFilter,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
self.local_series_store
.read_f64_range(bucket.id, series_filter.id, range, batch_size)
}
}

View File

@ -0,0 +1,193 @@
use crate::delorean::Predicate;
use crate::line_parser::PointType;
use crate::storage::{SeriesDataType, StorageError};
pub trait InvertedIndex: Sync + Send {
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut Vec<PointType>,
) -> Result<(), StorageError>;
fn read_series_matching(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError>;
fn get_tag_keys(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String>>, StorageError>;
fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String>>, StorageError>;
}
#[derive(Debug, PartialEq, Clone)]
pub struct SeriesFilter {
pub id: u64,
pub key: String,
pub value_predicate: Option<Predicate>,
pub series_type: SeriesDataType,
}
// Test helpers for other implementations to run
#[cfg(test)]
pub mod tests {
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::line_parser::PointType;
use crate::storage::predicate::parse_predicate;
use crate::storage::SeriesDataType;
pub fn series_id_indexing(index: Box<dyn InvertedIndex>) {
let bucket_id = 1;
let bucket_2 = 2;
let p1 = PointType::new_i64("one".to_string(), 1, 0);
let p2 = PointType::new_i64("two".to_string(), 23, 40);
let p3 = PointType::new_i64("three".to_string(), 33, 86);
let mut points = vec![p1.clone(), p2.clone()];
index.get_or_create_series_ids_for_points(bucket_id, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
assert_eq!(points[1].series_id(), Some(2));
// now put series in a different bucket, but make sure the IDs start from the beginning
let mut points = vec![p1.clone()];
index.get_or_create_series_ids_for_points(bucket_2, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
// now insert a new series in the first bucket and make sure it shows up
let mut points = vec![p1.clone(), p3.clone()];
index.get_or_create_series_ids_for_points(bucket_id, &mut points)
.unwrap();
assert_eq!(points[0].series_id(), Some(1));
assert_eq!(points[1].series_id(), Some(3));
}
pub fn series_metadata_indexing(index: Box<dyn InvertedIndex>) {
let bucket_id = 1;
let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0);
let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0);
let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0);
let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0);
let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
index.get_or_create_series_ids_for_points(bucket_id, &mut points).unwrap();
let tag_keys: Vec<String> = index.get_tag_keys(bucket_id, None).unwrap().collect();
assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]);
let tag_values: Vec<String>= index.get_tag_values(bucket_id, "host", None).unwrap().collect();
assert_eq!(tag_values, vec!["a", "b"]);
// get all series
// get series with measurement = mem
let pred = parse_predicate("_m = \"cpu\"").unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 1,
key: "cpu,host=b,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
// get series with host = a
let pred = parse_predicate("host = \"a\"").unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
// get series with measurement = cpu and host = b
let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![SeriesFilter {
id: 1,
key: "cpu,host=b,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},]
);
let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap();
let series: Vec<SeriesFilter> = index
.read_series_matching(bucket_id, Some(&pred))
.unwrap()
.collect();
assert_eq!(
series,
vec![
SeriesFilter {
id: 2,
key: "cpu,host=a,region=west\tusage_system".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 3,
key: "cpu,host=a,region=west\tusage_user".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
SeriesFilter {
id: 4,
key: "mem,host=b,region=west\tfree".to_string(),
value_predicate: None,
series_type: SeriesDataType::I64
},
]
);
}
}

View File

@ -1,44 +0,0 @@
use crate::storage::rocksdb::SeriesFilter;
pub struct SeriesIterator {
pub org_id: u32,
pub bucket_id: u32,
series_filters: Vec<SeriesFilter>,
next_filter: usize,
}
impl SeriesIterator {
pub fn new(org_id: u32, bucket_id: u32, series_filters: Vec<SeriesFilter>) -> SeriesIterator {
SeriesIterator {
org_id,
bucket_id,
next_filter: 0,
series_filters: series_filters,
}
}
}
impl Iterator for SeriesIterator {
type Item = SeriesFilter;
fn next(&mut self) -> Option<Self::Item> {
self.next_filter += 1;
match self.series_filters.get(self.next_filter - 1) {
Some(f) => Some(f.clone()),
None => None,
}
}
}
#[derive(Debug, PartialEq)]
pub struct ReadSeries<T> {
pub id: u64,
pub key: String,
pub points: Vec<ReadPoint<T>>,
}
#[derive(Debug, PartialEq)]
pub struct ReadPoint<T> {
pub time: i64,
pub value: T,
}

625
src/storage/memdb.rs Normal file
View File

@ -0,0 +1,625 @@
use crate::delorean::node::{Comparison, Logical, Value};
use crate::delorean::{Predicate, Node};
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::line_parser::{PointType, ParseError, Point};
use crate::storage::{StorageError, Range, SeriesDataType};
use crate::storage::series_store::{SeriesStore, ReadPoint};
use std::sync::{RwLock, Arc, RwLockReadGuard, Mutex};
use std::collections::{HashMap, BTreeMap};
use croaring::Treemap;
/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It
/// works with a ring buffer. Currently, it does not limit the number of series that can be
/// created. It also assumes that data per series arrives in time acending order.
// TODO: return errors if trying to insert data out of order in an individual series
pub struct MemDB {
default_ring_buffer_size: usize,
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
bucket_id_to_series_map: Arc<RwLock<HashMap<u32, RwLock<SeriesMap>>>>,
}
struct SeriesData {
ring_buffer_size: usize,
i64_series: HashMap<u64, SeriesRingBuffer<i64>>,
f64_series: HashMap<u64, SeriesRingBuffer<f64>>,
}
impl SeriesData {
fn write_points(&mut self, points: &Vec<PointType>) {
for p in points {
match p {
PointType::I64(p) => {
match self.i64_series.get_mut(&p.series_id.unwrap()) {
Some(buff) => buff.write(&p),
None => {
let mut buff = new_i64_ring_buffer(self.ring_buffer_size);
buff.write(&p);
self.i64_series.insert(p.series_id.unwrap(), buff);
},
}
},
PointType::F64(p) => {
match self.f64_series.get_mut(&p.series_id.unwrap()) {
Some(buff) => buff.write(&p),
None => {
let mut buff = new_f64_ring_buffer(self.ring_buffer_size);
buff.write(&p);
self.f64_series.insert(p.series_id.unwrap(), buff);
}
}
},
}
}
}
}
struct SeriesRingBuffer<T: Copy> {
next_position: usize,
data: Vec<ReadPoint<T>>,
}
fn new_i64_ring_buffer(size: usize) -> SeriesRingBuffer<i64> {
let mut data = Vec::with_capacity(size);
for _ in 0..size {
data.push(ReadPoint{time: std::i64::MAX, value: 0 as i64})
}
SeriesRingBuffer{
next_position: 0,
data,
}
}
fn new_f64_ring_buffer(size: usize) -> SeriesRingBuffer<f64> {
let mut data = Vec::with_capacity(size);
for _ in 0..size {
data.push(ReadPoint{time: std::i64::MAX, value: 0 as f64})
}
SeriesRingBuffer{
next_position: 0,
data,
}
}
impl<T: Copy> SeriesRingBuffer<T> {
fn write(&mut self, point: &Point<T>) {
if self.next_position == self.data.len() {
self.next_position = 0;
}
self.data[self.next_position].time = point.time;
self.data[self.next_position].value = point.value;
self.next_position += 1;
}
fn get_range(&self, range: &Range) -> Vec<ReadPoint<T>> {
let (_, pos) = self.oldest_time_and_position();
let mut values = Vec::new();
for i in pos..self.data.len() {
if self.data[i].time > range.stop {
return values;
} else if self.data[i].time >= range.start {
values.push(self.data[i].clone());
}
};
for i in 0..self.next_position {
if self.data[i].time > range.stop {
return values;
} else if self.data[i].time >= range.start {
values.push(self.data[i].clone());
}
}
values
}
fn oldest_time_and_position(&self) -> (i64, usize) {
let mut pos = self.next_position;
if self.next_position == self.data.len() {
pos = 0;
} else if self.data[pos].time == std::i64::MAX {
pos = 0;
}
(self.data[pos].time, pos)
}
}
struct SeriesMap {
last_id: u64,
series_key_to_id: HashMap<String, u64>,
series_id_to_key_and_type: HashMap<u64, (String, SeriesDataType)>,
tag_keys: BTreeMap<String, BTreeMap<String, bool>>,
posting_list: HashMap<Vec<u8>, Treemap>,
}
impl SeriesMap {
fn new() -> SeriesMap {
SeriesMap{
last_id: 0,
series_key_to_id: HashMap::new(),
series_id_to_key_and_type: HashMap::new(),
tag_keys: BTreeMap::new(),
posting_list: HashMap::new(),
}
}
fn insert_series(&mut self, point: &mut PointType) -> Result<(), ParseError> {
if let Some(id) = self.series_key_to_id.get(point.series()) {
point.set_series_id(*id);
return Ok(());
}
// insert the series id
self.last_id += 1;
point.set_series_id(self.last_id);
self.series_key_to_id.insert(point.series().clone(), self.last_id);
let series_type = match point {
PointType::I64(_) => SeriesDataType::I64,
PointType::F64(_) => SeriesDataType::F64,
};
self.series_id_to_key_and_type.insert(self.last_id, (point.series().clone(), series_type));
for pair in point.index_pairs()? {
// insert this id into the posting list
let list_key = list_key(&pair.key, &pair.value);
let posting_list = self.posting_list.entry(list_key).or_insert(Treemap::create());
posting_list.add(self.last_id);
// insert the tag key value mapping
let tag_values = self.tag_keys.entry(pair.key).or_insert(BTreeMap::new());
tag_values.insert(pair.value, true);
}
Ok(())
}
fn posting_list_for_key_value(&self, key: &str, value: &str) -> Treemap {
let list_key = list_key(key, value);
match self.posting_list.get(&list_key) {
Some(m) => m.clone(),
None => Treemap::create(),
}
}
}
fn list_key(key: &str, value: &str) -> Vec<u8> {
let mut list_key = key.as_bytes().to_vec();
list_key.push(0 as u8);
list_key.append(&mut value.as_bytes().to_vec());
list_key
}
impl MemDB {
pub fn new() -> MemDB {
MemDB {
default_ring_buffer_size: 10000,
bucket_id_to_series_data: Arc::new(RwLock::new(HashMap::new())),
bucket_id_to_series_map: Arc::new(RwLock::new(HashMap::new())),
}
}
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut Vec<PointType>,
) -> Result<(), StorageError> {
// first try to do everything with just a read lock
if self.get_series_ids_for_points(bucket_id, points) {
return Ok(())
}
// if we got this far, we have new series to insert
let buckets = self.bucket_id_to_series_map.read().unwrap();
let mut series_map = buckets.get(&bucket_id).unwrap().write().unwrap();
for p in points {
match p.series_id() {
Some(_) => (),
None => {
match series_map.insert_series(p) {
Ok(_) => (),
Err(e) => return Err(StorageError { description: format!("error parsing line protocol metadata {}", e) })
}
},
}
}
Ok(())
}
// get_series_ids_for_points attempts to fill the series ids for all points in the passed in
// collection using only a read lock. If no SeriesMap exists for the bucket, it will be inserted.
// It will return true if all points have series ids filled in.
fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut Vec<PointType>) -> bool {
let buckets = self.bucket_id_to_series_map.read().unwrap();
match buckets.get(&bucket_id) {
Some(b) => {
let b = b.read().unwrap();
let mut all_have_id = true;
for p in points {
match b.series_key_to_id.get(p.series()) {
Some(id) => p.set_series_id(*id),
None => all_have_id = false,
}
}
if all_have_id {
return true
}
},
None => {
// we've never even seen this bucket. insert it and then we'll get to inserting the series
drop(buckets);
self.bucket_id_to_series_map.write().unwrap().insert(bucket_id, RwLock::new(SeriesMap::new()));
},
}
false
}
fn get_tag_keys(
&self,
bucket_id: u32,
_predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item=String>>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => {
let keys: Vec<String> = map.read().unwrap().tag_keys.keys().map(|k| k.clone()).collect();
Ok(Box::new(keys.into_iter()))
},
None => {
Err(StorageError{description: format!("bucket {} not found", bucket_id)})
}
}
}
fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
_predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item=String>>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => {
match map.read().unwrap().tag_keys.get(tag_key) {
Some(values) => {
let values: Vec<String> = values.keys().map(|v| v.clone()).collect();
Ok(Box::new(values.into_iter()))
},
None => Ok(Box::new(vec![].into_iter())),
}
},
None => {
Err(StorageError{description: format!("bucket {} not found", bucket_id)})
}
}
}
fn read_series_matching(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
let pred = match predicate {
Some(p) => p,
None => return Err(StorageError{description: "unable to list all series".to_string()}),
};
let root = match &pred.root {
Some(r) => r,
None => return Err(StorageError{description: "expected root node to evaluate".to_string()}),
};
let bucket_map = self.bucket_id_to_series_map.read().unwrap();
let series_map = match bucket_map.get(&bucket_id) {
Some(m) => m.read().unwrap(),
None => return Err(StorageError{description: format!("no series written for bucket {}", bucket_id)}),
};
let map = evaluate_node(&series_map, &root)?;
let mut filters = Vec::with_capacity(map.cardinality() as usize);
for id in map.iter() {
let (key, series_type) = series_map.series_id_to_key_and_type.get(&id).unwrap();
filters.push(SeriesFilter {
id,
key: key.clone(),
value_predicate: None,
series_type: *series_type,
});
}
Ok(Box::new(filters.into_iter()))
}
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
) -> Result<(), StorageError> {
let bucket_data = self.bucket_id_to_series_data.read().unwrap();
// ensure the the bucket has a series data struct to write into
let series_data = match bucket_data.get(&bucket_id) {
Some(d) => d,
None => {
drop(bucket_data);
let mut bucket_data = self.bucket_id_to_series_data.write().unwrap();
let series_data = SeriesData{
ring_buffer_size: self.default_ring_buffer_size,
i64_series: HashMap::new(),
f64_series: HashMap::new(),
};
bucket_data.insert(bucket_id, Mutex::new(series_data));
drop(bucket_data);
return self.write_points_with_series_ids(bucket_id, points);
}
};
let mut series_data = series_data.lock().unwrap();
series_data.write_points(points);
Ok(())
}
fn read_i64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
let buckets = self.bucket_id_to_series_data.read().unwrap();
let data = match buckets.get(&bucket_id) {
Some(d) => d,
None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}),
};
let data = data.lock().unwrap();
let buff = match data.i64_series.get(&series_id) {
Some(b) => b,
None => return Err(StorageError{description: format!("series {} not found", series_id)}),
};
let values = buff.get_range(&range);
Ok(Box::new(PointsIterator{values: Some(values), batch_size}))
}
fn read_f64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
let buckets = self.bucket_id_to_series_data.read().unwrap();
let data = match buckets.get(&bucket_id) {
Some(d) => d,
None => return Err(StorageError{description: format!("bucket {} not found", bucket_id)}),
};
let data = data.lock().unwrap();
let buff = match data.f64_series.get(&series_id) {
Some(b) => b,
None => return Err(StorageError{description: format!("series {} not found", series_id)}),
};
let values = buff.get_range(&range);
Ok(Box::new(PointsIterator{values: Some(values), batch_size}))
}
}
struct PointsIterator<T: Copy> {
values: Option<Vec<ReadPoint<T>>>,
batch_size: usize,
}
impl<T: Copy> Iterator for PointsIterator<T> {
type Item = Vec<ReadPoint<T>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(mut values) = self.values.take() {
if self.batch_size > values.len() {
return Some(values);
}
let remaining = values.split_off(self.batch_size);
if remaining.len() != 0 {
self.values = Some(remaining);
}
return Some(values);
}
None
}
}
fn evaluate_node(series_map: &RwLockReadGuard<SeriesMap>, n: &Node) -> Result<Treemap, StorageError> {
if n.children.len() != 2 {
return Err(StorageError {
description: format!(
"expected only two children of node but found {}",
n.children.len()
),
});
}
match &n.value {
Some(node_value) => match node_value {
Value::Logical(l) => {
let l = Logical::from_i32(*l).unwrap();
evaluate_logical(series_map, &n.children[0], &n.children[1], l)
}
Value::Comparison(c) => {
let c = Comparison::from_i32(*c).unwrap();
evaluate_comparison(series_map, &n.children[0], &n.children[1], c)
}
val => Err(StorageError {
description: format!("evaluate_node called on wrong type {:?}", val),
}),
},
None => Err(StorageError {
description: "emtpy node value".to_string(),
}),
}
}
fn evaluate_logical(
series_map: &RwLockReadGuard<SeriesMap>,
left: &Node,
right: &Node,
op: Logical,
) -> Result<Treemap, StorageError> {
let mut left_result = evaluate_node(series_map, left)?;
let right_result = evaluate_node(series_map, right)?;
match op {
Logical::And => left_result.and_inplace(&right_result),
Logical::Or => left_result.or_inplace(&right_result),
};
Ok(left_result)
}
fn evaluate_comparison(
series_map: &RwLockReadGuard<SeriesMap>,
left: &Node,
right: &Node,
op: Comparison,
) -> Result<Treemap, StorageError> {
let left = match &left.value {
Some(Value::TagRefValue(s)) => s,
_ => {
return Err(StorageError {
description: "expected left operand to be a TagRefValue".to_string(),
})
}
};
let right = match &right.value {
Some(Value::StringValue(s)) => s,
_ => {
return Err(StorageError {
description: "unable to run comparison against anything other than a string"
.to_string(),
})
}
};
match op {
Comparison::Equal => {
return Ok(series_map.posting_list_for_key_value(&left, &right))
}
comp => {
return Err(StorageError {
description: format!("unable to handle comparison {:?}", comp),
})
}
}
}
impl InvertedIndex for MemDB {
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut Vec<PointType>,
) -> Result<(), StorageError> {
self.get_or_create_series_ids_for_points(bucket_id, points)
}
fn read_series_matching(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
self.read_series_matching(bucket_id, predicate)
}
fn get_tag_keys(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
self.get_tag_keys(bucket_id, predicate)
}
fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
self.get_tag_values(bucket_id, tag_key, predicate)
}
}
impl SeriesStore for MemDB {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
) -> Result<(), StorageError> {
self.write_points_with_series_ids(bucket_id, points)
}
fn read_i64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
self.read_i64_range(bucket_id, series_id, range, batch_size)
}
fn read_f64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
self.read_f64_range(bucket_id, series_id, range, batch_size)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::inverted_index;
use crate::storage::series_store;
#[test]
fn write_and_read_i64() {
let db = Box::new(MemDB::new());
series_store::tests::write_and_read_i64(db);
}
#[test]
fn write_and_read_f64() {
let db = Box::new(MemDB::new());
series_store::tests::write_and_read_f64(db);
}
#[test]
fn series_id_indexing() {
let db = Box::new(MemDB::new());
inverted_index::tests::series_id_indexing(db)
}
#[test]
fn series_metadata_indexing() {
let db = Box::new(MemDB::new());
inverted_index::tests::series_metadata_indexing(db);
}
}

View File

@ -1,3 +1,51 @@
pub mod iterators;
use actix_web::http::StatusCode;
use actix_web::ResponseError;
use std::error;
use std::fmt;
pub mod config_store;
pub mod database;
pub mod inverted_index;
pub mod predicate;
pub mod rocksdb;
pub mod series_store;
pub mod memdb;
pub struct Range {
pub start: i64,
pub stop: i64,
}
#[repr(u8)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum SeriesDataType {
I64,
F64,
// U64,
// String,
// Bool,
}
#[derive(Debug, Clone)]
pub struct StorageError {
pub description: String,
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description)
}
}
impl error::Error for StorageError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic error, underlying cause isn't tracked.
None
}
}
impl ResponseError for StorageError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}

View File

@ -1,6 +1,6 @@
use crate::delorean::node::{Comparison, Value};
use crate::delorean::{node, Node, Predicate};
use crate::storage::rocksdb::StorageError;
use crate::storage::StorageError;
use std::iter::Peekable;
use std::str::Chars;

File diff suppressed because it is too large Load Diff

155
src/storage/series_store.rs Normal file
View File

@ -0,0 +1,155 @@
use crate::line_parser::PointType;
use crate::storage::{Range, StorageError};
pub trait SeriesStore: Sync + Send {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &Vec<PointType>,
) -> Result<(), StorageError>;
fn read_i64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError>;
fn read_f64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &Range,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError>;
}
#[derive(Debug, PartialEq, Clone)]
pub struct ReadPoint<T: Copy> {
pub time: i64,
pub value: T,
}
// Test helpers for other implementations to run
#[cfg(test)]
pub mod tests {
use crate::storage::series_store::{SeriesStore, ReadPoint};
use crate::line_parser::PointType;
use crate::storage::Range;
pub fn write_and_read_i64(store: Box<dyn SeriesStore>) {
let b1_id = 1;
let b2_id = 2;
let mut p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1);
p1.set_series_id(1);
let mut p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2);
p2.set_series_id(1);
let mut p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2);
p3.set_series_id(2);
let mut p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4);
p4.set_series_id(2);
let b1_points = vec![p1.clone(), p2.clone()];
store.write_points_with_series_ids(b1_id, &b1_points).unwrap();
let b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
store.write_points_with_series_ids(b2_id, &b2_points).unwrap();
// test that we'll only read from the bucket we wrote points into
let range = Range { start: 1, stop: 4 };
let mut points_iter = store
.read_i64_range(b1_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 1, value: 1 },
ReadPoint { time: 2, value: 1 },
]
);
assert_eq!(points_iter.next(), None);
// test that we'll read multiple series
let mut points_iter = store
.read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 1, value: 1 },
ReadPoint { time: 2, value: 1 },
]
);
let mut points_iter = store
.read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint { time: 2, value: 1 },
ReadPoint { time: 4, value: 1 },
]
);
// test that the batch size is honored
let mut points_iter = store
.read_i64_range(b1_id, p1.series_id().unwrap(), &range, 1)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 1, value: 1 },]);
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
assert_eq!(points_iter.next(), None);
// test that the time range is properly limiting
let range = Range { start: 2, stop: 3 };
let mut points_iter = store
.read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
let mut points_iter = store
.read_i64_range(b2_id, p3.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
}
pub fn write_and_read_f64(store: Box<dyn SeriesStore>) {
let bucket_id = 1;
let mut p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1);
p1.set_series_id(1);
let mut p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2);
p2.set_series_id(1);
let points = vec![p1.clone(), p2.clone()];
store.write_points_with_series_ids(bucket_id, &points).unwrap();
// test that we'll only read from the bucket we wrote points into
let range = Range { start: 0, stop: 4 };
let mut points_iter = store
.read_f64_range(bucket_id, p1.series_id().unwrap(), &range, 10)
.unwrap();
let points = points_iter.next().unwrap();
assert_eq!(
points,
vec![
ReadPoint {
time: 1,
value: 1.0
},
ReadPoint {
time: 2,
value: 2.2
},
]
);
assert_eq!(points_iter.next(), None);
}
}