Merge pull request #14 from influxdata/pd-fix-warnings

Fix compile warnings from the Rust linter
pull/24376/head
Paul Dix 2020-01-06 08:31:40 -05:00 committed by GitHub
commit ca9667909f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 416 additions and 200 deletions

View File

@ -3,6 +3,7 @@ use integer_encoding::*;
use std::error::Error; use std::error::Error;
/// Encoding describes the type of encoding used by an encoded integer block. /// Encoding describes the type of encoding used by an encoded integer block.
#[allow(dead_code)]
enum Encoding { enum Encoding {
Uncompressed = 0, Uncompressed = 0,
Simple8b = 1, Simple8b = 1,
@ -15,8 +16,8 @@ enum Encoding {
/// deltas are then zig-zag encoded. The resulting zig-zag encoded deltas are /// deltas are then zig-zag encoded. The resulting zig-zag encoded deltas are
/// further compressed if possible, either via bit-packing using simple8b or by /// further compressed if possible, either via bit-packing using simple8b or by
/// run-length encoding the deltas if they're all the same. /// run-length encoding the deltas if they're all the same.
/// #[allow(dead_code)]
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> { pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer. dst.truncate(0); // reset buffer.
if src.len() == 0 { if src.len() == 0 {
return Ok(()); return Ok(());
@ -74,12 +75,14 @@ pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Bo
// negative and positive values across even and odd numbers. // negative and positive values across even and odd numbers.
// //
// Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]. // Eg. [0,-1,1,-2] becomes [0, 1, 2, 3].
#[allow(dead_code)]
fn zig_zag_encode(v: i64) -> u64 { fn zig_zag_encode(v: i64) -> u64 {
((v << 1) ^ (v >> 63)) as u64 ((v << 1) ^ (v >> 63)) as u64
} }
// zig_zag_decode converts a zig zag encoded unsigned integer into an signed // zig_zag_decode converts a zig zag encoded unsigned integer into an signed
// integer. // integer.
#[allow(dead_code)]
fn zig_zag_decode(v: u64) -> i64 { fn zig_zag_decode(v: u64) -> i64 {
((v >> 1) ^ ((((v & 1) as i64) << 63) >> 63) as u64) as i64 ((v >> 1) ^ ((((v & 1) as i64) << 63) >> 63) as u64) as i64
} }
@ -87,6 +90,7 @@ fn zig_zag_decode(v: u64) -> i64 {
// i64_to_u64_vector converts a Vec<i64> to Vec<u64>. // i64_to_u64_vector converts a Vec<i64> to Vec<u64>.
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
#[allow(dead_code)]
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> { fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>() src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
} }
@ -94,6 +98,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
// u64_to_i64_vector converts a Vec<u64> to Vec<i64>. // u64_to_i64_vector converts a Vec<u64> to Vec<i64>.
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
#[allow(dead_code)]
fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> { fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>() src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>()
} }
@ -103,6 +108,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
// v should be the first element of a sequence, delta the difference that each // v should be the first element of a sequence, delta the difference that each
// value in the sequence differs by, and count the total number of values in the // value in the sequence differs by, and count the total number of values in the
// sequence. // sequence.
#[allow(dead_code)]
fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) { fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
let max_var_int_size = 10; // max number of bytes needed to store var int let max_var_int_size = 10; // max number of bytes needed to store var int
dst.push(0); // save a byte for encoding type dst.push(0); // save a byte for encoding type
@ -122,7 +128,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
} }
/// decode_all decodes a slice of bytes into a vector of signed integers. /// decode_all decodes a slice of bytes into a vector of signed integers.
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 { if src.len() == 0 {
return Ok(()); return Ok(());
} }
@ -137,7 +144,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error
} }
} }
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 { if src.len() == 0 || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length")); return Err(From::from("invalid uncompressed block length"));
} }
@ -160,7 +168,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>>
// decode_rle decodes an RLE encoded slice containing only unsigned into the // decode_rle decodes an RLE encoded slice containing only unsigned into the
// destination vector. // destination vector.
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() < 8 { if src.len() < 8 {
return Err(From::from("not enough data to decode using RLE")); return Err(From::from("not enough data to decode using RLE"));
} }
@ -193,7 +202,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
Ok(()) Ok(())
} }
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 { if src.len() < 9 {
return Err(From::from("not enough data to decode packed timestamp")); return Err(From::from("not enough data to decode packed timestamp"));
} }

View File

@ -27,7 +27,7 @@ const NUM_BITS: [[u8; 2]; 14] = [
/// encode_all packs and binary encodes the provides slice of u64 values using /// encode_all packs and binary encodes the provides slice of u64 values using
/// simple8b into the provided vector. /// simple8b into the provided vector.
pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> { pub fn encode_all<'a>(src: &[u64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
let mut i = 0; let mut i = 0;
'next_value: while i < src.len() { 'next_value: while i < src.len() {
// try to pack a run of 240 or 120 1s // try to pack a run of 240 or 120 1s
@ -96,6 +96,7 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<u64>) {
dst.truncate(j); dst.truncate(j);
} }
#[allow(dead_code)]
pub fn decode(v: u64, dst: &mut [u64]) -> usize { pub fn decode(v: u64, dst: &mut [u64]) -> usize {
let sel = v >> S8B_BIT_SIZE as u64; let sel = v >> S8B_BIT_SIZE as u64;
let mut v = v; let mut v = v;

View File

@ -3,6 +3,7 @@ use integer_encoding::*;
use std::error::Error; use std::error::Error;
// Encoding describes the type of encoding used by an encoded timestamp block. // Encoding describes the type of encoding used by an encoded timestamp block.
#[allow(dead_code)]
enum Encoding { enum Encoding {
Uncompressed = 0, Uncompressed = 0,
Simple8b = 1, Simple8b = 1,
@ -16,7 +17,8 @@ enum Encoding {
/// is potentially carried out. If all the deltas are the same the block can be /// is potentially carried out. If all the deltas are the same the block can be
/// encoded using RLE. If not, as long as the deltas are not bigger than simple8b::MAX_VALUE /// encoded using RLE. If not, as long as the deltas are not bigger than simple8b::MAX_VALUE
/// they can be encoded using simple8b. /// they can be encoded using simple8b.
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<Error>> { #[allow(dead_code)]
pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error>> {
dst.truncate(0); // reset buffer. dst.truncate(0); // reset buffer.
if src.len() == 0 { if src.len() == 0 {
return Ok(()); return Ok(());
@ -91,6 +93,7 @@ pub fn encode_all<'a>(src: &mut Vec<i64>, dst: &'a mut Vec<u8>) -> Result<(), Bo
// i64_to_u64_vector converts a Vec<i64> to Vec<u64>. // i64_to_u64_vector converts a Vec<i64> to Vec<u64>.
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
#[allow(dead_code)]
fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> { fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>() src.into_iter().map(|x| *x as u64).collect::<Vec<u64>>()
} }
@ -98,6 +101,7 @@ fn i64_to_u64_vector(src: &[i64]) -> Vec<u64> {
// u64_to_i64_vector converts a Vec<u64> to Vec<i64>. // u64_to_i64_vector converts a Vec<u64> to Vec<i64>.
// TODO(edd): this is expensive as it copies. There are cheap // TODO(edd): this is expensive as it copies. There are cheap
// but unsafe alternatives to look into such as std::mem::transmute // but unsafe alternatives to look into such as std::mem::transmute
#[allow(dead_code)]
fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> { fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>() src.into_iter().map(|x| *x as i64).collect::<Vec<i64>>()
} }
@ -107,6 +111,7 @@ fn u64_to_i64_vector(src: &[u64]) -> Vec<i64> {
// v should be the first element of a sequence, delta the difference that each // v should be the first element of a sequence, delta the difference that each
// value in the sequence differs by, and count the total number of values in the // value in the sequence differs by, and count the total number of values in the
// sequence. // sequence.
#[allow(dead_code)]
fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) { fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
let max_var_int_size = 10; // max number of bytes needed to store var int let max_var_int_size = 10; // max number of bytes needed to store var int
@ -150,7 +155,8 @@ fn encode_rle(v: u64, delta: u64, count: u64, dst: &mut Vec<u8>) {
/// decode_all decodes a slice of bytes encoded using encode_all back into a /// decode_all decodes a slice of bytes encoded using encode_all back into a
/// vector of signed integers. /// vector of signed integers.
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 { if src.len() == 0 {
return Ok(()); return Ok(());
} }
@ -166,7 +172,8 @@ pub fn decode_all<'a>(src: &[u8], dst: &'a mut Vec<i64>) -> Result<(), Box<Error
} }
// decode_uncompressed writes the binary encoded values in src into dst. // decode_uncompressed writes the binary encoded values in src into dst.
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() == 0 || src.len() & 0x7 != 0 { if src.len() == 0 || src.len() & 0x7 != 0 {
return Err(From::from("invalid uncompressed block length")); return Err(From::from("invalid uncompressed block length"));
} }
@ -189,7 +196,8 @@ fn decode_uncompressed(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>>
// decode_rle decodes an RLE encoded slice containing only unsigned into the // decode_rle decodes an RLE encoded slice containing only unsigned into the
// destination vector. // destination vector.
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 { if src.len() < 9 {
return Err(From::from("not enough data to decode using RLE")); return Err(From::from("not enough data to decode using RLE"));
} }
@ -226,7 +234,8 @@ fn decode_rle(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> {
Ok(()) Ok(())
} }
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<Error>> { #[allow(dead_code)]
fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>> {
if src.len() < 9 { if src.len() < 9 {
return Err(From::from("not enough data to decode packed timestamp")); return Err(From::from("not enough data to decode packed timestamp"));
} }

View File

@ -1,22 +1,79 @@
use std::str::Chars; use std::str::Chars;
use std::{error, fmt}; use std::{error, fmt};
use std::fs::read;
use actix_web::ResponseError; use actix_web::ResponseError;
use actix_web::http::StatusCode; use actix_web::http::StatusCode;
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct Point { pub struct Point<T> {
pub series: String, pub series: String,
pub time: i64, pub time: i64,
pub value: i64, pub value: T,
} }
impl Point { impl<T> Point<T> {
pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> { pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> {
index_pairs(&self.series) index_pairs(&self.series)
} }
} }
#[derive(Debug, PartialEq, Clone)]
pub enum PointType {
I64(Point<i64>),
F64(Point<f64>),
}
impl PointType {
pub fn new_i64(series: String, value: i64, time: i64) -> PointType {
PointType::I64(Point{series, value, time})
}
pub fn new_f64(series: String, value: f64, time: i64) -> PointType {
PointType::F64(Point{series, value, time})
}
pub fn series(&self) -> &String {
match self {
PointType::I64(p) => &p.series,
PointType::F64(p) => &p.series,
}
}
pub fn time(&self) -> i64 {
match self {
PointType::I64(p) => p.time,
PointType::F64(p) => p.time,
}
}
pub fn set_time(&mut self, t: i64) {
match self {
PointType::I64(p) => p.time = t,
PointType::F64(p) => p.time = t,
}
}
pub fn i64_value(&self) -> Option<i64> {
match self {
PointType::I64(p) => Some(p.value),
_ => None,
}
}
pub fn f64_value(&self) -> Option<f64> {
match self {
PointType::F64(p) => Some(p.value),
_ => None,
}
}
pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> {
match self {
PointType::I64(p) => p.index_pairs(),
PointType::F64(p) => p.index_pairs(),
}
}
}
// TODO: handle escapes in the line protocol for , = and \t // TODO: handle escapes in the line protocol for , = and \t
/// index_pairs parses the series key into key value pairs for insertion into the index. In /// index_pairs parses the series key into key value pairs for insertion into the index. In
/// cases where this series is already in the database, this parse step can be skipped entirely. /// cases where this series is already in the database, this parse step can be skipped entirely.
@ -90,9 +147,9 @@ impl ResponseError for ParseError {
} }
// TODO: have parse return an error for invalid inputs // TODO: have parse return an error for invalid inputs
pub fn parse(input: &str) -> Vec<Point> { pub fn parse(input: &str) -> Vec<PointType> {
let mut points = Vec::with_capacity(10000); let mut points: Vec<PointType> = Vec::with_capacity(10000);
let mut lines= input.lines(); let lines= input.lines();
for line in lines { for line in lines {
read_line(line, &mut points) read_line(line, &mut points)
@ -101,7 +158,7 @@ pub fn parse(input: &str) -> Vec<Point> {
return points; return points;
} }
fn read_line(line: &str, points: &mut Vec<Point>) { fn read_line(line: &str, points: &mut Vec<PointType>) {
let mut points = points; let mut points = points;
let mut chars = line.chars(); let mut chars = line.chars();
let mut series = String::with_capacity(1000); let mut series = String::with_capacity(1000);
@ -113,7 +170,7 @@ fn read_line(line: &str, points: &mut Vec<Point>) {
} }
} }
fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec<Point>) { fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec<PointType>) {
let mut chars = chars; let mut chars = chars;
let mut points = points; let mut points = points;
let mut field_name = String::with_capacity(100); let mut field_name = String::with_capacity(100);
@ -123,34 +180,62 @@ fn read_fields(measurement_tags: &str, chars: &mut Chars, points: &mut Vec<Point
while let Some(ch) = chars.next() { while let Some(ch) = chars.next() {
match ch { match ch {
'=' => { '=' => {
read_value(&measurement_tags, field_name, &mut chars, &mut points); let should_break = !read_value(&measurement_tags, field_name, &mut chars, &mut points);
field_name = String::with_capacity(100); field_name = String::with_capacity(100);
if should_break {
break
}
}, },
_ => field_name.push(ch), _ => field_name.push(ch),
} }
} }
// read the time
for ch in chars {
field_name.push(ch);
}
let time = field_name.parse::<i64>().unwrap(); let time = field_name.parse::<i64>().unwrap();
while point_offset < points.len() { while point_offset < points.len() {
points[point_offset].time = time; points[point_offset].set_time(time);
point_offset += 1; point_offset += 1;
} }
} }
fn read_value(measurement_tags: &str, field_name: String, chars: &mut Chars, points: &mut Vec<Point>) { // read_value reads the value from the chars and returns true if there are more fields and values to be read
fn read_value(measurement_tags: &str, field_name: String, chars: &mut Chars, points: &mut Vec<PointType>) -> bool {
let mut value = String::new(); let mut value = String::new();
while let Some(ch) = chars.next() { while let Some(ch) = chars.next() {
match ch { match ch {
' ' => { ' ' | ',' => {
let val = value.parse::<i64>().unwrap(); let series = measurement_tags.to_string() + "\t" + &field_name;
points.push(Point{series: measurement_tags.to_string() + "\t" + &field_name, value: val, time: 0});
return; // if the last character of the value is an i then it's an integer, otherwise it's
// a float (at least until we support the other data types
let point = match value.ends_with("i") {
true => {
let val = value[..value.len()-1].parse::<i64>().unwrap();
PointType::new_i64(series, val, 0)
},
false => {
let val = value.parse::<f64>().unwrap();
PointType::new_f64(series, val, 0)
}
};
points.push(point);
if ch == ' ' {
return false;
}
return true;
}, },
_ => value.push(ch), _ => value.push(ch),
} }
} }
false
} }
#[cfg(test)] #[cfg(test)]
@ -159,26 +244,63 @@ mod test {
#[test] #[test]
fn parse_single_field() { fn parse_single_field() {
let input = "foo asdf=23 1234"; let input = "foo asdf=23i 1234";
let vals = parse(input); let vals = parse(input);
assert_eq!(vals[0].series, "foo\tasdf"); assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time, 1234); assert_eq!(vals[0].time(), 1234);
assert_eq!(vals[0].value, 23); assert_eq!(vals[0].i64_value().unwrap(), 23);
let input = "foo asdf=44 546";
let vals = parse(input);
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 546);
assert_eq!(vals[0].f64_value().unwrap(), 44.0);
let input = "foo asdf=3.14 123";
let vals = parse(input);
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 123);
assert_eq!(vals[0].f64_value().unwrap(), 3.14);
} }
#[test] #[test]
fn parse_two_fields() { fn parse_two_fields() {
let input = "foo asdf=23 bar=5 1234"; let input = "foo asdf=23i,bar=5i 1234";
let vals = parse(input); let vals = parse(input);
assert_eq!(vals[0].series, "foo\tasdf"); assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time, 1234); assert_eq!(vals[0].time(), 1234);
assert_eq!(vals[0].value, 23); assert_eq!(vals[0].i64_value().unwrap(), 23);
assert_eq!(vals[1].series, "foo\tbar"); assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time, 1234); assert_eq!(vals[1].time(), 1234);
assert_eq!(vals[1].value, 5); assert_eq!(vals[1].i64_value().unwrap(), 5);
let input = "foo asdf=23.1,bar=5 1234";
let vals = parse(input);
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
assert_eq!(vals[0].f64_value().unwrap(), 23.1);
assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time(), 1234);
assert_eq!(vals[1].f64_value().unwrap(), 5.0);
}
#[test]
fn parse_mixed() {
let input = "foo asdf=23.1,bar=5i 1234";
let vals = parse(input);
assert_eq!(vals[0].series(), "foo\tasdf");
assert_eq!(vals[0].time(), 1234);
assert_eq!(vals[0].f64_value().unwrap(), 23.1);
assert_eq!(vals[1].series(), "foo\tbar");
assert_eq!(vals[1].time(), 1234);
assert_eq!(vals[1].i64_value().unwrap(), 5);
} }
#[test] #[test]

View File

@ -1,24 +1,19 @@
use delorean::storage::rocksdb::Database; use delorean::storage::rocksdb::{Database, new_i64_points_iterator, SeriesDataType, new_f64_points_iterator};
use delorean::{line_parser, storage}; use delorean::line_parser;
use delorean::storage::iterators::SeriesIterator; use delorean::storage::rocksdb::Range;
use delorean::storage::rocksdb::{PointsIterator, SeriesFilter, Range}; use delorean::line_parser::index_pairs;
use delorean::line_parser::{parse, index_pairs, Pair};
use delorean::storage::predicate::parse_predicate; use delorean::storage::predicate::parse_predicate;
use delorean::time::{parse_duration, time_as_i64_nanos}; use delorean::time::{parse_duration, time_as_i64_nanos};
use std::{env, io, str}; use std::{env, io, str};
use std::env::VarError; use std::env::VarError;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::SystemTime;
use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error, Responder}; use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error};
use actix_web::web::Bytes;
use serde_json; use serde_json;
use serde::Deserialize; use serde::Deserialize;
use serde::ser::{Serialize, Serializer, SerializeStruct};
use actix_web::web::{BytesMut}; use actix_web::web::{BytesMut};
use futures::{self, StreamExt, Stream}; use futures::{self, StreamExt};
use failure::_core::time::Duration; use failure::_core::time::Duration;
use csv::Writer; use csv::Writer;
@ -160,7 +155,7 @@ async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Res
let range = Range{start, stop}; let range = Range{start, stop};
let mut series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?; let series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?;
let bucket_id = series.bucket_id; let bucket_id = series.bucket_id;
let db = &s.db; let db = &s.db;
@ -170,7 +165,6 @@ async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Res
for s in series { for s in series {
let mut wtr = Writer::from_writer(vec![]); let mut wtr = Writer::from_writer(vec![]);
let mut points = PointsIterator::new_from_series_filter(read_info.org_id, bucket_id, &db, &s, &range, 10)?;
let pairs = index_pairs(&s.key)?; let pairs = index_pairs(&s.key)?;
let mut cols = Vec::with_capacity(pairs.len() + 2); let mut cols = Vec::with_capacity(pairs.len() + 2);
let mut vals = Vec::with_capacity(pairs.len() + 2); let mut vals = Vec::with_capacity(pairs.len() + 2);
@ -191,16 +185,36 @@ async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Res
wtr.write_record(&cols).unwrap(); wtr.write_record(&cols).unwrap();
for batch in points { match s.series_type {
for p in batch { SeriesDataType::I64 => {
let t = p.time.to_string(); let points = new_i64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10);
let v = p.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap(); for batch in points {
} for p in batch {
} let t = p.time.to_string();
let v = p.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
},
SeriesDataType::F64 => {
let points = new_f64_points_iterator(read_info.org_id, bucket_id, &db, &s, &range, 10);
for batch in points {
for p in batch {
let t = p.time.to_string();
let v = p.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
},
};
let mut data = match wtr.into_inner() { let mut data = match wtr.into_inner() {
Ok(d) => d, Ok(d) => d,

View File

@ -1,7 +1,4 @@
use crate::delorean::Predicate; use crate::storage::rocksdb::SeriesFilter;
use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, PointsIterator};
use rocksdb::{DB, IteratorMode, DBIterator};
pub struct SeriesIterator { pub struct SeriesIterator {
pub org_id: u32, pub org_id: u32,

View File

@ -118,7 +118,7 @@ fn parse_value(chars: &mut Peekable<Chars>) -> Result<Value, StorageError> {
val.push(ch); val.push(ch);
} }
}, },
Some(ch) => return Err(StorageError{description: "unable to parse non-string values".to_string()}), Some(ch) => return Err(StorageError{description: format!("unable to parse non-string values like '{}'", ch)}),
None => (), None => (),
} }

View File

@ -1,19 +1,17 @@
use crate::line_parser::{Point, Pair}; use crate::line_parser::PointType;
use crate::storage::iterators::{ReadPoint, SeriesIterator}; use crate::storage::iterators::{ReadPoint, SeriesIterator};
use crate::delorean::{Bucket, IndexLevel, Predicate, Node, node}; use crate::delorean::{Bucket, IndexLevel, Predicate, Node};
use crate::delorean::node::{Value, Comparison, Logical}; use crate::delorean::node::{Value, Comparison, Logical};
use bytes::BufMut;
use std::{error, fmt}; use std::{error, fmt};
use std::sync::{Arc, RwLock, Mutex, MutexGuard}; use std::sync::{Arc, RwLock, Mutex};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::SystemTime; use std::time::SystemTime;
use std::io::Cursor; use std::io::Cursor;
use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, ColumnFamily, DBIterator}; use rocksdb::{DB, IteratorMode, WriteBatch, Options, ColumnFamilyDescriptor, Direction, DBIterator};
use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt}; use byteorder::{ByteOrder, BigEndian, WriteBytesExt, ReadBytesExt};
use prost::Message; use prost::Message;
use futures::AsyncWriteExt;
use croaring::Treemap; use croaring::Treemap;
use croaring::treemap::NativeSerializer; use croaring::treemap::NativeSerializer;
use actix_web::ResponseError; use actix_web::ResponseError;
@ -38,12 +36,11 @@ pub struct Database {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct Series { pub struct Series {
id: Option<u64>, id: Option<u64>,
point: Point, point: PointType,
} }
const BUCKET_CF: &str = "buckets"; const BUCKET_CF: &str = "buckets";
const BUCKET_CF_WRITE_BUFFER_SIZE: usize = 1024 * 1024; // 1MB const BUCKET_CF_WRITE_BUFFER_SIZE: usize = 1024 * 1024; // 1MB
const INDEX_CF: &str = "indexes";
const INDEX_CF_WRITE_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB const INDEX_CF_WRITE_BUFFER_SIZE: usize = 10 * 1024 * 1024; // 10MB
impl Database { impl Database {
@ -93,7 +90,7 @@ impl Database {
/// * org_id - the organization this data resides under /// * org_id - the organization this data resides under
/// * bucket_name - the string identifier of the bucket /// * bucket_name - the string identifier of the bucket
/// * points - individual values with their timestamps and series keys /// * points - individual values with their timestamps and series keys
pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec<Point>) -> Result<(), StorageError> { pub fn write_points(&self, org_id: u32, bucket_name: &str, points: Vec<PointType>) -> Result<(), StorageError> {
let key = bucket_key(org_id, bucket_name); let key = bucket_key(org_id, bucket_name);
let _ = self.create_default_bucket_if_not_exists(org_id, bucket_name, &key)?; let _ = self.create_default_bucket_if_not_exists(org_id, bucket_name, &key)?;
@ -106,10 +103,15 @@ impl Database {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
for s in series { for s in series {
let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time); let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time());
let mut value = Vec::with_capacity(8); let mut value = Vec::with_capacity(8);
value.write_i64::<BigEndian>(s.point.value).unwrap();
batch.put(key, value); match s.point {
PointType::I64(p) => value.write_i64::<BigEndian>(p.value).unwrap(),
PointType::F64(p) => value.write_f64::<BigEndian>(p.value).unwrap(),
}
batch.put(key, value).unwrap();
} }
self.db.read().unwrap().write(batch).expect("unexpected RocksDB error"); self.db.read().unwrap().write(batch).expect("unexpected RocksDB error");
@ -126,7 +128,7 @@ impl Database {
self.create_bucket_if_not_exists(org_id, &bucket) self.create_bucket_if_not_exists(org_id, &bucket)
} }
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result<SeriesIterator, StorageError> { pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, _batch_size: usize) -> Result<SeriesIterator, StorageError> {
let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() { let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() {
Some(b) => b, Some(b) => b,
None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}), None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}),
@ -137,12 +139,14 @@ impl Database {
Ok(SeriesIterator::new(org_id, bucket.id, series_filters)) Ok(SeriesIterator::new(org_id, bucket.id, series_filters))
} }
fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result<PointsIterator, StorageError> { fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, start: i64) -> (DBIterator, Vec<u8>) {
let mut prefix = prefix_for_series(bucket_id, series_id, range.start); let prefix = prefix_for_series(bucket_id, series_id, start);
let mode = IteratorMode::From(&prefix, Direction::Forward); let mode = IteratorMode::From(&prefix, Direction::Forward);
let mut iter = self.db.read().unwrap().iterator(mode);
Ok(PointsIterator::new(batch_size, iter, range.stop, prefix[0..12].to_vec())) let iter = self.db.read().unwrap().iterator(mode);
let prefix = prefix[0..12].to_vec();
(iter, prefix)
} }
/// If the bucket name exists within an org, this function returns the ID (ignoring whether the /// If the bucket name exists within an org, this function returns the ID (ignoring whether the
@ -176,7 +180,7 @@ impl Database {
let mut store = bucket.clone(); let mut store = bucket.clone();
// get the next bucket ID // get the next bucket ID
let mut next_id = match db.get_cf(buckets, next_bucket_id_key()) let next_id = match db.get_cf(buckets, next_bucket_id_key())
.expect("unexpected rocksdb error while trying to get the next bucket id") { .expect("unexpected rocksdb error while trying to get the next bucket id") {
Some(val) => u32_from_bytes(&val), Some(val) => u32_from_bytes(&val),
@ -189,7 +193,7 @@ impl Database {
// write the bucket and the next ID counter atomically // write the bucket and the next ID counter atomically
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(&buckets, &key, buf).unwrap(); batch.put_cf(&buckets, &key, buf).unwrap();
batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1)); batch.put_cf(&buckets, next_bucket_id_key(), u32_to_bytes(store.id + 1)).unwrap();
db.write(batch).expect("unexpected rocksdb error writing to DB"); db.write(batch).expect("unexpected rocksdb error writing to DB");
let id = store.id; let id = store.id;
@ -236,14 +240,14 @@ impl Database {
/// ///
/// # Returns /// # Returns
/// A vector of series where each point in the passed in vector is contained in a series /// A vector of series where each point in the passed in vector is contained in a series
pub fn get_series_ids(&self, org_id: u32, bucket: &Bucket, mut points: Vec<Point>) -> Vec<Series> { pub fn get_series_ids(&self, _org_id: u32, bucket: &Bucket, points: Vec<PointType>) -> Vec<Series> {
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let mut series = points.into_iter().map(|p| { let series = points.into_iter().map(|p| {
let mut series = Series{id: None, point: p}; let mut series = Series{id: None, point: p};
let level = &bucket.index_levels[0]; let level = &bucket.index_levels[0];
let cf_name = index_cf_name(bucket.id,level.duration_seconds, now); let cf_name = index_cf_name(bucket.id,level.duration_seconds, now);
series.id = self.get_series_id(&cf_name, &series.point.series); series.id = self.get_series_id(&cf_name, &series.point.series());
series series
}).collect(); }).collect();
@ -259,12 +263,12 @@ impl Database {
pub fn get_series_filters(&self, bucket: &Bucket, predicate: Option<&Predicate>, range: &Range) -> Result<Vec<SeriesFilter>, StorageError> { pub fn get_series_filters(&self, bucket: &Bucket, predicate: Option<&Predicate>, range: &Range) -> Result<Vec<SeriesFilter>, StorageError> {
if let Some(pred) = predicate { if let Some(pred) = predicate {
if let Some(root) = &pred.root { if let Some(root) = &pred.root {
let mut map = self.evaluate_node(bucket, &root, range)?; let map = self.evaluate_node(bucket, &root, range)?;
let mut filters = Vec::with_capacity(map.cardinality() as usize); let mut filters = Vec::with_capacity(map.cardinality() as usize);
for id in map.iter() { for id in map.iter() {
let key = self.get_series_key_by_id(&bucket, id, &range)?; let (key, series_type) = self.get_series_key_and_type_by_id(&bucket, id, &range)?;
filters.push(SeriesFilter{id, key, value_predicate: None}); filters.push(SeriesFilter{id, key, value_predicate: None, series_type});
} }
return Ok(filters); return Ok(filters);
@ -275,7 +279,7 @@ impl Database {
Err(StorageError{description: "get for all series ids not wired up yet".to_string()}) Err(StorageError{description: "get for all series ids not wired up yet".to_string()})
} }
fn get_series_key_by_id(&self, bucket: &Bucket, id: u64, _range: &Range) -> Result<String, StorageError> { fn get_series_key_and_type_by_id(&self, bucket: &Bucket, id: u64, _range: &Range) -> Result<(String, SeriesDataType), StorageError> {
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
@ -285,7 +289,9 @@ impl Database {
Some(cf) => { Some(cf) => {
match db.get_cf(cf, index_series_id_from_id(id)).unwrap() { match db.get_cf(cf, index_series_id_from_id(id)).unwrap() {
Some(val) => { Some(val) => {
Ok(std::str::from_utf8(&val).unwrap().to_owned()) let t = series_type_from_byte(val[0]);
let key = std::str::from_utf8(&val[1..]).unwrap().to_owned();
Ok((key, t))
}, },
None => Err(StorageError{description: "series id not found".to_string()}) None => Err(StorageError{description: "series id not found".to_string()})
} }
@ -370,7 +376,7 @@ impl Database {
} }
// TODO: handle predicate // TODO: handle predicate
pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, range: &Range) -> Vec<String> { pub fn get_tag_keys(&self, bucket: &Bucket, _predicate: Option<&Predicate>, _range: &Range) -> Vec<String> {
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
@ -383,7 +389,7 @@ impl Database {
Some(index) => { Some(index) => {
let prefix = index_tag_key_prefix(bucket.id); let prefix = index_tag_key_prefix(bucket.id);
let mode = IteratorMode::From(&prefix, Direction::Forward); let mode = IteratorMode::From(&prefix, Direction::Forward);
let mut iter = db.iterator_cf(index, mode) let iter = db.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index"); .expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter { for (key, _) in iter {
@ -401,7 +407,7 @@ impl Database {
keys keys
} }
pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, range: &Range) -> Vec<String> { pub fn get_tag_values(&self, bucket: &Bucket, tag: &str, _predicate: Option<&Predicate>, _range: &Range) -> Vec<String> {
let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range let index_level = bucket.index_levels.get(0).unwrap(); // TODO: find the right index based on range
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now); let cf_name = index_cf_name(bucket.id, index_level.duration_seconds, now);
@ -413,7 +419,7 @@ impl Database {
Some(index) => { Some(index) => {
let prefix = index_tag_key_value_prefix(bucket.id, tag); let prefix = index_tag_key_value_prefix(bucket.id, tag);
let mode = IteratorMode::From(&prefix, Direction::Forward); let mode = IteratorMode::From(&prefix, Direction::Forward);
let mut iter = db.iterator_cf(index, mode) let iter = db.iterator_cf(index, mode)
.expect("unexpected rocksdb error getting iterator for index"); .expect("unexpected rocksdb error getting iterator for index");
for (key, _) in iter { for (key, _) in iter {
@ -436,7 +442,7 @@ impl Database {
fn ensure_series_mutex_exists(&self, bucket_id: u32) { fn ensure_series_mutex_exists(&self, bucket_id: u32) {
let map = self.series_insert_lock.read().expect("mutex poisoned"); let map = self.series_insert_lock.read().expect("mutex poisoned");
if let Some(next_id_mutex) = map.get(&bucket_id) { if let Some(_next_id_mutex) = map.get(&bucket_id) {
return; return;
} }
@ -458,7 +464,7 @@ impl Database {
// We want to get a lock on new series only for this bucket // We want to get a lock on new series only for this bucket
self.ensure_series_mutex_exists(bucket.id); self.ensure_series_mutex_exists(bucket.id);
let map = self.series_insert_lock.read().expect("mutex poisoned"); let map = self.series_insert_lock.read().expect("mutex poisoned");
let mut next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists"); let next_id = map.get(&bucket.id).expect("should exist because of call to ensure_series_mutex_exists");
let mut next_id = next_id.lock().expect("mutex poisoned"); let mut next_id = next_id.lock().expect("mutex poisoned");
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
@ -490,13 +496,14 @@ impl Database {
} }
// if we've already put this series in the map in this write, skip it // if we've already put this series in the map in this write, skip it
if let Some(id) = series_id_map.get(&series.point.series) { if let Some(id) = series_id_map.get(series.point.series()) {
series.id = Some(*id); series.id = Some(*id);
continue; continue;
} }
// now that we have the mutex on series, make sure these weren't inserted in some other thread // now that we have the mutex on series, make sure these weren't inserted in some other thread
if let Some(_) = self.get_series_id(&cf_name, &series.point.series) { if let Some(id) = self.get_series_id(&cf_name, &series.point.series()) {
series.id = Some(id);
continue; continue;
} }
@ -504,9 +511,9 @@ impl Database {
let id = *next_id; let id = *next_id;
let mut series_id = Vec::with_capacity(8); let mut series_id = Vec::with_capacity(8);
series_id.write_u64::<BigEndian>(*next_id).unwrap(); series_id.write_u64::<BigEndian>(*next_id).unwrap();
batch.put_cf(index_cf, index_series_key_id(&series.point.series), series_id.clone()); batch.put_cf(index_cf, index_series_key_id(&series.point.series()), series_id.clone()).unwrap();
batch.put_cf(index_cf, index_series_id(&series_id), &series.point.series.as_bytes()); batch.put_cf(index_cf, index_series_id(&series_id), index_series_id_value(series_type_from_point_type(&series.point), &series.point.series())).unwrap();
series_id_map.insert(series.point.series.clone(), *next_id); series_id_map.insert(series.point.series().clone(), *next_id);
*next_id += 1; *next_id += 1;
// insert the index entries // insert the index entries
@ -516,10 +523,10 @@ impl Database {
let pairs = series.point.index_pairs().unwrap(); let pairs = series.point.index_pairs().unwrap();
for pair in pairs { for pair in pairs {
// insert the tag key index // insert the tag key index
batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]); batch.put_cf(index_cf, index_tag_key(bucket.id, &pair.key), vec![0 as u8]).unwrap();
// insert the tag value index // insert the tag value index
batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]); batch.put_cf(index_cf, index_tag_key_value(bucket.id, &pair.key, &pair.value), vec![0 as u8]).unwrap();
// update the key to id bitmap // update the key to id bitmap
let index_key_posting_list_key = index_key_posting_list(bucket.id, &pair.key).to_vec(); let index_key_posting_list_key = index_key_posting_list(bucket.id, &pair.key).to_vec();
@ -564,14 +571,14 @@ impl Database {
// do the index writes from the in temporary in memory map // do the index writes from the in temporary in memory map
for (k, v) in index_map.iter() { for (k, v) in index_map.iter() {
batch.put_cf(index_cf, k, v.serialize().unwrap()); let _ = batch.put_cf(index_cf, k, v.serialize().unwrap());
} }
// save the next series id // save the next series id
let bucket_cf = db.cf_handle(BUCKET_CF).unwrap(); let bucket_cf = db.cf_handle(BUCKET_CF).unwrap();
let mut next_series_id_val = Vec::with_capacity(8); let mut next_series_id_val = Vec::with_capacity(8);
next_series_id_val.write_u64::<BigEndian>(*next_id).unwrap(); next_series_id_val.write_u64::<BigEndian>(*next_id).unwrap();
batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val); let _ = batch.put_cf(bucket_cf, next_series_id_key(org_id, bucket.id), next_series_id_val);
db.write(batch).expect("unexpected rocksdb error"); db.write(batch).expect("unexpected rocksdb error");
} }
@ -595,7 +602,7 @@ impl Database {
let buckets = db.cf_handle(BUCKET_CF).unwrap(); let buckets = db.cf_handle(BUCKET_CF).unwrap();
let prefix = &[BucketEntryType::Bucket as u8]; let prefix = &[BucketEntryType::Bucket as u8];
let mut iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap(); let iter = db.iterator_cf(&buckets, IteratorMode::From(prefix, Direction::Forward)).unwrap();
let mut id_mutex_map = HashMap::new(); let mut id_mutex_map = HashMap::new();
let mut bucket_map = self.bucket_map.write().unwrap(); let mut bucket_map = self.bucket_map.write().unwrap();
@ -653,20 +660,24 @@ TODO: The index todo list
8. index levels 8. index levels
TODO: other pieces TODO: other pieces
2. HTTP GET endpoint with predicate and time ranges - API endpoint to delete old series data
3. API endpoint to delete old series data - API endpoint to delete old indexes
4. API endpoint to delete old indexes - API endpoint to run tsm compaction
5. API endpoint to run tsm compaction - Write/read other data types
6. Write/read other data types - Buckets backed by alternate storage
- Meta store abstracted from RocksDB
- Index abstracted to Trait
- Raw data iterator abstracted to Trait
*/ */
enum SeriesDataType { #[derive(Debug, PartialEq, Clone)]
Int64, pub enum SeriesDataType {
Float64, I64,
UInt64, F64,
String, // U64,
Bool, // String,
// Bool,
} }
fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec<u8> { fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec<u8> {
@ -677,32 +688,43 @@ fn prefix_for_series(bucket_id: u32, series_id: u64, start_time: i64) -> Vec<u8>
v v
} }
pub struct PointsIterator<'a> { pub struct PointsIterator<'a, T> {
batch_size: usize, batch_size: usize,
iter: DBIterator<'a>, iter: DBIterator<'a>,
stop_time: i64, stop_time: i64,
series_prefix: Vec<u8>, series_prefix: Vec<u8>,
drained: bool, drained: bool,
read: fn(b: &[u8]) -> T,
} }
impl PointsIterator<'_> { pub fn new_i64_points_iterator<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> PointsIterator<'a, i64> {
pub fn new(batch_size: usize, iter: DBIterator, stop_time: i64, series_prefix: Vec<u8>) -> PointsIterator { let (iter, series_prefix) = db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start);
PointsIterator{
batch_size,
iter,
stop_time,
series_prefix,
drained: false,
}
}
pub fn new_from_series_filter<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> Result<PointsIterator<'a>, StorageError> { PointsIterator{
db.get_db_points_iter(org_id, bucket_id, series_filter.id, range, batch_size) batch_size,
iter,
stop_time: range.stop,
series_prefix,
drained: false,
read: i64_from_bytes,
} }
} }
impl Iterator for PointsIterator<'_> { pub fn new_f64_points_iterator<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> PointsIterator<'a, f64> {
type Item = Vec<ReadPoint<i64>>; let (iter, series_prefix) = db.get_db_points_iter(org_id, bucket_id, series_filter.id, range.start);
PointsIterator{
batch_size,
iter,
stop_time: range.stop,
series_prefix,
drained: false,
read: f64_from_bytes,
}
}
impl<T> Iterator for PointsIterator<'_, T> {
type Item = Vec<ReadPoint<T>>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.drained { if self.drained {
@ -733,7 +755,7 @@ impl Iterator for PointsIterator<'_> {
} }
let point = ReadPoint{ let point = ReadPoint{
value: BigEndian::read_i64(&value), value: (self.read)(&value),
time, time,
}; };
@ -795,6 +817,24 @@ fn index_series_id(id: &Vec<u8>) -> Vec<u8> {
v v
} }
fn index_series_id_value(t: SeriesDataType, key: &str) -> Vec<u8> {
let mut v = Vec::with_capacity(1 + key.len());
v.push(t as u8);
v.append(&mut key.as_bytes().to_vec());
v
}
fn series_type_from_point_type(p: &PointType) -> SeriesDataType {
match p {
PointType::I64(_) => SeriesDataType::I64,
PointType::F64(_) => SeriesDataType::F64,
}
}
fn series_type_from_byte(b: u8) -> SeriesDataType {
unsafe { ::std::mem::transmute(b) }
}
fn index_series_id_from_id(id: u64) -> Vec<u8> { fn index_series_id_from_id(id: u64) -> Vec<u8> {
let mut v = Vec::with_capacity(8 + 1); let mut v = Vec::with_capacity(8 + 1);
v.push(IndexEntryType::IDToSeriesKey as u8); v.push(IndexEntryType::IDToSeriesKey as u8);
@ -854,10 +894,6 @@ fn index_key_value_posting_list(bucket_id: u32, key: &str, value: &str) -> Vec<u
v v
} }
fn index_entry_type_from_byte(b: u8) -> IndexEntryType {
unsafe { ::std::mem::transmute(b) }
}
// next_series_id_key gives the key in the buckets CF in rocks that holds the value for the next series ID // next_series_id_key gives the key in the buckets CF in rocks that holds the value for the next series ID
fn next_series_id_key(org_id: u32, bucket_id: u32) -> Vec<u8> { fn next_series_id_key(org_id: u32, bucket_id: u32) -> Vec<u8> {
let mut v = Vec::with_capacity(9); let mut v = Vec::with_capacity(9);
@ -912,10 +948,14 @@ fn u32_to_bytes(val: u32) -> Vec<u8> {
v v
} }
fn u64_to_bytes(val: u64) -> Vec<u8> { fn i64_from_bytes(b: &[u8]) -> i64 {
let mut v = Vec::with_capacity(8); let mut c = Cursor::new(b);
v.write_u64::<BigEndian>(val).unwrap(); c.read_i64::<BigEndian>().unwrap()
v }
fn f64_from_bytes(b: &[u8]) -> f64 {
let mut c = Cursor::new(b);
c.read_f64::<BigEndian>().unwrap()
} }
impl Bucket { impl Bucket {
@ -941,11 +981,13 @@ fn key_for_series_and_time(bucket_id: u32, series_id: u64, timestamp: i64) -> Ve
v v
} }
// TODO: add series type to series filter
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct SeriesFilter { pub struct SeriesFilter {
pub id: u64, pub id: u64,
pub key: String, pub key: String,
pub value_predicate: Option<Predicate>, pub value_predicate: Option<Predicate>,
pub series_type: SeriesDataType,
} }
pub struct Range { pub struct Range {
@ -983,22 +1025,16 @@ mod tests {
use dotenv::dotenv; use dotenv::dotenv;
use std::env; use std::env;
use serde_json::error::Category::Data;
use rocksdb;
use crate::storage::predicate::parse_predicate; use crate::storage::predicate::parse_predicate;
use crate::storage::rocksdb::IndexEntryType::SeriesKeyToID;
use crate::line_parser::parse;
use crate::storage::iterators::ReadSeries;
use std::net::Shutdown::Read;
#[test] #[test]
fn create_and_get_buckets() { fn create_and_get_buckets() {
let mut bucket: Bucket; let bucket: Bucket;
let org_id = 1; let org_id = 1;
let mut bucket2 = Bucket::new(2, "Foo".to_string()); let mut bucket2 = Bucket::new(2, "Foo".to_string());
{ {
let mut db = test_database("create_and_get_buckets", true); let db = test_database("create_and_get_buckets", true);
let mut b = Bucket::new(org_id, "Foo".to_string()); let mut b = Bucket::new(org_id, "Foo".to_string());
b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap();
@ -1029,7 +1065,7 @@ mod tests {
// ensure it persists across database reload // ensure it persists across database reload
{ {
let mut db = test_database("create_and_get_buckets", false); let db = test_database("create_and_get_buckets", false);
let stored_bucket = db.get_bucket_by_name(org_id, &bucket.name).unwrap().unwrap(); let stored_bucket = db.get_bucket_by_name(org_id, &bucket.name).unwrap().unwrap();
assert_eq!(bucket, stored_bucket); assert_eq!(bucket, stored_bucket);
@ -1045,13 +1081,13 @@ mod tests {
let org_id = 23; let org_id = 23;
let mut b = Bucket::new(org_id, "series".to_string()); let mut b = Bucket::new(org_id, "series".to_string());
let mut b2 = Bucket::new(1, "series".to_string()); let mut b2 = Bucket::new(1, "series".to_string());
let p1 = Point{series: "one".to_string(), value: 1, time: 0}; let p1 = PointType::new_i64("one".to_string(), 1, 0);
let p2 = Point{series: "two".to_string(), value: 23, time: 40}; let p2 = PointType::new_i64("two".to_string(), 23, 40);
let p3 = Point{series: "three".to_string(), value: 33, time: 86}; let p3 = PointType::new_i64("three".to_string(), 33, 86);
let p4 = Point{series: "four".to_string(), value: 234, time: 100}; let p4 = PointType::new_i64("four".to_string(), 234, 100);
{ {
let mut db = test_database("series_id_indexing", true); let db = test_database("series_id_indexing", true);
b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap(); b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap();
b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap();
@ -1090,7 +1126,7 @@ mod tests {
// now make sure that a new series gets inserted properly after restart // now make sure that a new series gets inserted properly after restart
{ {
let mut db = test_database("series_id_indexing", false); let db = test_database("series_id_indexing", false);
// check the first org // check the first org
let mut series = vec![Series{id: None, point: p4.clone()}]; let mut series = vec![Series{id: None, point: p4.clone()}];
@ -1124,11 +1160,11 @@ mod tests {
#[test] #[test]
fn series_metadata_indexing() { fn series_metadata_indexing() {
let mut bucket = Bucket::new(1, "foo".to_string()); let mut bucket = Bucket::new(1, "foo".to_string());
let mut db = test_database("series_metadata_indexing", true); let db = test_database("series_metadata_indexing", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 0}; let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0);
let p2 = Point{series: "cpu,host=a,region=west\tusage_system".to_string(), value: 1, time: 0}; let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0);
let p3 = Point{series: "cpu,host=a,region=west\tusage_user".to_string(), value: 1, time: 0}; let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0);
let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 0}; let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0);
bucket.id = db.create_bucket_if_not_exists(bucket.org_id, &bucket).unwrap(); bucket.id = db.create_bucket_if_not_exists(bucket.org_id, &bucket).unwrap();
let mut series = db.get_series_ids(bucket.org_id, &bucket, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]); let mut series = db.get_series_ids(bucket.org_id, &bucket, vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()]);
@ -1147,42 +1183,42 @@ mod tests {
let pred = parse_predicate("_m = \"cpu\"").unwrap(); let pred = parse_predicate("_m = \"cpu\"").unwrap();
let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap();
assert_eq!(series, vec![ assert_eq!(series, vec![
SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
]); ]);
// get series with host = a // get series with host = a
let pred = parse_predicate("host = \"a\"").unwrap(); let pred = parse_predicate("host = \"a\"").unwrap();
let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap();
assert_eq!(series, vec![ assert_eq!(series, vec![
SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
]); ]);
// get series with measurement = cpu and host = b // get series with measurement = cpu and host = b
let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap(); let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap();
let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap();
assert_eq!(series, vec![ assert_eq!(series, vec![
SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
]); ]);
let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap(); let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap();
let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap(); let series = db.get_series_filters(&bucket, Some(&pred), &range).unwrap();
assert_eq!(series, vec![ assert_eq!(series, vec![
SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None}, SeriesFilter{id: 2, key: "cpu,host=a,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None}, SeriesFilter{id: 3, key: "cpu,host=a,region=west\tusage_user".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
SeriesFilter{id: 4, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}, SeriesFilter{id: 4, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64},
]); ]);
} }
#[test] #[test]
fn write_creates_bucket() { fn write_creates_bucket() {
let mut b1 = Bucket::new(1, "bucket1".to_string()); let b1 = Bucket::new(1, "bucket1".to_string());
let mut db = test_database("write_creates_bucket", true); let db = test_database("write_creates_bucket", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1);
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2}; let p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2);
db.write_points(b1.org_id, &b1.name, vec![p1, p2]).unwrap(); db.write_points(b1.org_id, &b1.name, vec![p1, p2]).unwrap();
assert_eq!(db.get_bucket_by_name(b1.org_id, &b1.name).unwrap().unwrap().id, 1); assert_eq!(db.get_bucket_by_name(b1.org_id, &b1.name).unwrap().unwrap().id, 1);
@ -1191,9 +1227,9 @@ mod tests {
#[test] #[test]
fn catch_rocksdb_iterator_segfault() { fn catch_rocksdb_iterator_segfault() {
let mut b1 = Bucket::new(1, "bucket1".to_string()); let mut b1 = Bucket::new(1, "bucket1".to_string());
let mut db = test_database("catch_rocksdb_iterator_segfault", true); let db = test_database("catch_rocksdb_iterator_segfault", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1);
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
@ -1204,9 +1240,9 @@ mod tests {
let pred = parse_predicate("_m = \"cpu\"").unwrap(); let pred = parse_predicate("_m = \"cpu\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
assert_eq!(iter.next(), None); assert_eq!(iter.next(), None);
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 1, value: 1}, ReadPoint{time: 1, value: 1},
@ -1218,12 +1254,12 @@ mod tests {
fn write_and_read_points() { fn write_and_read_points() {
let mut b1 = Bucket::new(1, "bucket1".to_string()); let mut b1 = Bucket::new(1, "bucket1".to_string());
let mut b2 = Bucket::new(2, "bucket2".to_string()); let mut b2 = Bucket::new(2, "bucket2".to_string());
let mut db = test_database("write_and_read_points", true); let db = test_database("write_and_read_points", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1}; let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 1);
let p2 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 2}; let p2 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 2);
let p3 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 2}; let p3 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 2);
let p4 = Point{series: "mem,host=b,region=west\tfree".to_string(), value: 1, time: 4}; let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 4);
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap(); b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap(); b2.id = db.create_bucket_if_not_exists(b2.org_id, &b2).unwrap();
@ -1236,9 +1272,9 @@ mod tests {
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
assert_eq!(iter.next(), None); assert_eq!(iter.next(), None);
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 1, value: 1}, ReadPoint{time: 1, value: 1},
@ -1250,8 +1286,8 @@ mod tests {
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 1, value: 1}, ReadPoint{time: 1, value: 1},
@ -1259,8 +1295,8 @@ mod tests {
]); ]);
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 2, value: 1}, ReadPoint{time: 2, value: 1},
@ -1271,9 +1307,9 @@ mod tests {
let pred = parse_predicate("host = \"b\"").unwrap(); let pred = parse_predicate("host = \"b\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 1).unwrap(); let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 1).unwrap();
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
assert_eq!(iter.next(), None); assert_eq!(iter.next(), None);
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 1, value: 1}, ReadPoint{time: 1, value: 1},
@ -1288,22 +1324,50 @@ mod tests {
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap(); let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap(); let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 2, value: 1}, ReadPoint{time: 2, value: 1},
]); ]);
let series_filter = iter.next().unwrap(); let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None}); assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None, series_type: SeriesDataType::I64});
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap(); let mut points_iter = new_i64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap(); let points = points_iter.next().unwrap();
assert_eq!(points, vec![ assert_eq!(points, vec![
ReadPoint{time: 2, value: 1}, ReadPoint{time: 2, value: 1},
]); ]);
} }
#[test]
fn write_and_read_float_values() {
let mut b1 = Bucket::new(1, "bucket1".to_string());
let db = test_database("write_and_read_float_values", true);
let p1 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 1.0, 1);
let p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2);
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
db.write_points(b1.org_id, &b1.name, vec![p1.clone(), p2.clone()]).unwrap();
// test that we'll only read from the bucket we wrote points into
let range = Range{start: 0, stop: 4};
let pred = parse_predicate("_m = \"cpu\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None, series_type: SeriesDataType::F64});
assert_eq!(iter.next(), None);
let mut points_iter = new_f64_points_iterator(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10);
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1.0},
ReadPoint{time: 2, value: 2.2},
]);
assert_eq!(points_iter.next(), None);
}
// Test helpers // Test helpers
fn get_test_storage_path() -> String { fn get_test_storage_path() -> String {
dotenv().ok(); dotenv().ok();

View File

@ -1,7 +1,5 @@
use crate::Error; use crate::Error;
use std::str::Chars;
use std::iter::Peekable;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
use std::time::SystemTime; use std::time::SystemTime;
@ -30,6 +28,7 @@ impl RelativeDuration {
} }
} }
// TODO: update this so that we're not indexing into the string. Should be iterating with chars
pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> { pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> {
if s.len() < 2 { if s.len() < 2 {
return Err(Error{description: "duration must have at least two characters".to_string()}) return Err(Error{description: "duration must have at least two characters".to_string()})