refactor: merge master in
commit
38f23ac07a
|
@ -58,7 +58,7 @@ jobs:
|
|||
- install_clang
|
||||
- run:
|
||||
name: Clippy
|
||||
command: cargo clippy --all -j9
|
||||
command: cargo clippy --all-targets -j9 -- -D warnings
|
||||
test:
|
||||
docker:
|
||||
- image: circleci/rust:latest
|
||||
|
|
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
|
@ -7,17 +7,16 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4.12"
|
||||
rand = "0.7.2"
|
||||
bytes = "0.5.4"
|
||||
integer-encoding = "1.0.7"
|
||||
|
||||
# actix 2.0 should release around 12/20
|
||||
actix-rt = "1.0"
|
||||
actix-web = "2.0"
|
||||
hyper = "0.13"
|
||||
tokio = { version = "0.2", features = ["full"] }
|
||||
|
||||
dotenv = "0.10"
|
||||
dotenv = "0.15.0"
|
||||
dirs = "2.0.2"
|
||||
env_logger = "0.6"
|
||||
env_logger = "0.7.1"
|
||||
log = "0.4.8"
|
||||
failure = "0.1.1"
|
||||
futures = "0.3.1"
|
||||
|
||||
|
@ -25,28 +24,33 @@ serde_json = "1.0.44"
|
|||
serde = "1.0"
|
||||
csv = "1.1"
|
||||
rocksdb = "0.13"
|
||||
byteorder = "0.3"
|
||||
byteorder = "1.3.4"
|
||||
|
||||
num_cpus = "1.11.1"
|
||||
|
||||
# waiting for tonic to bump up to use newest tokio
|
||||
#tonic = "0.1.0-alpha.6"
|
||||
prost = "0.5.0"
|
||||
tonic = "0.1.1"
|
||||
prost = "0.6.1"
|
||||
prost-types = "0.6.1"
|
||||
|
||||
# using croaring here because we needed Treemap support for u64
|
||||
# for series ids and we needed serialization, which the pure Rust
|
||||
# crate didn't offer.
|
||||
croaring = "0.4.2"
|
||||
http = "0.2.0"
|
||||
serde_urlencoded = "0.6.1"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
reqwest = { version = "0.10.1", features = ["blocking"] }
|
||||
assert_cmd = "0.12.0"
|
||||
rand = "0.7.2"
|
||||
|
||||
[[bench]]
|
||||
name = "encoders"
|
||||
harness = false
|
||||
|
||||
[build-dependencies]
|
||||
#tonic-build = "0.1.0-alpha.6"
|
||||
tonic-build = "0.1.1"
|
||||
prost-build = { version = "0.5.0" }
|
||||
rand = "0.7.2"
|
||||
|
|
10
build.rs
10
build.rs
|
@ -1,7 +1,7 @@
|
|||
//use tonic_build;
|
||||
use prost_build;
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
fn main() {
|
||||
// tonic_build::compile_protos("proto/delorean/delorean.proto")?;
|
||||
prost_build::compile_protos(&["proto/delorean/delorean.proto"], &["proto/delorean/"]).unwrap();
|
||||
fn main() -> Result<()> {
|
||||
tonic_build::compile_protos("proto/delorean/delorean.proto")?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
syntax = "proto3";
|
||||
package delorean;
|
||||
|
||||
import "google/protobuf/empty.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
|
||||
// TODO: how should requests handle authentication & authorization?
|
||||
|
||||
message CreateBucketRequest {
|
||||
|
@ -145,4 +148,181 @@ Considerations:
|
|||
configure when to roll to deeper prefix (size/file count based)
|
||||
|
||||
on boot it should load up its S3 file index and keep in memory
|
||||
*/
|
||||
*/
|
||||
|
||||
// From https://github.com/influxdata/influxdb/blob/master/storage/reads/datatypes/storage_common.proto
|
||||
|
||||
service Storage {
|
||||
// ReadFilter performs a filter operation at storage
|
||||
rpc ReadFilter (ReadFilterRequest) returns (stream ReadResponse);
|
||||
|
||||
// ReadGroup performs a group operation at storage
|
||||
rpc ReadGroup (ReadGroupRequest) returns (stream ReadResponse);
|
||||
|
||||
// TagKeys performs a read operation for tag keys
|
||||
rpc TagKeys (TagKeysRequest) returns (stream StringValuesResponse);
|
||||
|
||||
// TagValues performs a read operation for tag values
|
||||
rpc TagValues (TagValuesRequest) returns (stream StringValuesResponse);
|
||||
|
||||
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
|
||||
rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse);
|
||||
}
|
||||
|
||||
message ReadFilterRequest {
|
||||
google.protobuf.Any read_source = 1;
|
||||
TimestampRange range = 2;
|
||||
Predicate predicate = 3;
|
||||
}
|
||||
|
||||
message ReadGroupRequest {
|
||||
google.protobuf.Any read_source = 1;
|
||||
TimestampRange range = 2;
|
||||
Predicate predicate = 3;
|
||||
|
||||
enum Group {
|
||||
// GroupNone returns all series as a single group.
|
||||
// The single GroupFrame.TagKeys will be the union of all tag keys.
|
||||
GROUP_NONE = 0;
|
||||
|
||||
// GroupBy returns a group for each unique value of the specified GroupKeys.
|
||||
GROUP_BY = 2;
|
||||
}
|
||||
|
||||
// GroupKeys specifies a list of tag keys used to order the data.
|
||||
// It is dependent on the Group property to determine its behavior.
|
||||
repeated string group_keys = 4;
|
||||
|
||||
Group group = 5;
|
||||
Aggregate aggregate = 6;
|
||||
}
|
||||
|
||||
message Aggregate {
|
||||
enum AggregateType {
|
||||
NONE = 0;
|
||||
SUM = 1;
|
||||
COUNT = 2;
|
||||
}
|
||||
|
||||
AggregateType type = 1;
|
||||
|
||||
// additional arguments?
|
||||
}
|
||||
|
||||
message Tag {
|
||||
bytes key = 1;
|
||||
bytes value = 2;
|
||||
}
|
||||
|
||||
// Response message for ReadFilter and ReadGroup
|
||||
message ReadResponse {
|
||||
enum FrameType {
|
||||
SERIES = 0;
|
||||
POINTS = 1;
|
||||
}
|
||||
|
||||
enum DataType {
|
||||
FLOAT = 0;
|
||||
INTEGER = 1;
|
||||
UNSIGNED = 2;
|
||||
BOOLEAN = 3;
|
||||
STRING = 4;
|
||||
}
|
||||
|
||||
message Frame {
|
||||
oneof data {
|
||||
GroupFrame group = 7;
|
||||
SeriesFrame series = 1;
|
||||
FloatPointsFrame float_points = 2;
|
||||
IntegerPointsFrame integer_points = 3;
|
||||
UnsignedPointsFrame unsigned_points = 4;
|
||||
BooleanPointsFrame boolean_points = 5;
|
||||
StringPointsFrame string_points = 6;
|
||||
}
|
||||
}
|
||||
|
||||
message GroupFrame {
|
||||
// TagKeys
|
||||
repeated bytes tag_keys = 1;
|
||||
// PartitionKeyVals is the values of the partition key for this group, order matching ReadGroupRequest.GroupKeys
|
||||
repeated bytes partition_key_vals = 2;
|
||||
}
|
||||
|
||||
message SeriesFrame {
|
||||
repeated Tag tags = 1;
|
||||
DataType data_type = 2;
|
||||
}
|
||||
|
||||
message FloatPointsFrame {
|
||||
repeated sfixed64 timestamps = 1;
|
||||
repeated double values = 2;
|
||||
}
|
||||
|
||||
message IntegerPointsFrame {
|
||||
repeated sfixed64 timestamps = 1;
|
||||
repeated int64 values = 2;
|
||||
}
|
||||
|
||||
message UnsignedPointsFrame {
|
||||
repeated sfixed64 timestamps = 1;
|
||||
repeated uint64 values = 2;
|
||||
}
|
||||
|
||||
message BooleanPointsFrame {
|
||||
repeated sfixed64 timestamps = 1;
|
||||
repeated bool values = 2;
|
||||
}
|
||||
|
||||
message StringPointsFrame {
|
||||
repeated sfixed64 timestamps = 1;
|
||||
repeated string values = 2;
|
||||
}
|
||||
|
||||
repeated Frame frames = 1;
|
||||
}
|
||||
|
||||
message CapabilitiesResponse {
|
||||
map<string, string> caps = 1;
|
||||
}
|
||||
|
||||
// Specifies a continuous range of nanosecond timestamps.
|
||||
message TimestampRange {
|
||||
// Start defines the inclusive lower bound.
|
||||
int64 start = 1;
|
||||
|
||||
// End defines the exclusive upper bound.
|
||||
int64 end = 2;
|
||||
}
|
||||
|
||||
// TagKeysRequest is the request message for Storage.TagKeys.
|
||||
message TagKeysRequest {
|
||||
google.protobuf.Any tags_source = 1;
|
||||
TimestampRange range = 2;
|
||||
Predicate predicate = 3;
|
||||
}
|
||||
|
||||
// TagValuesRequest is the request message for Storage.TagValues.
|
||||
message TagValuesRequest {
|
||||
google.protobuf.Any tags_source = 1;
|
||||
TimestampRange range = 2;
|
||||
Predicate predicate = 3;
|
||||
string tag_key = 4;
|
||||
}
|
||||
|
||||
// Response message for Storage.TagKeys and Storage.TagValues.
|
||||
message StringValuesResponse {
|
||||
repeated bytes values = 1;
|
||||
}
|
||||
|
||||
// From https://github.com/influxdata/idpe/blob/master/storage/read/source.proto
|
||||
|
||||
message ReadSource {
|
||||
// OrgID specifies the organization identifier for this request.
|
||||
uint64 org_id = 1;
|
||||
|
||||
// BucketID specifies the bucket in the organization.
|
||||
uint64 bucket_id = 2;
|
||||
|
||||
// PartitionID specifies the partition to be queried.
|
||||
uint64 partition_id = 3;
|
||||
}
|
||||
|
|
|
@ -488,7 +488,10 @@ pub fn decode(src: &[u8], dst: &mut Vec<f64>) -> Result<(), Box<dyn Error>> {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unreadable_literal)]
|
||||
#[allow(clippy::excessive_precision)] // TODO: Audit test values for truncation
|
||||
mod tests {
|
||||
use crate::tests::approximately_equal;
|
||||
|
||||
#[test]
|
||||
fn encode_no_values() {
|
||||
|
@ -532,7 +535,11 @@ mod tests {
|
|||
assert_eq!(got.len(), src.len());
|
||||
|
||||
for (i, v) in got.iter().enumerate() {
|
||||
assert_eq!(src[i].to_bits(), v.to_bits());
|
||||
if v.is_nan() || v.is_infinite() {
|
||||
assert_eq!(src[i].to_bits(), v.to_bits());
|
||||
} else {
|
||||
assert!(approximately_equal(src[i], *v));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error
|
|||
dst.reserve_exact(cap - dst.capacity());
|
||||
}
|
||||
dst.push((Encoding::Uncompressed as u8) << 4);
|
||||
for delta in deltas.iter() {
|
||||
for delta in &deltas {
|
||||
dst.extend_from_slice(&delta.to_be_bytes());
|
||||
}
|
||||
return Ok(());
|
||||
|
@ -201,7 +201,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
|
|||
simple8b::decode(&src[8..], &mut res);
|
||||
// TODO(edd): fix this. It's copying, which is slowwwwwwwww.
|
||||
let mut next = dst[0];
|
||||
for v in res.iter() {
|
||||
for v in &res {
|
||||
next += zig_zag_decode(*v);
|
||||
dst.push(next);
|
||||
}
|
||||
|
@ -209,6 +209,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unreadable_literal)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
|
|
@ -212,6 +212,7 @@ fn decode_value(v: u64, dst: &mut [u64]) -> usize {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unreadable_literal)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rand::rngs::StdRng;
|
||||
|
@ -255,13 +256,11 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_encode_too_big() {
|
||||
let src = vec![7, 6, 2 << 61 - 1, 4, 3, 2, 1];
|
||||
let src = vec![7, 6, 2 << (61 - 1), 4, 3, 2, 1];
|
||||
|
||||
let mut encoded = vec![];
|
||||
match encode(&src, &mut encoded) {
|
||||
Ok(_) => assert!(false), // TODO(edd): fix this silly assertion
|
||||
Err(_) => (),
|
||||
}
|
||||
let result = encode(&src, &mut encoded);
|
||||
assert_eq!(result.unwrap_err().to_string(), "value out of bounds");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -57,7 +57,7 @@ pub fn encode<'a>(src: &[i64], dst: &'a mut Vec<u8>) -> Result<(), Box<dyn Error
|
|||
}
|
||||
|
||||
dst.push((Encoding::Uncompressed as u8) << 4);
|
||||
for delta in deltas.iter() {
|
||||
for delta in &deltas {
|
||||
dst.extend_from_slice(&delta.to_be_bytes());
|
||||
}
|
||||
return Ok(());
|
||||
|
@ -238,7 +238,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
|
|||
let mut next = dst[dst.len() - 1];
|
||||
if scaler > 1 {
|
||||
// TODO(edd): fix this. It's copying, which is slowwwwwwwww.
|
||||
for v in res.iter() {
|
||||
for v in &res {
|
||||
next += (v * scaler) as i64;
|
||||
dst.push(next);
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
|
|||
}
|
||||
|
||||
// TODO(edd): fix this. It's copying, which is slowwwwwwwww.
|
||||
for v in res.iter() {
|
||||
for v in &res {
|
||||
next += *v as i64;
|
||||
dst.push(next);
|
||||
}
|
||||
|
@ -254,6 +254,7 @@ fn decode_simple8b(src: &[u8], dst: &mut Vec<i64>) -> Result<(), Box<dyn Error>>
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unreadable_literal)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
|
16
src/lib.rs
16
src/lib.rs
|
@ -1,7 +1,6 @@
|
|||
#![deny(rust_2018_idioms)]
|
||||
#![warn(clippy::explicit_iter_loop)]
|
||||
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::ResponseError;
|
||||
use std::{error, fmt};
|
||||
|
||||
pub mod encoders;
|
||||
|
@ -13,7 +12,7 @@ pub mod delorean {
|
|||
include!(concat!(env!("OUT_DIR"), "/delorean.rs"));
|
||||
}
|
||||
|
||||
// TODO: audit all errors and make ones that can differentiate between 400 and 500 and otehrs
|
||||
// TODO: audit all errors and their handling in main
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Error {
|
||||
|
@ -33,8 +32,13 @@ impl error::Error for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl ResponseError for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::BAD_REQUEST
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use std::f64;
|
||||
|
||||
/// A test helper function for asserting floating point numbers are within the machine epsilon
|
||||
/// because strict comparison of floating point numbers is incorrect
|
||||
pub fn approximately_equal(f1: f64, f2: f64) -> bool {
|
||||
(f1 - f2).abs() < f64::EPSILON
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use actix_web::http::StatusCode;
|
||||
use actix_web::ResponseError;
|
||||
use std::str::Chars;
|
||||
use std::{error, fmt};
|
||||
|
||||
|
@ -165,12 +163,6 @@ impl error::Error for ParseError {
|
|||
}
|
||||
}
|
||||
|
||||
impl ResponseError for ParseError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::BAD_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: have parse return an error for invalid inputs
|
||||
pub fn parse(input: &str) -> Vec<PointType> {
|
||||
let mut points: Vec<PointType> = Vec::with_capacity(10000);
|
||||
|
@ -267,6 +259,7 @@ fn read_value(
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::tests::approximately_equal;
|
||||
|
||||
#[test]
|
||||
fn parse_single_field() {
|
||||
|
@ -281,13 +274,13 @@ mod test {
|
|||
let vals = parse(input);
|
||||
assert_eq!(vals[0].series(), "foo\tasdf");
|
||||
assert_eq!(vals[0].time(), 546);
|
||||
assert_eq!(vals[0].f64_value().unwrap(), 44.0);
|
||||
assert!(approximately_equal(vals[0].f64_value().unwrap(), 44.0));
|
||||
|
||||
let input = "foo asdf=3.14 123";
|
||||
let input = "foo asdf=3.74 123";
|
||||
let vals = parse(input);
|
||||
assert_eq!(vals[0].series(), "foo\tasdf");
|
||||
assert_eq!(vals[0].time(), 123);
|
||||
assert_eq!(vals[0].f64_value().unwrap(), 3.14);
|
||||
assert!(approximately_equal(vals[0].f64_value().unwrap(), 3.74));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -308,11 +301,11 @@ mod test {
|
|||
let vals = parse(input);
|
||||
assert_eq!(vals[0].series(), "foo\tasdf");
|
||||
assert_eq!(vals[0].time(), 1234);
|
||||
assert_eq!(vals[0].f64_value().unwrap(), 23.1);
|
||||
assert!(approximately_equal(vals[0].f64_value().unwrap(), 23.1));
|
||||
|
||||
assert_eq!(vals[1].series(), "foo\tbar");
|
||||
assert_eq!(vals[1].time(), 1234);
|
||||
assert_eq!(vals[1].f64_value().unwrap(), 5.0);
|
||||
assert!(approximately_equal(vals[1].f64_value().unwrap(), 5.0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -322,7 +315,7 @@ mod test {
|
|||
let vals = parse(input);
|
||||
assert_eq!(vals[0].series(), "foo\tasdf");
|
||||
assert_eq!(vals[0].time(), 1234);
|
||||
assert_eq!(vals[0].f64_value().unwrap(), 23.1);
|
||||
assert!(approximately_equal(vals[0].f64_value().unwrap(), 23.1));
|
||||
|
||||
assert_eq!(vals[1].series(), "foo\tbar");
|
||||
assert_eq!(vals[1].time(), 1234);
|
377
src/main.rs
377
src/main.rs
|
@ -1,26 +1,38 @@
|
|||
#![deny(rust_2018_idioms)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use delorean::delorean::Bucket;
|
||||
use delorean::delorean::{
|
||||
delorean_server::DeloreanServer, storage_server::StorageServer, TimestampRange,
|
||||
};
|
||||
use delorean::line_parser;
|
||||
use delorean::line_parser::index_pairs;
|
||||
use delorean::storage::database::Database;
|
||||
use delorean::storage::predicate::parse_predicate;
|
||||
use delorean::storage::{Range, SeriesDataType};
|
||||
use delorean::storage::SeriesDataType;
|
||||
use delorean::time::{parse_duration, time_as_i64_nanos};
|
||||
|
||||
use std::env::VarError;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::{env, io, str};
|
||||
use std::{env, fmt, str};
|
||||
|
||||
use actix_web::web::BytesMut;
|
||||
use actix_web::{error, guard, middleware, web, App, Error as AWError, HttpResponse, HttpServer};
|
||||
use bytes::BytesMut;
|
||||
use csv::Writer;
|
||||
use failure::_core::time::Duration;
|
||||
use futures::{self, StreamExt};
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Server, StatusCode};
|
||||
use serde::Deserialize;
|
||||
use serde_json;
|
||||
|
||||
struct Server {
|
||||
mod rpc;
|
||||
|
||||
use crate::rpc::GrpcServer;
|
||||
|
||||
pub struct App {
|
||||
db: Database,
|
||||
}
|
||||
|
||||
|
@ -32,15 +44,17 @@ struct WriteInfo {
|
|||
bucket_name: String,
|
||||
}
|
||||
|
||||
async fn write(
|
||||
mut payload: web::Payload,
|
||||
write_info: web::Query<WriteInfo>,
|
||||
s: web::Data<Arc<Server>>,
|
||||
) -> Result<HttpResponse, AWError> {
|
||||
let bucket = match s
|
||||
async fn write(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, ApplicationError> {
|
||||
let query = req.uri().query().ok_or(StatusCode::BAD_REQUEST)?;
|
||||
let write_info: WriteInfo =
|
||||
serde_urlencoded::from_str(query).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let maybe_bucket = app
|
||||
.db
|
||||
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)?
|
||||
{
|
||||
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let bucket = match maybe_bucket {
|
||||
Some(b) => b,
|
||||
None => {
|
||||
// create this as the default bucket
|
||||
|
@ -53,18 +67,27 @@ async fn write(
|
|||
index_levels: vec![],
|
||||
};
|
||||
|
||||
let _ = s.db.create_bucket_if_not_exists(write_info.org_id, &b)?;
|
||||
s.db.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)?
|
||||
.unwrap()
|
||||
app.db
|
||||
.create_bucket_if_not_exists(write_info.org_id, &b)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
app.db
|
||||
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||
.expect("Bucket should have just been created")
|
||||
}
|
||||
};
|
||||
|
||||
let mut payload = req.into_body();
|
||||
|
||||
let mut body = BytesMut::new();
|
||||
while let Some(chunk) = payload.next().await {
|
||||
let chunk = chunk?;
|
||||
let chunk = chunk.expect("Should have been able to read the next chunk");
|
||||
// limit max size of in-memory payload
|
||||
if (body.len() + chunk.len()) > MAX_SIZE {
|
||||
return Err(error::ErrorBadRequest("overflow"));
|
||||
return Err(ApplicationError::new(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Body exceeds limit of 1MB",
|
||||
));
|
||||
}
|
||||
body.extend_from_slice(&chunk);
|
||||
}
|
||||
|
@ -73,12 +96,11 @@ async fn write(
|
|||
|
||||
let mut points = line_parser::parse(body);
|
||||
|
||||
if let Err(err) = s.db.write_points(write_info.org_id, &bucket, &mut points) {
|
||||
return Ok(HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({ "error": format!("{}", err) })));
|
||||
}
|
||||
app.db
|
||||
.write_points(write_info.org_id, &bucket, &mut points)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(()))
|
||||
Ok(serde_json::json!(()).to_string().into())
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
|
@ -90,125 +112,76 @@ struct ReadInfo {
|
|||
stop: Option<String>,
|
||||
}
|
||||
|
||||
//struct ReadResponseBody<'a> {
|
||||
// series: SeriesIterator<'a>,
|
||||
// current_points_iterator: PointsIterator<'a>,
|
||||
//}
|
||||
//
|
||||
//impl Iterator for ReadResponseBody<'_> {
|
||||
// type Item = Vec<u8>;
|
||||
//
|
||||
// fn next(&mut self) -> Option<Self::Item> {
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//impl Stream for ReadResponseBody {
|
||||
// type Item = Result<Bytes, AWError>;
|
||||
//
|
||||
// fn poll_next(
|
||||
// &mut self,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Option<Self::Item>> {
|
||||
// if self.iters > 10 {
|
||||
// Poll::Ready(None)
|
||||
// } else {
|
||||
// Poll::Ready(Some(Ok(Bytes::from_static("this is a line in the feed\n"))))
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//impl Stream for ReadResponseBody<'_> {
|
||||
// fn poll_next(
|
||||
// &mut self,
|
||||
// cx: &mut Context
|
||||
// ) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
||||
// if self.iters > 10_000_000 {
|
||||
// Ok(Async::Ready(None))
|
||||
// } else {
|
||||
// Ok(Async::Ready(Some("this is a line in the feed\n".to_string())))
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
//struct Record<T: Serialize> {
|
||||
// pairs: Vec<Pair>,
|
||||
// time: i64,
|
||||
// value: T,
|
||||
//}
|
||||
//
|
||||
//impl<T: Serialize> Serialize for Record<T> {
|
||||
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
// where
|
||||
// S: Serializer,
|
||||
// {
|
||||
// let mut state = serializer.serialize_struct("Record", self.pairs.len() + 2)?;
|
||||
// for p in &self.pairs {
|
||||
// state.serialize_field(&p.key, &p.value)?;
|
||||
// }
|
||||
//
|
||||
// state.serialize_field("_value", &self.value)?;
|
||||
// state.serialize_field("_time", &self.time)?;
|
||||
//
|
||||
// state.end()
|
||||
// }
|
||||
//}
|
||||
// TODO: Move this to src/time.rs, better error surfacing
|
||||
fn duration_to_nanos_or_default(
|
||||
duration_param: Option<&str>,
|
||||
now: std::time::SystemTime,
|
||||
default: std::time::SystemTime,
|
||||
) -> Result<i64, delorean::Error> {
|
||||
let time = match duration_param {
|
||||
Some(duration) => {
|
||||
let d = parse_duration(duration)?;
|
||||
d.from_time(now)?
|
||||
}
|
||||
None => default,
|
||||
};
|
||||
Ok(time_as_i64_nanos(&time))
|
||||
}
|
||||
|
||||
// TODO: figure out how to stream read results out rather than rendering the whole thing in mem
|
||||
async fn read(
|
||||
read_info: web::Query<ReadInfo>,
|
||||
s: web::Data<Arc<Server>>,
|
||||
) -> Result<HttpResponse, AWError> {
|
||||
let predicate = parse_predicate(&read_info.predicate)?;
|
||||
async fn read(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, ApplicationError> {
|
||||
let query = req
|
||||
.uri()
|
||||
.query()
|
||||
.expect("Should have been query parameters");
|
||||
let read_info: ReadInfo =
|
||||
serde_urlencoded::from_str(query).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let predicate = parse_predicate(&read_info.predicate).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let now = std::time::SystemTime::now();
|
||||
|
||||
let start = match &read_info.start {
|
||||
Some(duration) => {
|
||||
let d = parse_duration(duration)?;
|
||||
d.from_time(now)?
|
||||
}
|
||||
None => {
|
||||
// default to 10s in the past
|
||||
now.checked_sub(Duration::from_secs(10)).unwrap()
|
||||
}
|
||||
};
|
||||
let start = duration_to_nanos_or_default(
|
||||
read_info.start.as_deref(),
|
||||
now,
|
||||
now.checked_sub(Duration::from_secs(10)).unwrap(),
|
||||
)
|
||||
.map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let stop = match &read_info.stop {
|
||||
Some(duration) => {
|
||||
let d = parse_duration(duration)?;
|
||||
d.from_time(now)?
|
||||
}
|
||||
None => now,
|
||||
};
|
||||
let end = duration_to_nanos_or_default(read_info.stop.as_deref(), now, now)
|
||||
.map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let start = time_as_i64_nanos(&start);
|
||||
let stop = time_as_i64_nanos(&stop);
|
||||
let range = TimestampRange { start, end };
|
||||
|
||||
let range = Range { start, stop };
|
||||
|
||||
let bucket = match s
|
||||
let maybe_bucket = app
|
||||
.db
|
||||
.get_bucket_by_name(read_info.org_id, &read_info.bucket_name)?
|
||||
{
|
||||
.get_bucket_by_name(read_info.org_id, &read_info.bucket_name)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let bucket = match maybe_bucket {
|
||||
Some(b) => b,
|
||||
None => {
|
||||
return Ok(HttpResponse::NotFound().json(serde_json::json!({
|
||||
"error": format!("bucket {} not found", read_info.bucket_name)
|
||||
})))
|
||||
return Err(ApplicationError::new(
|
||||
StatusCode::NOT_FOUND,
|
||||
&format!("bucket {} not found", read_info.bucket_name),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let series =
|
||||
s.db.read_series_matching_predicate_and_range(&bucket, Some(&predicate), Some(&range))?;
|
||||
let series = app
|
||||
.db
|
||||
.read_series_matching_predicate_and_range(&bucket, Some(&predicate), Some(&range))
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let db = &s.db;
|
||||
let db = &app.db;
|
||||
|
||||
let mut response_body = vec![];
|
||||
|
||||
for s in series {
|
||||
let mut wtr = Writer::from_writer(vec![]);
|
||||
|
||||
let pairs = index_pairs(&s.key)?;
|
||||
let pairs = index_pairs(&s.key).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let mut cols = Vec::with_capacity(pairs.len() + 2);
|
||||
let mut vals = Vec::with_capacity(pairs.len() + 2);
|
||||
|
||||
|
@ -230,7 +203,9 @@ async fn read(
|
|||
|
||||
match s.series_type {
|
||||
SeriesDataType::I64 => {
|
||||
let points = db.read_i64_range(&bucket, &s, &range, 10)?;
|
||||
let points = db
|
||||
.read_i64_range(&bucket, &s, &range, 10)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
for batch in points {
|
||||
for p in batch {
|
||||
|
@ -244,7 +219,9 @@ async fn read(
|
|||
}
|
||||
}
|
||||
SeriesDataType::F64 => {
|
||||
let points = db.read_f64_range(&bucket, &s, &range, 10)?;
|
||||
let points = db
|
||||
.read_f64_range(&bucket, &s, &range, 10)
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
for batch in points {
|
||||
for p in batch {
|
||||
|
@ -259,29 +236,87 @@ async fn read(
|
|||
}
|
||||
};
|
||||
|
||||
let mut data = match wtr.into_inner() {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
return Ok(HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({ "error": format!("{}", e) })))
|
||||
}
|
||||
};
|
||||
let mut data = wtr
|
||||
.into_inner()
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
response_body.append(&mut data);
|
||||
response_body.append(&mut b"\n".to_vec());
|
||||
}
|
||||
|
||||
Ok(HttpResponse::Ok().body(response_body))
|
||||
Ok(response_body.into())
|
||||
}
|
||||
|
||||
async fn not_found() -> Result<HttpResponse, AWError> {
|
||||
Ok(HttpResponse::NotFound().json(serde_json::json!({"error": "not found"})))
|
||||
async fn service(req: hyper::Request<Body>, app: Arc<App>) -> http::Result<hyper::Response<Body>> {
|
||||
let response = match (req.method(), req.uri().path()) {
|
||||
(&Method::POST, "/api/v2/write") => write(req, app).await,
|
||||
(&Method::GET, "/api/v2/read") => read(req, app).await,
|
||||
_ => Err(ApplicationError::new(
|
||||
StatusCode::NOT_FOUND,
|
||||
"route not found",
|
||||
)),
|
||||
};
|
||||
|
||||
match response {
|
||||
Ok(body) => Ok(hyper::Response::builder()
|
||||
.body(body)
|
||||
.expect("Should have been able to construct a response")),
|
||||
Err(e) => {
|
||||
let json = serde_json::json!({"error": e.to_string()}).to_string();
|
||||
Ok(hyper::Response::builder()
|
||||
.status(e.status_code())
|
||||
.body(json.into())
|
||||
.expect("Should have been able to construct a response"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
#[derive(Debug)]
|
||||
struct ApplicationError {
|
||||
status_code: StatusCode,
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl ApplicationError {
|
||||
fn new(status_code: StatusCode, message: impl Into<String>) -> ApplicationError {
|
||||
ApplicationError {
|
||||
status_code,
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
self.status_code
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ApplicationError {}
|
||||
|
||||
impl fmt::Display for ApplicationError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StatusCode> for ApplicationError {
|
||||
fn from(other: StatusCode) -> ApplicationError {
|
||||
match other {
|
||||
StatusCode::BAD_REQUEST => {
|
||||
ApplicationError::new(StatusCode::BAD_REQUEST, "Bad request")
|
||||
}
|
||||
StatusCode::INTERNAL_SERVER_ERROR => {
|
||||
ApplicationError::new(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error")
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
env::set_var("RUST_LOG", "delorean=debug,actix_server=info");
|
||||
env::set_var("RUST_LOG", "delorean=debug,hyper=info");
|
||||
env_logger::init();
|
||||
|
||||
let db_dir = match std::env::var("DELOREAN_DB_DIR") {
|
||||
|
@ -295,39 +330,49 @@ async fn main() -> io::Result<()> {
|
|||
};
|
||||
|
||||
let db = Database::new(&db_dir);
|
||||
let state = Arc::new(Server { db });
|
||||
let bind_addr = match std::env::var("DELOREAN_BIND_ADDR") {
|
||||
Ok(addr) => addr,
|
||||
Err(VarError::NotPresent) => "127.0.0.1:8080".to_string(),
|
||||
let state = Arc::new(App { db });
|
||||
let bind_addr: SocketAddr = match std::env::var("DELOREAN_BIND_ADDR") {
|
||||
Ok(addr) => addr
|
||||
.parse()
|
||||
.expect("DELOREAN_BIND_ADDR environment variable not a valid SocketAddr"),
|
||||
Err(VarError::NotPresent) => "127.0.0.1:8080".parse().unwrap(),
|
||||
Err(VarError::NotUnicode(_)) => {
|
||||
panic!("DELOREAN_BIND_ADDR environment variable not a valid unicode string")
|
||||
}
|
||||
};
|
||||
let grpc_bind_addr: SocketAddr = match std::env::var("DELOREAN_GRPC_BIND_ADDR") {
|
||||
Ok(addr) => addr
|
||||
.parse()
|
||||
.expect("DELOREAN_GRPC_BIND_ADDR environment variable not a valid SocketAddr"),
|
||||
Err(VarError::NotPresent) => "127.0.0.1:8081".parse().unwrap(),
|
||||
Err(VarError::NotUnicode(_)) => {
|
||||
panic!("DELOREAN_GRPC_BIND_ADDR environment variable not a valid unicode string")
|
||||
}
|
||||
};
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.data(state.clone())
|
||||
// enable logger
|
||||
.wrap(middleware::Logger::default())
|
||||
.service(
|
||||
web::scope("/api/v2")
|
||||
.service(web::resource("/write").route(web::post().to(write)))
|
||||
.service(web::resource("/read").route(web::get().to(read))),
|
||||
)
|
||||
// default
|
||||
.default_service(
|
||||
// 404 for GET request
|
||||
web::resource("")
|
||||
.route(web::get().to(not_found))
|
||||
// all requests that are not `GET`
|
||||
.route(
|
||||
web::route()
|
||||
.guard(guard::Not(guard::Get()))
|
||||
.to(HttpResponse::MethodNotAllowed),
|
||||
),
|
||||
)
|
||||
})
|
||||
.bind(bind_addr)?
|
||||
.run()
|
||||
.await
|
||||
let grpc_server = tonic::transport::Server::builder()
|
||||
.add_service(DeloreanServer::new(GrpcServer { app: state.clone() }))
|
||||
.add_service(StorageServer::new(GrpcServer { app: state.clone() }))
|
||||
.serve(grpc_bind_addr);
|
||||
|
||||
let make_svc = make_service_fn(move |_conn| {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
Ok::<_, http::Error>(service_fn(move |req| {
|
||||
let state = state.clone();
|
||||
service(req, state)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
let server = Server::bind(&bind_addr).serve(make_svc);
|
||||
|
||||
info!("gRPC server listening on http://{}", grpc_bind_addr);
|
||||
info!("Listening on http://{}", bind_addr);
|
||||
|
||||
let (grpc_server, server) = futures::future::join(grpc_server, server).await;
|
||||
grpc_server?;
|
||||
server?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,327 @@
|
|||
use delorean::delorean::Bucket;
|
||||
use delorean::delorean::{
|
||||
delorean_server::Delorean,
|
||||
read_response::{
|
||||
frame::Data, DataType, FloatPointsFrame, Frame, IntegerPointsFrame, SeriesFrame,
|
||||
},
|
||||
storage_server::Storage,
|
||||
CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest,
|
||||
DeleteBucketResponse, GetBucketsResponse, Organization, Predicate, ReadFilterRequest,
|
||||
ReadGroupRequest, ReadResponse, ReadSource, StringValuesResponse, TagKeysRequest,
|
||||
TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use delorean::storage::database::Database;
|
||||
use delorean::storage::SeriesDataType;
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::Status;
|
||||
|
||||
use crate::App;
|
||||
|
||||
pub struct GrpcServer {
|
||||
pub app: Arc<App>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl Delorean for GrpcServer {
|
||||
async fn create_bucket(
|
||||
&self,
|
||||
_req: tonic::Request<CreateBucketRequest>,
|
||||
) -> Result<tonic::Response<CreateBucketResponse>, Status> {
|
||||
Ok(tonic::Response::new(CreateBucketResponse {}))
|
||||
}
|
||||
|
||||
async fn delete_bucket(
|
||||
&self,
|
||||
_req: tonic::Request<DeleteBucketRequest>,
|
||||
) -> Result<tonic::Response<DeleteBucketResponse>, Status> {
|
||||
Ok(tonic::Response::new(DeleteBucketResponse {}))
|
||||
}
|
||||
|
||||
async fn get_buckets(
|
||||
&self,
|
||||
_req: tonic::Request<Organization>,
|
||||
) -> Result<tonic::Response<GetBucketsResponse>, Status> {
|
||||
Ok(tonic::Response::new(GetBucketsResponse { buckets: vec![] }))
|
||||
}
|
||||
}
|
||||
|
||||
/// This trait implements extraction of information from all storage gRPC requests. The only method
|
||||
/// required to implement is `read_source_field` because for some requests the field is named
|
||||
/// `read_source` and for others it is `tags_source`.
|
||||
trait GrpcInputs {
|
||||
fn read_source_field(&self) -> Option<&prost_types::Any>;
|
||||
|
||||
fn read_source_raw(&self) -> Result<&prost_types::Any, Status> {
|
||||
Ok(self
|
||||
.read_source_field()
|
||||
.ok_or_else(|| Status::invalid_argument("missing read_source"))?)
|
||||
}
|
||||
|
||||
fn read_source(&self) -> Result<ReadSource, Status> {
|
||||
let raw = self.read_source_raw()?;
|
||||
let val = &raw.value[..];
|
||||
Ok(prost::Message::decode(val).map_err(|_| {
|
||||
Status::invalid_argument("value could not be parsed as a ReadSource message")
|
||||
})?)
|
||||
}
|
||||
|
||||
fn org_id(&self) -> Result<u32, Status> {
|
||||
Ok(self
|
||||
.read_source()?
|
||||
.org_id
|
||||
.try_into()
|
||||
.map_err(|_| Status::invalid_argument("org_id did not fit in a u32"))?)
|
||||
}
|
||||
|
||||
fn bucket(&self, db: &Database) -> Result<Arc<Bucket>, Status> {
|
||||
let bucket_id = self
|
||||
.read_source()?
|
||||
.bucket_id
|
||||
.try_into()
|
||||
.map_err(|_| Status::invalid_argument("bucket_id did not fit in a u32"))?;
|
||||
|
||||
let maybe_bucket = db
|
||||
.get_bucket_by_id(bucket_id)
|
||||
.map_err(|_| Status::internal("could not query for bucket"))?;
|
||||
|
||||
Ok(maybe_bucket
|
||||
.ok_or_else(|| Status::not_found(&format!("bucket {} not found", bucket_id)))?)
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcInputs for ReadFilterRequest {
|
||||
fn read_source_field(&self) -> Option<&prost_types::Any> {
|
||||
self.read_source.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcInputs for ReadGroupRequest {
|
||||
fn read_source_field(&self) -> Option<&prost_types::Any> {
|
||||
self.read_source.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcInputs for TagKeysRequest {
|
||||
fn read_source_field(&self) -> Option<&prost_types::Any> {
|
||||
self.tags_source.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcInputs for TagValuesRequest {
|
||||
fn read_source_field(&self) -> Option<&prost_types::Any> {
|
||||
self.tags_source.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl Storage for GrpcServer {
|
||||
type ReadFilterStream = mpsc::Receiver<Result<ReadResponse, Status>>;
|
||||
|
||||
async fn read_filter(
|
||||
&self,
|
||||
req: tonic::Request<ReadFilterRequest>,
|
||||
) -> Result<tonic::Response<Self::ReadFilterStream>, Status> {
|
||||
let (mut tx, rx) = mpsc::channel(4);
|
||||
|
||||
let read_filter_request = req.into_inner();
|
||||
|
||||
let _org_id = read_filter_request.org_id()?;
|
||||
let bucket = read_filter_request.bucket(&self.app.db)?;
|
||||
let predicate = read_filter_request.predicate;
|
||||
let range = read_filter_request.range;
|
||||
|
||||
let app = Arc::clone(&self.app);
|
||||
|
||||
// TODO: is this blocking because of the blocking calls to the database...?
|
||||
tokio::spawn(async move {
|
||||
let predicate = predicate.as_ref();
|
||||
// TODO: The call to read_series_matching_predicate_and_range takes an optional range,
|
||||
// but read_f64_range requires a range-- should this route require a range or use a
|
||||
// default or something else?
|
||||
let range = range.as_ref().expect("TODO: Must have a range?");
|
||||
|
||||
if let Err(e) = send_series_filters(tx.clone(), app, &bucket, predicate, &range).await {
|
||||
tx.send(Err(e)).await.unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
Ok(tonic::Response::new(rx))
|
||||
}
|
||||
|
||||
type ReadGroupStream = mpsc::Receiver<Result<ReadResponse, Status>>;
|
||||
|
||||
async fn read_group(
|
||||
&self,
|
||||
_req: tonic::Request<ReadGroupRequest>,
|
||||
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
|
||||
Err(Status::unimplemented("Not yet implemented"))
|
||||
}
|
||||
|
||||
type TagKeysStream = mpsc::Receiver<Result<StringValuesResponse, Status>>;
|
||||
|
||||
async fn tag_keys(
|
||||
&self,
|
||||
req: tonic::Request<TagKeysRequest>,
|
||||
) -> Result<tonic::Response<Self::TagKeysStream>, Status> {
|
||||
let (mut tx, rx) = mpsc::channel(4);
|
||||
|
||||
let tag_keys_request = req.get_ref();
|
||||
|
||||
let _org_id = tag_keys_request.org_id()?;
|
||||
let bucket = tag_keys_request.bucket(&self.app.db)?;
|
||||
let predicate = tag_keys_request.predicate.clone();
|
||||
let _range = tag_keys_request.range.as_ref();
|
||||
|
||||
let app = self.app.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match app.db.get_tag_keys(&bucket, predicate.as_ref()) {
|
||||
Err(_) => tx
|
||||
.send(Err(Status::internal("could not query for tag keys")))
|
||||
.await
|
||||
.unwrap(),
|
||||
Ok(tag_keys_iter) => {
|
||||
// TODO: Should these be batched? If so, how?
|
||||
let tag_keys: Vec<_> = tag_keys_iter.map(|s| s.into_bytes()).collect();
|
||||
tx.send(Ok(StringValuesResponse { values: tag_keys }))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(tonic::Response::new(rx))
|
||||
}
|
||||
|
||||
type TagValuesStream = mpsc::Receiver<Result<StringValuesResponse, Status>>;
|
||||
|
||||
async fn tag_values(
|
||||
&self,
|
||||
req: tonic::Request<TagValuesRequest>,
|
||||
) -> Result<tonic::Response<Self::TagValuesStream>, Status> {
|
||||
let (mut tx, rx) = mpsc::channel(4);
|
||||
|
||||
let tag_values_request = req.get_ref();
|
||||
|
||||
let _org_id = tag_values_request.org_id()?;
|
||||
let bucket = tag_values_request.bucket(&self.app.db)?;
|
||||
let predicate = tag_values_request.predicate.clone();
|
||||
let _range = tag_values_request.range.as_ref();
|
||||
|
||||
let tag_key = tag_values_request.tag_key.clone();
|
||||
|
||||
let app = self.app.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match app.db.get_tag_values(&bucket, &tag_key, predicate.as_ref()) {
|
||||
Err(_) => tx
|
||||
.send(Err(Status::internal("could not query for tag values")))
|
||||
.await
|
||||
.unwrap(),
|
||||
Ok(tag_values_iter) => {
|
||||
// TODO: Should these be batched? If so, how?
|
||||
let tag_values: Vec<_> = tag_values_iter.map(|s| s.into_bytes()).collect();
|
||||
tx.send(Ok(StringValuesResponse { values: tag_values }))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(tonic::Response::new(rx))
|
||||
}
|
||||
|
||||
async fn capabilities(
|
||||
&self,
|
||||
_: tonic::Request<()>,
|
||||
) -> Result<tonic::Response<CapabilitiesResponse>, Status> {
|
||||
Err(Status::unimplemented("Not yet implemented"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_series_filters(
|
||||
mut tx: mpsc::Sender<Result<ReadResponse, Status>>,
|
||||
app: Arc<App>,
|
||||
bucket: &Bucket,
|
||||
predicate: Option<&Predicate>,
|
||||
range: &TimestampRange,
|
||||
) -> Result<(), Status> {
|
||||
let filter_iter = app
|
||||
.db
|
||||
.read_series_matching_predicate_and_range(&bucket, predicate, Some(range))
|
||||
.map_err(|e| Status::internal(format!("could not query for filters: {}", e)))?;
|
||||
|
||||
for series_filter in filter_iter {
|
||||
let tags = series_filter.tags();
|
||||
let data_type = match series_filter.series_type {
|
||||
SeriesDataType::F64 => DataType::Float,
|
||||
SeriesDataType::I64 => DataType::Integer,
|
||||
} as _;
|
||||
let series = SeriesFrame { data_type, tags };
|
||||
let data = Data::Series(series);
|
||||
let data = Some(data);
|
||||
let frame = Frame { data };
|
||||
let frames = vec![frame];
|
||||
let series_frame_response_header = Ok(ReadResponse { frames });
|
||||
|
||||
tx.send(series_frame_response_header).await.unwrap();
|
||||
|
||||
// TODO: Should this match https://github.com/influxdata/influxdb/blob/d96f3dc5abb6bb187374caa9e7c7a876b4799bd2/storage/reads/response_writer.go#L21 ?
|
||||
const BATCH_SIZE: usize = 1;
|
||||
|
||||
match series_filter.series_type {
|
||||
SeriesDataType::F64 => {
|
||||
let iter = app
|
||||
.db
|
||||
.read_f64_range(&bucket, &series_filter, &range, BATCH_SIZE)
|
||||
.map_err(|e| {
|
||||
Status::internal(format!("could not query for SeriesFilter data: {}", e))
|
||||
})?;
|
||||
|
||||
let frames = iter
|
||||
.map(|batch| {
|
||||
// TODO: Performance hazard; splitting this vector is non-ideal
|
||||
let (timestamps, values) =
|
||||
batch.into_iter().map(|p| (p.time, p.value)).unzip();
|
||||
let frame = FloatPointsFrame { timestamps, values };
|
||||
let data = Data::FloatPoints(frame);
|
||||
let data = Some(data);
|
||||
Frame { data }
|
||||
})
|
||||
.collect();
|
||||
let data_frame_response = Ok(ReadResponse { frames });
|
||||
|
||||
tx.send(data_frame_response).await.unwrap();
|
||||
}
|
||||
SeriesDataType::I64 => {
|
||||
let iter = app
|
||||
.db
|
||||
.read_i64_range(&bucket, &series_filter, &range, BATCH_SIZE)
|
||||
.map_err(|e| {
|
||||
Status::internal(format!("could not query for SeriesFilter data: {}", e))
|
||||
})?;
|
||||
|
||||
let frames = iter
|
||||
.map(|batch| {
|
||||
// TODO: Performance hazard; splitting this vector is non-ideal
|
||||
let (timestamps, values) =
|
||||
batch.into_iter().map(|p| (p.time, p.value)).unzip();
|
||||
let frame = IntegerPointsFrame { timestamps, values };
|
||||
let data = Data::IntegerPoints(frame);
|
||||
let data = Some(data);
|
||||
Frame { data }
|
||||
})
|
||||
.collect();
|
||||
let data_frame_response = Ok(ReadResponse { frames });
|
||||
|
||||
tx.send(data_frame_response).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,5 +1,3 @@
|
|||
use actix_web::http::StatusCode;
|
||||
use actix_web::ResponseError;
|
||||
use std::convert::TryFrom;
|
||||
use std::error;
|
||||
use std::fmt;
|
||||
|
@ -12,11 +10,6 @@ pub mod predicate;
|
|||
pub mod rocksdb;
|
||||
pub mod series_store;
|
||||
|
||||
pub struct Range {
|
||||
pub start: i64,
|
||||
pub stop: i64,
|
||||
}
|
||||
|
||||
// The values for these enum variants have no real meaning, but they
|
||||
// are serialized to disk. Revisit these whenever it's time to decide
|
||||
// on an on-disk format.
|
||||
|
@ -67,9 +60,3 @@ impl error::Error for StorageError {
|
|||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseError for StorageError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::BAD_REQUEST
|
||||
}
|
||||
}
|
|
@ -14,4 +14,6 @@ pub trait ConfigStore: Sync + Send {
|
|||
org_id: u32,
|
||||
bucket_name: &str,
|
||||
) -> Result<Option<Arc<Bucket>>, StorageError>;
|
||||
|
||||
fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError>;
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use crate::delorean::{Bucket, Predicate};
|
||||
use crate::delorean::{Bucket, Predicate, TimestampRange};
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::config_store::ConfigStore;
|
||||
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
|
||||
use crate::storage::rocksdb::RocksDB;
|
||||
use crate::storage::series_store::{ReadPoint, SeriesStore};
|
||||
use crate::storage::{Range, StorageError};
|
||||
use crate::storage::StorageError;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -29,12 +29,12 @@ impl Database {
|
|||
&self,
|
||||
_org_id: u32,
|
||||
bucket: &Bucket,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError> {
|
||||
self.local_index
|
||||
.get_or_create_series_ids_for_points(bucket.id, points)?;
|
||||
self.local_series_store
|
||||
.write_points_with_series_ids(bucket.id, &points)
|
||||
.write_points_with_series_ids(bucket.id, points)
|
||||
}
|
||||
|
||||
pub fn get_bucket_by_name(
|
||||
|
@ -46,6 +46,10 @@ impl Database {
|
|||
.get_bucket_by_name(org_id, bucket_name)
|
||||
}
|
||||
|
||||
pub fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError> {
|
||||
self.local_config_store.get_bucket_by_id(bucket_id)
|
||||
}
|
||||
|
||||
pub fn create_bucket_if_not_exists(
|
||||
&self,
|
||||
org_id: u32,
|
||||
|
@ -59,8 +63,8 @@ impl Database {
|
|||
&self,
|
||||
bucket: &Bucket,
|
||||
predicate: Option<&Predicate>,
|
||||
_range: Option<&Range>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
|
||||
_range: Option<&TimestampRange>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
|
||||
self.local_index.read_series_matching(bucket.id, predicate)
|
||||
}
|
||||
|
||||
|
@ -68,9 +72,9 @@ impl Database {
|
|||
&self,
|
||||
bucket: &Bucket,
|
||||
series_filter: &SeriesFilter,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError> {
|
||||
self.local_series_store
|
||||
.read_i64_range(bucket.id, series_filter.id, range, batch_size)
|
||||
}
|
||||
|
@ -79,10 +83,28 @@ impl Database {
|
|||
&self,
|
||||
bucket: &Bucket,
|
||||
series_filter: &SeriesFilter,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError> {
|
||||
self.local_series_store
|
||||
.read_f64_range(bucket.id, series_filter.id, range, batch_size)
|
||||
}
|
||||
|
||||
pub fn get_tag_keys(
|
||||
&self,
|
||||
bucket: &Bucket,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
self.local_index.get_tag_keys(bucket.id, predicate)
|
||||
}
|
||||
|
||||
pub fn get_tag_values(
|
||||
&self,
|
||||
bucket: &Bucket,
|
||||
tag_key: &str,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
self.local_index
|
||||
.get_tag_values(bucket.id, tag_key, predicate)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::delorean::Predicate;
|
||||
use crate::delorean::{Predicate, Tag};
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::{SeriesDataType, StorageError};
|
||||
|
||||
|
@ -6,27 +6,27 @@ pub trait InvertedIndex: Sync + Send {
|
|||
fn get_or_create_series_ids_for_points(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError>;
|
||||
|
||||
fn read_series_matching(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError>;
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError>;
|
||||
|
||||
fn get_tag_keys(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError>;
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError>;
|
||||
|
||||
fn get_tag_values(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
tag_key: &str,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError>;
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
|
@ -37,14 +37,50 @@ pub struct SeriesFilter {
|
|||
pub series_type: SeriesDataType,
|
||||
}
|
||||
|
||||
// Test helpers for other implementations to run
|
||||
impl SeriesFilter {
|
||||
// TODO: Handle escaping of ',', '=', and '\t'
|
||||
// TODO: Better error handling
|
||||
pub fn tags(&self) -> Vec<Tag> {
|
||||
let before_tab = self
|
||||
.key
|
||||
.splitn(2, '\t')
|
||||
.next()
|
||||
.expect("SeriesFilter key did not contain a tab");
|
||||
|
||||
before_tab
|
||||
.split(',')
|
||||
.skip(1)
|
||||
.map(|kv| {
|
||||
let mut parts = kv.splitn(2, '=');
|
||||
Tag {
|
||||
key: parts
|
||||
.next()
|
||||
.expect("SeriesFilter did not contain expected parts")
|
||||
.bytes()
|
||||
.collect(),
|
||||
value: parts
|
||||
.next()
|
||||
.expect("SeriesFilter did not contain expected parts")
|
||||
.bytes()
|
||||
.collect(),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use crate::delorean::Tag;
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
|
||||
use crate::storage::predicate::parse_predicate;
|
||||
use crate::storage::SeriesDataType;
|
||||
|
||||
use std::str;
|
||||
|
||||
// Test helpers for other implementations to run
|
||||
|
||||
pub fn series_id_indexing(index: Box<dyn InvertedIndex>) {
|
||||
let bucket_id = 1;
|
||||
let bucket_2 = 2;
|
||||
|
@ -52,7 +88,7 @@ pub mod tests {
|
|||
let p2 = PointType::new_i64("two".to_string(), 23, 40);
|
||||
let p3 = PointType::new_i64("three".to_string(), 33, 86);
|
||||
|
||||
let mut points = vec![p1.clone(), p2.clone()];
|
||||
let mut points = vec![p1.clone(), p2];
|
||||
index
|
||||
.get_or_create_series_ids_for_points(bucket_id, &mut points)
|
||||
.unwrap();
|
||||
|
@ -67,7 +103,7 @@ pub mod tests {
|
|||
assert_eq!(points[0].series_id(), Some(1));
|
||||
|
||||
// now insert a new series in the first bucket and make sure it shows up
|
||||
let mut points = vec![p1.clone(), p3.clone()];
|
||||
let mut points = vec![p1, p3];
|
||||
index
|
||||
.get_or_create_series_ids_for_points(bucket_id, &mut points)
|
||||
.unwrap();
|
||||
|
@ -82,7 +118,7 @@ pub mod tests {
|
|||
let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0);
|
||||
let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0);
|
||||
|
||||
let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
|
||||
let mut points = vec![p1, p2, p3, p4];
|
||||
index
|
||||
.get_or_create_series_ids_for_points(bucket_id, &mut points)
|
||||
.unwrap();
|
||||
|
@ -99,7 +135,7 @@ pub mod tests {
|
|||
// get all series
|
||||
|
||||
// get series with measurement = mem
|
||||
let pred = parse_predicate("_m = \"cpu\"").unwrap();
|
||||
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = index
|
||||
.read_series_matching(bucket_id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -129,7 +165,7 @@ pub mod tests {
|
|||
);
|
||||
|
||||
// get series with host = a
|
||||
let pred = parse_predicate("host = \"a\"").unwrap();
|
||||
let pred = parse_predicate(r#"host = "a""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = index
|
||||
.read_series_matching(bucket_id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -153,7 +189,7 @@ pub mod tests {
|
|||
);
|
||||
|
||||
// get series with measurement = cpu and host = b
|
||||
let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap();
|
||||
let pred = parse_predicate(r#"_m = "cpu" and host = "b""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = index
|
||||
.read_series_matching(bucket_id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -168,7 +204,7 @@ pub mod tests {
|
|||
},]
|
||||
);
|
||||
|
||||
let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap();
|
||||
let pred = parse_predicate(r#"host = "a" OR _m = "mem""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = index
|
||||
.read_series_matching(bucket_id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -197,4 +233,32 @@ pub mod tests {
|
|||
]
|
||||
);
|
||||
}
|
||||
|
||||
pub fn tags_as_strings(tags: &[Tag]) -> Vec<(&str, &str)> {
|
||||
tags.iter()
|
||||
.map(|t| {
|
||||
(
|
||||
str::from_utf8(&t.key).unwrap(),
|
||||
str::from_utf8(&t.value).unwrap(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Unit tests for SeriesFilter
|
||||
|
||||
#[test]
|
||||
fn series_filter_tag_parsing() {
|
||||
let sf = SeriesFilter {
|
||||
id: 1,
|
||||
key: "cpu,host=b,region=west\tusage_system".to_string(),
|
||||
value_predicate: None,
|
||||
series_type: SeriesDataType::I64,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
tags_as_strings(&sf.tags()),
|
||||
vec![("host", "b"), ("region", "west")]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use crate::delorean::node::{Comparison, Logical, Value};
|
||||
use crate::delorean::{Node, Predicate};
|
||||
use crate::delorean::{Node, Predicate, TimestampRange};
|
||||
use crate::line_parser::{ParseError, Point, PointType};
|
||||
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
|
||||
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
|
||||
use crate::storage::series_store::{ReadPoint, SeriesStore};
|
||||
use crate::storage::{Range, SeriesDataType, StorageError};
|
||||
use crate::storage::{SeriesDataType, StorageError};
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use croaring::Treemap;
|
||||
/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It
|
||||
|
@ -120,13 +120,13 @@ impl<T: Clone> SeriesRingBuffer<T> {
|
|||
self.next_position += 1;
|
||||
}
|
||||
|
||||
fn get_range(&self, range: &Range) -> Vec<ReadPoint<T>> {
|
||||
fn get_range(&self, range: &TimestampRange) -> Vec<ReadPoint<T>> {
|
||||
let (_, pos) = self.oldest_time_and_position();
|
||||
|
||||
let mut values = Vec::new();
|
||||
|
||||
for i in pos..self.data.len() {
|
||||
if self.data[i].time > range.stop {
|
||||
if self.data[i].time > range.end {
|
||||
return values;
|
||||
} else if self.data[i].time >= range.start {
|
||||
values.push(self.data[i].clone());
|
||||
|
@ -134,7 +134,7 @@ impl<T: Clone> SeriesRingBuffer<T> {
|
|||
}
|
||||
|
||||
for i in 0..self.next_position {
|
||||
if self.data[i].time > range.stop {
|
||||
if self.data[i].time > range.end {
|
||||
return values;
|
||||
} else if self.data[i].time >= range.start {
|
||||
values.push(self.data[i].clone());
|
||||
|
@ -238,7 +238,7 @@ impl MemDB {
|
|||
fn get_or_create_series_ids_for_points(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError> {
|
||||
// first try to do everything with just a read lock
|
||||
if self.get_series_ids_for_points(bucket_id, points) {
|
||||
|
@ -269,7 +269,7 @@ impl MemDB {
|
|||
// get_series_ids_for_points attempts to fill the series ids for all points in the passed in
|
||||
// collection using only a read lock. If no SeriesMap exists for the bucket, it will be inserted.
|
||||
// It will return true if all points have series ids filled in.
|
||||
fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut Vec<PointType>) -> bool {
|
||||
fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut [PointType]) -> bool {
|
||||
let buckets = self.bucket_id_to_series_map.read().unwrap();
|
||||
match buckets.get(&bucket_id) {
|
||||
Some(b) => {
|
||||
|
@ -303,7 +303,7 @@ impl MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
_predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
||||
Some(map) => {
|
||||
let keys: Vec<String> = map.read().unwrap().tag_keys.keys().cloned().collect();
|
||||
|
@ -320,7 +320,7 @@ impl MemDB {
|
|||
bucket_id: u32,
|
||||
tag_key: &str,
|
||||
_predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
|
||||
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
|
||||
Some(values) => {
|
||||
|
@ -339,7 +339,7 @@ impl MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
|
||||
let pred = match predicate {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
|
@ -413,13 +413,13 @@ impl MemDB {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn read_range<T: 'static + Clone + FromSeries>(
|
||||
fn read_range<T: 'static + Clone + FromSeries + Send>(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<T>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<T>>> + Send>, StorageError> {
|
||||
let buckets = self.bucket_id_to_series_data.read().unwrap();
|
||||
let data = match buckets.get(&bucket_id) {
|
||||
Some(d) => d,
|
||||
|
@ -491,94 +491,23 @@ impl<T: Clone> Iterator for PointsIterator<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn evaluate_node(
|
||||
series_map: &RwLockReadGuard<'_, SeriesMap>,
|
||||
n: &Node,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
if n.children.len() != 2 {
|
||||
return Err(StorageError {
|
||||
description: format!(
|
||||
"expected only two children of node but found {}",
|
||||
n.children.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
fn evaluate_node(series_map: &SeriesMap, n: &Node) -> Result<Treemap, StorageError> {
|
||||
struct Visitor<'a>(&'a SeriesMap);
|
||||
|
||||
match &n.value {
|
||||
Some(node_value) => match node_value {
|
||||
Value::Logical(l) => {
|
||||
let l = Logical::from_i32(*l).unwrap();
|
||||
evaluate_logical(series_map, &n.children[0], &n.children[1], l)
|
||||
}
|
||||
Value::Comparison(c) => {
|
||||
let c = Comparison::from_i32(*c).unwrap();
|
||||
evaluate_comparison(series_map, &n.children[0], &n.children[1], c)
|
||||
}
|
||||
val => Err(StorageError {
|
||||
description: format!("evaluate_node called on wrong type {:?}", val),
|
||||
}),
|
||||
},
|
||||
None => Err(StorageError {
|
||||
description: "emtpy node value".to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_logical(
|
||||
series_map: &RwLockReadGuard<'_, SeriesMap>,
|
||||
left: &Node,
|
||||
right: &Node,
|
||||
op: Logical,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
let mut left_result = evaluate_node(series_map, left)?;
|
||||
let right_result = evaluate_node(series_map, right)?;
|
||||
|
||||
match op {
|
||||
Logical::And => left_result.and_inplace(&right_result),
|
||||
Logical::Or => left_result.or_inplace(&right_result),
|
||||
};
|
||||
|
||||
Ok(left_result)
|
||||
}
|
||||
|
||||
fn evaluate_comparison(
|
||||
series_map: &RwLockReadGuard<'_, SeriesMap>,
|
||||
left: &Node,
|
||||
right: &Node,
|
||||
op: Comparison,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
let left = match &left.value {
|
||||
Some(Value::TagRefValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "expected left operand to be a TagRefValue".to_string(),
|
||||
})
|
||||
impl EvaluateVisitor for Visitor<'_> {
|
||||
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError> {
|
||||
Ok(self.0.posting_list_for_key_value(left, right))
|
||||
}
|
||||
};
|
||||
|
||||
let right = match &right.value {
|
||||
Some(Value::StringValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "unable to run comparison against anything other than a string"
|
||||
.to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
match op {
|
||||
Comparison::Equal => Ok(series_map.posting_list_for_key_value(&left, &right)),
|
||||
comp => Err(StorageError {
|
||||
description: format!("unable to handle comparison {:?}", comp),
|
||||
}),
|
||||
}
|
||||
|
||||
Evaluate::evaluate(Visitor(series_map), n)
|
||||
}
|
||||
|
||||
impl InvertedIndex for MemDB {
|
||||
fn get_or_create_series_ids_for_points(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError> {
|
||||
self.get_or_create_series_ids_for_points(bucket_id, points)
|
||||
}
|
||||
|
@ -587,7 +516,7 @@ impl InvertedIndex for MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
|
||||
self.read_series_matching(bucket_id, predicate)
|
||||
}
|
||||
|
||||
|
@ -595,7 +524,7 @@ impl InvertedIndex for MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
self.get_tag_keys(bucket_id, predicate)
|
||||
}
|
||||
|
||||
|
@ -604,7 +533,7 @@ impl InvertedIndex for MemDB {
|
|||
bucket_id: u32,
|
||||
tag_key: &str,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
self.get_tag_values(bucket_id, tag_key, predicate)
|
||||
}
|
||||
}
|
||||
|
@ -622,9 +551,9 @@ impl SeriesStore for MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError> {
|
||||
self.read_range(bucket_id, series_id, range, batch_size)
|
||||
}
|
||||
|
||||
|
@ -632,9 +561,9 @@ impl SeriesStore for MemDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError> {
|
||||
self.read_range(bucket_id, series_id, range, batch_size)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use crate::delorean::node::Logical;
|
||||
use crate::delorean::node::{Comparison, Value};
|
||||
use crate::delorean::{node, Node, Predicate};
|
||||
use crate::storage::StorageError;
|
||||
|
||||
use croaring::Treemap;
|
||||
use std::iter::Peekable;
|
||||
use std::str::Chars;
|
||||
|
||||
|
@ -156,7 +157,7 @@ fn parse_logical(chars: &mut Peekable<Chars<'_>>) -> Result<Option<node::Logical
|
|||
Some('n') | Some('N') => (),
|
||||
Some(ch) => {
|
||||
return Err(StorageError {
|
||||
description: format!("expected \"and\" but found a{}", ch),
|
||||
description: format!(r#"expected "and" but found a{}"#, ch),
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
|
@ -169,7 +170,7 @@ fn parse_logical(chars: &mut Peekable<Chars<'_>>) -> Result<Option<node::Logical
|
|||
Some('d') | Some('D') => (),
|
||||
Some(ch) => {
|
||||
return Err(StorageError {
|
||||
description: format!("expected \"and\" but found an{}", ch),
|
||||
description: format!(r#"expected "and" but found an{}"#, ch),
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
|
@ -184,7 +185,7 @@ fn parse_logical(chars: &mut Peekable<Chars<'_>>) -> Result<Option<node::Logical
|
|||
Some('r') | Some('R') => return Ok(Some(node::Logical::Or)),
|
||||
Some(ch) => {
|
||||
return Err(StorageError {
|
||||
description: format!("expected \"or\" but found o{}", ch),
|
||||
description: format!(r#"expected "or" but found o{}"#, ch),
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
|
@ -217,13 +218,100 @@ fn eat_whitespace(chars: &mut Peekable<Chars<'_>>) {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait EvaluateVisitor {
|
||||
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError>;
|
||||
}
|
||||
|
||||
pub struct Evaluate<V: EvaluateVisitor>(V);
|
||||
|
||||
impl<V: EvaluateVisitor> Evaluate<V> {
|
||||
pub fn evaluate(visitor: V, node: &Node) -> Result<Treemap, StorageError> {
|
||||
Self(visitor).node(node)
|
||||
}
|
||||
|
||||
fn node(&mut self, n: &Node) -> Result<Treemap, StorageError> {
|
||||
if n.children.len() != 2 {
|
||||
return Err(StorageError {
|
||||
description: format!(
|
||||
"expected only two children of node but found {}",
|
||||
n.children.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
match &n.value {
|
||||
Some(node_value) => match node_value {
|
||||
Value::Logical(l) => {
|
||||
let l = Logical::from_i32(*l).unwrap();
|
||||
self.logical(&n.children[0], &n.children[1], l)
|
||||
}
|
||||
Value::Comparison(c) => {
|
||||
let c = Comparison::from_i32(*c).unwrap();
|
||||
self.comparison(&n.children[0], &n.children[1], c)
|
||||
}
|
||||
val => Err(StorageError {
|
||||
description: format!("Evaluate::node called on wrong type {:?}", val),
|
||||
}),
|
||||
},
|
||||
None => Err(StorageError {
|
||||
description: "emtpy node value".to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn logical(&mut self, left: &Node, right: &Node, op: Logical) -> Result<Treemap, StorageError> {
|
||||
let mut left_result = self.node(left)?;
|
||||
let right_result = self.node(right)?;
|
||||
|
||||
match op {
|
||||
Logical::And => left_result.and_inplace(&right_result),
|
||||
Logical::Or => left_result.or_inplace(&right_result),
|
||||
};
|
||||
|
||||
Ok(left_result)
|
||||
}
|
||||
|
||||
fn comparison(
|
||||
&mut self,
|
||||
left: &Node,
|
||||
right: &Node,
|
||||
op: Comparison,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
let left = match &left.value {
|
||||
Some(Value::TagRefValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "expected left operand to be a TagRefValue".to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let right = match &right.value {
|
||||
Some(Value::StringValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "unable to run comparison against anything other than a string"
|
||||
.to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
match op {
|
||||
Comparison::Equal => self.0.equal(left, right),
|
||||
comp => Err(StorageError {
|
||||
description: format!("unable to handle comparison {:?}", comp),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_predicate() {
|
||||
let pred = super::parse_predicate("host = \"foo\"").unwrap();
|
||||
let pred = super::parse_predicate(r#"host = "foo""#).unwrap();
|
||||
assert_eq!(
|
||||
pred,
|
||||
Predicate {
|
||||
|
@ -243,7 +331,7 @@ mod tests {
|
|||
}
|
||||
);
|
||||
|
||||
let pred = super::parse_predicate("host != \"serverA\" AND region=\"west\"").unwrap();
|
||||
let pred = super::parse_predicate(r#"host != "serverA" AND region="west""#).unwrap();
|
||||
assert_eq!(
|
||||
pred,
|
||||
Predicate {
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use crate::delorean::node::{Comparison, Logical, Value};
|
||||
use crate::delorean::{Bucket, IndexLevel, Node, Predicate};
|
||||
use crate::delorean::{Bucket, IndexLevel, Node, Predicate, TimestampRange};
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::config_store::ConfigStore;
|
||||
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
|
||||
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
|
||||
use crate::storage::series_store::{ReadPoint, SeriesStore};
|
||||
use crate::storage::{Range, SeriesDataType, StorageError};
|
||||
use crate::storage::{SeriesDataType, StorageError};
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
|
@ -32,6 +32,8 @@ pub struct RocksDB {
|
|||
db: Arc<RwLock<DB>>,
|
||||
// bucket_map is an in memory map of what buckets exist in the system. the key is the org id and bucket name together as bytes
|
||||
bucket_map: Arc<RwLock<HashMap<Vec<u8>, Arc<Bucket>>>>,
|
||||
// `bucket_id_map` is an in-memory map of bucket IDs to buckets that exist in the system.
|
||||
bucket_id_map: Arc<RwLock<HashMap<u32, Arc<Bucket>>>>,
|
||||
// series_insert_lock is a map of mutexes for creating new series in each bucket. Bucket ids are unique across all orgs
|
||||
series_insert_lock: Arc<RwLock<HashMap<u32, Mutex<u64>>>>,
|
||||
}
|
||||
|
@ -72,6 +74,7 @@ impl RocksDB {
|
|||
let mut database = RocksDB {
|
||||
db: Arc::new(RwLock::new(db)),
|
||||
bucket_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
bucket_id_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
series_insert_lock: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
database.load_bucket_map();
|
||||
|
@ -112,19 +115,19 @@ impl RocksDB {
|
|||
}
|
||||
|
||||
// TODO: update this so it decompresses at least the first point to verify the data type or return error
|
||||
fn read_range<T: 'static + FromBytes + Clone>(
|
||||
fn read_range<T: 'static + FromBytes + Clone + Send>(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<T>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<T>>> + Send>, StorageError> {
|
||||
let (iter, series_prefix) = self.get_db_points_iter(bucket_id, series_id, range.start);
|
||||
|
||||
Ok(Box::new(PointsIterator {
|
||||
batch_size,
|
||||
iter,
|
||||
stop_time: range.stop,
|
||||
stop_time: range.end,
|
||||
series_prefix,
|
||||
drained: false,
|
||||
read: FromBytes::from,
|
||||
|
@ -167,6 +170,7 @@ impl RocksDB {
|
|||
}
|
||||
|
||||
let mut map = self.bucket_map.write().unwrap();
|
||||
let mut id_map = self.bucket_id_map.write().unwrap();
|
||||
if let Some(b) = map.get(&key) {
|
||||
return Ok(b.id);
|
||||
}
|
||||
|
@ -205,7 +209,9 @@ impl RocksDB {
|
|||
.expect("unexpected rocksdb error writing to DB");
|
||||
|
||||
let id = store.id;
|
||||
map.insert(key, Arc::new(store));
|
||||
let arc_bucket = Arc::new(store);
|
||||
map.insert(key, arc_bucket.clone());
|
||||
id_map.insert(id, arc_bucket);
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
@ -222,11 +228,18 @@ impl RocksDB {
|
|||
name: &str,
|
||||
) -> Result<Option<Arc<Bucket>>, StorageError> {
|
||||
let buckets = self.bucket_map.read().unwrap();
|
||||
let key = bucket_key(org_id, &name.to_string());
|
||||
match buckets.get(&key) {
|
||||
Some(b) => Ok(Some(b.clone())),
|
||||
None => Ok(None),
|
||||
}
|
||||
let key = bucket_key(org_id, name);
|
||||
Ok(buckets.get(&key).map(Arc::clone))
|
||||
}
|
||||
|
||||
/// Looks up the bucket object by bucket id and returns it.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `bucket_id` - The ID of the bucket (which is globally unique)
|
||||
pub fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError> {
|
||||
let buckets = self.bucket_id_map.read().unwrap();
|
||||
Ok(buckets.get(&bucket_id).map(Arc::clone))
|
||||
}
|
||||
|
||||
// TODO: ensure that points with timestamps older than the first index level get matched against the appropriate index
|
||||
|
@ -246,7 +259,7 @@ impl RocksDB {
|
|||
pub fn get_series_ids(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError> {
|
||||
let cf_name = index_cf_name(bucket_id);
|
||||
|
||||
|
@ -321,85 +334,25 @@ impl RocksDB {
|
|||
}
|
||||
|
||||
fn evaluate_node(&self, bucket_id: u32, n: &Node) -> Result<Treemap, StorageError> {
|
||||
if n.children.len() != 2 {
|
||||
return Err(StorageError {
|
||||
description: format!(
|
||||
"expected only two children of node but found {}",
|
||||
n.children.len()
|
||||
),
|
||||
});
|
||||
struct Visitor<'a> {
|
||||
db: &'a RocksDB,
|
||||
bucket_id: u32,
|
||||
};
|
||||
|
||||
impl EvaluateVisitor for Visitor<'_> {
|
||||
fn equal(&mut self, left: &str, right: &str) -> Result<Treemap, StorageError> {
|
||||
self.db
|
||||
.get_posting_list_for_tag_key_value(self.bucket_id, left, right)
|
||||
}
|
||||
}
|
||||
|
||||
match &n.value {
|
||||
Some(node_value) => match node_value {
|
||||
Value::Logical(l) => {
|
||||
let l = Logical::from_i32(*l).unwrap();
|
||||
self.evaluate_logical(bucket_id, &n.children[0], &n.children[1], l)
|
||||
}
|
||||
Value::Comparison(c) => {
|
||||
let c = Comparison::from_i32(*c).unwrap();
|
||||
self.evaluate_comparison(bucket_id, &n.children[0], &n.children[1], c)
|
||||
}
|
||||
val => Err(StorageError {
|
||||
description: format!("evaluate_node called on wrong type {:?}", val),
|
||||
}),
|
||||
Evaluate::evaluate(
|
||||
Visitor {
|
||||
db: self,
|
||||
bucket_id,
|
||||
},
|
||||
None => Err(StorageError {
|
||||
description: "emtpy node value".to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_logical(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
left: &Node,
|
||||
right: &Node,
|
||||
op: Logical,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
let mut left_result = self.evaluate_node(bucket_id, left)?;
|
||||
let right_result = self.evaluate_node(bucket_id, right)?;
|
||||
|
||||
match op {
|
||||
Logical::And => left_result.and_inplace(&right_result),
|
||||
Logical::Or => left_result.or_inplace(&right_result),
|
||||
};
|
||||
|
||||
Ok(left_result)
|
||||
}
|
||||
|
||||
fn evaluate_comparison(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
left: &Node,
|
||||
right: &Node,
|
||||
op: Comparison,
|
||||
) -> Result<Treemap, StorageError> {
|
||||
let left = match &left.value {
|
||||
Some(Value::TagRefValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "expected left operand to be a TagRefValue".to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let right = match &right.value {
|
||||
Some(Value::StringValue(s)) => s,
|
||||
_ => {
|
||||
return Err(StorageError {
|
||||
description: "unable to run comparison against anything other than a string"
|
||||
.to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
match op {
|
||||
Comparison::Equal => self.get_posting_list_for_tag_key_value(bucket_id, &left, &right),
|
||||
comp => Err(StorageError {
|
||||
description: format!("unable to handle comparison {:?}", comp),
|
||||
}),
|
||||
}
|
||||
n,
|
||||
)
|
||||
}
|
||||
|
||||
fn get_posting_list_for_tag_key_value(
|
||||
|
@ -513,7 +466,7 @@ impl RocksDB {
|
|||
// TODO: build the index for levels other than the first
|
||||
// insert_series_without_ids will insert any series into the index and obtain an identifier for it.
|
||||
// the passed in series vector is modified so that the newly inserted series have their ids
|
||||
pub fn insert_series_without_ids(&self, bucket_id: u32, points: &mut Vec<PointType>) {
|
||||
pub fn insert_series_without_ids(&self, bucket_id: u32, points: &mut [PointType]) {
|
||||
// We want to get a lock on new series only for this bucket
|
||||
self.ensure_series_mutex_exists(bucket_id);
|
||||
let map = self.series_insert_lock.read().expect("mutex poisoned");
|
||||
|
@ -638,7 +591,7 @@ impl RocksDB {
|
|||
}
|
||||
|
||||
// do the index writes from the in temporary in memory map
|
||||
for (k, v) in index_map.iter() {
|
||||
for (k, v) in &index_map {
|
||||
let _ = batch.put_cf(index_cf, k, v.serialize().unwrap());
|
||||
}
|
||||
|
||||
|
@ -682,6 +635,7 @@ impl RocksDB {
|
|||
|
||||
let mut id_mutex_map = HashMap::new();
|
||||
let mut bucket_map = self.bucket_map.write().unwrap();
|
||||
let mut bucket_id_map = self.bucket_id_map.write().unwrap();
|
||||
|
||||
for (key, value) in iter {
|
||||
match key[0].try_into().unwrap() {
|
||||
|
@ -700,10 +654,11 @@ impl RocksDB {
|
|||
id_mutex_map.insert(bucket_id, Mutex::new(next_id));
|
||||
}
|
||||
BucketEntryType::Bucket => {
|
||||
let bucket =
|
||||
Bucket::decode(value.into_vec()).expect("unexpected error decoding bucket");
|
||||
let bucket = Bucket::decode(&*value).expect("unexpected error decoding bucket");
|
||||
let key = bucket_key(bucket.org_id, &bucket.name);
|
||||
bucket_map.insert(key, Arc::new(bucket));
|
||||
let arc_bucket = Arc::new(bucket);
|
||||
bucket_map.insert(key, arc_bucket.clone());
|
||||
bucket_id_map.insert(arc_bucket.id, arc_bucket);
|
||||
}
|
||||
BucketEntryType::NextBucketID => (),
|
||||
}
|
||||
|
@ -745,7 +700,7 @@ impl InvertedIndex for RocksDB {
|
|||
fn get_or_create_series_ids_for_points(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
points: &mut Vec<PointType>,
|
||||
points: &mut [PointType],
|
||||
) -> Result<(), StorageError> {
|
||||
self.get_series_ids(bucket_id, points)?;
|
||||
self.insert_series_without_ids(bucket_id, points);
|
||||
|
@ -756,7 +711,7 @@ impl InvertedIndex for RocksDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
|
||||
let filters = self.get_series_filters(bucket_id, predicate)?;
|
||||
Ok(Box::new(filters.into_iter()))
|
||||
}
|
||||
|
@ -765,7 +720,7 @@ impl InvertedIndex for RocksDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
let keys = self.get_tag_keys(bucket_id, predicate);
|
||||
Ok(Box::new(keys.into_iter()))
|
||||
}
|
||||
|
@ -775,7 +730,7 @@ impl InvertedIndex for RocksDB {
|
|||
bucket_id: u32,
|
||||
tag_key: &str,
|
||||
predicate: Option<&Predicate>,
|
||||
) -> Result<Box<dyn Iterator<Item = String>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
|
||||
let values = self.get_tag_values(bucket_id, tag_key, predicate);
|
||||
Ok(Box::new(values.into_iter()))
|
||||
}
|
||||
|
@ -794,9 +749,9 @@ impl SeriesStore for RocksDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError> {
|
||||
self.read_range(bucket_id, series_id, range, batch_size)
|
||||
}
|
||||
|
||||
|
@ -804,9 +759,9 @@ impl SeriesStore for RocksDB {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError> {
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError> {
|
||||
self.read_range(bucket_id, series_id, range, batch_size)
|
||||
}
|
||||
}
|
||||
|
@ -827,6 +782,10 @@ impl ConfigStore for RocksDB {
|
|||
) -> Result<Option<Arc<Bucket>>, StorageError> {
|
||||
self.get_bucket_by_name(org_id, bucket_name)
|
||||
}
|
||||
|
||||
fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError> {
|
||||
self.get_bucket_by_id(bucket_id)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1175,8 +1134,13 @@ mod tests {
|
|||
|
||||
b.id = db.create_bucket_if_not_exists(org_id, &b).unwrap();
|
||||
assert_eq!(b.id, 1);
|
||||
|
||||
let stored_bucket = db.get_bucket_by_name(org_id, &b.name).unwrap().unwrap();
|
||||
assert_eq!(Arc::new(b.clone()), stored_bucket);
|
||||
|
||||
let bucket_by_id = db.get_bucket_by_id(b.id).unwrap().unwrap();
|
||||
assert_eq!(Arc::new(b.clone()), bucket_by_id);
|
||||
|
||||
bucket = stored_bucket;
|
||||
|
||||
// ensure it doesn't insert again
|
||||
|
@ -1271,7 +1235,7 @@ mod tests {
|
|||
db.insert_series_without_ids(b.id, &mut points);
|
||||
assert_eq!(points[0].series_id(), Some(4));
|
||||
|
||||
let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
|
||||
let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4];
|
||||
db.get_series_ids(b.id, &mut points).unwrap();
|
||||
assert_eq!(points[0].series_id(), Some(1));
|
||||
assert_eq!(points[1].series_id(), Some(2));
|
||||
|
@ -1283,7 +1247,7 @@ mod tests {
|
|||
db.insert_series_without_ids(b2.id, &mut points);
|
||||
assert_eq!(points[0].series_id(), Some(2));
|
||||
|
||||
let mut points = vec![p1.clone(), p2.clone(), p3.clone()];
|
||||
let mut points = vec![p1, p2, p3];
|
||||
db.get_series_ids(b2.id, &mut points).unwrap();
|
||||
assert_eq!(points[0].series_id(), Some(1));
|
||||
assert_eq!(points[1].series_id(), Some(2));
|
||||
|
@ -1303,7 +1267,7 @@ mod tests {
|
|||
bucket.id = db
|
||||
.create_bucket_if_not_exists(bucket.org_id, &bucket)
|
||||
.unwrap();
|
||||
let mut points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
|
||||
let mut points = vec![p1, p2, p3, p4];
|
||||
db.get_or_create_series_ids_for_points(bucket.id, &mut points)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1316,7 +1280,7 @@ mod tests {
|
|||
// get all series
|
||||
|
||||
// get series with measurement = mem
|
||||
let pred = parse_predicate("_m = \"cpu\"").unwrap();
|
||||
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = db
|
||||
.read_series_matching(bucket.id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -1346,7 +1310,7 @@ mod tests {
|
|||
);
|
||||
|
||||
// get series with host = a
|
||||
let pred = parse_predicate("host = \"a\"").unwrap();
|
||||
let pred = parse_predicate(r#"host = "a""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = db
|
||||
.read_series_matching(bucket.id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -1370,7 +1334,7 @@ mod tests {
|
|||
);
|
||||
|
||||
// get series with measurement = cpu and host = b
|
||||
let pred = parse_predicate("_m = \"cpu\" and host = \"b\"").unwrap();
|
||||
let pred = parse_predicate(r#"_m = "cpu" and host = "b""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = db
|
||||
.read_series_matching(bucket.id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -1385,7 +1349,7 @@ mod tests {
|
|||
},]
|
||||
);
|
||||
|
||||
let pred = parse_predicate("host = \"a\" OR _m = \"mem\"").unwrap();
|
||||
let pred = parse_predicate(r#"host = "a" OR _m = "mem""#).unwrap();
|
||||
let series: Vec<SeriesFilter> = db
|
||||
.read_series_matching(bucket.id, Some(&pred))
|
||||
.unwrap()
|
||||
|
@ -1424,14 +1388,14 @@ mod tests {
|
|||
|
||||
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
|
||||
|
||||
let mut points = vec![p1.clone()];
|
||||
let mut points = vec![p1];
|
||||
db.get_or_create_series_ids_for_points(b1.id, &mut points)
|
||||
.unwrap();
|
||||
db.write_points(b1.id, &points).unwrap();
|
||||
|
||||
// test that we'll only read from the bucket we wrote points into
|
||||
let range = Range { start: 1, stop: 4 };
|
||||
let pred = parse_predicate("_m = \"cpu\"").unwrap();
|
||||
let range = TimestampRange { start: 1, end: 4 };
|
||||
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap();
|
||||
|
||||
let series_filter = iter.next().unwrap();
|
||||
|
@ -1472,14 +1436,14 @@ mod tests {
|
|||
.unwrap();
|
||||
db.write_points(b1.id, &b1_points).unwrap();
|
||||
|
||||
let mut b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
|
||||
let mut b2_points = vec![p1, p2, p3, p4];
|
||||
db.get_or_create_series_ids_for_points(b2.id, &mut b2_points)
|
||||
.unwrap();
|
||||
db.write_points(b2.id, &b2_points).unwrap();
|
||||
|
||||
// test that we'll only read from the bucket we wrote points into
|
||||
let range = Range { start: 1, stop: 4 };
|
||||
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
|
||||
let range = TimestampRange { start: 1, end: 4 };
|
||||
let pred = parse_predicate(r#"_m = "cpu" OR _m = "mem""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap();
|
||||
let series_filter = iter.next().unwrap();
|
||||
assert_eq!(
|
||||
|
@ -1506,7 +1470,7 @@ mod tests {
|
|||
assert_eq!(points_iter.next(), None);
|
||||
|
||||
// test that we'll read multiple series
|
||||
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
|
||||
let pred = parse_predicate(r#"_m = "cpu" OR _m = "mem""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b2.id, Some(&pred)).unwrap();
|
||||
let series_filter = iter.next().unwrap();
|
||||
assert_eq!(
|
||||
|
@ -1553,7 +1517,7 @@ mod tests {
|
|||
);
|
||||
|
||||
// test that the batch size is honored
|
||||
let pred = parse_predicate("host = \"b\"").unwrap();
|
||||
let pred = parse_predicate(r#"host = "b""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap();
|
||||
let series_filter = iter.next().unwrap();
|
||||
assert_eq!(
|
||||
|
@ -1575,8 +1539,8 @@ mod tests {
|
|||
assert_eq!(points, vec![ReadPoint { time: 2, value: 1 },]);
|
||||
|
||||
// test that the time range is properly limiting
|
||||
let range = Range { start: 2, stop: 3 };
|
||||
let pred = parse_predicate("_m = \"cpu\" OR _m = \"mem\"").unwrap();
|
||||
let range = TimestampRange { start: 2, end: 3 };
|
||||
let pred = parse_predicate(r#"_m = "cpu" OR _m = "mem""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b2.id, Some(&pred)).unwrap();
|
||||
let series_filter = iter.next().unwrap();
|
||||
assert_eq!(
|
||||
|
@ -1621,14 +1585,14 @@ mod tests {
|
|||
|
||||
b1.id = db.create_bucket_if_not_exists(b1.org_id, &b1).unwrap();
|
||||
|
||||
let mut points = vec![p1.clone(), p2.clone()];
|
||||
let mut points = vec![p1, p2];
|
||||
db.get_or_create_series_ids_for_points(b1.id, &mut points)
|
||||
.unwrap();
|
||||
db.write_points_with_series_ids(b1.id, &points).unwrap();
|
||||
|
||||
// test that we'll only read from the bucket we wrote points into
|
||||
let range = Range { start: 0, stop: 4 };
|
||||
let pred = parse_predicate("_m = \"cpu\"").unwrap();
|
||||
let range = TimestampRange { start: 0, end: 4 };
|
||||
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
|
||||
let mut iter = db.read_series_matching(b1.id, Some(&pred)).unwrap();
|
||||
let series_filter = iter.next().unwrap();
|
||||
assert_eq!(
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::delorean::TimestampRange;
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::{Range, StorageError};
|
||||
use crate::storage::StorageError;
|
||||
|
||||
pub trait SeriesStore: Sync + Send {
|
||||
fn write_points_with_series_ids(
|
||||
|
@ -12,17 +13,17 @@ pub trait SeriesStore: Sync + Send {
|
|||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>>>, StorageError>;
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError>;
|
||||
|
||||
fn read_f64_range(
|
||||
&self,
|
||||
bucket_id: u32,
|
||||
series_id: u64,
|
||||
range: &Range,
|
||||
range: &TimestampRange,
|
||||
batch_size: usize,
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>>>, StorageError>;
|
||||
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
|
@ -34,9 +35,9 @@ pub struct ReadPoint<T: Clone> {
|
|||
// Test helpers for other implementations to run
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use crate::delorean::TimestampRange;
|
||||
use crate::line_parser::PointType;
|
||||
use crate::storage::series_store::{ReadPoint, SeriesStore};
|
||||
use crate::storage::Range;
|
||||
|
||||
pub fn write_and_read_i64(store: Box<dyn SeriesStore>) {
|
||||
let b1_id = 1;
|
||||
|
@ -55,13 +56,13 @@ pub mod tests {
|
|||
.write_points_with_series_ids(b1_id, &b1_points)
|
||||
.unwrap();
|
||||
|
||||
let b2_points = vec![p1.clone(), p2.clone(), p3.clone(), p4.clone()];
|
||||
let b2_points = vec![p1.clone(), p2, p3.clone(), p4];
|
||||
store
|
||||
.write_points_with_series_ids(b2_id, &b2_points)
|
||||
.unwrap();
|
||||
|
||||
// test that we'll only read from the bucket we wrote points into
|
||||
let range = Range { start: 1, stop: 4 };
|
||||
let range = TimestampRange { start: 1, end: 4 };
|
||||
let mut points_iter = store
|
||||
.read_i64_range(b1_id, p1.series_id().unwrap(), &range, 10)
|
||||
.unwrap();
|
||||
|
@ -111,7 +112,7 @@ pub mod tests {
|
|||
assert_eq!(points_iter.next(), None);
|
||||
|
||||
// test that the time range is properly limiting
|
||||
let range = Range { start: 2, stop: 3 };
|
||||
let range = TimestampRange { start: 2, end: 3 };
|
||||
let mut points_iter = store
|
||||
.read_i64_range(b2_id, p1.series_id().unwrap(), &range, 10)
|
||||
.unwrap();
|
||||
|
@ -132,13 +133,13 @@ pub mod tests {
|
|||
let mut p2 = PointType::new_f64("cpu,host=b,region=west\tusage_system".to_string(), 2.2, 2);
|
||||
p2.set_series_id(1);
|
||||
|
||||
let points = vec![p1.clone(), p2.clone()];
|
||||
let points = vec![p1.clone(), p2];
|
||||
store
|
||||
.write_points_with_series_ids(bucket_id, &points)
|
||||
.unwrap();
|
||||
|
||||
// test that we'll only read from the bucket we wrote points into
|
||||
let range = Range { start: 0, stop: 4 };
|
||||
let range = TimestampRange { start: 0, end: 4 };
|
||||
let mut points_iter = store
|
||||
.read_f64_range(bucket_id, p1.series_id().unwrap(), &range, 10)
|
||||
.unwrap();
|
||||
|
|
|
@ -16,18 +16,55 @@
|
|||
// - Stopping the server after all relevant tests are run
|
||||
|
||||
use assert_cmd::prelude::*;
|
||||
|
||||
use futures::prelude::*;
|
||||
use prost::Message;
|
||||
use std::convert::TryInto;
|
||||
use std::env;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str;
|
||||
use std::thread::sleep;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::u32;
|
||||
|
||||
const URL_BASE: &str = "http://localhost:8080/api/v2";
|
||||
const GRPC_URL_BASE: &str = "http://localhost:8081/";
|
||||
|
||||
fn read_data(
|
||||
client: &reqwest::blocking::Client,
|
||||
mod grpc {
|
||||
tonic::include_proto!("delorean");
|
||||
}
|
||||
|
||||
use grpc::delorean_client::DeloreanClient;
|
||||
use grpc::storage_client::StorageClient;
|
||||
use grpc::Organization;
|
||||
use grpc::ReadSource;
|
||||
use grpc::{
|
||||
node::{Comparison, Value},
|
||||
read_response::{frame::Data, DataType},
|
||||
Node, Predicate, ReadFilterRequest, Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
};
|
||||
|
||||
macro_rules! assert_unwrap {
|
||||
($e:expr, $p:path) => {
|
||||
match $e {
|
||||
$p(v) => v,
|
||||
_ => panic!("{} was not a {}", stringify!($e), stringify!($p)),
|
||||
}
|
||||
};
|
||||
($e:expr, $p:path, $extra:tt) => {
|
||||
match $e {
|
||||
$p(v) => v,
|
||||
_ => {
|
||||
let extra = format_args!($extra);
|
||||
panic!("{} was not a {}: {}", stringify!($e), stringify!($p), extra);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async fn read_data(
|
||||
client: &reqwest::Client,
|
||||
path: &str,
|
||||
org_id: &str,
|
||||
org_id: u32,
|
||||
bucket_name: &str,
|
||||
predicate: &str,
|
||||
seconds_ago: u64,
|
||||
|
@ -36,29 +73,35 @@ fn read_data(
|
|||
Ok(client
|
||||
.get(&url)
|
||||
.query(&[
|
||||
("org_id", org_id),
|
||||
("bucket_name", bucket_name),
|
||||
("org_id", &org_id.to_string()),
|
||||
("predicate", predicate),
|
||||
("start", &format!("-{}s", seconds_ago)),
|
||||
])
|
||||
.send()?
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?
|
||||
.text()?)
|
||||
.text()
|
||||
.await?)
|
||||
}
|
||||
|
||||
fn write_data(
|
||||
client: &reqwest::blocking::Client,
|
||||
async fn write_data(
|
||||
client: &reqwest::Client,
|
||||
path: &str,
|
||||
org_id: &str,
|
||||
org_id: u32,
|
||||
bucket_name: &str,
|
||||
body: String,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let url = format!("{}{}", URL_BASE, path);
|
||||
client
|
||||
.post(&url)
|
||||
.query(&[("org_id", org_id), ("bucket_name", bucket_name)])
|
||||
.query(&[
|
||||
("bucket_name", bucket_name),
|
||||
("org_id", &org_id.to_string()),
|
||||
])
|
||||
.body(body)
|
||||
.send()?
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -75,8 +118,8 @@ fn get_test_storage_path() -> String {
|
|||
.expect("Should have been able to turn temp dir into String")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_and_write_data() -> Result<(), Box<dyn std::error::Error>> {
|
||||
#[tokio::test]
|
||||
async fn read_and_write_data() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut server_thread = Command::cargo_bin("delorean")?
|
||||
.stdout(Stdio::null())
|
||||
.env("DELOREAN_DB_DIR", get_test_storage_path())
|
||||
|
@ -85,15 +128,31 @@ fn read_and_write_data() -> Result<(), Box<dyn std::error::Error>> {
|
|||
// TODO: poll the server to see if it's ready instead of sleeping
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
let org_id = "7878";
|
||||
let org_id = 7878;
|
||||
let bucket_name = "all";
|
||||
let client = reqwest::blocking::Client::new();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut grpc_client = DeloreanClient::connect(GRPC_URL_BASE).await?;
|
||||
|
||||
let get_buckets_request = tonic::Request::new(Organization {
|
||||
id: org_id,
|
||||
name: "test".into(),
|
||||
buckets: vec![],
|
||||
});
|
||||
let get_buckets_response = grpc_client.get_buckets(get_buckets_request).await?;
|
||||
let get_buckets_response = get_buckets_response.into_inner();
|
||||
let org_buckets = get_buckets_response.buckets;
|
||||
|
||||
// This checks that gRPC is functioning and that we're starting from an org without buckets.
|
||||
assert!(org_buckets.is_empty());
|
||||
|
||||
let start_time = SystemTime::now();
|
||||
let ns_since_epoch = start_time
|
||||
let ns_since_epoch: i64 = start_time
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("System time should have been after the epoch")
|
||||
.as_nanos();
|
||||
.as_nanos()
|
||||
.try_into()
|
||||
.expect("Unable to represent system time");
|
||||
|
||||
// TODO: make a more extensible way to manage data for tests, such as in external fixture
|
||||
// files or with factories.
|
||||
|
@ -113,7 +172,8 @@ cpu_load_short,host=server01,region=us-west value=0.000003 {}",
|
|||
ns_since_epoch + 2,
|
||||
ns_since_epoch + 3
|
||||
),
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
let end_time = SystemTime::now();
|
||||
let duration = end_time
|
||||
|
@ -126,9 +186,10 @@ cpu_load_short,host=server01,region=us-west value=0.000003 {}",
|
|||
"/read",
|
||||
org_id,
|
||||
bucket_name,
|
||||
"host=\"server01\"",
|
||||
r#"host="server01""#,
|
||||
seconds_ago,
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO: make a more sustainable way to manage expected data for tests, such as using the
|
||||
// insta crate to manage snapshots.
|
||||
|
@ -150,9 +211,143 @@ cpu_load_short,server01,us-east,value,{},1234567.891011
|
|||
)
|
||||
);
|
||||
|
||||
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await?;
|
||||
|
||||
let org_id = u64::from(u32::MAX);
|
||||
let bucket_id = 1; // TODO: how do we know this?
|
||||
let partition_id = u64::from(u32::MAX);
|
||||
let read_source = ReadSource {
|
||||
org_id,
|
||||
bucket_id,
|
||||
partition_id,
|
||||
};
|
||||
let mut d = Vec::new();
|
||||
read_source.encode(&mut d)?;
|
||||
let read_source = prost_types::Any {
|
||||
type_url: "/TODO".to_string(),
|
||||
value: d,
|
||||
};
|
||||
let read_source = Some(read_source);
|
||||
|
||||
let range = TimestampRange {
|
||||
start: ns_since_epoch,
|
||||
end: ns_since_epoch + 3,
|
||||
};
|
||||
let range = Some(range);
|
||||
|
||||
let l = Value::TagRefValue("host".into());
|
||||
let l = Node {
|
||||
children: vec![],
|
||||
value: Some(l),
|
||||
};
|
||||
|
||||
let r = Value::StringValue("server01".into());
|
||||
let r = Node {
|
||||
children: vec![],
|
||||
value: Some(r),
|
||||
};
|
||||
|
||||
let comp = Value::Comparison(Comparison::Equal as _);
|
||||
let comp = Some(comp);
|
||||
let root = Node {
|
||||
children: vec![l, r],
|
||||
value: comp,
|
||||
};
|
||||
let root = Some(root);
|
||||
let predicate = Predicate { root };
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let read_filter_request = tonic::Request::new(ReadFilterRequest {
|
||||
read_source: read_source.clone(),
|
||||
range: range.clone(),
|
||||
predicate: predicate.clone(),
|
||||
});
|
||||
let read_response = storage_client.read_filter(read_filter_request).await?;
|
||||
|
||||
let responses: Vec<_> = read_response.into_inner().try_collect().await?;
|
||||
let frames: Vec<_> = responses
|
||||
.into_iter()
|
||||
.flat_map(|r| r.frames)
|
||||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
|
||||
assert_eq!(
|
||||
frames.len(),
|
||||
5,
|
||||
"expected exactly 5 frames, but there were {}",
|
||||
frames.len()
|
||||
);
|
||||
|
||||
let f = assert_unwrap!(&frames[0], Data::Series, "in frame 0");
|
||||
assert_eq!(f.data_type, DataType::Float as i32, "in frame 0");
|
||||
assert_eq!(
|
||||
tags_as_strings(&f.tags),
|
||||
vec![("host", "server01"), ("region", "us-west")]
|
||||
);
|
||||
|
||||
let f = assert_unwrap!(&frames[1], Data::FloatPoints, "in frame 1");
|
||||
assert_eq!(f.timestamps, [ns_since_epoch], "in frame 1");
|
||||
assert_eq!(f.values, [0.64], "in frame 1");
|
||||
|
||||
let f = assert_unwrap!(&frames[2], Data::FloatPoints, "in frame 2");
|
||||
assert_eq!(f.timestamps, [ns_since_epoch + 3], "in frame 2");
|
||||
assert_eq!(f.values, [0.000_003], "in frame 2");
|
||||
|
||||
let f = assert_unwrap!(&frames[3], Data::Series, "in frame 3");
|
||||
assert_eq!(f.data_type, DataType::Float as i32, "in frame 3");
|
||||
|
||||
assert_eq!(
|
||||
tags_as_strings(&f.tags),
|
||||
vec![("host", "server01"), ("region", "us-east")]
|
||||
);
|
||||
|
||||
let f = assert_unwrap!(&frames[4], Data::FloatPoints, "in frame 4");
|
||||
assert_eq!(f.timestamps, [ns_since_epoch + 2], "in frame 4");
|
||||
assert_eq!(f.values, [1_234_567.891_011], "in frame 4");
|
||||
|
||||
let tag_keys_request = tonic::Request::new(TagKeysRequest {
|
||||
tags_source: read_source.clone(),
|
||||
range: range.clone(),
|
||||
predicate: predicate.clone(),
|
||||
});
|
||||
|
||||
let tag_keys_response = storage_client.tag_keys(tag_keys_request).await?;
|
||||
let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await?;
|
||||
|
||||
let keys = &responses[0].values;
|
||||
let keys: Vec<_> = keys.iter().map(|s| str::from_utf8(s).unwrap()).collect();
|
||||
|
||||
assert_eq!(keys, vec!["_f", "_m", "host", "region"]);
|
||||
|
||||
let tag_values_request = tonic::Request::new(TagValuesRequest {
|
||||
tags_source: read_source,
|
||||
range,
|
||||
predicate,
|
||||
tag_key: String::from("host"),
|
||||
});
|
||||
|
||||
let tag_values_response = storage_client.tag_values(tag_values_request).await?;
|
||||
let responses: Vec<_> = tag_values_response.into_inner().try_collect().await?;
|
||||
|
||||
let values = &responses[0].values;
|
||||
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
|
||||
|
||||
assert_eq!(values, vec!["server01", "server02"]);
|
||||
|
||||
server_thread
|
||||
.kill()
|
||||
.expect("Should have been able to kill the test server");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tags_as_strings(tags: &[Tag]) -> Vec<(&str, &str)> {
|
||||
tags.iter()
|
||||
.map(|t| {
|
||||
(
|
||||
str::from_utf8(&t.key).unwrap(),
|
||||
str::from_utf8(&t.value).unwrap(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue