Implement read range on the database

This commit adds iterators for iterating over series and batches of points for a read range request. The exact signature/structure of the iterators are likely to change when this is generalized for other data types and other storage backends (S3 & memory).
pull/24376/head
Paul Dix 2020-01-02 17:32:04 -05:00
parent c76ce39da5
commit 4892a87898
3 changed files with 275 additions and 12 deletions

57
src/storage/iterators.rs Normal file
View File

@ -0,0 +1,57 @@
use crate::delorean::Predicate;
use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, PointsIterator};
use rocksdb::{DB, IteratorMode, DBIterator};
pub struct SeriesIterator<'a> {
range: &'a Range,
batch_size: usize,
predicate: &'a Predicate,
org_id: u32,
bucket_id: u32,
series_filters: Vec<SeriesFilter>,
next_filter: usize,
}
impl SeriesIterator<'_> {
pub fn new<'a>(range: &'a Range, batch_size: usize, predicate: &'a Predicate, org_id: u32, bucket_id: u32, series_filters: Vec<SeriesFilter>) -> SeriesIterator<'a> {
SeriesIterator{
range,
batch_size,
predicate,
org_id,
bucket_id,
next_filter: 0,
series_filters: series_filters,
}
}
pub fn points_iterator<'a>(&self, db: &'a Database, series_filter: &'a SeriesFilter) -> Result<PointsIterator<'a>, StorageError> {
db.get_db_points_iter(self.org_id, self.bucket_id, series_filter.id, self.range, self.batch_size)
}
}
impl Iterator for SeriesIterator<'_> {
type Item = SeriesFilter;
fn next(&mut self) -> Option<Self::Item> {
self.next_filter += 1;
match self.series_filters.get(self.next_filter - 1) {
Some(f) => Some(f.clone()),
None => None,
}
}
}
#[derive(Debug, PartialEq)]
pub struct ReadSeries<T> {
pub id: u64,
pub key: String,
pub points: Vec<ReadPoint<T>>,
}
#[derive(Debug, PartialEq)]
pub struct ReadPoint<T> {
pub time: i64,
pub value: T,
}

View File

@ -1,2 +1,3 @@
pub mod predicate;
pub mod rocksdb;
pub mod rocksdb;
pub mod iterators;

View File

@ -1,4 +1,5 @@
use crate::line_parser::{Point, Pair};
use crate::storage::iterators::{ReadPoint, SeriesIterator};
use crate::delorean::{Bucket, IndexLevel, Predicate, Node, node};
use crate::delorean::node::{Value, Comparison, Logical};
@ -9,9 +10,8 @@ use std::collections::HashMap;
use std::time::SystemTime;
use std::io::Cursor;
use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily};
use rocksdb::MemtableFactory::{Vector, HashLinkList};
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily, DBIterator};
use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt};
use prost::Message;
use futures::AsyncWriteExt;
use croaring::Treemap;
@ -93,13 +93,16 @@ impl Database {
/// * points - individual values with their timestamps and series keys
pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec<Point>) -> Result<(), StorageError> {
let key = bucket_key(org_id, bucket_name);
let bucket = match self.bucket_map.read().unwrap().get(&key) {
Some(b) => b.clone(),
let bucket_map = self.bucket_map.read().unwrap();
let bucket = match bucket_map.get(&key) {
Some(b) => b,
None => return Err(StorageError{description: "bucket not found".to_string()}),
};
let mut series = self.get_series_ids(org_id, &bucket, points);
println!("get_series_ids: {:?}", series);
self.insert_series_without_ids(org_id, &bucket, &mut series);
println!("get_series_ids after insert: {:?}", series);
let mut batch = WriteBatch::default();
for s in series {
@ -113,6 +116,26 @@ impl Database {
Ok(())
}
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result<SeriesIterator<'a>, StorageError> {
let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() {
Some(b) => b,
None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}),
};
let series_filters = self.get_series_filters(&bucket, Some(&predicate), range)?;
println!("filters: {:?}", series_filters);
Ok(SeriesIterator::new(range, batch_size, predicate, org_id, bucket.id, series_filters))
}
pub fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result<PointsIterator, StorageError> {
let mut prefix = prefix_for_series(bucket_id, series_id, range.start);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let mut iter = self.db.read().unwrap().iterator(mode);
Ok(PointsIterator::new(batch_size, iter, range.stop, prefix[0..12].to_vec()))
}
/// If the bucket name exists within an org, this function returns the ID (ignoring whether the
/// bucket options are different than the one that exists). If it doesn't exist, this function
/// creates the bucket and returns its unique identifier.
@ -428,6 +451,7 @@ impl Database {
// Keep an in memory map for updating multiple index entries at a time
let mut index_map: HashMap<Vec<u8>, Treemap> = HashMap::new();
let mut series_id_map: HashMap<String, u64> = HashMap::new();
// now loop through the series and insert the index entries into the map
for series in series {
@ -436,6 +460,12 @@ impl Database {
continue;
}
// if we've already put this series in the map in this write, skip it
if let Some(id) = series_id_map.get(&series.point.series) {
series.id = Some(*id);
continue;
}
// now that we have the mutex on series, make sure these weren't inserted in some other thread
if let Some(_) = self.get_series_id(&cf_name, &series.point.series) {
continue;
@ -445,7 +475,8 @@ impl Database {
let id = *next_id;
let mut series_id = Vec::with_capacity(8);
series_id.write_u64::<BigEndian>(*next_id).unwrap();
batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id);
batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone());
series_id_map.insert(series.point.series.clone(), *next_id);
*next_id += 1;
// insert the index entries
@ -596,9 +627,92 @@ TODO: other pieces
2. HTTP GET endpoint with predicate and time ranges
3. API endpoint to delete old series data
4. API endpoint to delete old indexes
5. API endpoint to run tsm compaction
6. Write/read other data types
*/
enum SeriesDataType {
Int64,
Float64,
UInt64,
String,
Bool,
}
fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec<u8> {
let mut v = Vec::with_capacity(20);
v.write_u32::<BigEndian>(bucket_id).unwrap();
v.write_u64::<BigEndian>(series_id).unwrap();
v.write_i64::<BigEndian>(start_time).unwrap();
v
}
pub struct PointsIterator<'a> {
batch_size: usize,
iter: DBIterator<'a>,
stop_time: i64,
series_prefix: Vec<u8>,
drained: bool,
}
impl PointsIterator<'_> {
pub fn new(batch_size: usize, iter: DBIterator, stop_time: i64, series_prefix: Vec<u8>) -> PointsIterator {
PointsIterator{
batch_size,
iter,
stop_time,
series_prefix,
drained: false,
}
}
}
impl Iterator for PointsIterator<'_> {
type Item = Vec<ReadPoint<i64>>;
fn next(&mut self) -> Option<Self::Item> {
if self.drained {
return None;
}
let mut v = Vec::with_capacity(self.batch_size);
let mut n = 0;
while let Some((key, value)) = self.iter.next() {
if !key.starts_with(&self.series_prefix) {
self.drained = true;
break;
}
let time = BigEndian::read_i64(&key[12..]);
if time > self.stop_time {
self.drained = true;
break;
}
let point = ReadPoint{
value: BigEndian::read_i64(&value),
time,
};
v.push(point);
n += 1;
if n >= self.batch_size {
break;
}
}
if v.is_empty() {
self.drained = true;
None
} else {
Some(v)
}
}
}
// IndexEntryType is used as a u8 prefix for any key in rocks for these different index entries
enum IndexEntryType {
SeriesKeyToID,
@ -766,15 +880,15 @@ fn key_for_series_and_time(bucket_id: u32, series_id: u64, timestamp: i64) -> Ve
v
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct SeriesFilter {
id: u64,
value_predicate: Option<Predicate>,
pub id: u64,
pub value_predicate: Option<Predicate>,
}
pub struct Range {
start: i64,
stop: i64,
pub start: i64,
pub stop: i64,
}
#[derive(Debug, Clone)]
@ -807,6 +921,8 @@ mod tests {
use crate::storage::predicate::parse_predicate;
use crate::storage::rocksdb::IndexEntryType::SeriesKeyToID;
use crate::line_parser::parse;
use crate::storage::iterators::ReadSeries;
use std::net::Shutdown::Read;
#[test]
fn create_and_get_buckets() {
@ -992,6 +1108,95 @@ mod tests {
]);
}
#[test]
fn write_and_read_points() {
let mut b1 = Bucket::new(1, "bucket1".to_string());
let mut b2 = Bucket::new(2, "bucket2".to_string());
let mut db = test_database("write_and_read_points", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1};
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2};
let p3 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 2};
let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 4};
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap();
db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]).unwrap();
db.write_points(b2.org_id, &b2.name, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]).unwrap();
// test that we'll only read from the bucket we wrote points into
let range = Range{start: 1, stop: 4};
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 1);
assert_eq!(iter.next(), None);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
ReadPoint{time: 2, value: 1},
]);
// test that we'll read multiple series
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 1);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
ReadPoint{time: 2, value: 1},
]);
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 2);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
ReadPoint{time: 4, value: 1},
]);
// test that the batch size is honored
let pred = parse_predicate("host = \"b\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 1).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 1);
assert_eq!(iter.next(), None);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
]);
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
]);
// test that the time range is properly limiting
let range = Range{start: 2, stop: 3};
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 1);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
]);
let series_filter = iter.next().unwrap();
assert_eq!(series_filter.id, 2);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
]);
}
// Test helpers
fn get_test_storage_path() -> String {
dotenv().ok();