diff --git a/src/encoders/integer.rs b/src/encoders/integer.rs index 3c7bf9612b..308179be55 100644 --- a/src/encoders/integer.rs +++ b/src/encoders/integer.rs @@ -3,6 +3,7 @@ use integer_encoding::*; use std::error::Error; /// Encoding describes the type of encoding used by an encoded integer block. +#[allow(dead_code)] enum Encoding { Uncompressed = 0, Simple8b = 1, @@ -15,8 +16,8 @@ enum Encoding { /// deltas are then zig-zag encoded. The resulting zig-zag encoded deltas are /// further compressed if possible, either via bit-packing using simple8b or by /// run-length encoding the deltas if they're all the same. -/// -pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Box> { dst.truncate(0); // reset buffer. if src.len() == 0 { return Ok(()); @@ -74,12 +75,14 @@ pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Bo // negative and positive values across even and odd numbers. // // Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]. +#[allow(dead_code)] fn zig_zag_encode(v: i64) -> u64 { ((v << 1) ^ (v >> 63)) as u64 } // zig_zag_decode converts a zig zag encoded unsigned integer into an signed // integer. +#[allow(dead_code)] fn zig_zag_decode(v: u64) -> i64 { ((v >> 1) ^ ((((v & 1) as i64) << 63) >> 63) as u64) as i64 } @@ -87,6 +90,7 @@ fn zig_zag_decode(v: u64) -> i64 { // i64_to_u64_vector converts a Vec to Vec. // TODO(edd): this is expensive as it copies. There are cheap // but unsafe alternatives to look into such as std::mem::transmute +#[allow(dead_code)] fn i64_to_u64_vector(src: &[i64]) -> Vec { src.into_iter().map(|x| *x as u64).collect::>() } @@ -94,6 +98,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec { // u64_to_i64_vector converts a Vec to Vec. // TODO(edd): this is expensive as it copies. There are cheap // but unsafe alternatives to look into such as std::mem::transmute +#[allow(dead_code)] fn u64_to_i64_vector(src: &[u64]) -> Vec { src.into_iter().map(|x| *x as i64).collect::>() } @@ -103,6 +108,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec { // v should be the first element of a sequence, delta the difference that each // value in the sequence differs by, and count the total number of values in the // sequence. +#[allow(dead_code)] fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec) { let max_var_int_size = 10; // max number of bytes needed to store var int dst.push(0); // save a byte for encoding type @@ -122,7 +128,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec) { } /// decode_all decodes a slice of bytes into a vector of signed integers. -pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box> { if src.len() == 0 { return Ok(()); } @@ -137,7 +144,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_uncompressed(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() == 0 || src.len() & 0x7 != 0 { return Err(From::from("invalid uncompressed block length")); } @@ -160,7 +168,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec) -> Result<(), Box> // decode_rle decodes an RLE encoded slice containing only unsigned into the // destination vector. -fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() < 8 { return Err(From::from("not enough data to decode using RLE")); } @@ -193,7 +202,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { Ok(()) } -fn decode_simple8b(src: &[u8], dst: &mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_simple8b(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() < 9 { return Err(From::from("not enough data to decode packed timestamp")); } diff --git a/src/encoders/simple8b.rs b/src/encoders/simple8b.rs index f942f74b99..7ae4a0d893 100644 --- a/src/encoders/simple8b.rs +++ b/src/encoders/simple8b.rs @@ -27,7 +27,7 @@ const NUM_BITS: [[u8; 2]; 14] = [ /// encode_all packs and binary encodes the provides slice of u64 values using /// simple8b into the provided vector. -pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec) -> Result<(), Box> { +pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec) -> Result<(), Box> { let mut i = 0; 'next_value: while i < src.len() { // try to pack a run of 240 or 120 1s @@ -96,6 +96,7 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) { dst.truncate(j); } +#[allow(dead_code)] pub fn decode(v: u64, dst: &mut [u64]) -> usize { let sel = v >> S8B_BIT_SIZE as u64; let mut v = v; diff --git a/src/encoders/timestamp.rs b/src/encoders/timestamp.rs index 203b4a8e0b..73424d12d3 100644 --- a/src/encoders/timestamp.rs +++ b/src/encoders/timestamp.rs @@ -3,6 +3,7 @@ use integer_encoding::*; use std::error::Error; // Encoding describes the type of encoding used by an encoded timestamp block. +#[allow(dead_code)] enum Encoding { Uncompressed = 0, Simple8b = 1, @@ -16,7 +17,8 @@ enum Encoding { /// is potentially carried out. If all the deltas are the same the block can be /// encoded using RLE. If not, as long as the deltas are not bigger than simple8b::MAX_VALUE /// they can be encoded using simple8b. -pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Box> { dst.truncate(0); // reset buffer. if src.len() == 0 { return Ok(()); @@ -91,6 +93,7 @@ pub fn encode_all<'a>(src: &mut Vec, dst: &'a mut Vec) -> Result<(), Bo // i64_to_u64_vector converts a Vec to Vec. // TODO(edd): this is expensive as it copies. There are cheap // but unsafe alternatives to look into such as std::mem::transmute +#[allow(dead_code)] fn i64_to_u64_vector(src: &[i64]) -> Vec { src.into_iter().map(|x| *x as u64).collect::>() } @@ -98,6 +101,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec { // u64_to_i64_vector converts a Vec to Vec. // TODO(edd): this is expensive as it copies. There are cheap // but unsafe alternatives to look into such as std::mem::transmute +#[allow(dead_code)] fn u64_to_i64_vector(src: &[u64]) -> Vec { src.into_iter().map(|x| *x as i64).collect::>() } @@ -107,6 +111,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec { // v should be the first element of a sequence, delta the difference that each // value in the sequence differs by, and count the total number of values in the // sequence. +#[allow(dead_code)] fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec) { let max_var_int_size = 10; // max number of bytes needed to store var int @@ -150,7 +155,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec) { /// decode_all decodes a slice of bytes encoded using encode_all back into a /// vector of signed integers. -pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box> { if src.len() == 0 { return Ok(()); } @@ -166,7 +172,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec) -> Result<(), Box) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_uncompressed(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() == 0 || src.len() & 0x7 != 0 { return Err(From::from("invalid uncompressed block length")); } @@ -189,7 +196,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec) -> Result<(), Box> // decode_rle decodes an RLE encoded slice containing only unsigned into the // destination vector. -fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() < 9 { return Err(From::from("not enough data to decode using RLE")); } @@ -226,7 +234,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec) -> Result<(), Box> { Ok(()) } -fn decode_simple8b(src: &[u8], dst: &mut Vec) -> Result<(), Box> { +#[allow(dead_code)] +fn decode_simple8b(src: &[u8], dst: &mut Vec) -> Result<(), Box> { if src.len() < 9 { return Err(From::from("not enough data to decode packed timestamp")); } diff --git a/src/line_parser/mod.rs b/src/line_parser/mod.rs index 33613fb1a0..57e6b9776d 100644 --- a/src/line_parser/mod.rs +++ b/src/line_parser/mod.rs @@ -1,22 +1,79 @@ use std::str::Chars; use std::{error, fmt}; -use std::fs::read; use actix_web::ResponseError; use actix_web::http::StatusCode; #[derive(Debug, PartialEq, Clone)] -pub struct Point { +pub struct Point { pub series: String, pub time: i64, - pub value: i64, + pub value: T, } -impl Point { +impl Point { pub fn index_pairs(&self) -> Result, ParseError> { index_pairs(&self.series) } } +#[derive(Debug, PartialEq, Clone)] +pub enum PointType { + I64(Point), + F64(Point), +} + +impl PointType { + pub fn new_i64(series: String, value: i64, time: i64) -> PointType { + PointType::I64(Point{series, value, time}) + } + + pub fn new_f64(series: String, value: f64, time: i64) -> PointType { + PointType::F64(Point{series, value, time}) + } + + pub fn series(&self) -> &String { + match self { + PointType::I64(p) => &p.series, + PointType::F64(p) => &p.series, + } + } + + pub fn time(&self) -> i64 { + match self { + PointType::I64(p) => p.time, + PointType::F64(p) => p.time, + } + } + + pub fn set_time(&mut self, t: i64) { + match self { + PointType::I64(p) => p.time = t, + PointType::F64(p) => p.time = t, + } + } + + pub fn i64_value(&self) -> Option { + match self { + PointType::I64(p) => Some(p.value), + _ => None, + } + } + + pub fn f64_value(&self) -> Option { + match self { + PointType::F64(p) => Some(p.value), + _ => None, + } + } + + pub fn index_pairs(&self) -> Result, ParseError> { + match self { + PointType::I64(p) => p.index_pairs(), + PointType::F64(p) => p.index_pairs(), + } + } +} + // TODO: handle escapes in the line protocol for , = and \t /// index_pairs parses the series key into key value pairs for insertion into the index. In /// cases where this series is already in the database, this parse step can be skipped entirely. @@ -90,9 +147,9 @@ impl ResponseError for ParseError { } // TODO: have parse return an error for invalid inputs -pub fn parse(input: &str) -> Vec { - let mut points = Vec::with_capacity(10000); - let mut lines= input.lines(); +pub fn parse(input: &str) -> Vec { + let mut points: Vec = Vec::with_capacity(10000); + let lines= input.lines(); for line in lines { read_line(line, &mut points) @@ -101,7 +158,7 @@ pub fn parse(input: &str) -> Vec { return points; } -fn read_line(line: &str, points: &mut Vec) { +fn read_line(line: &str, points: &mut Vec) { let mut points = points; let mut chars = line.chars(); let mut series = String::with_capacity(1000); @@ -113,7 +170,7 @@ fn read_line(line: &str, points: &mut Vec) { } } -fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec) { +fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec) { let mut chars = chars; let mut points = points; let mut field_name = String::with_capacity(100); @@ -123,34 +180,62 @@ fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec { - let val = value.parse::().unwrap(); - points.push(Point{series: measurement_tags.to_string() + "\t" + &field_name, value: val, time: 0}); - return; + ' ' | ',' => { + let series = measurement_tags.to_string() + "\t" + &field_name; + + // if the last character of the value is an i then it's an integer, otherwise it's + // a float (at least until we support the other data types + let point = match value.ends_with("i") { + true => { + let val = value[..value.len()-1].parse::().unwrap(); + PointType::new_i64(series, val, 0) + }, + false => { + let val = value.parse::().unwrap(); + PointType::new_f64(series, val, 0) + } + }; + points.push(point); + + if ch == ' ' { + return false; + } + + return true; }, _ => value.push(ch), } } + + false } #[cfg(test)] @@ -159,26 +244,63 @@ mod test { #[test] fn parse_single_field() { - let input = "foo asdf=23 1234"; + let input = "foo asdf=23i 1234"; let vals = parse(input); - assert_eq!(vals[0].series, "foo\tasdf"); - assert_eq!(vals[0].time, 1234); - assert_eq!(vals[0].value, 23); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 1234); + assert_eq!(vals[0].i64_value().unwrap(), 23); + + let input = "foo asdf=44 546"; + let vals = parse(input); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 546); + assert_eq!(vals[0].f64_value().unwrap(), 44.0); + + let input = "foo asdf=3.14 123"; + let vals = parse(input); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 123); + assert_eq!(vals[0].f64_value().unwrap(), 3.14); } #[test] fn parse_two_fields() { - let input = "foo asdf=23 bar=5 1234"; + let input = "foo asdf=23i,bar=5i 1234"; let vals = parse(input); - assert_eq!(vals[0].series, "foo\tasdf"); - assert_eq!(vals[0].time, 1234); - assert_eq!(vals[0].value, 23); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 1234); + assert_eq!(vals[0].i64_value().unwrap(), 23); - assert_eq!(vals[1].series, "foo\tbar"); - assert_eq!(vals[1].time, 1234); - assert_eq!(vals[1].value, 5); + assert_eq!(vals[1].series(), "foo\tbar"); + assert_eq!(vals[1].time(), 1234); + assert_eq!(vals[1].i64_value().unwrap(), 5); + + let input = "foo asdf=23.1,bar=5 1234"; + + let vals = parse(input); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 1234); + assert_eq!(vals[0].f64_value().unwrap(), 23.1); + + assert_eq!(vals[1].series(), "foo\tbar"); + assert_eq!(vals[1].time(), 1234); + assert_eq!(vals[1].f64_value().unwrap(), 5.0); + } + + #[test] + fn parse_mixed() { + let input = "foo asdf=23.1,bar=5i 1234"; + + let vals = parse(input); + assert_eq!(vals[0].series(), "foo\tasdf"); + assert_eq!(vals[0].time(), 1234); + assert_eq!(vals[0].f64_value().unwrap(), 23.1); + + assert_eq!(vals[1].series(), "foo\tbar"); + assert_eq!(vals[1].time(), 1234); + assert_eq!(vals[1].i64_value().unwrap(), 5); } #[test] diff --git a/src/main.rs b/src/main.rs index 9af5feb2c8..dd5c9b62a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,19 @@ -use delorean::storage::rocksdb::Database; -use delorean::{line_parser, storage}; -use delorean::storage::iterators::SeriesIterator; -use delorean::storage::rocksdb::{PointsIterator, SeriesFilter, Range}; -use delorean::line_parser::{parse, index_pairs, Pair}; +use delorean::storage::rocksdb::{Database, new_i64_points_iterator, SeriesDataType, new_f64_points_iterator}; +use delorean::line_parser; +use delorean::storage::rocksdb::Range; +use delorean::line_parser::index_pairs; use delorean::storage::predicate::parse_predicate; use delorean::time::{parse_duration, time_as_i64_nanos}; use std::{env, io, str}; use std::env::VarError; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::SystemTime; -use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error, Responder}; -use actix_web::web::Bytes; +use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error}; use serde_json; use serde::Deserialize; -use serde::ser::{Serialize, Serializer, SerializeStruct}; use actix_web::web::{BytesMut}; -use futures::{self, StreamExt, Stream}; +use futures::{self, StreamExt}; use failure::_core::time::Duration; use csv::Writer; @@ -160,7 +155,7 @@ async fn read(read_info: web::Query, s: web::Data>) -> Res let range = Range{start, stop}; - let mut series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?; + let series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?; let bucket_id = series.bucket_id; let db = &s.db; @@ -170,7 +165,6 @@ async fn read(read_info: web::Query, s: web::Data>) -> Res for s in series { let mut wtr = Writer::from_writer(vec![]); - let mut points = PointsIterator::new_from_series_filter(read_info.org_id, bucket_id, &db, &s, &range, 10)?; let pairs = index_pairs(&s.key)?; let mut cols = Vec::with_capacity(pairs.len() + 2); let mut vals = Vec::with_capacity(pairs.len() + 2); @@ -191,16 +185,36 @@ async fn read(read_info: web::Query, s: web::Data>) -> Res wtr.write_record(&cols).unwrap(); - for batch in points { - for p in batch { - let t = p.time.to_string(); - let v = p.value.to_string(); - vals[vcol] = v; - vals[tcol] = t; + match s.series_type { + SeriesDataType::I64 => { + let points = new_i64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10); - wtr.write_record(&vals).unwrap(); - } - } + for batch in points { + for p in batch { + let t = p.time.to_string(); + let v = p.value.to_string(); + vals[vcol] = v; + vals[tcol] = t; + + wtr.write_record(&vals).unwrap(); + } + } + }, + SeriesDataType::F64 => { + let points = new_f64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10); + + for batch in points { + for p in batch { + let t = p.time.to_string(); + let v = p.value.to_string(); + vals[vcol] = v; + vals[tcol] = t; + + wtr.write_record(&vals).unwrap(); + } + } + }, + }; let mut data = match wtr.into_inner() { Ok(d) => d, diff --git a/src/storage/iterators.rs b/src/storage/iterators.rs index a9c5bb43c7..50f7d563da 100644 --- a/src/storage/iterators.rs +++ b/src/storage/iterators.rs @@ -1,7 +1,4 @@ -use crate::delorean::Predicate; -use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, PointsIterator}; - -use rocksdb::{DB, IteratorMode, DBIterator}; +use crate::storage::rocksdb::SeriesFilter; pub struct SeriesIterator { pub org_id: u32, diff --git a/src/storage/predicate.rs b/src/storage/predicate.rs index c5364822bd..52878640d5 100644 --- a/src/storage/predicate.rs +++ b/src/storage/predicate.rs @@ -118,7 +118,7 @@ fn parse_value(chars: &mut Peekable) -> Result { val.push(ch); } }, - Some(ch) => return Err(StorageError{description: "unable to parse non-string values".to_string()}), + Some(ch) => return Err(StorageError{description: format!("unable to parse non-string values like '{}'", ch)}), None => (), } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 5c661a6472..9c59463295 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -1,19 +1,17 @@ -use crate::line_parser::{Point, Pair}; +use crate::line_parser::PointType; use crate::storage::iterators::{ReadPoint, SeriesIterator}; -use crate::delorean::{Bucket, IndexLevel, Predicate, Node, node}; +use crate::delorean::{Bucket, IndexLevel, Predicate, Node}; use crate::delorean::node::{Value, Comparison, Logical}; -use bytes::BufMut; use std::{error, fmt}; -use std::sync::{Arc, RwLock, Mutex, MutexGuard}; +use std::sync::{Arc, RwLock, Mutex}; use std::collections::HashMap; use std::time::SystemTime; use std::io::Cursor; -use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily, DBIterator}; +use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, DBIterator}; use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt}; use prost::Message; -use futures::AsyncWriteExt; use croaring::Treemap; use croaring::treemap::NativeSerializer; use actix_web::ResponseError; @@ -38,12 +36,11 @@ pub struct Database { #[derive(Debug, PartialEq)] pub struct Series { id: Option, - point: Point, + point: PointType, } const BUCKET_CF: &str = "buckets"; const BUCKET_CF_WRITE_BUFFER_SIZE: usize = 1024 * 1024; // 1MB -const INDEX_CF: &str = "indexes"; const INDEX_CF_WRITE_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB impl Database { @@ -93,7 +90,7 @@ impl Database { /// * org_id - the organization this data resides under /// * bucket_name - the string identifier of the bucket /// * points - individual values with their timestamps and series keys - pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec) -> Result<(), StorageError> { + pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec) -> Result<(), StorageError> { let key = bucket_key(org_id, bucket_name); let _ = self.create_default_bucket_if_not_exists(org_id, bucket_name, &key)?; @@ -106,10 +103,15 @@ impl Database { let mut batch = WriteBatch::default(); for s in series { - let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time); + let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time()); let mut value = Vec::with_capacity(8); - value.write_i64::(s.point.value).unwrap(); - batch.put(key, value); + + match s.point { + PointType::I64(p) => value.write_i64::(p.value).unwrap(), + PointType::F64(p) => value.write_f64::(p.value).unwrap(), + } + + batch.put(key, value).unwrap(); } self.db.read().unwrap().write(batch).expect("unexpected RocksDB error"); @@ -126,7 +128,7 @@ impl Database { self.create_bucket_if_not_exists(org_id, &bucket) } - pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result { + pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, _batch_size: usize) -> Result { let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() { Some(b) => b, None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}), @@ -137,12 +139,14 @@ impl Database { Ok(SeriesIterator::new(org_id, bucket.id, series_filters)) } - fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result { - let mut prefix = prefix_for_series(bucket_id, series_id, range.start); + fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, start: i64) -> (DBIterator, Vec) { + let prefix = prefix_for_series(bucket_id, series_id, start); let mode = IteratorMode::From(&prefix, Direction::Forward); - let mut iter = self.db.read().unwrap().iterator(mode); - Ok(PointsIterator::new(batch_size, iter, range.stop, prefix[0..12].to_vec())) + let iter = self.db.read().unwrap().iterator(mode); + let prefix = prefix[0..12].to_vec(); + + (iter, prefix) } /// If the bucket name exists within an org, this function returns the ID (ignoring whether the @@ -176,7 +180,7 @@ impl Database { let mut store = bucket.clone(); // get the next bucket ID - let mut next_id = match db.get_cf(buckets, next_bucket_id_key()) + let next_id = match db.get_cf(buckets, next_bucket_id_key()) .expect("unexpected rocksdb error while trying to get the next bucket id") { Some(val) => u32_from_bytes(&val), @@ -189,7 +193,7 @@ impl Database { // write the bucket and the next ID counter atomically let mut batch = WriteBatch::default(); batch.put_cf(&buckets, &key, buf).unwrap(); - batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1)); + batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1)).unwrap(); db.write(batch).expect("unexpected rocksdb error writing to DB"); let id = store.id; @@ -236,14 +240,14 @@ impl Database { /// /// # Returns /// A vector of series where each point in the passed in vector is contained in a series - pub fn get_series_ids(&self, org_id: u32, bucket: &Bucket, mut points: Vec) -> Vec { + pub fn get_series_ids(&self, _org_id: u32, bucket: &Bucket, points: Vec) -> Vec { let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - let mut series = points.into_iter().map(|p| { + let series = points.into_iter().map(|p| { let mut series = Series{id: None, point: p}; let level = &bucket.index_levels[0]; let cf_name = index_cf_name(bucket.id,level.duration_seconds, now); - series.id = self.get_series_id(&cf_name, &series.point.series); + series.id = self.get_series_id(&cf_name, &series.point.series()); series }).collect(); @@ -259,12 +263,12 @@ impl Database { pub fn get_series_filters(&self, bucket: &Bucket, predicate: Option<&Predicate>, range: &Range) -> Result, StorageError> { if let Some(pred) = predicate { if let Some(root) = &pred.root { - let mut map = self.evaluate_node(bucket, &root, range)?; + let map = self.evaluate_node(bucket, &root, range)?; let mut filters = Vec::with_capacity(map.cardinality() as usize); for id in map.iter() { - let key = self.get_series_key_by_id(&bucket, id, &range)?; - filters.push(SeriesFilter{id, key, value_predicate: None}); + let (key, series_type) = self.get_series_key_and_type_by_id(&bucket, id, &range)?; + filters.push(SeriesFilter{id, key, value_predicate: None, series_type}); } return Ok(filters); @@ -275,7 +279,7 @@ impl Database { Err(StorageError{description: "get for all series ids not wired up yet".to_string()}) } - fn get_series_key_by_id(&self, bucket: &Bucket, id: u64, _range: &Range) -> Result { + fn get_series_key_and_type_by_id(&self, bucket: &Bucket, id: u64, _range: &Range) -> Result<(String, SeriesDataType), StorageError> { let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); @@ -285,7 +289,9 @@ impl Database { Some(cf) => { match db.get_cf(cf, index_series_id_from_id(id)).unwrap() { Some(val) => { - Ok(std::str::from_utf8(&val).unwrap().to_owned()) + let t = series_type_from_byte(val[0]); + let key = std::str::from_utf8(&val[1..]).unwrap().to_owned(); + Ok((key, t)) }, None => Err(StorageError{description: "series id not found".to_string()}) } @@ -370,7 +376,7 @@ impl Database { } // TODO: handle predicate - pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, range: &Range) -> Vec { + pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, _range: &Range) -> Vec { let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); @@ -383,7 +389,7 @@ impl Database { Some(index) => { let prefix = index_tag_key_prefix(bucket.id); let mode = IteratorMode::From(&prefix, Direction::Forward); - let mut iter = db.iterator_cf(index, mode) + let iter = db.iterator_cf(index, mode) .expect("unexpected rocksdb error getting iterator for index"); for (key, _) in iter { @@ -401,7 +407,7 @@ impl Database { keys } - pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, range: &Range) -> Vec { + pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, _range: &Range) -> Vec { let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); @@ -413,7 +419,7 @@ impl Database { Some(index) => { let prefix = index_tag_key_value_prefix(bucket.id, tag); let mode = IteratorMode::From(&prefix, Direction::Forward); - let mut iter = db.iterator_cf(index, mode) + let iter = db.iterator_cf(index, mode) .expect("unexpected rocksdb error getting iterator for index"); for (key, _) in iter { @@ -436,7 +442,7 @@ impl Database { fn ensure_series_mutex_exists(&self, bucket_id: u32) { let map = self.series_insert_lock.read().expect("mutex poisoned"); - if let Some(next_id_mutex) = map.get(&bucket_id) { + if let Some(_next_id_mutex) = map.get(&bucket_id) { return; } @@ -458,7 +464,7 @@ impl Database { // We want to get a lock on new series only for this bucket self.ensure_series_mutex_exists(bucket.id); let map = self.series_insert_lock.read().expect("mutex poisoned"); - let mut next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists"); + let next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists"); let mut next_id = next_id.lock().expect("mutex poisoned"); let mut batch = WriteBatch::default(); @@ -490,13 +496,14 @@ impl Database { } // if we've already put this series in the map in this write, skip it - if let Some(id) = series_id_map.get(&series.point.series) { + if let Some(id) = series_id_map.get(series.point.series()) { series.id = Some(*id); continue; } // now that we have the mutex on series, make sure these weren't inserted in some other thread - if let Some(_) = self.get_series_id(&cf_name, &series.point.series) { + if let Some(id) = self.get_series_id(&cf_name, &series.point.series()) { + series.id = Some(id); continue; } @@ -504,9 +511,9 @@ impl Database { let id = *next_id; let mut series_id = Vec::with_capacity(8); series_id.write_u64::(*next_id).unwrap(); - batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone()); - batch.put_cf(index_cf, index_series_id(&series_id), &series.point.series.as_bytes()); - series_id_map.insert(series.point.series.clone(), *next_id); + batch.put_cf(index_cf, index_series_key_id(&series.point.series()), series_id.clone()).unwrap(); + batch.put_cf(index_cf, index_series_id(&series_id), index_series_id_value(series_type_from_point_type(&series.point), &series.point.series())).unwrap(); + series_id_map.insert(series.point.series().clone(), *next_id); *next_id += 1; // insert the index entries @@ -516,10 +523,10 @@ impl Database { let pairs = series.point.index_pairs().unwrap(); for pair in pairs { // insert the tag key index - batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]); + batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]).unwrap(); // insert the tag value index - batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]); + batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]).unwrap(); // update the key to id bitmap let index_key_posting_list_key = index_key_posting_list(bucket.id, &pair.key).to_vec(); @@ -564,14 +571,14 @@ impl Database { // do the index writes from the in temporary in memory map for (k, v) in index_map.iter() { - batch.put_cf(index_cf, k, v.serialize().unwrap()); + let _ = batch.put_cf(index_cf, k, v.serialize().unwrap()); } // save the next series id let bucket_cf = db.cf_handle(BUCKET_CF).unwrap(); let mut next_series_id_val = Vec::with_capacity(8); next_series_id_val.write_u64::(*next_id).unwrap(); - batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val); + let _ = batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val); db.write(batch).expect("unexpected rocksdb error"); } @@ -595,7 +602,7 @@ impl Database { let buckets = db.cf_handle(BUCKET_CF).unwrap(); let prefix = &[BucketEntryType::Bucket as u8]; - let mut iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap(); + let iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap(); let mut id_mutex_map = HashMap::new(); let mut bucket_map = self.bucket_map.write().unwrap(); @@ -653,20 +660,24 @@ TODO: The index todo list 8. index levels TODO: other pieces -2. HTTP GET endpoint with predicate and time ranges -3. API endpoint to delete old series data -4. API endpoint to delete old indexes -5. API endpoint to run tsm compaction -6. Write/read other data types + - API endpoint to delete old series data + - API endpoint to delete old indexes + - API endpoint to run tsm compaction + - Write/read other data types + - Buckets backed by alternate storage + - Meta store abstracted from RocksDB + - Index abstracted to Trait + - Raw data iterator abstracted to Trait */ -enum SeriesDataType { - Int64, - Float64, - UInt64, - String, - Bool, +#[derive(Debug, PartialEq, Clone)] +pub enum SeriesDataType { + I64, + F64, +// U64, +// String, +// Bool, } fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec { @@ -677,32 +688,43 @@ fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec v } -pub struct PointsIterator<'a> { +pub struct PointsIterator<'a, T> { batch_size: usize, iter: DBIterator<'a>, stop_time: i64, series_prefix: Vec, drained: bool, + read: fn(b: &[u8]) -> T, } -impl PointsIterator<'_> { - pub fn new(batch_size: usize, iter: DBIterator, stop_time: i64, series_prefix: Vec) -> PointsIterator { - PointsIterator{ - batch_size, - iter, - stop_time, - series_prefix, - drained: false, - } - } +pub fn new_i64_points_iterator<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> PointsIterator<'a, i64> { + let (iter, series_prefix) = db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start); - pub fn new_from_series_filter<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> Result, StorageError> { - db.get_db_points_iter(org_id, bucket_id, series_filter.id, range, batch_size) + PointsIterator{ + batch_size, + iter, + stop_time: range.stop, + series_prefix, + drained: false, + read: i64_from_bytes, } } -impl Iterator for PointsIterator<'_> { - type Item = Vec>; +pub fn new_f64_points_iterator<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> PointsIterator<'a, f64> { + let (iter, series_prefix) = db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start); + + PointsIterator{ + batch_size, + iter, + stop_time: range.stop, + series_prefix, + drained: false, + read: f64_from_bytes, + } +} + +impl Iterator for PointsIterator<'_, T> { + type Item = Vec>; fn next(&mut self) -> Option { if self.drained { @@ -733,7 +755,7 @@ impl Iterator for PointsIterator<'_> { } let point = ReadPoint{ - value: BigEndian::read_i64(&value), + value: (self.read)(&value), time, }; @@ -795,6 +817,24 @@ fn index_series_id(id: &Vec) -> Vec { v } +fn index_series_id_value(t: SeriesDataType, key: &str) -> Vec { + let mut v = Vec::with_capacity(1 + key.len()); + v.push(t as u8); + v.append(&mut key.as_bytes().to_vec()); + v +} + +fn series_type_from_point_type(p: &PointType) -> SeriesDataType { + match p { + PointType::I64(_) => SeriesDataType::I64, + PointType::F64(_) => SeriesDataType::F64, + } +} + +fn series_type_from_byte(b: u8) -> SeriesDataType { + unsafe { ::std::mem::transmute(b) } +} + fn index_series_id_from_id(id: u64) -> Vec { let mut v = Vec::with_capacity(8 + 1); v.push(IndexEntryType::IDToSeriesKey as u8); @@ -854,10 +894,6 @@ fn index_key_value_posting_list(bucket_id: u32, key: &str, value: &str) -> Vec IndexEntryType { - unsafe { ::std::mem::transmute(b) } -} - // next_series_id_key gives the key in the buckets CF in rocks that holds the value for the next series ID fn next_series_id_key(org_id: u32, bucket_id: u32) -> Vec { let mut v = Vec::with_capacity(9); @@ -912,10 +948,14 @@ fn u32_to_bytes(val: u32) -> Vec { v } -fn u64_to_bytes(val: u64) -> Vec { - let mut v = Vec::with_capacity(8); - v.write_u64::(val).unwrap(); - v +fn i64_from_bytes(b: &[u8]) -> i64 { + let mut c = Cursor::new(b); + c.read_i64::().unwrap() +} + +fn f64_from_bytes(b: &[u8]) -> f64 { + let mut c = Cursor::new(b); + c.read_f64::().unwrap() } impl Bucket { @@ -941,11 +981,13 @@ fn key_for_series_and_time(bucket_id: u32, series_id: u64, timestamp: i64) -> Ve v } +// TODO: add series type to series filter #[derive(Debug, PartialEq, Clone)] pub struct SeriesFilter { pub id: u64, pub key: String, pub value_predicate: Option, + pub series_type: SeriesDataType, } pub struct Range { @@ -983,22 +1025,16 @@ mod tests { use dotenv::dotenv; use std::env; - use serde_json::error::Category::Data; - use rocksdb; use crate::storage::predicate::parse_predicate; - use crate::storage::rocksdb::IndexEntryType::SeriesKeyToID; - use crate::line_parser::parse; - use crate::storage::iterators::ReadSeries; - use std::net::Shutdown::Read; #[test] fn create_and_get_buckets() { - let mut bucket: Bucket; + let bucket: Bucket; let org_id = 1; let mut bucket2 = Bucket::new(2, "Foo".to_string()); { - let mut db = test_database("create_and_get_buckets", true); + let db = test_database("create_and_get_buckets", true); let mut b = Bucket::new(org_id, "Foo".to_string()); b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); @@ -1029,7 +1065,7 @@ mod tests { // ensure it persists across database reload { - let mut db = test_database("create_and_get_buckets", false); + let db = test_database("create_and_get_buckets", false); let stored_bucket = db.get_bucket_by_name(org_id, &bucket.name).unwrap().unwrap(); assert_eq!(bucket, stored_bucket); @@ -1045,13 +1081,13 @@ mod tests { let org_id = 23; let mut b = Bucket::new(org_id, "series".to_string()); let mut b2 = Bucket::new(1, "series".to_string()); - let p1 = Point{series: "one".to_string(), value: 1, time: 0}; - let p2 = Point{series: "two".to_string(), value: 23, time: 40}; - let p3 = Point{series: "three".to_string(), value: 33, time: 86}; - let p4 = Point{series: "four".to_string(), value: 234, time: 100}; + let p1 = PointType::new_i64("one".to_string(), 1, 0); + let p2 = PointType::new_i64("two".to_string(), 23, 40); + let p3 = PointType::new_i64("three".to_string(), 33, 86); + let p4 = PointType::new_i64("four".to_string(), 234, 100); { - let mut db = test_database("series_id_indexing", true); + let db = test_database("series_id_indexing", true); b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); @@ -1090,7 +1126,7 @@ mod tests { // now make sure that a new series gets inserted properly after restart { - let mut db = test_database("series_id_indexing", false); + let db = test_database("series_id_indexing", false); // check the first org let mut series = vec![Series{id: None, point: p4.clone()}]; @@ -1124,11 +1160,11 @@ mod tests { #[test] fn series_metadata_indexing() { let mut bucket = Bucket::new(1, "foo".to_string()); - let mut db = test_database("series_metadata_indexing", true); - let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0}; - let p2 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0}; - let p3 = Point{series: "cpu,host=a,region=west\tusage_user".to_string(), value: 1, time: 0}; - let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 0}; + let db = test_database("series_metadata_indexing", true); + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0); + let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0); + let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0); + let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0); bucket.id = db.create_bucket_if_not_exists(bucket.org_id, &bucket).unwrap(); let mut series = db.get_series_ids(bucket.org_id, &bucket, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]); @@ -1147,42 +1183,42 @@ mod tests { let pred = parse_predicate("_m = \"cpu\"").unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); assert_eq!(series, vec![ - SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}, - SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, - SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, + SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, + SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, + SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, ]); // get series with host = a let pred = parse_predicate("host = \"a\"").unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); assert_eq!(series, vec![ - SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, - SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, + SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, + SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, ]); // get series with measurement = cpu and host = b let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); assert_eq!(series, vec![ - SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}, + SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, ]); let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); assert_eq!(series, vec![ - SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, - SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, - SeriesFilter{id: 4, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}, + SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, + SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, + SeriesFilter{id: 4, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64}, ]); } #[test] fn write_creates_bucket() { - let mut b1 = Bucket::new(1, "bucket1".to_string()); - let mut db = test_database("write_creates_bucket", true); + let b1 = Bucket::new(1, "bucket1".to_string()); + let db = test_database("write_creates_bucket", true); - let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; - let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2}; + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); + let p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2); db.write_points(b1.org_id, &b1.name, vec![p1, p2]).unwrap(); assert_eq!(db.get_bucket_by_name(b1.org_id, &b1.name).unwrap().unwrap().id, 1); @@ -1191,9 +1227,9 @@ mod tests { #[test] fn catch_rocksdb_iterator_segfault() { let mut b1 = Bucket::new(1, "bucket1".to_string()); - let mut db = test_database("catch_rocksdb_iterator_segfault", true); + let db = test_database("catch_rocksdb_iterator_segfault", true); - let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); @@ -1204,9 +1240,9 @@ mod tests { let pred = parse_predicate("_m = \"cpu\"").unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); assert_eq!(iter.next(), None); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 1, value: 1}, @@ -1218,12 +1254,12 @@ mod tests { fn write_and_read_points() { let mut b1 = Bucket::new(1, "bucket1".to_string()); let mut b2 = Bucket::new(2, "bucket2".to_string()); - let mut db = test_database("write_and_read_points", true); + let db = test_database("write_and_read_points", true); - let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; - let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2}; - let p3 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 2}; - let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 4}; + let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1); + let p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2); + let p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2); + let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4); b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); @@ -1236,9 +1272,9 @@ mod tests { let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); assert_eq!(iter.next(), None); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 1, value: 1}, @@ -1250,8 +1286,8 @@ mod tests { let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 1, value: 1}, @@ -1259,8 +1295,8 @@ mod tests { ]); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 2, value: 1}, @@ -1271,9 +1307,9 @@ mod tests { let pred = parse_predicate("host = \"b\"").unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 1).unwrap(); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); assert_eq!(iter.next(), None); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1).unwrap(); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 1, value: 1}, @@ -1288,22 +1324,50 @@ mod tests { let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 2, value: 1}, ]); let series_filter = iter.next().unwrap(); - assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}); - let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); + assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64}); + let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); let points = points_iter.next().unwrap(); assert_eq!(points, vec![ ReadPoint{time: 2, value: 1}, ]); } + #[test] + fn write_and_read_float_values() { + let mut b1 = Bucket::new(1, "bucket1".to_string()); + let db = test_database("write_and_read_float_values", true); + + let p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1); + let p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2); + + b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); + + db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]).unwrap(); + + // test that we'll only read from the bucket we wrote points into + let range = Range{start: 0, stop: 4}; + let pred = parse_predicate("_m = \"cpu\"").unwrap(); + let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); + let series_filter = iter.next().unwrap(); + assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::F64}); + assert_eq!(iter.next(), None); + let mut points_iter = new_f64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10); + let points = points_iter.next().unwrap(); + assert_eq!(points, vec![ + ReadPoint{time: 1, value: 1.0}, + ReadPoint{time: 2, value: 2.2}, + ]); + assert_eq!(points_iter.next(), None); + } + // Test helpers fn get_test_storage_path() -> String { dotenv().ok(); diff --git a/src/time.rs b/src/time.rs index 699c419619..ea2e842f11 100644 --- a/src/time.rs +++ b/src/time.rs @@ -1,7 +1,5 @@ use crate::Error; -use std::str::Chars; -use std::iter::Peekable; use std::time::{Duration, UNIX_EPOCH}; use std::time::SystemTime; @@ -30,6 +28,7 @@ impl RelativeDuration { } } +// TODO: update this so that we're not indexing into the string. Should be iterating with chars pub fn parse_duration(s: &str) -> Result { if s.len() < 2 { return Err(Error{description: "duration must have at least two characters".to_string()})