Add basic read endpoint

This commit adds a basic read endpoint to pull data out of the database. In order to provide the basic functionality a few things were added:

* Time package with limited support for parsing Flux style durations
* API endpoint at /api/v2/read with query paramters of org_id, bucket_name, predicate, start, and stop

The start and stop query parameters only support relative durations.

The predicate parameter supports what is possible in the parse_predicate method and in the RocksDB implementation (only == comparisons on tags and AND or OR)
pull/24376/head
Paul Dix 2020-01-04 19:07:54 -05:00
parent fe9cb87c3d
commit 1a851f8d0b
8 changed files with 469 additions and 75 deletions

47
Cargo.lock generated
View File

@ -414,6 +414,17 @@ dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bstr"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex-automata 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "byteorder"
version = "0.3.13"
@ -551,6 +562,26 @@ dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "csv"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bstr 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"csv-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "csv-core"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "delorean"
version = "0.1.0"
@ -560,6 +591,7 @@ dependencies = [
"byteorder 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"croaring 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"csv 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1042,6 +1074,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "memchr"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mime"
@ -1388,6 +1423,14 @@ dependencies = [
"thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex-automata"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex-syntax"
version = "0.5.6"
@ -1908,6 +1951,7 @@ dependencies = [
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum brotli-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd"
"checksum brotli2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0cb036c3eade309815c15ddbacec5b22c4d1f3983a774ab2eac2e3e9ea85568e"
"checksum bstr 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6c2c5b58ab920a4f5aeaaca34b4488074e8cc7596af94e6f8c6ff247c60245"
"checksum byteorder 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "29b2aa490a8f546381308d68fc79e6bd753cd3ad839f7a7172897f1feedfa175"
"checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5"
"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
@ -1925,6 +1969,8 @@ dependencies = [
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum croaring 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "814857132d83007167c34d05a0253020601db444e8a448101b230324c5a4244f"
"checksum croaring-sys 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3c83bbf2b2ab171c45ab6a52284851aedbc568901aa9dd5302fa073541dbdbf4"
"checksum csv 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "37519ccdfd73a75821cac9319d4fce15a81b9fcf75f951df5b9988aa3a0af87d"
"checksum csv-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9b5cadb6b25c77aeff80ba701712494213f4a8418fcda2ee11b6560c3ad0bf4c"
"checksum derive-error-chain 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3c9ca9ade651388daad7c993f005d0d20c4f6fe78c1cdc93e95f161c6f5ede4a"
"checksum derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2159be042979966de68315bce7034bb000c775f22e3e834e1c52ff78f041cae8"
"checksum dotenv 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d6f0e2bb24d163428d8031d3ebd2d2bd903ad933205a97d0f18c7c1aade380f3"
@ -2022,6 +2068,7 @@ dependencies = [
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
"checksum regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384"
"checksum regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc220bd33bdce8f093101afe22a037b8eb0e5af33592e6a9caafff0d4cb81cbd"
"checksum regex-automata 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "92b73c2a1770c255c240eaa4ee600df1704a38dc3feaa6e949e7fcd4f8dc09f9"
"checksum regex-syntax 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7"
"checksum regex-syntax 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "11a7e20d1cce64ef2fed88b66d347f88bd9babb82845b2b858f3edbf59a4f716"
"checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e"

View File

@ -22,6 +22,7 @@ futures = "0.3.1"
serde_json = "1.0.44"
serde = "1.0"
csv = "1.1"
rocksdb = "0.13"
byteorder = "0.3"

View File

@ -1,9 +1,40 @@
extern crate num_cpus;
use std::{error, fmt};
use actix_web::ResponseError;
use actix_web::http::StatusCode;
pub mod line_parser;
pub mod encoders;
pub mod storage;
pub mod time;
pub mod delorean {
include!(concat!(env!("OUT_DIR"), "/delorean.rs"));
}
}
// TODO: audit all errors and make ones that can differentiate between 400 and 500 and otehrs
#[derive(Debug, Clone, PartialEq)]
pub struct Error {
pub description: String,
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description)
}
}
impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// Generic error, underlying cause isn't tracked.
None
}
}
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}

View File

@ -1,6 +1,8 @@
use std::str::Chars;
use std::{error, fmt};
use std::fs::read;
use actix_web::ResponseError;
use actix_web::http::StatusCode;
#[derive(Debug, PartialEq, Clone)]
pub struct Point {
@ -10,47 +12,51 @@ pub struct Point {
}
impl Point {
// TODO: handle escapes in the line protocol for , = and \t
/// index_pairs parses the series key into key value pairs for insertion into the index. In
/// cases where this series is already in the database, this parse step can be skipped entirely.
/// The measurement is represented as a _m key and field as _f.
pub fn index_pairs(&self) -> Result<Vec<Pair>, ParseError> {
let mut chars = self.series.chars();
let mut pairs = vec![];
let mut key = "_m".to_string();
let mut value = String::with_capacity(250);
let mut reading_key = false;
index_pairs(&self.series)
}
}
while let Some(ch) = chars.next() {
match ch {
',' => {
reading_key = true;
pairs.push(Pair{key, value});
key = String::with_capacity(250);
value = String::with_capacity(250);
},
'=' => {
reading_key = false;
},
'\t' => {
reading_key = false;
pairs.push(Pair{key, value});
key = "_f".to_string();
value = String::with_capacity(250);
},
_ => {
if reading_key {
key.push(ch);
} else {
value.push(ch);
}
// TODO: handle escapes in the line protocol for , = and \t
/// index_pairs parses the series key into key value pairs for insertion into the index. In
/// cases where this series is already in the database, this parse step can be skipped entirely.
/// The measurement is represented as a _m key and field as _f.
pub fn index_pairs(key: &str) -> Result<Vec<Pair>, ParseError> {
let mut chars = key.chars();
let mut pairs = vec![];
let mut key = "_m".to_string();
let mut value = String::with_capacity(250);
let mut reading_key = false;
while let Some(ch) = chars.next() {
match ch {
',' => {
reading_key = true;
pairs.push(Pair{key, value});
key = String::with_capacity(250);
value = String::with_capacity(250);
},
'=' => {
reading_key = false;
},
'\t' => {
reading_key = false;
pairs.push(Pair{key, value});
key = "_f".to_string();
value = String::with_capacity(250);
},
_ => {
if reading_key {
key.push(ch);
} else {
value.push(ch);
}
}
}
pairs.push(Pair{key, value});
Ok(pairs)
}
pairs.push(Pair{key, value});
Ok(pairs)
}
#[derive(Debug, PartialEq)]
@ -77,6 +83,12 @@ impl error::Error for ParseError {
}
}
impl ResponseError for ParseError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
// TODO: have parse return an error for invalid inputs
pub fn parse(input: &str) -> Vec<Point> {
let mut points = Vec::with_capacity(10000);

View File

@ -1,16 +1,26 @@
use delorean::storage::rocksdb::Database;
use delorean::{line_parser, storage};
use delorean::storage::iterators::SeriesIterator;
use delorean::storage::rocksdb::{PointsIterator, SeriesFilter, Range};
use delorean::line_parser::{parse, index_pairs, Pair};
use delorean::storage::predicate::parse_predicate;
use delorean::time::{parse_duration, time_as_i64_nanos};
use std::{env, io, str};
use std::env::VarError;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::SystemTime;
use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error};
use actix_web::{App, middleware, HttpServer, web, HttpResponse, Error as AWError, guard, error, Responder};
use actix_web::web::Bytes;
use serde_json;
use serde::Deserialize;
use serde::ser::{Serialize, Serializer, SerializeStruct};
use actix_web::web::{BytesMut};
use futures::StreamExt;
use delorean::{line_parser, storage};
use std::env::VarError;
use futures::{self, StreamExt, Stream};
use failure::_core::time::Duration;
use csv::Writer;
struct Server {
db: Database,
@ -47,8 +57,160 @@ async fn write(mut payload: web::Payload, write_info: web::Query<WriteInfo>, s:
Ok(HttpResponse::Ok().json({}))
}
async fn series_match() -> Result<HttpResponse, AWError> {
Ok(HttpResponse::InternalServerError().json(serde_json::json!({"error": "not implemented"})))
#[derive(Deserialize, Debug)]
struct ReadInfo {
org_id: u32,
bucket_name: String,
predicate: String,
start: Option<String>,
stop: Option<String>,
}
//struct ReadResponseBody<'a> {
// series: SeriesIterator<'a>,
// current_points_iterator: PointsIterator<'a>,
//}
//
//impl Iterator for ReadResponseBody<'_> {
// type Item = Vec<u8>;
//
// fn next(&mut self) -> Option<Self::Item> {
// }
//}
//
//impl Stream for ReadResponseBody {
// type Item = Result<Bytes, AWError>;
//
// fn poll_next(
// &mut self,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Self::Item>> {
// if self.iters > 10 {
// Poll::Ready(None)
// } else {
// Poll::Ready(Some(Ok(Bytes::from_static("this is a line in the feed\n"))))
// }
// }
//}
//
//impl Stream for ReadResponseBody<'_> {
// fn poll_next(
// &mut self,
// cx: &mut Context
// ) -> Result<Async<Option<Self::Item>>, Self::Error> {
// if self.iters > 10_000_000 {
// Ok(Async::Ready(None))
// } else {
// Ok(Async::Ready(Some("this is a line in the feed\n".to_string())))
// }
// }
//}
//struct Record<T: Serialize> {
// pairs: Vec<Pair>,
// time: i64,
// value: T,
//}
//
//impl<T: Serialize> Serialize for Record<T> {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: Serializer,
// {
// let mut state = serializer.serialize_struct("Record", self.pairs.len() + 2)?;
// for p in &self.pairs {
// state.serialize_field(&p.key, &p.value)?;
// }
//
// state.serialize_field("_value", &self.value)?;
// state.serialize_field("_time", &self.time)?;
//
// state.end()
// }
//}
// TODO: write end to end test of read
// TODO: figure out how to stream read results out rather than rendering the whole thing in mem
async fn read(read_info: web::Query<ReadInfo>, s: web::Data<Arc<Server>>) -> Result<HttpResponse, AWError> {
let predicate = parse_predicate(&read_info.predicate)?;
let now = std::time::SystemTime::now();
let start = match &read_info.start {
Some(duration) => {
let d = parse_duration(duration)?;
d.from_time(now)?
}
None => {
// default to 10s in the past
now.checked_sub(Duration::from_secs(10)).unwrap()
}
};
let stop = match &read_info.stop {
Some(duration) => {
let d = parse_duration(duration)?;
d.from_time(now)?
},
None => now,
};
let start = time_as_i64_nanos(&start);
let stop = time_as_i64_nanos(&stop);
let range = Range{start, stop};
let mut series = s.db.read_range(read_info.org_id, &read_info.bucket_name, &range, &predicate, 10)?;
let bucket_id = series.bucket_id;
let db = &s.db;
let mut response_body = vec![];
for s in series {
let mut wtr = Writer::from_writer(vec![]);
let mut points = PointsIterator::new_from_series_filter(read_info.org_id, bucket_id, &db, &s, &range, 10)?;
let pairs = index_pairs(&s.key)?;
let mut cols = Vec::with_capacity(pairs.len() + 2);
let mut vals = Vec::with_capacity(pairs.len() + 2);
for p in &pairs {
cols.push(p.key.clone());
vals.push(p.value.clone());
}
let tcol = "_time".to_string();
let vcol = "_value".to_string();
cols.push(tcol.clone());
cols.push(vcol.clone());
vals.push(tcol);
vals.push(vcol);
let tcol = cols.len() - 2;
let vcol = cols.len() - 1;
wtr.write_record(&cols).unwrap();
for batch in points {
for p in batch {
let t = p.time.to_string();
let v = p.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
let mut data = match wtr.into_inner() {
Ok(d) => d,
Err(e) => return Ok(HttpResponse::InternalServerError().json(serde_json::json!({"error": format!("{}", e)}))),
};
response_body.append(&mut data);
response_body.append(&mut b"\n".to_vec());
}
Ok(HttpResponse::Ok().body(response_body))
}
async fn not_found() -> Result<HttpResponse, AWError> {
@ -76,13 +238,13 @@ async fn main() -> io::Result<()> {
.data(state.clone())
// enable logger
.wrap(middleware::Logger::default())
.service(web::resource("/api/v2/write")
.route(web::post().to(write))
)
.service(
web::scope("/api/v3")
.service(web::resource("/series_match")
.route(web::get().to(series_match))
web::scope("/api/v2")
.service(web::resource("/write")
.route(web::post().to(write))
)
.service(web::resource("/read")
.route(web::get().to(read))
)
)
// default

View File

@ -3,35 +3,25 @@ use crate::storage::rocksdb::{Database, SeriesFilter, StorageError, Range, Point
use rocksdb::{DB, IteratorMode, DBIterator};
pub struct SeriesIterator<'a> {
range: &'a Range,
batch_size: usize,
predicate: &'a Predicate,
org_id: u32,
bucket_id: u32,
pub struct SeriesIterator {
pub org_id: u32,
pub bucket_id: u32,
series_filters: Vec<SeriesFilter>,
next_filter: usize,
}
impl SeriesIterator<'_> {
pub fn new<'a>(range: &'a Range, batch_size: usize, predicate: &'a Predicate, org_id: u32, bucket_id: u32, series_filters: Vec<SeriesFilter>) -> SeriesIterator<'a> {
impl SeriesIterator {
pub fn new(org_id: u32, bucket_id: u32, series_filters: Vec<SeriesFilter>) -> SeriesIterator {
SeriesIterator{
range,
batch_size,
predicate,
org_id,
bucket_id,
next_filter: 0,
series_filters: series_filters,
}
}
pub fn points_iterator<'a>(&self, db: &'a Database, series_filter: &'a SeriesFilter) -> Result<PointsIterator<'a>, StorageError> {
db.get_db_points_iter(self.org_id, self.bucket_id, series_filter.id, self.range, self.batch_size)
}
}
impl Iterator for SeriesIterator<'_> {
impl Iterator for SeriesIterator {
type Item = SeriesFilter;
fn next(&mut self) -> Option<Self::Item> {

View File

@ -16,6 +16,8 @@ use prost::Message;
use futures::AsyncWriteExt;
use croaring::Treemap;
use croaring::treemap::NativeSerializer;
use actix_web::ResponseError;
use actix_web::http::StatusCode;
/// Database wraps a RocksDB database for storing the raw series data, an inverted index of the
/// metadata and the metadata about what buckets exist in the system.
@ -102,6 +104,7 @@ impl Database {
self.insert_series_without_ids(org_id, &bucket, &mut series);
let mut batch = WriteBatch::default();
for s in series {
let key = key_for_series_and_time(bucket.id, s.id.unwrap(), s.point.time);
let mut value = Vec::with_capacity(8);
@ -123,19 +126,18 @@ impl Database {
self.create_bucket_if_not_exists(org_id, &bucket)
}
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result<SeriesIterator<'a>, StorageError> {
pub fn read_range<'a>(&self, org_id: u32, bucket_name: &str, range: &'a Range, predicate: &'a Predicate, batch_size: usize) -> Result<SeriesIterator, StorageError> {
let bucket = match self.get_bucket_by_name(org_id, bucket_name).unwrap() {
Some(b) => b,
None => return Err(StorageError{description: format!("bucket {} not found", bucket_name)}),
};
let series_filters = self.get_series_filters(&bucket, Some(&predicate), range)?;
println!("filters: {:?}", series_filters);
Ok(SeriesIterator::new(range, batch_size, predicate, org_id, bucket.id, series_filters))
Ok(SeriesIterator::new(org_id, bucket.id, series_filters))
}
pub fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result<PointsIterator, StorageError> {
fn get_db_points_iter(&self, _org_id: u32, bucket_id: u32, series_id: u64, range: &Range, batch_size: usize) -> Result<PointsIterator, StorageError> {
let mut prefix = prefix_for_series(bucket_id, series_id, range.start);
let mode = IteratorMode::From(&prefix, Direction::Forward);
let mut iter = self.db.read().unwrap().iterator(mode);
@ -693,6 +695,10 @@ impl PointsIterator<'_> {
drained: false,
}
}
pub fn new_from_series_filter<'a>(org_id: u32, bucket_id: u32, db: &'a Database, series_filter: &'a SeriesFilter, range: &Range, batch_size: usize) -> Result<PointsIterator<'a>, StorageError> {
db.get_db_points_iter(org_id, bucket_id, series_filter.id, range, batch_size)
}
}
impl Iterator for PointsIterator<'_> {
@ -706,6 +712,14 @@ impl Iterator for PointsIterator<'_> {
let mut v = Vec::with_capacity(self.batch_size);
let mut n = 0;
// we have to check if the iterator is still valid. There are some edge cases where
// this function could get called with an invalid iterator because it has gone to
// the end of th rocksdb keyspace. Calling next on it segfaults the program, so check it first.
// Here's the issue: https://github.com/rust-rocksdb/rust-rocksdb/issues/361
if !self.iter.valid() {
self.drained = true;
return None;
}
while let Some((key, value)) = self.iter.next() {
if !key.starts_with(&self.series_prefix) {
self.drained = true;
@ -957,6 +971,12 @@ impl error::Error for StorageError {
}
}
impl ResponseError for StorageError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -1168,6 +1188,32 @@ mod tests {
assert_eq!(db.get_bucket_by_name(b1.org_id, &b1.name).unwrap().unwrap().id, 1);
}
#[test]
fn catch_rocksdb_iterator_segfault() {
let mut b1 = Bucket::new(1, "bucket1".to_string());
let mut db = test_database("catch_rocksdb_iterator_segfault", true);
let p1 = Point{series: "cpu,host=b,region=west\tusage_system".to_string(), value: 1, time: 1};
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
db.write_points(b1.org_id, &b1.name, vec![p1.clone()]).unwrap();
// test that we'll only read from the bucket we wrote points into
let range = Range{start: 1, stop: 4};
let pred = parse_predicate("_m = \"cpu\"").unwrap();
let mut iter = db.read_range(b1.org_id, &b1.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None});
assert_eq!(iter.next(), None);
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
]);
assert_eq!(points_iter.next(), None);
}
#[test]
fn write_and_read_points() {
let mut b1 = Bucket::new(1, "bucket1".to_string());
@ -1192,19 +1238,20 @@ mod tests {
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None});
assert_eq!(iter.next(), None);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
ReadPoint{time: 2, value: 1},
]);
assert_eq!(points_iter.next(), None);
// test that we'll read multiple series
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None});
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
@ -1213,7 +1260,7 @@ mod tests {
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None});
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
@ -1226,7 +1273,7 @@ mod tests {
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None});
assert_eq!(iter.next(), None);
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 1).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 1, value: 1},
@ -1242,7 +1289,7 @@ mod tests {
let mut iter = db.read_range(b2.org_id, &b2.name, &range, &pred, 10).unwrap();
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 1, key: "cpu,host=b,region=west\tusage_system".to_string(), value_predicate: None});
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},
@ -1250,7 +1297,7 @@ mod tests {
let series_filter = iter.next().unwrap();
assert_eq!(series_filter, SeriesFilter{id: 2, key: "mem,host=b,region=west\tfree".to_string(), value_predicate: None});
let mut points_iter = iter.points_iterator(&db, &series_filter).unwrap();
let mut points_iter = PointsIterator::new_from_series_filter(iter.org_id, iter.bucket_id, &db, &series_filter, &range, 10).unwrap();
let points = points_iter.next().unwrap();
assert_eq!(points, vec![
ReadPoint{time: 2, value: 1},

104
src/time.rs Normal file
View File

@ -0,0 +1,104 @@
use crate::Error;
use std::str::Chars;
use std::iter::Peekable;
use std::time::{Duration, UNIX_EPOCH};
use std::time::SystemTime;
// TODO: because we're using SystemTime as our base time object, we only support times after
// unix epoch. We should fix this so we can represent a wider range of dates & times.
#[derive(Debug, PartialEq)]
pub struct RelativeDuration {
pub subtract: bool,
pub duration: Duration,
}
impl RelativeDuration {
pub fn from_time(&self, t: std::time::SystemTime) -> Result<std::time::SystemTime, Error> {
if self.subtract {
match t.checked_sub(self.duration) {
Some(t) => Ok(t),
None => Err(Error{description: "unable to subtract duration from time".to_string()}),
}
} else {
match t.checked_add(self.duration) {
Some(t) => Ok(t),
None => Err(Error{description: "unable to add duration from time".to_string()}),
}
}
}
}
pub fn parse_duration(s: &str) -> Result<RelativeDuration, Error> {
if s.len() < 2 {
return Err(Error{description: "duration must have at least two characters".to_string()})
}
let i;
let mut start = 0;
if s.starts_with("-") {
start = 1;
}
match s[start..].chars().position(|c| !c.is_digit(10)) {
Some(p) => i = p + start,
None => return Err(Error{description: "duration must end with a valid unit like s, m, ms, us, ns".to_string()}),
}
if i == 0 {
return Err(Error{description: format!("unable to parse duration {} because of invalid first character", s)})
}
let magnitude = match s[start..i].parse::<u64>() {
Ok(n) => n,
Err(e) => return Err(Error{description: e.to_string()}),
};
let duration = match &s[i..] {
"s" => Duration::from_secs(magnitude),
"m" => Duration::from_secs(magnitude * 60),
unknown => return Err(Error{description: format!("unhandled duration '{}'", unknown)}),
};
Ok(RelativeDuration{subtract: start == 1, duration})
}
pub fn time_as_i64_nanos(t: &SystemTime) -> i64 {
let d = t.duration_since(UNIX_EPOCH).expect("unable to support times before 1970-01-01T00:00:00");
let s = d.as_secs() as i64;
s * 1_000_000_000 + d.subsec_nanos() as i64
}
#[cfg(test)]
mod tests {
use super::*;
# [test]
fn parse_durations() {
assert_eq!(
parse_duration("1m").unwrap(),
RelativeDuration{subtract: false, duration: Duration::from_secs(60)}
);
assert_eq!(
parse_duration("-20s").unwrap(),
RelativeDuration{subtract: true, duration: Duration::from_secs(20)}
);
assert_eq!(
parse_duration("10d"),
Err(Error{description: "unhandled duration 'd'".to_string()}),
);
assert_eq!(
parse_duration("a23"),
Err(Error{description: "unable to parse duration a23 because of invalid first character".to_string()})
);
assert_eq!(
parse_duration("3"),
Err(Error{description: "duration must have at least two characters".to_string()})
);
}
}