Merge pull request #56 from influxdata/pd-partiton-store

feat: Add Partition Store
pull/24376/head
Paul Dix 2020-04-03 13:27:17 -04:00 committed by GitHub
commit 779633fd77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1386 additions and 2433 deletions

59
Cargo.lock generated
View File

@ -136,29 +136,6 @@ dependencies = [
"which 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bindgen"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"cexpr 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
"clang-sys 0.28.1 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hash 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"shlex 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"which 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bitflags"
version = "1.2.1"
@ -220,9 +197,6 @@ dependencies = [
name = "cc"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"jobserver 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "cexpr"
@ -421,7 +395,6 @@ dependencies = [
"prost-types 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -790,14 +763,6 @@ name = "itoa"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "jobserver"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "js-sys"
version = "0.3.35"
@ -852,17 +817,6 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "librocksdb-sys"
version = "6.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bindgen 0.53.1 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "log"
version = "0.4.8"
@ -1403,15 +1357,6 @@ dependencies = [
"winreg 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rocksdb"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)",
"librocksdb-sys 6.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rust-argon2"
version = "0.7.0"
@ -2249,7 +2194,6 @@ dependencies = [
"checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
"checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
"checksum bindgen 0.52.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f1c85344eb535a31b62f0af37be84441ba9e7f0f4111eb0530f43d15e513fe57"
"checksum bindgen 0.53.1 (registry+https://github.com/rust-lang/crates.io-index)" = "99de13bb6361e01e493b3db7928085dcc474b7ba4f5481818e53a89d76b8393f"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum blake2b_simd 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
"checksum bstr 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "502ae1441a0a5adb8fbd38a5955a6416b9493e92b465de5e4a9bde6a539c2c48"
@ -2318,7 +2262,6 @@ dependencies = [
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
"checksum jobserver 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2"
"checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
@ -2326,7 +2269,6 @@ dependencies = [
"checksum lexical-core 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f86d66d380c9c5a685aaac7a11818bdfa1f733198dfd9ec09c70b762cd12ad6f"
"checksum libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)" = "eb147597cdf94ed43ab7a9038716637d2d1bf2bc571da995d0028dec06bd3018"
"checksum libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
"checksum librocksdb-sys 6.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4e3b727e2dd20ec2fb7ed93f23d9fd5328a0871185485ebdaff007b47d3e27e4"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
@ -2388,7 +2330,6 @@ dependencies = [
"checksum regex-syntax 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "b28dfe3fe9badec5dbf0a79a9cccad2cfc2ab5484bdb3e44cbd1ae8b3ba2be06"
"checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e"
"checksum reqwest 0.10.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a9f62f24514117d09a8fc74b803d3d65faa27cea1c7378fb12b0d002913f3831"
"checksum rocksdb 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12069b106981c6103d3eab7dd1c86751482d0779a520b7c14954c8b586c1e643"
"checksum rust-argon2 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017"
"checksum rustc-hash 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"

View File

@ -22,7 +22,6 @@ futures = "0.3.1"
serde_json = "1.0.44"
serde = "1.0"
csv = "1.1"
rocksdb = "0.13"
byteorder = "1.3.4"
num_cpus = "1.11.1"

View File

@ -3,15 +3,14 @@
#[macro_use]
extern crate log;
use delorean::delorean::Bucket;
use delorean::delorean::{
delorean_server::DeloreanServer, storage_server::StorageServer, TimestampRange,
};
use delorean::line_parser;
use delorean::line_parser::index_pairs;
use delorean::storage::database::Database;
use delorean::storage::partitioned_store::ReadValues;
use delorean::storage::predicate::parse_predicate;
use delorean::storage::SeriesDataType;
use delorean::time::{parse_duration, time_as_i64_nanos};
use std::env::VarError;
@ -48,33 +47,20 @@ async fn write(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, Applica
let write_info: WriteInfo =
serde_urlencoded::from_str(query).map_err(|_| StatusCode::BAD_REQUEST)?;
let maybe_bucket = app
let bucket_id = app
.db
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let bucket = match maybe_bucket {
Some(b) => b,
None => {
// create this as the default bucket
let b = Bucket {
org_id: write_info.org_id,
id: 0,
name: write_info.bucket_name.clone(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
};
app.db
.create_bucket_if_not_exists(write_info.org_id, &b)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
app.db
.get_bucket_by_name(write_info.org_id, &write_info.bucket_name)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.expect("Bucket should have just been created")
}
};
.get_bucket_id_by_name(write_info.org_id, &write_info.bucket_name)
.await
.map_err(|e| {
debug!("Error getting bucket id: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| {
ApplicationError::new(
StatusCode::NOT_FOUND,
&format!("bucket {} not found", write_info.bucket_name),
)
})?;
let mut payload = req.into_body();
@ -96,8 +82,12 @@ async fn write(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, Applica
let mut points = line_parser::parse(body).expect("TODO: Unable to parse lines");
app.db
.write_points(write_info.org_id, &bucket, &mut points)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
.write_points(write_info.org_id, bucket_id, &mut points)
.await
.map_err(|e| {
debug!("Error writing points: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(serde_json::json!(()).to_string().into())
}
@ -152,34 +142,30 @@ async fn read(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, Applicat
let range = TimestampRange { start, end };
let maybe_bucket = app
let bucket_id = app
.db
.get_bucket_by_name(read_info.org_id, &read_info.bucket_name)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let bucket = match maybe_bucket {
Some(b) => b,
None => {
return Err(ApplicationError::new(
.get_bucket_id_by_name(read_info.org_id, &read_info.bucket_name)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or_else(|| {
ApplicationError::new(
StatusCode::NOT_FOUND,
&format!("bucket {} not found", read_info.bucket_name),
));
}
};
)
})?;
let series = app
let batches = app
.db
.read_series_matching_predicate_and_range(&bucket, Some(&predicate), Some(&range))
.read_points(read_info.org_id, bucket_id, &predicate, &range)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let db = &app.db;
let mut response_body = vec![];
for s in series {
for batch in batches {
let mut wtr = Writer::from_writer(vec![]);
let pairs = index_pairs(&s.key).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let pairs = index_pairs(&batch.key).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut cols = Vec::with_capacity(pairs.len() + 2);
let mut vals = Vec::with_capacity(pairs.len() + 2);
@ -200,40 +186,26 @@ async fn read(req: hyper::Request<Body>, app: Arc<App>) -> Result<Body, Applicat
wtr.write_record(&cols).unwrap();
match s.series_type {
SeriesDataType::I64 => {
let points = db
.read_i64_range(&bucket, &s, &range, 10)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
for batch in points {
for p in batch {
let t = p.time.to_string();
let v = p.value.to_string();
match batch.values {
ReadValues::I64(values) => {
for val in values {
let t = val.time.to_string();
let v = val.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
ReadValues::F64(values) => {
for val in values {
let t = val.time.to_string();
let v = val.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
}
SeriesDataType::F64 => {
let points = db
.read_f64_range(&bucket, &s, &range, 10)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
for batch in points {
for p in batch {
let t = p.time.to_string();
let v = p.value.to_string();
vals[vcol] = v;
vals[tcol] = t;
wtr.write_record(&vals).unwrap();
}
}
}
};
let mut data = wtr
.into_inner()

View File

@ -1,4 +1,3 @@
use delorean::delorean::Bucket;
use delorean::delorean::{
delorean_server::Delorean,
read_response::{
@ -7,11 +6,11 @@ use delorean::delorean::{
storage_server::Storage,
CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest,
DeleteBucketResponse, GetBucketsResponse, Organization, Predicate, ReadFilterRequest,
ReadGroupRequest, ReadResponse, ReadSource, StringValuesResponse, TagKeysRequest,
ReadGroupRequest, ReadResponse, ReadSource, StringValuesResponse, Tag, TagKeysRequest,
TagValuesRequest, TimestampRange,
};
use delorean::storage::database::Database;
use delorean::storage::SeriesDataType;
use delorean::line_parser::index_pairs;
use delorean::storage::partitioned_store::ReadValues;
use std::convert::TryInto;
use std::sync::Arc;
@ -29,8 +28,21 @@ pub struct GrpcServer {
impl Delorean for GrpcServer {
async fn create_bucket(
&self,
_req: tonic::Request<CreateBucketRequest>,
req: tonic::Request<CreateBucketRequest>,
) -> Result<tonic::Response<CreateBucketResponse>, Status> {
let create_bucket_request = req.into_inner();
let org_id = create_bucket_request.org_id;
let bucket = create_bucket_request
.bucket
.ok_or_else(|| Status::invalid_argument("missing bucket argument"))?;
self.app
.db
.create_bucket_if_not_exists(org_id, bucket)
.await
.map_err(|err| Status::internal(format!("error creating bucket: {}", err)))?;
Ok(tonic::Response::new(CreateBucketResponse {}))
}
@ -43,9 +55,20 @@ impl Delorean for GrpcServer {
async fn get_buckets(
&self,
_req: tonic::Request<Organization>,
req: tonic::Request<Organization>,
) -> Result<tonic::Response<GetBucketsResponse>, Status> {
Ok(tonic::Response::new(GetBucketsResponse { buckets: vec![] }))
let org = req.into_inner();
let org_id = org.id;
let buckets = self
.app
.db
.buckets(org_id)
.await
.map_err(|err| Status::internal(format!("error reading db: {}", err)))?;
Ok(tonic::Response::new(GetBucketsResponse { buckets }))
}
}
@ -77,19 +100,12 @@ trait GrpcInputs {
.map_err(|_| Status::invalid_argument("org_id did not fit in a u32"))?)
}
fn bucket(&self, db: &Database) -> Result<Arc<Bucket>, Status> {
let bucket_id = self
fn bucket_id(&self) -> Result<u32, Status> {
Ok(self
.read_source()?
.bucket_id
.try_into()
.map_err(|_| Status::invalid_argument("bucket_id did not fit in a u32"))?;
let maybe_bucket = db
.get_bucket_by_id(bucket_id)
.map_err(|_| Status::internal("could not query for bucket"))?;
Ok(maybe_bucket
.ok_or_else(|| Status::not_found(&format!("bucket {} not found", bucket_id)))?)
.map_err(|_| Status::invalid_argument("bucket_id did not fit in a u32"))?)
}
}
@ -129,8 +145,8 @@ impl Storage for GrpcServer {
let read_filter_request = req.into_inner();
let _org_id = read_filter_request.org_id()?;
let bucket = read_filter_request.bucket(&self.app.db)?;
let org_id = read_filter_request.org_id()?;
let bucket_id = read_filter_request.bucket_id()?;
let predicate = read_filter_request.predicate;
let range = read_filter_request.range;
@ -138,13 +154,15 @@ impl Storage for GrpcServer {
// TODO: is this blocking because of the blocking calls to the database...?
tokio::spawn(async move {
let predicate = predicate.as_ref();
let predicate = predicate.as_ref().expect("TODO: must have a predicate");
// TODO: The call to read_series_matching_predicate_and_range takes an optional range,
// but read_f64_range requires a range-- should this route require a range or use a
// default or something else?
let range = range.as_ref().expect("TODO: Must have a range?");
if let Err(e) = send_series_filters(tx.clone(), app, &bucket, predicate, &range).await {
if let Err(e) =
send_series_filters(tx.clone(), app, org_id, bucket_id, predicate, &range).await
{
tx.send(Err(e)).await.unwrap();
}
});
@ -169,24 +187,28 @@ impl Storage for GrpcServer {
) -> Result<tonic::Response<Self::TagKeysStream>, Status> {
let (mut tx, rx) = mpsc::channel(4);
let tag_keys_request = req.get_ref();
let tag_keys_request = req.into_inner();
let _org_id = tag_keys_request.org_id()?;
let bucket = tag_keys_request.bucket(&self.app.db)?;
let predicate = tag_keys_request.predicate.clone();
let _range = tag_keys_request.range.as_ref();
let org_id = tag_keys_request.org_id()?;
let bucket_id = tag_keys_request.bucket_id()?;
let predicate = tag_keys_request.predicate;
let range = tag_keys_request.range;
let app = self.app.clone();
tokio::spawn(async move {
match app.db.get_tag_keys(&bucket, predicate.as_ref()) {
match app
.db
.get_tag_keys(org_id, bucket_id, predicate.as_ref(), range.as_ref())
.await
{
Err(_) => tx
.send(Err(Status::internal("could not query for tag keys")))
.await
.unwrap(),
Ok(tag_keys_iter) => {
Ok(tag_keys) => {
// TODO: Should these be batched? If so, how?
let tag_keys: Vec<_> = tag_keys_iter.map(|s| s.into_bytes()).collect();
let tag_keys: Vec<_> = tag_keys.into_iter().map(|s| s.into_bytes()).collect();
tx.send(Ok(StringValuesResponse { values: tag_keys }))
.await
.unwrap();
@ -205,26 +227,37 @@ impl Storage for GrpcServer {
) -> Result<tonic::Response<Self::TagValuesStream>, Status> {
let (mut tx, rx) = mpsc::channel(4);
let tag_values_request = req.get_ref();
let tag_values_request = req.into_inner();
let _org_id = tag_values_request.org_id()?;
let bucket = tag_values_request.bucket(&self.app.db)?;
let predicate = tag_values_request.predicate.clone();
let _range = tag_values_request.range.as_ref();
let org_id = tag_values_request.org_id()?;
let bucket_id = tag_values_request.bucket_id()?;
let predicate = tag_values_request.predicate;
let range = tag_values_request.range;
let tag_key = tag_values_request.tag_key.clone();
let app = self.app.clone();
tokio::spawn(async move {
match app.db.get_tag_values(&bucket, &tag_key, predicate.as_ref()) {
match app
.db
.get_tag_values(
org_id,
bucket_id,
&tag_key,
predicate.as_ref(),
range.as_ref(),
)
.await
{
Err(_) => tx
.send(Err(Status::internal("could not query for tag values")))
.await
.unwrap(),
Ok(tag_values_iter) => {
Ok(tag_values) => {
// TODO: Should these be batched? If so, how?
let tag_values: Vec<_> = tag_values_iter.map(|s| s.into_bytes()).collect();
let tag_values: Vec<_> =
tag_values.into_iter().map(|s| s.into_bytes()).collect();
tx.send(Ok(StringValuesResponse { values: tag_values }))
.await
.unwrap();
@ -246,20 +279,37 @@ impl Storage for GrpcServer {
async fn send_series_filters(
mut tx: mpsc::Sender<Result<ReadResponse, Status>>,
app: Arc<App>,
bucket: &Bucket,
predicate: Option<&Predicate>,
org_id: u32,
bucket_id: u32,
predicate: &Predicate,
range: &TimestampRange,
) -> Result<(), Status> {
let filter_iter = app
let batches = app
.db
.read_series_matching_predicate_and_range(&bucket, predicate, Some(range))
.map_err(|e| Status::internal(format!("could not query for filters: {}", e)))?;
.read_points(org_id, bucket_id, predicate, range)
.await
.map_err(|err| Status::internal(format!("error reading db: {}", err)))?;
for series_filter in filter_iter {
let tags = series_filter.tags();
let data_type = match series_filter.series_type {
SeriesDataType::F64 => DataType::Float,
SeriesDataType::I64 => DataType::Integer,
let mut last_frame_key = String::new();
for batch in batches {
// only send the series frame header if we haven't sent it for this key. We have to do
// this because a single series key can be spread out over multiple ReadBatches, which
// should each be sent as their own data frames
if last_frame_key != batch.key {
last_frame_key = batch.key.clone();
let tags = index_pairs(&batch.key)
.map_err(|err| Status::invalid_argument(err.to_string()))?
.into_iter()
.map(|p| Tag {
key: p.key.bytes().collect(),
value: p.value.bytes().collect(),
})
.collect();
let data_type = match batch.values {
ReadValues::F64(_) => DataType::Float,
ReadValues::I64(_) => DataType::Integer,
} as _;
let series = SeriesFrame { data_type, tags };
let data = Data::Series(series);
@ -269,53 +319,25 @@ async fn send_series_filters(
let series_frame_response_header = Ok(ReadResponse { frames });
tx.send(series_frame_response_header).await.unwrap();
}
// TODO: Should this match https://github.com/influxdata/influxdb/blob/d96f3dc5abb6bb187374caa9e7c7a876b4799bd2/storage/reads/response_writer.go#L21 ?
const BATCH_SIZE: usize = 1;
match series_filter.series_type {
SeriesDataType::F64 => {
let iter = app
.db
.read_f64_range(&bucket, &series_filter, &range, BATCH_SIZE)
.map_err(|e| {
Status::internal(format!("could not query for SeriesFilter data: {}", e))
})?;
let frames = iter
.map(|batch| {
// TODO: Performance hazard; splitting this vector is non-ideal
let (timestamps, values) =
batch.into_iter().map(|p| (p.time, p.value)).unzip();
match batch.values {
ReadValues::F64(values) => {
let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip();
let frame = FloatPointsFrame { timestamps, values };
let data = Data::FloatPoints(frame);
let data = Some(data);
Frame { data }
})
.collect();
let frames = vec![Frame { data }];
let data_frame_response = Ok(ReadResponse { frames });
tx.send(data_frame_response).await.unwrap();
}
SeriesDataType::I64 => {
let iter = app
.db
.read_i64_range(&bucket, &series_filter, &range, BATCH_SIZE)
.map_err(|e| {
Status::internal(format!("could not query for SeriesFilter data: {}", e))
})?;
let frames = iter
.map(|batch| {
// TODO: Performance hazard; splitting this vector is non-ideal
let (timestamps, values) =
batch.into_iter().map(|p| (p.time, p.value)).unzip();
ReadValues::I64(values) => {
let (timestamps, values) = values.into_iter().map(|p| (p.time, p.value)).unzip();
let frame = IntegerPointsFrame { timestamps, values };
let data = Data::IntegerPoints(frame);
let data = Some(data);
Frame { data }
})
.collect();
let frames = vec![Frame { data }];
let data_frame_response = Ok(ReadResponse { frames });
tx.send(data_frame_response).await.unwrap();

View File

@ -6,8 +6,10 @@ pub mod config_store;
pub mod database;
pub mod inverted_index;
pub mod memdb;
pub mod partitioned_store;
pub mod predicate;
pub mod rocksdb;
pub mod remote_partition;
pub mod s3_partition;
pub mod series_store;
// The values for these enum variants have no real meaning, but they

View File

@ -1,110 +1,374 @@
use crate::delorean::{Bucket, Predicate, TimestampRange};
use crate::line_parser::PointType;
use crate::storage::config_store::ConfigStore;
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::rocksdb::RocksDB;
use crate::storage::series_store::{ReadPoint, SeriesStore};
use crate::storage::memdb::MemDB;
use crate::storage::partitioned_store::{Partition, ReadBatch};
use crate::storage::StorageError;
use futures::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct Database {
local_index: Arc<dyn InvertedIndex>,
local_series_store: Arc<dyn SeriesStore>,
local_config_store: Arc<dyn ConfigStore>,
organizations: RwLock<HashMap<u32, RwLock<Organization>>>,
}
#[derive(Default)]
struct Organization {
bucket_data: HashMap<u32, Arc<BucketData>>,
bucket_name_to_id: HashMap<String, u32>,
}
impl Organization {
// create_bucket_if_not_exists inserts the bucket into the map and returns its id
fn create_bucket_if_not_exists(&mut self, mut bucket: Bucket) -> u32 {
match self.bucket_name_to_id.get(&bucket.name) {
Some(id) => *id,
None => {
let id = (self.bucket_data.len() + 1) as u32;
bucket.id = id;
self.bucket_name_to_id.insert(bucket.name.clone(), id);
self.bucket_data
.insert(id, Arc::new(BucketData::new(bucket)));
id
}
}
}
}
struct BucketData {
config: Bucket,
// TODO: wire up rules for partitioning data and storing and reading from multiple partitions
partition: RwLock<Partition>,
}
impl BucketData {
const BATCH_SIZE: usize = 100_000;
fn new(bucket: Bucket) -> BucketData {
let partition_id = bucket.name.clone();
let partition = Partition::MemDB(Box::new(MemDB::new(partition_id)));
BucketData {
config: bucket,
partition: RwLock::new(partition),
}
}
async fn write_points(&self, points: &mut [PointType]) -> Result<(), StorageError> {
self.partition.write().await.write_points(points).await
}
async fn read_points(
&self,
predicate: &Predicate,
range: &TimestampRange,
) -> Result<Vec<ReadBatch>, StorageError> {
let p = self.partition.read().await;
let stream = p
.read_points(BucketData::BATCH_SIZE, predicate, range)
.await?;
Ok(stream.collect().await)
}
async fn get_tag_keys(
&self,
predicate: Option<&Predicate>,
range: Option<&TimestampRange>,
) -> Result<Vec<String>, StorageError> {
let p = self.partition.read().await;
let stream = p.get_tag_keys(predicate, range).await?;
Ok(stream.collect().await)
}
async fn get_tag_values(
&self,
tag_key: &str,
predicate: Option<&Predicate>,
range: Option<&TimestampRange>,
) -> Result<Vec<String>, StorageError> {
let p = self.partition.read().await;
let stream = p.get_tag_values(tag_key, predicate, range).await?;
Ok(stream.collect().await)
}
}
impl Database {
pub fn new(dir: &str) -> Database {
let db = Arc::new(RocksDB::new(dir));
pub fn new(_dir: &str) -> Database {
Database {
local_index: db.clone(),
local_config_store: db.clone(),
local_series_store: db,
organizations: RwLock::new(HashMap::new()),
}
}
pub fn write_points(
pub async fn write_points(
&self,
_org_id: u32,
bucket: &Bucket,
org_id: u32,
bucket_id: u32,
points: &mut [PointType],
) -> Result<(), StorageError> {
self.local_index
.get_or_create_series_ids_for_points(bucket.id, points)?;
self.local_series_store
.write_points_with_series_ids(bucket.id, points)
let bucket_data = self.bucket_data(org_id, bucket_id).await?;
bucket_data.write_points(points).await
}
pub fn get_bucket_by_name(
pub async fn get_bucket_id_by_name(
&self,
org_id: u32,
bucket_name: &str,
) -> Result<Option<Arc<Bucket>>, StorageError> {
self.local_config_store
.get_bucket_by_name(org_id, bucket_name)
) -> Result<Option<u32>, StorageError> {
let orgs = self.organizations.read().await;
let org = match orgs.get(&org_id) {
Some(org) => org,
None => return Ok(None),
};
let id = match org.read().await.bucket_name_to_id.get(bucket_name) {
Some(id) => Some(*id),
None => None,
};
Ok(id)
}
pub fn get_bucket_by_id(&self, bucket_id: u32) -> Result<Option<Arc<Bucket>>, StorageError> {
self.local_config_store.get_bucket_by_id(bucket_id)
}
pub fn create_bucket_if_not_exists(
pub async fn create_bucket_if_not_exists(
&self,
org_id: u32,
bucket: &Bucket,
bucket: Bucket,
) -> Result<u32, StorageError> {
self.local_config_store
.create_bucket_if_not_exists(org_id, bucket)
let mut orgs = self.organizations.write().await;
let org = orgs
.entry(org_id)
.or_insert_with(|| RwLock::new(Organization::default()));
let mut org = org.write().await;
Ok(org.create_bucket_if_not_exists(bucket))
}
pub fn read_series_matching_predicate_and_range(
pub async fn read_points(
&self,
bucket: &Bucket,
predicate: Option<&Predicate>,
_range: Option<&TimestampRange>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
self.local_index.read_series_matching(bucket.id, predicate)
}
pub fn read_i64_range(
&self,
bucket: &Bucket,
series_filter: &SeriesFilter,
org_id: u32,
bucket_id: u32,
predicate: &Predicate,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError> {
self.local_series_store
.read_i64_range(bucket.id, series_filter.id, range, batch_size)
) -> Result<Vec<ReadBatch>, StorageError> {
let bucket_data = self.bucket_data(org_id, bucket_id).await?;
bucket_data.read_points(predicate, range).await
}
pub fn read_f64_range(
pub async fn get_tag_keys(
&self,
bucket: &Bucket,
series_filter: &SeriesFilter,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError> {
self.local_series_store
.read_f64_range(bucket.id, series_filter.id, range, batch_size)
}
pub fn get_tag_keys(
&self,
bucket: &Bucket,
org_id: u32,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
self.local_index.get_tag_keys(bucket.id, predicate)
range: Option<&TimestampRange>,
) -> Result<Vec<String>, StorageError> {
let bucket_data = self.bucket_data(org_id, bucket_id).await?;
bucket_data.get_tag_keys(predicate, range).await
}
pub fn get_tag_values(
pub async fn get_tag_values(
&self,
bucket: &Bucket,
org_id: u32,
bucket_id: u32,
tag_key: &str,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
self.local_index
.get_tag_values(bucket.id, tag_key, predicate)
range: Option<&TimestampRange>,
) -> Result<Vec<String>, StorageError> {
let bucket_data = self.bucket_data(org_id, bucket_id).await?;
bucket_data.get_tag_values(tag_key, predicate, range).await
}
pub async fn buckets(&self, org_id: u32) -> Result<Vec<Bucket>, StorageError> {
Ok(match self.organizations.read().await.get(&org_id) {
None => vec![],
Some(org) => org
.read()
.await
.bucket_data
.values()
.map(|bd| bd.config.clone())
.collect(),
})
}
async fn bucket_data(
&self,
org_id: u32,
bucket_id: u32,
) -> Result<Arc<BucketData>, StorageError> {
let orgs = self.organizations.read().await;
let org = orgs.get(&org_id).ok_or_else(|| StorageError {
description: format!("org {} not found", org_id),
})?;
let org = org.read().await;
match org.bucket_data.get(&bucket_id) {
Some(b) => Ok(Arc::clone(b)),
None => Err(StorageError {
description: format!("bucket {} not found", bucket_id),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::line_parser::PointType;
use crate::storage::database::Database;
use crate::storage::partitioned_store::ReadValues;
use crate::storage::predicate::parse_predicate;
use crate::storage::series_store::ReadPoint;
#[tokio::test]
async fn create_bucket() {
let database = Database::new("");
let org_id = 2;
let bucket = Bucket {
org_id,
id: 0,
name: "first".to_string(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
};
let bucket_id = database
.create_bucket_if_not_exists(org_id, bucket.clone())
.await
.unwrap();
assert_eq!(bucket_id, 1);
let bucket_two = Bucket {
org_id,
id: 0,
name: "second".to_string(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
};
let bucket_id = database
.create_bucket_if_not_exists(org_id, bucket_two)
.await
.unwrap();
assert_eq!(bucket_id, 2);
let bucket_id = database
.create_bucket_if_not_exists(org_id, bucket)
.await
.unwrap();
assert_eq!(bucket_id, 1);
}
#[tokio::test]
async fn get_tag_keys() {
let (db, org, bucket) = setup_db_and_bucket().await;
db.write_points(
org,
bucket,
&mut [
PointType::new_i64("cpu,host=a,region=west\tfoo".to_string(), 1, 0),
PointType::new_i64("mem,foo=bar\tasdf".to_string(), 1, 0),
],
)
.await
.unwrap();
let keys = db.get_tag_keys(org, bucket, None, None).await.unwrap();
assert_eq!(keys, vec!["_f", "_m", "foo", "host", "region"]);
}
#[tokio::test]
async fn get_tag_values() {
let (db, org, bucket) = setup_db_and_bucket().await;
db.write_points(
org,
bucket,
&mut [
PointType::new_i64("cpu,host=a,region=west\tfoo".to_string(), 1, 0),
PointType::new_i64("mem,host=b\tasdf".to_string(), 1, 0),
],
)
.await
.unwrap();
let values = db
.get_tag_values(org, bucket, "host", None, None)
.await
.unwrap();
assert_eq!(values, vec!["a", "b"]);
let values = db
.get_tag_values(org, bucket, "region", None, None)
.await
.unwrap();
assert_eq!(values, vec!["west"]);
let values = db
.get_tag_values(org, bucket, "_m", None, None)
.await
.unwrap();
assert_eq!(values, vec!["cpu", "mem"]);
}
#[tokio::test]
async fn read_points() {
let (db, org, bucket) = setup_db_and_bucket().await;
db.write_points(
org,
bucket,
&mut [
PointType::new_i64("cpu,host=a,region=west\tval".to_string(), 3, 1),
PointType::new_i64("cpu,host=a,region=west\tval".to_string(), 2, 5),
PointType::new_i64("cpu,host=a,region=west\tval".to_string(), 1, 10),
PointType::new_i64("cpu,host=b,region=west\tval".to_string(), 5, 9),
],
)
.await
.unwrap();
let pred = parse_predicate(r#"host = "a""#).unwrap();
let range = TimestampRange { start: 0, end: 11 };
let batches = db.read_points(org, bucket, &pred, &range).await.unwrap();
assert_eq!(
batches,
vec![ReadBatch {
key: "cpu,host=a,region=west\tval".to_string(),
values: ReadValues::I64(vec![
ReadPoint { value: 3, time: 1 },
ReadPoint { value: 2, time: 5 },
ReadPoint { value: 1, time: 10 },
])
}]
);
}
async fn setup_db_and_bucket() -> (Database, u32, u32) {
let database = Database::new("");
let org_id = 1;
let bucket = Bucket {
org_id,
id: 0,
name: "foo".to_string(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
};
let bucket_id = database
.create_bucket_if_not_exists(org_id, bucket)
.await
.unwrap();
(database, org_id, bucket_id)
}
}

View File

@ -1,31 +1,54 @@
#![allow(dead_code)]
use crate::delorean::{Node, Predicate, TimestampRange};
use crate::line_parser::{self, Point, PointType};
use crate::storage::inverted_index::{InvertedIndex, SeriesFilter};
use crate::storage::partitioned_store::{ReadBatch, ReadValues};
use crate::storage::predicate::{Evaluate, EvaluateVisitor};
use crate::storage::series_store::{ReadPoint, SeriesStore};
use crate::storage::series_store::ReadPoint;
use crate::storage::{SeriesDataType, StorageError};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex, RwLock};
use croaring::Treemap;
/// memdb implements an in memory database for the InvertedIndex and SeriesStore traits. It
/// works with a ring buffer. Currently, it does not limit the number of series that can be
/// created. It also assumes that data per series arrives in time acending order.
use futures::stream::{self, BoxStream};
use futures::StreamExt;
use std::collections::{BTreeMap, HashMap};
/// memdb implements an in memory database for the Partition trait. It currently assumes that
/// data arrives in time ascending order per series. It has no limits on the number of series
/// or the amount of data per series. It is up to the higher level database to decide when to
/// stop writing into a given MemDB.
// TODO: return errors if trying to insert data out of order in an individual series
#[derive(Default)]
pub struct MemDB {
default_ring_buffer_size: usize,
bucket_id_to_series_data: Arc<RwLock<HashMap<u32, Mutex<SeriesData>>>>,
bucket_id_to_series_map: Arc<RwLock<HashMap<u32, RwLock<SeriesMap>>>>,
pub id: String,
series_data: SeriesData,
series_map: SeriesMap,
}
#[derive(Default)]
struct SeriesData {
ring_buffer_size: usize,
i64_series: HashMap<u64, SeriesRingBuffer<i64>>,
f64_series: HashMap<u64, SeriesRingBuffer<f64>>,
current_size: usize,
i64_series: HashMap<u64, SeriesBuffer<i64>>,
f64_series: HashMap<u64, SeriesBuffer<f64>>,
}
struct SeriesBuffer<T: Clone> {
values: Vec<ReadPoint<T>>,
}
impl<T: Clone> SeriesBuffer<T> {
fn read(&self, range: &TimestampRange) -> Vec<ReadPoint<T>> {
let start = match self.values.iter().position(|val| val.time >= range.start) {
Some(pos) => pos,
None => return vec![],
};
let stop = self.values.iter().position(|val| val.time >= range.end);
let stop = stop.unwrap_or_else(|| self.values.len());
self.values[start..stop].to_vec()
}
}
trait StoreInSeriesData {
@ -43,11 +66,15 @@ impl StoreInSeriesData for PointType {
impl StoreInSeriesData for Point<i64> {
fn write(&self, series_data: &mut SeriesData) {
let point: ReadPoint<_> = self.into();
series_data.current_size += std::mem::size_of::<ReadPoint<i64>>();
match series_data.i64_series.get_mut(&self.series_id.unwrap()) {
Some(buff) => buff.write(&self),
Some(buff) => buff.values.push(point),
None => {
let mut buff = new_i64_ring_buffer(series_data.ring_buffer_size);
buff.write(&self);
let buff = SeriesBuffer {
values: vec![point],
};
series_data.i64_series.insert(self.series_id.unwrap(), buff);
}
}
@ -56,105 +83,24 @@ impl StoreInSeriesData for Point<i64> {
impl StoreInSeriesData for Point<f64> {
fn write(&self, series_data: &mut SeriesData) {
let point: ReadPoint<_> = self.into();
series_data.current_size += std::mem::size_of::<Point<f64>>();
match series_data.f64_series.get_mut(&self.series_id.unwrap()) {
Some(buff) => buff.write(&self),
Some(buff) => buff.values.push(point),
None => {
let mut buff = new_f64_ring_buffer(series_data.ring_buffer_size);
buff.write(&self);
let buff = SeriesBuffer {
values: vec![point],
};
series_data.f64_series.insert(self.series_id.unwrap(), buff);
}
}
}
}
impl SeriesData {
fn write_points(&mut self, points: &[PointType]) {
for p in points {
p.write(self);
}
}
}
struct SeriesRingBuffer<T: Clone> {
next_position: usize,
data: Vec<ReadPoint<T>>,
}
fn new_i64_ring_buffer(size: usize) -> SeriesRingBuffer<i64> {
let mut data = Vec::with_capacity(size);
for _ in 0..size {
data.push(ReadPoint {
time: std::i64::MAX,
value: 0 as i64,
})
}
SeriesRingBuffer {
next_position: 0,
data,
}
}
fn new_f64_ring_buffer(size: usize) -> SeriesRingBuffer<f64> {
let mut data = Vec::with_capacity(size);
for _ in 0..size {
data.push(ReadPoint {
time: std::i64::MAX,
value: 0 as f64,
})
}
SeriesRingBuffer {
next_position: 0,
data,
}
}
impl<T: Clone> SeriesRingBuffer<T> {
fn write(&mut self, point: &Point<T>) {
if self.next_position == self.data.len() {
self.next_position = 0;
}
self.data[self.next_position].time = point.time;
self.data[self.next_position].value = point.value.clone();
self.next_position += 1;
}
fn get_range(&self, range: &TimestampRange) -> Vec<ReadPoint<T>> {
let (_, pos) = self.oldest_time_and_position();
let mut values = Vec::new();
for i in pos..self.data.len() {
if self.data[i].time > range.end {
return values;
} else if self.data[i].time >= range.start {
values.push(self.data[i].clone());
}
}
for i in 0..self.next_position {
if self.data[i].time > range.end {
return values;
} else if self.data[i].time >= range.start {
values.push(self.data[i].clone());
}
}
values
}
fn oldest_time_and_position(&self) -> (i64, usize) {
let mut pos = self.next_position;
if self.next_position == self.data.len() || self.data[pos].time == std::i64::MAX {
pos = 0;
}
(self.data[pos].time, pos)
}
}
#[derive(Default)]
struct SeriesMap {
current_size: usize,
last_id: u64,
series_key_to_id: HashMap<String, u64>,
series_id_to_key_and_type: HashMap<u64, (String, SeriesDataType)>,
@ -163,15 +109,18 @@ struct SeriesMap {
}
impl SeriesMap {
fn new() -> SeriesMap {
SeriesMap {
last_id: 0,
series_key_to_id: HashMap::new(),
series_id_to_key_and_type: HashMap::new(),
tag_keys: BTreeMap::new(),
posting_list: HashMap::new(),
}
}
/// The number of copies of the key this map contains. This is
/// used to provide a rough estimate of the memory size.
///
/// It occurs:
///
/// 1. in the map to ID
/// 2. in the ID to map
const SERIES_KEY_COPIES: usize = 2;
/// The number of bytes the different copies of the series ID in
/// this map represents. This is used to provide a rough estimate
/// of the memory size.
const SERIES_ID_BYTES: usize = 24;
fn insert_series(&mut self, point: &mut PointType) -> line_parser::Result<()> {
if let Some(id) = self.series_key_to_id.get(point.series()) {
@ -192,10 +141,17 @@ impl SeriesMap {
self.series_id_to_key_and_type
.insert(self.last_id, (point.series().clone(), series_type));
// update the estimated size of the map.
self.current_size +=
point.series().len() * SeriesMap::SERIES_KEY_COPIES + SeriesMap::SERIES_ID_BYTES;
for pair in point.index_pairs()? {
// insert this id into the posting list
let list_key = list_key(&pair.key, &pair.value);
// update estimated size for the index pairs
self.current_size += list_key.len() + pair.key.len() + pair.value.len();
let posting_list = self
.posting_list
.entry(list_key)
@ -227,129 +183,59 @@ fn list_key(key: &str, value: &str) -> Vec<u8> {
}
impl MemDB {
pub fn new() -> MemDB {
pub fn new(id: String) -> Self {
MemDB {
default_ring_buffer_size: 10000,
bucket_id_to_series_data: Arc::new(RwLock::new(HashMap::new())),
bucket_id_to_series_map: Arc::new(RwLock::new(HashMap::new())),
id,
..Default::default()
}
}
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut [PointType],
) -> Result<(), StorageError> {
// first try to do everything with just a read lock
if self.get_series_ids_for_points(bucket_id, points) {
return Ok(());
pub fn size(&self) -> usize {
self.series_data.current_size + self.series_map.current_size
}
// if we got this far, we have new series to insert
let buckets = self.bucket_id_to_series_map.read().unwrap();
let mut series_map = buckets.get(&bucket_id).unwrap().write().unwrap();
pub fn write_points(&mut self, points: &mut [PointType]) -> Result<(), StorageError> {
for p in points {
match p.series_id() {
Some(_) => (),
None => match series_map.insert_series(p) {
Ok(_) => (),
Err(e) => {
return Err(StorageError {
self.series_map.insert_series(p).map_err(|e| StorageError {
description: format!("error parsing line protocol metadata {}", e),
})
}
},
}
})?;
p.write(&mut self.series_data);
}
Ok(())
}
// get_series_ids_for_points attempts to fill the series ids for all points in the passed in
// collection using only a read lock. If no SeriesMap exists for the bucket, it will be inserted.
// It will return true if all points have series ids filled in.
fn get_series_ids_for_points(&self, bucket_id: u32, points: &mut [PointType]) -> bool {
let buckets = self.bucket_id_to_series_map.read().unwrap();
match buckets.get(&bucket_id) {
Some(b) => {
let b = b.read().unwrap();
let mut all_have_id = true;
for p in points {
match b.series_key_to_id.get(p.series()) {
Some(id) => p.set_series_id(*id),
None => all_have_id = false,
}
}
if all_have_id {
return true;
}
}
None => {
// we've never even seen this bucket. insert it and then we'll get to inserting the series
drop(buckets);
self.bucket_id_to_series_map
.write()
.unwrap()
.insert(bucket_id, RwLock::new(SeriesMap::new()));
}
}
false
}
fn get_tag_keys(
pub fn get_tag_keys(
&self,
bucket_id: u32,
_predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => {
let keys: Vec<String> = map.read().unwrap().tag_keys.keys().cloned().collect();
Ok(Box::new(keys.into_iter()))
}
None => Err(StorageError {
description: format!("bucket {} not found", bucket_id),
}),
}
_range: Option<&TimestampRange>,
) -> Result<BoxStream<'_, String>, StorageError> {
let keys = self.series_map.tag_keys.keys().cloned();
Ok(stream::iter(keys).boxed())
}
fn get_tag_values(
pub fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
_predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
match self.bucket_id_to_series_map.read().unwrap().get(&bucket_id) {
Some(map) => match map.read().unwrap().tag_keys.get(tag_key) {
_range: Option<&TimestampRange>,
) -> Result<BoxStream<'_, String>, StorageError> {
match self.series_map.tag_keys.get(tag_key) {
Some(values) => {
let values: Vec<String> = values.keys().cloned().collect();
Ok(Box::new(values.into_iter()))
let values = values.keys().cloned();
Ok(stream::iter(values).boxed())
}
None => Ok(Box::new(vec![].into_iter())),
},
None => Err(StorageError {
description: format!("bucket {} not found", bucket_id),
}),
None => Ok(stream::empty().boxed()),
}
}
fn read_series_matching(
pub fn read_points(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
let pred = match predicate {
Some(p) => p,
None => {
return Err(StorageError {
description: "unable to list all series".to_string(),
})
}
};
let root = match &pred.root {
_batch_size: usize,
predicate: &Predicate,
range: &TimestampRange,
) -> Result<BoxStream<'_, ReadBatch>, StorageError> {
let root = match &predicate.root {
Some(r) => r,
None => {
return Err(StorageError {
@ -358,136 +244,38 @@ impl MemDB {
}
};
let bucket_map = self.bucket_id_to_series_map.read().unwrap();
let series_map = match bucket_map.get(&bucket_id) {
Some(m) => m.read().unwrap(),
None => {
return Err(StorageError {
description: format!("no series written for bucket {}", bucket_id),
})
}
};
let map = evaluate_node(&series_map, &root)?;
let mut filters = Vec::with_capacity(map.cardinality() as usize);
let map = evaluate_node(&self.series_map, &root)?;
let mut read_batches = Vec::with_capacity(map.cardinality() as usize);
for id in map.iter() {
let (key, series_type) = series_map.series_id_to_key_and_type.get(&id).unwrap();
filters.push(SeriesFilter {
id,
key: key.clone(),
value_predicate: None,
series_type: *series_type,
});
let (key, series_type) = self.series_map.series_id_to_key_and_type.get(&id).unwrap();
let values = match series_type {
SeriesDataType::I64 => {
let buff = self.series_data.i64_series.get(&id).unwrap();
ReadValues::I64(buff.read(range))
}
Ok(Box::new(filters.into_iter()))
}
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &[PointType],
) -> Result<(), StorageError> {
let bucket_data = self.bucket_id_to_series_data.read().unwrap();
// ensure the the bucket has a series data struct to write into
let series_data = match bucket_data.get(&bucket_id) {
Some(d) => d,
None => {
drop(bucket_data);
let mut bucket_data = self.bucket_id_to_series_data.write().unwrap();
let series_data = SeriesData {
ring_buffer_size: self.default_ring_buffer_size,
i64_series: HashMap::new(),
f64_series: HashMap::new(),
};
bucket_data.insert(bucket_id, Mutex::new(series_data));
drop(bucket_data);
return self.write_points_with_series_ids(bucket_id, points);
SeriesDataType::F64 => {
let buff = self.series_data.f64_series.get(&id).unwrap();
ReadValues::F64(buff.read(range))
}
};
let mut series_data = series_data.lock().unwrap();
series_data.write_points(points);
Ok(())
// TODO: Encode in the type system that `ReadBatch`es will never be created with an
// empty vector, as we're doing here.
if values.is_empty() {
continue;
}
fn read_range<T: 'static + Clone + FromSeries + Send>(
&self,
bucket_id: u32,
series_id: u64,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<T>>> + Send>, StorageError> {
let buckets = self.bucket_id_to_series_data.read().unwrap();
let data = match buckets.get(&bucket_id) {
Some(d) => d,
None => {
return Err(StorageError {
description: format!("bucket {} not found", bucket_id),
})
}
let batch = ReadBatch {
key: key.to_string(),
values,
};
let data = data.lock().unwrap();
let buff = match FromSeries::from_series(&data, series_id) {
Some(b) => b,
None => {
return Err(StorageError {
description: format!("series {} not found", series_id),
})
}
};
let values = buff.get_range(&range);
Ok(Box::new(PointsIterator {
values: Some(values),
batch_size,
}))
}
}
trait FromSeries: Clone {
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<Self>>;
}
impl FromSeries for i64 {
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<i64>> {
data.i64_series.get(&series_id)
}
}
impl FromSeries for f64 {
fn from_series<'a>(data: &'a SeriesData, series_id: u64) -> Option<&'a SeriesRingBuffer<f64>> {
data.f64_series.get(&series_id)
}
}
struct PointsIterator<T: Clone> {
values: Option<Vec<ReadPoint<T>>>,
batch_size: usize,
}
impl<T: Clone> Iterator for PointsIterator<T> {
type Item = Vec<ReadPoint<T>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(mut values) = self.values.take() {
if self.batch_size > values.len() {
return Some(values);
read_batches.push(batch);
}
let remaining = values.split_off(self.batch_size);
if !remaining.is_empty() {
self.values = Some(remaining);
}
return Some(values);
}
None
Ok(stream::iter(read_batches.into_iter()).boxed())
}
}
@ -503,98 +291,146 @@ fn evaluate_node(series_map: &SeriesMap, n: &Node) -> Result<Treemap, StorageErr
Evaluate::evaluate(Visitor(series_map), n)
}
impl InvertedIndex for MemDB {
fn get_or_create_series_ids_for_points(
&self,
bucket_id: u32,
points: &mut [PointType],
) -> Result<(), StorageError> {
self.get_or_create_series_ids_for_points(bucket_id, points)
}
fn read_series_matching(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = SeriesFilter> + Send>, StorageError> {
self.read_series_matching(bucket_id, predicate)
}
fn get_tag_keys(
&self,
bucket_id: u32,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
self.get_tag_keys(bucket_id, predicate)
}
fn get_tag_values(
&self,
bucket_id: u32,
tag_key: &str,
predicate: Option<&Predicate>,
) -> Result<Box<dyn Iterator<Item = String> + Send>, StorageError> {
self.get_tag_values(bucket_id, tag_key, predicate)
}
}
impl SeriesStore for MemDB {
fn write_points_with_series_ids(
&self,
bucket_id: u32,
points: &[PointType],
) -> Result<(), StorageError> {
self.write_points_with_series_ids(bucket_id, points)
}
fn read_i64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<i64>>> + Send>, StorageError> {
self.read_range(bucket_id, series_id, range, batch_size)
}
fn read_f64_range(
&self,
bucket_id: u32,
series_id: u64,
range: &TimestampRange,
batch_size: usize,
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError> {
self.read_range(bucket_id, series_id, range, batch_size)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::inverted_index;
use crate::storage::series_store;
use crate::storage::predicate::parse_predicate;
#[test]
fn write_and_read_i64() {
let db = Box::new(MemDB::new());
series_store::tests::write_and_read_i64(db);
fn get_tag_keys() {
let memdb = setup_db();
let tag_keys = memdb.get_tag_keys(None, None).unwrap();
let tag_keys: Vec<_> = futures::executor::block_on_stream(tag_keys).collect();
assert_eq!(tag_keys, vec!["_f", "_m", "host", "region"]);
}
#[test]
fn write_and_read_f64() {
let db = Box::new(MemDB::new());
series_store::tests::write_and_read_f64(db);
fn get_tag_values() {
let memdb = setup_db();
let tag_values = memdb.get_tag_values("host", None, None).unwrap();
let tag_values: Vec<_> = futures::executor::block_on_stream(tag_values).collect();
assert_eq!(tag_values, vec!["a", "b"]);
}
#[test]
fn series_id_indexing() {
let db = Box::new(MemDB::new());
inverted_index::tests::series_id_indexing(db)
fn check_size() {
let memdb = setup_db();
assert_eq!(memdb.size(), 704);
}
#[test]
fn series_metadata_indexing() {
let db = Box::new(MemDB::new());
inverted_index::tests::series_metadata_indexing(db);
fn get_measurement_series() {
let memdb = setup_db();
let pred = parse_predicate(r#"_m = "cpu""#).unwrap();
let batches = memdb
.read_points(10, &pred, &TimestampRange { start: 0, end: 5 })
.unwrap();
let batches: Vec<_> = futures::executor::block_on_stream(batches).collect();
assert_eq!(
batches,
vec![
ReadBatch {
key: "cpu,host=b,region=west\tusage_system".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 0, value: 1 },
ReadPoint { time: 1, value: 2 },
]),
},
ReadBatch {
key: "cpu,host=a,region=west\tusage_system".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 }]),
},
ReadBatch {
key: "cpu,host=a,region=west\tusage_user".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 }]),
},
],
);
}
#[test]
fn get_tag_match_series() {
let memdb = setup_db();
let pred = parse_predicate(r#"host = "a""#).unwrap();
let batches = memdb
.read_points(10, &pred, &TimestampRange { start: 0, end: 5 })
.unwrap();
let batches: Vec<_> = futures::executor::block_on_stream(batches).collect();
assert_eq!(
batches,
vec![
ReadBatch {
key: "cpu,host=a,region=west\tusage_system".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 }]),
},
ReadBatch {
key: "cpu,host=a,region=west\tusage_user".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 }]),
},
]
);
}
#[test]
fn measurement_and_tag_match_series() {
let memdb = setup_db();
let pred = parse_predicate(r#"_m = "cpu" and host = "b""#).unwrap();
let batches = memdb
.read_points(10, &pred, &TimestampRange { start: 0, end: 5 })
.unwrap();
let batches: Vec<_> = futures::executor::block_on_stream(batches).collect();
assert_eq!(
batches,
vec![ReadBatch {
key: "cpu,host=b,region=west\tusage_system".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 0, value: 1 },
ReadPoint { time: 1, value: 2 },
]),
},]
);
}
#[test]
fn measurement_or_tag_match() {
let memdb = setup_db();
let pred = parse_predicate(r#"host = "a" OR _m = "mem""#).unwrap();
let batches = memdb
.read_points(10, &pred, &TimestampRange { start: 0, end: 5 })
.unwrap();
let batches: Vec<_> = futures::executor::block_on_stream(batches).collect();
assert_eq!(
batches,
vec![
ReadBatch {
key: "cpu,host=a,region=west\tusage_system".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 },]),
},
ReadBatch {
key: "cpu,host=a,region=west\tusage_user".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 },]),
},
ReadBatch {
key: "mem,host=b,region=west\tfree".to_string(),
values: ReadValues::I64(vec![ReadPoint { time: 0, value: 1 },]),
},
]
);
}
fn setup_db() -> MemDB {
let p1 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 1, 0);
let p2 = PointType::new_i64("cpu,host=a,region=west\tusage_system".to_string(), 1, 0);
let p3 = PointType::new_i64("cpu,host=a,region=west\tusage_user".to_string(), 1, 0);
let p4 = PointType::new_i64("mem,host=b,region=west\tfree".to_string(), 1, 0);
let p5 = PointType::new_i64("cpu,host=b,region=west\tusage_system".to_string(), 2, 1);
let mut points = vec![p1, p2, p3, p4, p5];
let mut memdb = MemDB::new("foo".to_string());
memdb.write_points(&mut points).unwrap();
memdb
}
}

View File

@ -0,0 +1,552 @@
//! partitioned_store is an enum and set of helper functions and structs to define Partitions
//! that store data. The helper funcs and structs merge results from multiple partitions together.
use crate::delorean::{Predicate, TimestampRange};
use crate::line_parser::PointType;
use crate::storage::memdb::MemDB;
use crate::storage::remote_partition::RemotePartition;
use crate::storage::s3_partition::S3Partition;
use crate::storage::series_store::ReadPoint;
use crate::storage::StorageError;
use futures::stream::{BoxStream, Stream};
use std::cmp::Ordering;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A Partition is a block of data. It has methods for reading the metadata like which measurements,
/// tags, tag values, and fields exist. Along with the raw time series data. It is designed to work
/// as a stream so that it can be used in safely an asynchronous context. A partition is the
/// lowest level organization scheme. Above it you will have a database which keeps track of
/// what organizations and buckets exist. A bucket will have 1 to many partitions and a partition
/// will only ever contain data for a single bucket.
pub enum Partition {
MemDB(Box<MemDB>),
S3(Box<S3Partition>),
Remote(Box<RemotePartition>),
}
impl Partition {
pub fn id(&self) -> &str {
match self {
Partition::MemDB(db) => &db.id,
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
pub fn size(&self) -> usize {
match self {
Partition::MemDB(db) => db.size(),
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
pub async fn write_points(&mut self, points: &mut [PointType]) -> Result<(), StorageError> {
match self {
Partition::MemDB(db) => db.write_points(points),
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
pub async fn get_tag_keys(
&self,
predicate: Option<&Predicate>,
range: Option<&TimestampRange>,
) -> Result<BoxStream<'_, String>, StorageError> {
match self {
Partition::MemDB(db) => db.get_tag_keys(predicate, range),
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
pub async fn get_tag_values(
&self,
tag_key: &str,
predicate: Option<&Predicate>,
range: Option<&TimestampRange>,
) -> Result<BoxStream<'_, String>, StorageError> {
match self {
Partition::MemDB(db) => db.get_tag_values(tag_key, predicate, range),
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
pub async fn read_points(
&self,
batch_size: usize,
predicate: &Predicate,
range: &TimestampRange,
) -> Result<BoxStream<'_, ReadBatch>, StorageError> {
match self {
Partition::MemDB(db) => db.read_points(batch_size, predicate, range),
Partition::S3(_) => panic!("s3 partition not implemented!"),
Partition::Remote(_) => panic!("remote partition not implemented!"),
}
}
}
/// StringMergeStream will do a merge sort with deduplication of multiple streams of Strings. This
/// is used for combining results from multiple partitions for calls to get measurements, tag keys,
/// tag values, or field keys. It assumes the incoming streams are in sorted order with no duplicates.
pub struct StringMergeStream<'a> {
states: Vec<StreamState<'a, String>>,
drained: bool,
}
struct StreamState<'a, T> {
stream: BoxStream<'a, T>,
next: Poll<Option<T>>,
}
impl StringMergeStream<'_> {
#[allow(dead_code)]
fn new(streams: Vec<BoxStream<'_, String>>) -> StringMergeStream<'_> {
let states = streams
.into_iter()
.map(|s| StreamState {
stream: s,
next: Poll::Pending,
})
.collect();
StringMergeStream {
states,
drained: false,
}
}
}
impl Stream for StringMergeStream<'_> {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.drained {
return Poll::Ready(None);
}
let mut one_pending = false;
for state in &mut self.states {
if state.next.is_pending() {
state.next = state.stream.as_mut().poll_next(cx);
one_pending = one_pending || state.next.is_pending();
}
}
if one_pending {
return Poll::Pending;
}
let mut next_val: Option<String> = None;
let mut next_pos = 0;
for (pos, state) in self.states.iter_mut().enumerate() {
match (&next_val, &state.next) {
(None, Poll::Ready(Some(ref val))) => {
next_val = Some(val.clone());
next_pos = pos;
}
(Some(next), Poll::Ready(Some(ref val))) => match next.cmp(val) {
Ordering::Greater => {
next_val = Some(val.clone());
next_pos = pos;
}
Ordering::Equal => {
state.next = state.stream.as_mut().poll_next(cx);
}
_ => (),
},
(Some(_), Poll::Ready(None)) => (),
(None, Poll::Ready(None)) => (),
_ => unreachable!(),
}
}
if next_val.is_none() {
self.drained = true;
return Poll::Ready(None);
}
let next_state: &mut StreamState<'_, String> = &mut self.states[next_pos];
mem::replace(
&mut next_state.next,
next_state.stream.as_mut().poll_next(cx),
)
}
}
/// ReadMergeStream will do a merge sort of the ReadBatches from multiple partitions. When merging
/// it will ensure that batches are sent through in lexographical order by key. In situations
/// where multiple partitions have batches with the same key, they are merged together in time
/// ascending order. For any given key, multiple read batches can come through.
///
/// It assume that the input streams send batches in key lexographical order and that values are
/// always of the same type for a given key, and that those values are in time sorted order. A
/// stream can have multiple batches with the same key, as long as the values across those batches
/// are in time sorted order (ascending).
pub struct ReadMergeStream<'a> {
states: Vec<StreamState<'a, ReadBatch>>,
drained: bool,
}
impl ReadMergeStream<'_> {
#[allow(dead_code)]
fn new(streams: Vec<BoxStream<'_, ReadBatch>>) -> ReadMergeStream<'_> {
let states = streams
.into_iter()
.map(|s| StreamState {
stream: s,
next: Poll::Pending,
})
.collect();
ReadMergeStream {
states,
drained: false,
}
}
}
impl Stream for ReadMergeStream<'_> {
type Item = ReadBatch;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.drained {
return Poll::Ready(None);
}
// ensure that every stream in pending state is called next and return if any are still pending
let mut one_pending = false;
for state in &mut self.states {
if state.next.is_pending() {
state.next = state.stream.as_mut().poll_next(cx);
one_pending = one_pending || state.next.is_pending();
}
}
if one_pending {
return Poll::Pending;
}
// find the minimum key for the next batch and keep track of the other batches that have
// the same key
let mut next_min_key: Option<String> = None;
let mut min_time = std::i64::MAX;
let mut min_pos = 0;
let mut positions = Vec::with_capacity(self.states.len());
for (pos, state) in self.states.iter().enumerate() {
match (&next_min_key, &state.next) {
(None, Poll::Ready(Some(batch))) => {
next_min_key = Some(batch.key.clone());
min_pos = pos;
let (_, t) = batch.start_stop_times();
min_time = t;
}
(Some(min_key), Poll::Ready(Some(batch))) => {
match min_key.cmp(&batch.key) {
Ordering::Greater => {
next_min_key = Some(batch.key.clone());
min_pos = pos;
positions = Vec::with_capacity(self.states.len());
let (_, t) = batch.start_stop_times();
min_time = t;
}
Ordering::Equal => {
// if this batch has an end time less than the existing min time, make this
// the batch that we want to pull out first
let (_, t) = batch.start_stop_times();
if t < min_time {
min_time = t;
positions.push(min_pos);
min_pos = pos;
} else {
positions.push(pos);
}
}
_ => (),
}
}
(Some(_), Poll::Ready(None)) => (),
(None, Poll::Ready(None)) => (),
_ => unreachable!(),
}
}
if next_min_key.is_none() {
self.drained = true;
return Poll::Ready(None);
}
let mut val = mem::replace(&mut self.states[min_pos].next, Poll::Pending);
if positions.is_empty() {
return val;
}
// pull out all the values with times less than the end time from the val batch
match &mut val {
Poll::Ready(Some(batch)) => {
for pos in positions {
if let Poll::Ready(Some(b)) = &mut self.states[pos].next {
if batch.append_below_time(b, min_time) {
self.states[pos].next = Poll::Pending;
}
}
}
batch.sort_by_time();
}
_ => unreachable!(),
}
val
}
}
// TODO: Make a constructor function that fails if given an empty `Vec` of `ReadPoint`s.
#[derive(Debug, PartialEq, Clone)]
pub enum ReadValues {
I64(Vec<ReadPoint<i64>>),
F64(Vec<ReadPoint<f64>>),
}
impl ReadValues {
pub fn is_empty(&self) -> bool {
match self {
ReadValues::I64(vals) => vals.is_empty(),
ReadValues::F64(vals) => vals.is_empty(),
}
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct ReadBatch {
pub key: String,
pub values: ReadValues,
}
impl ReadBatch {
/// Returns the first time and the last time in the batch.
///
/// # Panics
///
/// Will panic if there are no values in the `ReadValues`.
fn start_stop_times(&self) -> (i64, i64) {
match &self.values {
ReadValues::I64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
ReadValues::F64(vals) => (vals.first().unwrap().time, vals.last().unwrap().time),
}
}
fn sort_by_time(&mut self) {
match &mut self.values {
ReadValues::I64(vals) => vals.sort_by_key(|v| v.time),
ReadValues::F64(vals) => vals.sort_by_key(|v| v.time),
}
}
// append_below_time will append all values from other that have a time < than the one passed in.
// it returns true if other has been cleared of all values
fn append_below_time(&mut self, other: &mut ReadBatch, t: i64) -> bool {
match (&mut self.values, &mut other.values) {
(ReadValues::I64(vals), ReadValues::I64(other_vals)) => {
let pos = other_vals.iter().position(|val| val.time > t);
match pos {
None => vals.append(other_vals),
Some(pos) => vals.extend(other_vals.drain(..pos)),
}
other_vals.is_empty()
}
(ReadValues::F64(vals), ReadValues::F64(other_vals)) => {
let pos = other_vals.iter().position(|val| val.time > t);
match pos {
None => vals.append(other_vals),
Some(pos) => vals.extend(other_vals.drain(..pos)),
}
other_vals.is_empty()
}
(_, _) => true, // do nothing here
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{stream, StreamExt};
#[test]
fn string_merge_stream() {
let one = stream::iter(vec!["a".to_string(), "c".to_string()].into_iter());
let two = stream::iter(vec!["b".to_string(), "c".to_string(), "d".to_string()].into_iter());
let three =
stream::iter(vec!["c".to_string(), "e".to_string(), "f".to_string()].into_iter());
let four = stream::iter(vec![].into_iter());
let merger =
StringMergeStream::new(vec![one.boxed(), two.boxed(), three.boxed(), four.boxed()]);
let stream = futures::executor::block_on_stream(merger);
let vals: Vec<_> = stream.collect();
assert_eq!(
vals,
vec![
"a".to_string(),
"b".to_string(),
"c".to_string(),
"d".to_string(),
"e".to_string(),
"f".to_string()
],
);
}
#[test]
fn read_merge_stream() {
let one = stream::iter(
vec![
ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 3, value: 30 },
ReadPoint { time: 4, value: 40 },
]),
},
ReadBatch {
key: "test".to_string(),
values: ReadValues::F64(vec![
ReadPoint {
time: 1,
value: 1.1,
},
ReadPoint {
time: 2,
value: 2.2,
},
]),
},
]
.into_iter(),
);
let two = stream::iter(
vec![
ReadBatch {
key: "bar".to_string(),
values: ReadValues::F64(vec![
ReadPoint {
time: 5,
value: 5.5,
},
ReadPoint {
time: 6,
value: 6.6,
},
]),
},
ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 1, value: 10 },
ReadPoint { time: 2, value: 20 },
ReadPoint { time: 6, value: 60 },
ReadPoint {
time: 11,
value: 110,
},
]),
},
]
.into_iter(),
);
let three = stream::iter(
vec![ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 5, value: 50 },
ReadPoint {
time: 10,
value: 100,
},
]),
}]
.into_iter(),
);
let four = stream::iter(vec![].into_iter());
let merger =
ReadMergeStream::new(vec![one.boxed(), two.boxed(), three.boxed(), four.boxed()]);
let stream = futures::executor::block_on_stream(merger);
let vals: Vec<_> = stream.collect();
assert_eq!(
vals,
vec![
ReadBatch {
key: "bar".to_string(),
values: ReadValues::F64(vec![
ReadPoint {
time: 5,
value: 5.5
},
ReadPoint {
time: 6,
value: 6.6
},
]),
},
ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 1, value: 10 },
ReadPoint { time: 2, value: 20 },
ReadPoint { time: 3, value: 30 },
ReadPoint { time: 4, value: 40 },
]),
},
ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![
ReadPoint { time: 5, value: 50 },
ReadPoint { time: 6, value: 60 },
ReadPoint {
time: 10,
value: 100
},
]),
},
ReadBatch {
key: "foo".to_string(),
values: ReadValues::I64(vec![ReadPoint {
time: 11,
value: 110
},]),
},
ReadBatch {
key: "test".to_string(),
values: ReadValues::F64(vec![
ReadPoint {
time: 1,
value: 1.1
},
ReadPoint {
time: 2,
value: 2.2
}
]),
},
],
)
}
}

View File

@ -0,0 +1,4 @@
/// RemotePartition represents partitions that are on remote delorean servers. It implements the
/// methods that the Partition enum requires to answer queries.
/// TODO: implement me
pub struct RemotePartition {}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,4 @@
/// S3Partition represents a partition of data stored on S3. It implements the methods that the
/// Partition enum requires to answer queries.
/// TODO: implment me
pub struct S3Partition {}

View File

@ -26,12 +26,22 @@ pub trait SeriesStore: Sync + Send {
) -> Result<Box<dyn Iterator<Item = Vec<ReadPoint<f64>>> + Send>, StorageError>;
}
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct ReadPoint<T: Clone> {
pub time: i64,
pub value: T,
}
impl<T: Copy + Clone> From<&'_ crate::line_parser::Point<T>> for ReadPoint<T> {
fn from(other: &'_ crate::line_parser::Point<T>) -> Self {
let crate::line_parser::Point { time, value, .. } = other;
Self {
time: *time,
value: *value,
}
}
}
// Test helpers for other implementations to run
#[cfg(test)]
pub mod tests {

View File

@ -33,14 +33,13 @@ mod grpc {
tonic::include_proto!("delorean");
}
use grpc::delorean_client::DeloreanClient;
use grpc::storage_client::StorageClient;
use grpc::Organization;
use grpc::ReadSource;
use grpc::{
delorean_client::DeloreanClient,
node::{Comparison, Value},
read_response::{frame::Data, DataType},
Node, Predicate, ReadFilterRequest, Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
storage_client::StorageClient,
Bucket, CreateBucketRequest, Node, Organization, Predicate, ReadFilterRequest, ReadSource, Tag,
TagKeysRequest, TagValuesRequest, TimestampRange,
};
type Error = Box<dyn std::error::Error>;
@ -132,6 +131,19 @@ async fn read_and_write_data() -> Result<()> {
// This checks that gRPC is functioning and that we're starting from an org without buckets.
assert!(org_buckets.is_empty());
let create_bucket_request = tonic::Request::new(CreateBucketRequest {
org_id,
bucket: Some(Bucket {
org_id,
id: 0,
name: bucket_name.to_string(),
retention: "0".to_string(),
posting_list_rollover: 10_000,
index_levels: vec![],
}),
});
grpc_client.create_bucket(create_bucket_request).await?;
let start_time = SystemTime::now();
let ns_since_epoch: i64 = start_time
.duration_since(SystemTime::UNIX_EPOCH)
@ -199,12 +211,21 @@ cpu_load_short,server01,us-east,value,{},1234567.891011
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await?;
let org_id = u64::from(u32::MAX);
let bucket_id = 1; // TODO: how do we know this?
// Get the ID of the bucket that was created with the auto-incrementing in MemDB
let get_buckets_request = tonic::Request::new(Organization {
id: org_id,
name: "test".into(),
buckets: vec![],
});
let get_buckets_response = grpc_client.get_buckets(get_buckets_request).await?;
let get_buckets_response = get_buckets_response.into_inner();
let org_buckets = get_buckets_response.buckets;
let bucket_id = org_buckets.first().unwrap().id;
let partition_id = u64::from(u32::MAX);
let read_source = ReadSource {
org_id,
bucket_id,
org_id: org_id.into(),
bucket_id: bucket_id.into(),
partition_id,
};
let mut d = Vec::new();
@ -217,7 +238,7 @@ cpu_load_short,server01,us-east,value,{},1234567.891011
let range = TimestampRange {
start: ns_since_epoch,
end: ns_since_epoch + 3,
end: ns_since_epoch + 4,
};
let range = Some(range);
@ -259,7 +280,7 @@ cpu_load_short,server01,us-east,value,{},1234567.891011
assert_eq!(
frames.len(),
5,
4,
"expected exactly 5 frames, but there were {}",
frames.len()
);
@ -268,26 +289,36 @@ cpu_load_short,server01,us-east,value,{},1234567.891011
assert_eq!(f.data_type, DataType::Float as i32, "in frame 0");
assert_eq!(
tags_as_strings(&f.tags),
vec![("host", "server01"), ("region", "us-west")]
vec![
("_m", "cpu_load_short"),
("host", "server01"),
("region", "us-west"),
("_f", "value")
]
);
let f = assert_unwrap!(&frames[1], Data::FloatPoints, "in frame 1");
assert_eq!(f.timestamps, [ns_since_epoch], "in frame 1");
assert_eq!(f.values, [0.64], "in frame 1");
assert_eq!(
f.timestamps,
[ns_since_epoch, ns_since_epoch + 3],
"in frame 1"
);
assert_eq!(f.values, [0.64, 0.000_003], "in frame 1");
let f = assert_unwrap!(&frames[2], Data::FloatPoints, "in frame 2");
assert_eq!(f.timestamps, [ns_since_epoch + 3], "in frame 2");
assert_eq!(f.values, [0.000_003], "in frame 2");
let f = assert_unwrap!(&frames[3], Data::Series, "in frame 3");
let f = assert_unwrap!(&frames[2], Data::Series, "in frame 3");
assert_eq!(f.data_type, DataType::Float as i32, "in frame 3");
assert_eq!(
tags_as_strings(&f.tags),
vec![("host", "server01"), ("region", "us-east")]
vec![
("_m", "cpu_load_short"),
("host", "server01"),
("region", "us-east"),
("_f", "value")
]
);
let f = assert_unwrap!(&frames[4], Data::FloatPoints, "in frame 4");
let f = assert_unwrap!(&frames[3], Data::FloatPoints, "in frame 4");
assert_eq!(f.timestamps, [ns_since_epoch + 2], "in frame 4");
assert_eq!(f.values, [1_234_567.891_011], "in frame 4");