diff --git a/delorean_storage/src/exec.rs b/delorean_storage/src/exec.rs index d95efd66d4..457ae03fbd 100644 --- a/delorean_storage/src/exec.rs +++ b/delorean_storage/src/exec.rs @@ -3,6 +3,7 @@ //! interface abstracts away many of the details mod planning; mod schema_pivot; +mod seriesset; mod stringset; use std::{sync::atomic::AtomicU64, sync::atomic::Ordering, sync::Arc}; @@ -16,7 +17,9 @@ use planning::make_exec_context; use schema_pivot::SchemaPivotNode; // Publically export StringSets +pub use seriesset::{SeriesSet, SeriesSetRef}; pub use stringset::{IntoStringSet, StringSet, StringSetRef}; +use tokio::sync::mpsc; use tracing::debug; @@ -109,6 +112,22 @@ impl From> for StringSetPlan { } } +/// A plan which produces a logical set of Series +#[derive(Debug)] +pub struct SeriesSetPlan { + /// Datafusion plan(s) to execute. Each plan must produce + /// RecordBatches with the following shape: + /// + /// TODO DOCUMENT + pub plans: Vec, + + // The names of the columns that define tags + pub tag_columns: Vec, + + // The names of the columns which are "fields" + pub field_columns: Vec, +} + /// Handles executing plans, and marshalling the results into rust /// native structures. #[derive(Debug, Default)] @@ -130,6 +149,19 @@ impl Executor { .context(StringSetConversion), } } + + /// Executes this plan, sending the resulting `SeriesSet`s one by one + /// via `tx` + pub async fn to_series_set( + &self, + plan: SeriesSetPlan, + _tx: mpsc::Sender>, + ) -> Result<()> { + if plan.plans.is_empty() { + return Ok(()); + } + unimplemented!("Executing SeriesSet plans"); + } } // Various statistics for execution diff --git a/delorean_storage/src/exec/seriesset.rs b/delorean_storage/src/exec/seriesset.rs new file mode 100644 index 0000000000..f53239d178 --- /dev/null +++ b/delorean_storage/src/exec/seriesset.rs @@ -0,0 +1,52 @@ +//! This module contains the definition of a "SeriesSet" a plan that when run produces +//! rows that can be logically divided into "Series" +//! +//! Specifically, this thing can produce represents a set of "tables", +//! and each table is sorted on a set of "tag" columns, meaning the +//! groups / series will be contiguous. +//! +//! For example, the output columns of such a plan would be: +//! (tag col0) (tag col1) ... (tag colN) (field val1) (field val2) ... (field valN) .. (timestamps) +//! +//! Note that the data will come out ordered by the tag keys (ORDER BY +//! (tag col0) (tag col1) ... (tag colN)) + +use std::sync::Arc; + +use delorean_arrow::arrow::{self, array::ArrayRef}; +//use snafu::{ensure, OptionExt, Snafu}; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +/// Opaque error type +pub enum Error { + #[snafu(display("Plan Execution Error: {}", source))] + Execution { + source: Box, + }, + + #[snafu(display( + "Error reading record batch while converting from SeriesSet: {:?}", + source + ))] + ReadingRecordBatch { source: arrow::error::ArrowError }, +} + +#[allow(dead_code)] +pub type Result = std::result::Result; + +#[derive(Debug)] +pub struct SeriesSet { + /// key = value pairs + keys: Vec<(String, String)>, + /// Arrow array for this series. + values: ArrayRef, +} +pub type SeriesSetRef = Arc; + +/// Trait to convert RecordBatch'y things into `StringSetRef`s. Can +/// return errors, so don't use `std::convert::From` +pub trait IntoSeriesSet { + /// Convert this thing into a stringset + fn into_seriesset(self) -> Result; +} diff --git a/delorean_storage/src/lib.rs b/delorean_storage/src/lib.rs index 95048a397f..19ad841af2 100644 --- a/delorean_storage/src/lib.rs +++ b/delorean_storage/src/lib.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use delorean_arrow::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr}; use delorean_data_types::data::ReplicatedWrite; use delorean_line_parser::ParsedLine; -use exec::StringSetPlan; +use exec::{SeriesSetPlan, StringSetPlan}; use std::{fmt::Debug, sync::Arc}; @@ -99,8 +99,9 @@ pub trait Database: Debug + Send + Sync { predicate: Option, ) -> Result; - /// Returns the distinct values in the `column_name` column of - /// this database for rows that match optional predicates. + /// Returns a plan which can find the distinct values in the + /// `column_name` column of this database for rows that match + /// optional predicates. /// /// If `table` is specified, then only values from the /// specified database which match other predictes are included. @@ -120,6 +121,23 @@ pub trait Database: Debug + Send + Sync { predicate: Option, ) -> Result; + /// Returns a plan that finds sets of rows which form logical time + /// series. Each series is defined by the unique values in a set + /// of "tag_columns" for each field in the "field_columns" + /// + /// If `range` is specified, only rows which have data in the + /// specified timestamp range which match other predictes are + /// included. + /// + /// If `predicate` is specified, then only rows which have at + /// least one non-null value in any row that matches the predicate + /// are returned + async fn query_series( + &self, + _range: Option, + _predicate: Option, + ) -> Result; + /// Fetch the specified table names and columns as Arrow /// RecordBatches. Columns are returned in the order specified. async fn table_to_arrow( diff --git a/delorean_storage/src/test.rs b/delorean_storage/src/test.rs index 9750806c96..a5001124c3 100644 --- a/delorean_storage/src/test.rs +++ b/delorean_storage/src/test.rs @@ -5,8 +5,8 @@ use delorean_arrow::arrow::record_batch::RecordBatch; use crate::{ - exec::StringSet, exec::StringSetPlan, exec::StringSetRef, Database, DatabaseStore, Predicate, - TimestampRange, + exec::{SeriesSetPlan, StringSet, StringSetPlan, StringSetRef}, + Database, DatabaseStore, Predicate, TimestampRange, }; use delorean_data_types::data::ReplicatedWrite; use delorean_line_parser::{parse_lines, ParsedLine}; @@ -36,6 +36,12 @@ pub struct TestDatabase { /// the last request for column_values. column_values_request: Arc>>, + + /// responses to return on the next request to query_series + query_series_values: Arc>>, + + /// the last request for query_series + query_series_request: Arc>>, } /// Records the parameters passed to a column name request @@ -57,6 +63,14 @@ pub struct ColumnValuesRequest { pub predicate: Option, } +/// Records the parameters passed to a query_series request +#[derive(Debug, PartialEq, Clone)] +pub struct QuerySeriesRequest { + pub range: Option, + /// Stringified '{:?}' version of the predicate + pub predicate: Option, +} + #[derive(Snafu, Debug)] pub enum TestError { #[snafu(display("Test database error: {}", message))] @@ -75,6 +89,8 @@ impl Default for TestDatabase { column_names_request: Arc::new(Mutex::new(None)), column_values: Arc::new(Mutex::new(None)), column_values_request: Arc::new(Mutex::new(None)), + query_series_values: Arc::new(Mutex::new(None)), + query_series_request: Arc::new(Mutex::new(None)), } } } @@ -131,6 +147,17 @@ impl TestDatabase { pub async fn get_column_values_request(&self) -> Option { self.column_values_request.clone().lock().await.take() } + + /// Set the series that will be returned on a call to query_series + /// TODO add in the actual values + pub async fn set_query_series_values(&self, plan: SeriesSetPlan) { + *(self.query_series_values.clone().lock().await) = Some(plan); + } + + /// Get the parameters from the last column name request + pub async fn get_query_series_request(&self) -> Option { + self.query_series_request.clone().lock().await.take() + } } /// returns true if this line is within the range of the timestamp @@ -257,6 +284,28 @@ impl Database for TestDatabase { Ok(column_values.into()) } + async fn query_series( + &self, + range: Option, + predicate: Option, + ) -> Result { + let predicate = predicate.map(|p| format!("{:?}", p)); + + let new_queries_series_request = Some(QuerySeriesRequest { range, predicate }); + + *self.query_series_request.clone().lock().await = new_queries_series_request; + + self.query_series_values + .clone() + .lock() + .await + .take() + // Turn None into an error + .context(General { + message: "No saved query_series in TestDatabase", + }) + } + /// Fetch the specified table names and columns as Arrow RecordBatches async fn table_to_arrow( &self, diff --git a/delorean_write_buffer/src/database.rs b/delorean_write_buffer/src/database.rs index ff348527a6..e896a0a0ba 100644 --- a/delorean_write_buffer/src/database.rs +++ b/delorean_write_buffer/src/database.rs @@ -1,6 +1,8 @@ use delorean_generated_types::wal as wb; use delorean_line_parser::ParsedLine; -use delorean_storage::{exec::StringSet, exec::StringSetPlan, Database, Predicate, TimestampRange}; +use delorean_storage::{ + exec::SeriesSetPlan, exec::StringSet, exec::StringSetPlan, Database, Predicate, TimestampRange, +}; use delorean_wal::WalBuilder; use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}; @@ -439,6 +441,14 @@ impl Database for Db { } } + async fn query_series( + &self, + _range: Option, + _predicate: Option, + ) -> Result { + unimplemented!("query series for write buffer database."); + } + async fn table_to_arrow( &self, table_name: &str, diff --git a/src/server/write_buffer_rpc.rs b/src/server/write_buffer_rpc.rs index 9b9a616249..0f5486cff5 100644 --- a/src/server/write_buffer_rpc.rs +++ b/src/server/write_buffer_rpc.rs @@ -12,6 +12,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use delorean_generated_types::{ delorean_server::{Delorean, DeloreanServer}, + read_response::Frame, storage_server::{Storage, StorageServer}, CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest, DeleteBucketResponse, GetBucketsResponse, MeasurementFieldsRequest, MeasurementFieldsResponse, @@ -29,8 +30,8 @@ use crate::server::rpc::expr::convert_predicate; use crate::server::rpc::input::GrpcInputs; use delorean_storage::{ - exec::Executor as StorageExecutor, org_and_bucket_to_database, Database, DatabaseStore, - TimestampRange as StorageTimestampRange, + exec::Executor as StorageExecutor, exec::SeriesSet, org_and_bucket_to_database, Database, + DatabaseStore, TimestampRange as StorageTimestampRange, }; use snafu::{ResultExt, Snafu}; @@ -59,6 +60,18 @@ pub enum Error { source: Box, }, + #[snafu(display("Error creating series plans for database '{}': {}", db_name, source))] + PlanningFilteringSeries { + db_name: String, + source: Box, + }, + + #[snafu(display("Error running series plans for database '{}': {}", db_name, source))] + FilteringSeries { + db_name: String, + source: Box, + }, + #[snafu(display( "Can not retrieve tag values for '{}' in database '{}': {}", tag_name, @@ -94,6 +107,8 @@ impl Error { // TODO: distinguish between input errors and internal errors Status::invalid_argument(self.to_string()) } + Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()), + Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()), Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()), Self::NotYetImplemented { .. } => Status::internal(self.to_string()), @@ -164,9 +179,32 @@ where async fn read_filter( &self, - _req: tonic::Request, + req: tonic::Request, ) -> Result, Status> { - Err(Status::unimplemented("read_filter")) + let (tx, rx) = mpsc::channel(4); + + let read_filter_request = req.into_inner(); + + let db_name = get_database_name(&read_filter_request)?; + + let ReadFilterRequest { + read_source: _read_source, + range, + predicate, + } = read_filter_request; + + read_filter_impl( + tx.clone(), + self.db_store.clone(), + self.executor.clone(), + db_name, + range, + predicate, + ) + .await + .map_err(|e| e.to_status())?; + + Ok(tonic::Response::new(rx)) } type ReadGroupStream = mpsc::Receiver>; @@ -307,7 +345,7 @@ where type MeasurementTagKeysStream = mpsc::Receiver>; - //#[tracing::instrument(level = "debug")] + #[tracing::instrument(level = "debug")] async fn measurement_tag_keys( &self, req: tonic::Request, @@ -560,6 +598,85 @@ where Ok(StringValuesResponse { values }) } +/// Launch async tasks that send the result of executing read_filter to `tx` +async fn read_filter_impl( + tx: mpsc::Sender>, + db_store: Arc, + executor: Arc, + db_name: String, + range: Option, + predicate: Option, +) -> Result<()> +where + T: DatabaseStore, +{ + let range = convert_range(range); + let db = db_store + .db(&db_name) + .await + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.clone(), + })?; + + let predicate_string = format!("{:?}", predicate); + let predicate = + convert_predicate(predicate).context(ConvertingPredicate { predicate_string })?; + + let series_plan = + db.query_series(range, predicate) + .await + .map_err(|e| Error::PlanningFilteringSeries { + db_name: db_name.clone(), + source: Box::new(e), + })?; + + // Spawn task to convert between series sets and the gRPC results + // and to run the actual plans (so we can return a result to the + // client before we start sending result) + let (tx_series, rx_series) = mpsc::channel(4); + tokio::spawn(async move { convert_series_set(rx_series, tx).await }); + + // fire up the plans and start the pipeline flowing + tokio::spawn(async move { + executor + .to_series_set(series_plan, tx_series) + .await + .map_err(|e| Error::FilteringSeries { + db_name: db_name.clone(), + source: Box::new(e), + }) + }); + + Ok(()) +} + +/// Receives SeriesSets from rx, converts them to ReadResponse and +/// and sends them to tx +async fn convert_series_set( + mut rx: mpsc::Receiver>, + mut tx: mpsc::Sender>, +) { + while let Some(series_set) = rx.recv().await { + let response = series_set + .map(|_series_set| { + // todo the conversion here + warn!("Conversion NOT YET IMPLEMENTED"); + ReadResponse { + frames: vec![Frame { + //data: Some(Data::Series(SeriesFrame { data_type, tags })), + data: None, + }], + } + }) + .map_err(|e| Status::internal(e.to_string())); + + // ignore errors sending results there is no one to notice them, return early + if tx.send(response).await.is_err() { + return; + } + } +} + /// Instantiate a server listening on the specified address /// implementing the Delorean and Storage gRPC interfaces, the /// underlying hyper server instance. Resolves when the server has @@ -591,7 +708,8 @@ mod tests { use super::*; use crate::panic::SendPanicsToTracing; use delorean_storage::{ - id::Id, test::ColumnNamesRequest, test::ColumnValuesRequest, test::TestDatabaseStore, + exec::SeriesSetPlan, id::Id, test::ColumnNamesRequest, test::ColumnValuesRequest, + test::QuerySeriesRequest, test::TestDatabaseStore, }; use delorean_test_helpers::tracing::TracingCapture; use std::{ @@ -603,7 +721,9 @@ mod tests { use futures::prelude::*; - use delorean_generated_types::{delorean_client, storage_client, ReadSource}; + use delorean_generated_types::{ + delorean_client, read_response::frame, storage_client, ReadSource, + }; use prost::Message; type DeloreanClient = delorean_client::DeloreanClient; @@ -1423,6 +1543,153 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_read_filter() -> Result<(), tonic::Status> { + // Note we use a unique port. TODO: let the OS pick the port + let mut fixture = Fixture::new(11901) + .await + .expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + let test_db = fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .expect("creating test database"); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); + + #[derive(Debug)] + struct TestCase<'a> { + description: &'a str, + // TODO (when conversion between SeriesSets and Frames is implemented) + // series_set: + // expected output: + request: ReadFilterRequest, + expected_request: QuerySeriesRequest, + } + + let test_cases = vec![ + TestCase { + description: "No predicates / timestamps", + request: ReadFilterRequest { + read_source: source.clone(), + range: None, + predicate: None, + }, + expected_request: QuerySeriesRequest { + range: None, + predicate: None, + }, + }, + TestCase { + description: "Only timestamp", + request: ReadFilterRequest { + read_source: source.clone(), + range: make_timestamp_range(150, 200), + predicate: None, + }, + expected_request: QuerySeriesRequest { + range: Some(StorageTimestampRange::new(150, 200)), + predicate: None, + }, + }, + TestCase { + description: "Only predicate", + request: ReadFilterRequest { + read_source: source.clone(), + range: None, + predicate: make_state_ma_predicate(), + }, + expected_request: QuerySeriesRequest { + range: None, + predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()), + }, + }, + TestCase { + description: "Both timestamp + predicate", + request: ReadFilterRequest { + read_source: source.clone(), + range: make_timestamp_range(150, 200), + predicate: make_state_ma_predicate(), + }, + expected_request: QuerySeriesRequest { + range: Some(StorageTimestampRange::new(150, 200)), + predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()), + }, + }, + ]; + + for test_case in test_cases.into_iter() { + let test_case_str = format!("{:?}", test_case); + let TestCase { + request, + expected_request, + .. + } = test_case; + + // TODO setup any expected results + let dummy_series_set_plan = SeriesSetPlan { + plans: vec![], + tag_columns: vec![], + field_columns: vec![], + }; + test_db.set_query_series_values(dummy_series_set_plan).await; + + let actual_frames = fixture.storage_client.read_filter(request).await?; + + // TODO: encode this in the test case or something + let expected_frames: Vec = vec!["0 frames".into()]; + + assert_eq!( + actual_frames, expected_frames, + "unexpected frames returned by query_series: {}", + test_case_str + ); + assert_eq!( + test_db.get_query_series_request().await, + Some(expected_request), + "unexpected request to query_series: {}", + test_case_str + ); + } + + // --- + // test error + // --- + let request = ReadFilterRequest { + read_source: source.clone(), + range: None, + predicate: None, + }; + + // Note we don't set the response on the test database, so we expect an error + let response = fixture.storage_client.read_filter(request).await; + assert!(response.is_err()); + let response_string = format!("{:?}", response); + let expected_error = "No saved query_series in TestDatabase"; + assert!( + response_string.contains(expected_error), + "'{}' did not contain expected content '{}'", + response_string, + expected_error + ); + + let expected_request = Some(QuerySeriesRequest { + range: None, + predicate: None, + }); + assert_eq!(test_db.get_query_series_request().await, expected_request); + + Ok(()) + } + fn make_timestamp_range(start: i64, end: i64) -> Option { Some(TimestampRange { start, end }) } @@ -1604,6 +1871,32 @@ mod tests { Ok(self.to_string_vec(responses)) } + /// Make a request to Storage::read_filter and do the + /// required async dance to flatten the resulting stream + async fn read_filter( + &mut self, + request: ReadFilterRequest, + ) -> Result, tonic::Status> { + let responses: Vec<_> = self + .inner + .read_filter(request) + .await? + .into_inner() + .try_collect() + .await?; + + let data_frames: Vec = responses + .into_iter() + .flat_map(|r| r.frames) + .flat_map(|f| f.data) + .collect(); + + let s = format!("{} frames", data_frames.len()); + + //Ok(self.to_string_vec(responses)) + Ok(vec![s]) + } + /// Convert the StringValueResponses into rust Strings, sorting the values /// to ensure consistency. fn to_string_vec(&self, responses: Vec) -> Vec {