diff --git a/delorean_storage/src/exec.rs b/delorean_storage/src/exec.rs index d92335d375..ba65456887 100644 --- a/delorean_storage/src/exec.rs +++ b/delorean_storage/src/exec.rs @@ -18,8 +18,11 @@ use delorean_arrow::{ use planning::DeloreanExecutionContext; use schema_pivot::SchemaPivotNode; -// Publically export StringSets -pub use seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetConverter, SeriesSetRef}; +// Publically export the different types of plans +pub use seriesset::{ + Error as SeriesSetError, GroupedSeriesSetConverter, GroupedSeriesSetItem, SeriesSet, + SeriesSetConverter, +}; pub use stringset::{IntoStringSet, StringSet, StringSetRef}; use tokio::sync::mpsc; @@ -144,20 +147,49 @@ pub struct SeriesSetPlan { pub field_columns: Vec>, } +/// A plan to run that can produce a grouped set series. +/// +/// TODO: this may also need / support computing an aggregation per +/// group, pending on what is required for the gRPC layer. +#[derive(Debug)] +pub struct GroupedSeriesSetPlan { + /// The underlying SeriesSet + pub series_set_plan: SeriesSetPlan, + + /// How many of the series_set_plan::tag_columns should be used to + /// compute the group + pub num_prefix_tag_group_columns: usize, +} + /// A container for plans which each produces a logical stream of /// timeseries (from across many potential tables). A `SeriesSetPlans` -/// and can be executed to produce streams of `SeriesSet`s. +/// can be executed to produce streams of `SeriesSet`s. #[derive(Debug, Default)] pub struct SeriesSetPlans { pub plans: Vec, } +/// A container for plans which each produces a logical stream of +/// timeseries (from across many potential tables) grouped in some +/// way. A `GroupedSeriesSetPlans` can be executed to produce +/// streams of `GroupedSeriesSet`s. +#[derive(Debug, Default)] +pub struct GroupedSeriesSetPlans { + pub grouped_plans: Vec, +} + impl From> for SeriesSetPlans { fn from(plans: Vec) -> Self { Self { plans } } } +impl From> for GroupedSeriesSetPlans { + fn from(grouped_plans: Vec) -> Self { + Self { grouped_plans } + } +} + /// Handles executing plans, and marshalling the results into rust /// native structures. #[derive(Debug, Default)] @@ -240,6 +272,79 @@ impl Executor { }) .collect::>(); + // now, wait for all the values to resolve and report any errors + for join_handle in handles.into_iter() { + join_handle.await.context(JoinError)??; + } + Ok(()) + } + + /// Executes the the Grouped plans, sending the + /// results one by one to the `tx` chanel. + /// + /// Note that the returned future resolves (e.g. "returns") once + /// all plans have been sent to `tx`. This means that the future + /// will not resolve if there is nothing hooked up receiving + /// results from the other end of the channel and the channel + /// can't hold all the resulting series. + pub async fn to_grouped_series_set( + &self, + grouped_series_set_plans: GroupedSeriesSetPlans, + tx: mpsc::Sender>, + ) -> Result<()> { + let GroupedSeriesSetPlans { grouped_plans } = grouped_series_set_plans; + + // Run the plans in parallel + let handles = grouped_plans + .into_iter() + .map(|plan| { + // Clone Arc's for transmission to threads + let counters = self.counters.clone(); + let tx = tx.clone(); + tokio::task::spawn(async move { + let GroupedSeriesSetPlan { + series_set_plan, + num_prefix_tag_group_columns, + } = plan; + + let SeriesSetPlan { + table_name, + plan, + tag_columns, + field_columns, + } = series_set_plan; + + let tag_columns = Arc::new(tag_columns); + let field_columns = Arc::new(field_columns); + + // TODO run these on some executor other than the main tokio pool (maybe?) + let ctx = DeloreanExecutionContext::new(counters); + let physical_plan = ctx + .make_plan(&plan) + .await + .context(DataFusionPhysicalPlanning)?; + + let it = ctx + .execute(physical_plan) + .await + .context(DataFusionExecution)?; + + GroupedSeriesSetConverter::new(tx) + .convert( + table_name, + tag_columns, + num_prefix_tag_group_columns, + field_columns, + it, + ) + .await + .context(SeriesSetConversion)?; + + Ok(()) + }) + }) + .collect::>(); + // now, wait for all the values to resolve and reprot any errors for join_handle in handles.into_iter() { join_handle.await.context(JoinError)??; diff --git a/delorean_storage/src/exec/seriesset.rs b/delorean_storage/src/exec/seriesset.rs index d73db6a897..062c779ca4 100644 --- a/delorean_storage/src/exec/seriesset.rs +++ b/delorean_storage/src/exec/seriesset.rs @@ -91,7 +91,25 @@ pub struct SeriesSet { // The underlying record batch data pub batch: RecordBatch, } -pub type SeriesSetRef = Arc; + +/// Describes a group of series "group of series" series. Namely, +/// several logical timeseries that share the same timestamps and +/// name=value tag keys, grouped by some subset of the tag keys +/// +/// TODO: this may also support computing an aggregation per group, +/// pending on what is required for the gRPC layer. +#[derive(Debug)] +pub struct GroupDescription { + /// key = value pairs that define the group + pub tags: Vec<(Arc, Arc)>, + // TODO: maybe also include the resulting aggregate value (per group) here +} + +#[derive(Debug)] +pub enum GroupedSeriesSetItem { + GroupStart(GroupDescription), + GroupData(SeriesSet), +} // Handles converting record batches into SeriesSets, and sending them // to tx @@ -319,6 +337,30 @@ impl SeriesSetConverter { } } +// Handles converting record batches into GroupedSeriesSets, and +// sending them to tx +#[derive(Debug)] +pub struct GroupedSeriesSetConverter { + tx: mpsc::Sender>, +} + +impl GroupedSeriesSetConverter { + pub fn new(tx: mpsc::Sender>) -> Self { + Self { tx } + } + + pub async fn convert( + &mut self, + _table_name: Arc, + _tag_columns: Arc>>, + _num_prefix_tag_group_columns: usize, + _field_columns: Arc>>, + _it: Box, + ) -> Result<()> { + unimplemented!("GroupedSeriesConverter"); + } +} + #[cfg(test)] mod tests { use arrow::{ diff --git a/delorean_storage/src/lib.rs b/delorean_storage/src/lib.rs index 5e225d8520..b9be17d827 100644 --- a/delorean_storage/src/lib.rs +++ b/delorean_storage/src/lib.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use delorean_arrow::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr}; use delorean_data_types::data::ReplicatedWrite; use delorean_line_parser::ParsedLine; -use exec::{SeriesSetPlans, StringSetPlan}; +use exec::{GroupedSeriesSetPlans, SeriesSetPlans, StringSetPlan}; use std::{fmt::Debug, sync::Arc}; @@ -134,10 +134,30 @@ pub trait Database: Debug + Send + Sync { /// are returned async fn query_series( &self, - _range: Option, - _predicate: Option, + range: Option, + predicate: Option, ) -> Result; + /// Returns a plan that finds sets of rows which form groups of + /// logical time series. Each series is defined by the unique + /// values in a set of "tag_columns" for each field in the + /// "field_columns". Each group is is defined by unique + /// combinations of the columns described + /// + /// If `range` is specified, only rows which have data in the + /// specified timestamp range which match other predictes are + /// included. + /// + /// If `predicate` is specified, then only rows which have at + /// least one non-null value in any row that matches the predicate + /// are returned + async fn query_groups( + &self, + range: Option, + predicate: Option, + group_columns: Vec, + ) -> Result; + /// Fetch the specified table names and columns as Arrow /// RecordBatches. Columns are returned in the order specified. async fn table_to_arrow( diff --git a/delorean_storage/src/test.rs b/delorean_storage/src/test.rs index 2c88811c0a..eaf2586f44 100644 --- a/delorean_storage/src/test.rs +++ b/delorean_storage/src/test.rs @@ -4,10 +4,12 @@ use delorean_arrow::arrow::record_batch::RecordBatch; use crate::{ + exec::GroupedSeriesSetPlans, exec::SeriesSetPlans, exec::{StringSet, StringSetPlan, StringSetRef}, Database, DatabaseStore, Predicate, TimestampRange, }; + use delorean_data_types::data::ReplicatedWrite; use delorean_line_parser::{parse_lines, ParsedLine}; @@ -42,6 +44,12 @@ pub struct TestDatabase { /// The last request for `query_series` query_series_request: Arc>>, + + /// Responses to return on the next request to `query_groups` + query_groups_values: Arc>>, + + /// The last request for `query_series` + query_groups_request: Arc>>, } /// Records the parameters passed to a column name request @@ -70,6 +78,14 @@ pub struct QuerySeriesRequest { /// Stringified '{:?}' version of the predicate pub predicate: Option, } +/// Records the parameters passed to a `query_groups` request +#[derive(Debug, PartialEq, Clone)] +pub struct QueryGroupsRequest { + pub range: Option, + /// Stringified '{:?}' version of the predicate + pub predicate: Option, + pub group_columns: Vec, +} #[derive(Snafu, Debug)] pub enum TestError { @@ -134,7 +150,6 @@ impl TestDatabase { } /// Set the series that will be returned on a call to query_series - /// TODO add in the actual values pub async fn set_query_series_values(&self, plan: SeriesSetPlans) { *(self.query_series_values.clone().lock().await) = Some(plan); } @@ -143,6 +158,16 @@ impl TestDatabase { pub async fn get_query_series_request(&self) -> Option { self.query_series_request.clone().lock().await.take() } + + /// Set the series that will be returned on a call to query_groups + pub async fn set_query_groups_values(&self, plan: GroupedSeriesSetPlans) { + *(self.query_groups_values.clone().lock().await) = Some(plan); + } + + /// Get the parameters from the last column name request + pub async fn get_query_groups_request(&self) -> Option { + self.query_groups_request.clone().lock().await.take() + } } /// returns true if this line is within the range of the timestamp @@ -291,6 +316,33 @@ impl Database for TestDatabase { }) } + async fn query_groups( + &self, + range: Option, + predicate: Option, + group_columns: Vec, + ) -> Result { + let predicate = predicate.map(|p| format!("{:?}", p)); + + let new_queries_groups_request = Some(QueryGroupsRequest { + range, + predicate, + group_columns, + }); + + *self.query_groups_request.clone().lock().await = new_queries_groups_request; + + self.query_groups_values + .clone() + .lock() + .await + .take() + // Turn None into an error + .context(General { + message: "No saved query_groups in TestDatabase", + }) + } + /// Fetch the specified table names and columns as Arrow RecordBatches async fn table_to_arrow( &self, diff --git a/delorean_write_buffer/src/database.rs b/delorean_write_buffer/src/database.rs index e3a4ed7312..95c5fe99e7 100644 --- a/delorean_write_buffer/src/database.rs +++ b/delorean_write_buffer/src/database.rs @@ -1,8 +1,8 @@ use delorean_generated_types::wal as wb; use delorean_line_parser::ParsedLine; use delorean_storage::{ - exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet, exec::StringSetPlan, Database, - Predicate, TimestampRange, + exec::GroupedSeriesSetPlans, exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet, + exec::StringSetPlan, Database, Predicate, TimestampRange, }; use delorean_wal::WalBuilder; use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}; @@ -457,6 +457,15 @@ impl Database for Db { Ok(visitor.plans.into()) } + async fn query_groups( + &self, + _range: Option, + _predicate: Option, + _group_columns: Vec, + ) -> Result { + unimplemented!("query_groups unimplemented as part of write buffer database"); + } + async fn table_to_arrow( &self, table_name: &str, diff --git a/src/server/rpc/data.rs b/src/server/rpc/data.rs index e680cca0c7..1158dc8e4f 100644 --- a/src/server/rpc/data.rs +++ b/src/server/rpc/data.rs @@ -7,7 +7,7 @@ use delorean_arrow::arrow::{ datatypes::DataType as ArrowDataType, }; -use delorean_storage::exec::SeriesSet; +use delorean_storage::exec::{GroupedSeriesSetItem, SeriesSet}; use delorean_generated_types::{ read_response::{ @@ -58,6 +58,32 @@ pub fn series_set_to_read_response(series_set: SeriesSet) -> Result Result { + unimplemented!("grouped_series_set_to_read_response"); +} + fn data_type(array: &ArrayRef) -> Result { match array.data_type() { ArrowDataType::Utf8 => Ok(DataType::String), diff --git a/src/server/write_buffer_rpc.rs b/src/server/write_buffer_rpc.rs index 69e59a9d8d..3fef84e27b 100644 --- a/src/server/write_buffer_rpc.rs +++ b/src/server/write_buffer_rpc.rs @@ -29,7 +29,7 @@ use crate::server::rpc::expr::convert_predicate; use crate::server::rpc::input::GrpcInputs; use delorean_storage::{ - exec::{Executor as StorageExecutor, SeriesSet, SeriesSetError}, + exec::{Executor as StorageExecutor, GroupedSeriesSetItem, SeriesSet, SeriesSetError}, org_and_bucket_to_database, Database, DatabaseStore, TimestampRange as StorageTimestampRange, }; @@ -39,7 +39,7 @@ use tokio::sync::mpsc; use tonic::Status; use tracing::warn; -use super::rpc::data::series_set_to_read_response; +use super::rpc::data::{grouped_series_set_to_read_response, series_set_to_read_response}; #[derive(Debug, Snafu)] pub enum Error { @@ -67,12 +67,24 @@ pub enum Error { source: Box, }, + #[snafu(display("Error creating group plans for database '{}': {}", db_name, source))] + PlanningGroupSeries { + db_name: String, + source: Box, + }, + #[snafu(display("Error running series plans for database '{}': {}", db_name, source))] FilteringSeries { db_name: String, source: Box, }, + #[snafu(display("Error running grouping plans for database '{}': {}", db_name, source))] + GroupingSeries { + db_name: String, + source: Box, + }, + #[snafu(display( "Can not retrieve tag values for '{}' in database '{}': {}", tag_name, @@ -94,6 +106,9 @@ pub enum Error { #[snafu(display("Computing series: {}", source))] ComputingSeriesSet { source: SeriesSetError }, + #[snafu(display("Computing groups series: {}", source))] + ComputingGroupedSeriesSet { source: SeriesSetError }, + #[snafu(display("Converting time series into gRPC response: {}", source))] ConvertingSeriesSet { source: crate::server::rpc::data::Error, @@ -117,10 +132,13 @@ impl Error { Status::invalid_argument(self.to_string()) } Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()), + Self::PlanningGroupSeries { .. } => Status::invalid_argument(self.to_string()), Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()), + Self::GroupingSeries { .. } => Status::invalid_argument(self.to_string()), Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()), Self::ComputingSeriesSet { .. } => Status::invalid_argument(self.to_string()), + Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()), Self::NotYetImplemented { .. } => Status::internal(self.to_string()), } @@ -226,7 +244,37 @@ where &self, req: tonic::Request, ) -> Result, Status> { - Err(Status::unimplemented("read_group")) + let (tx, rx) = mpsc::channel(4); + + let read_group_request = req.into_inner(); + + let db_name = get_database_name(&read_group_request)?; + + let ReadGroupRequest { + read_source: _read_source, + range, + predicate, + group_keys, + // TODO: handle Group::None + group: _group, + // TODO: handle aggregate values, especially whether None is the same as + // Some(AggregateType::None) or not + aggregate: _aggregate, + } = read_group_request; + + read_group_impl( + tx.clone(), + self.db_store.clone(), + self.executor.clone(), + db_name, + range, + predicate, + group_keys, + ) + .await + .map_err(|e| e.to_status())?; + + Ok(tonic::Response::new(rx)) } type TagKeysStream = mpsc::Receiver>; @@ -683,6 +731,81 @@ async fn convert_series_set( } } +/// Launch async tasks that send the result of executing read_group to `tx` +async fn read_group_impl( + tx: mpsc::Sender>, + db_store: Arc, + executor: Arc, + db_name: String, + range: Option, + predicate: Option, + group_keys: Vec, +) -> Result<()> +where + T: DatabaseStore, +{ + let range = convert_range(range); + let db = db_store + .db(&db_name) + .await + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.clone(), + })?; + + let predicate_string = format!("{:?}", predicate); + let predicate = + convert_predicate(predicate).context(ConvertingPredicate { predicate_string })?; + + let grouped_series_set_plan = db + .query_groups(range, predicate, group_keys) + .await + .map_err(|e| Error::PlanningFilteringSeries { + db_name: db_name.clone(), + source: Box::new(e), + })?; + + // Spawn task to convert between series sets and the gRPC results + // and to run the actual plans (so we can return a result to the + // client before we start sending result) + let (tx_series, rx_series) = mpsc::channel(4); + tokio::spawn(async move { convert_grouped_series_set(rx_series, tx).await }); + + // fire up the plans and start the pipeline flowing + tokio::spawn(async move { + executor + .to_grouped_series_set(grouped_series_set_plan, tx_series) + .await + .map_err(|e| Error::GroupingSeries { + db_name: db_name.clone(), + source: Box::new(e), + }) + }); + + Ok(()) +} + +/// Receives SeriesSets from rx, converts them to ReadResponse and +/// and sends them to tx +async fn convert_grouped_series_set( + mut rx: mpsc::Receiver>, + mut tx: mpsc::Sender>, +) { + while let Some(grouped_series_set_item) = rx.recv().await { + let response = grouped_series_set_item + .context(ComputingGroupedSeriesSet) + .and_then(|grouped_series_set_item| { + grouped_series_set_to_read_response(grouped_series_set_item) + .context(ConvertingSeriesSet) + }) + .map_err(|e| Status::internal(e.to_string())); + + // ignore errors sending results there is no one to notice them, return early + if tx.send(response).await.is_err() { + return; + } + } +} + /// Instantiate a server listening on the specified address /// implementing the Delorean and Storage gRPC interfaces, the /// underlying hyper server instance. Resolves when the server has @@ -714,9 +837,11 @@ mod tests { use super::*; use crate::panic::SendPanicsToTracing; use delorean_storage::{ + exec::GroupedSeriesSetPlans, exec::SeriesSetPlans, id::Id, test::ColumnNamesRequest, + test::QueryGroupsRequest, test::TestDatabaseStore, test::{ColumnValuesRequest, QuerySeriesRequest}, }; @@ -1695,6 +1820,96 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_read_group() -> Result<(), tonic::Status> { + // Note we use a unique port. TODO: let the OS pick the port + let mut fixture = Fixture::new(11902) + .await + .expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + let test_db = fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .expect("creating test database"); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); + + let group = delorean_generated_types::read_group_request::Group::None as i32; + + let request = ReadGroupRequest { + read_source: source.clone(), + range: make_timestamp_range(150, 200), + predicate: make_state_ma_predicate(), + group_keys: vec![String::from("tag1")], + group, + aggregate: None, + }; + + let expected_request = QueryGroupsRequest { + range: Some(StorageTimestampRange::new(150, 200)), + predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()), + group_columns: vec![String::from("tag1")], + }; + + // TODO setup any expected results + let dummy_groups_set_plan = GroupedSeriesSetPlans::from(vec![]); + test_db.set_query_groups_values(dummy_groups_set_plan).await; + + let actual_frames = fixture.storage_client.read_group(request).await?; + let expected_frames: Vec = vec!["0 group frames".into()]; + + assert_eq!( + actual_frames, expected_frames, + "unexpected frames returned by query_groups" + ); + assert_eq!( + test_db.get_query_groups_request().await, + Some(expected_request), + "unexpected request to query_groups" + ); + + // --- + // test error + // --- + let request = ReadGroupRequest { + read_source: source.clone(), + range: None, + predicate: None, + group_keys: vec![], + group, + aggregate: None, + }; + + // Note we don't set the response on the test database, so we expect an error + let response = fixture.storage_client.read_group(request).await; + assert!(response.is_err()); + let response_string = format!("{:?}", response); + let expected_error = "No saved query_groups in TestDatabase"; + assert!( + response_string.contains(expected_error), + "'{}' did not contain expected content '{}'", + response_string, + expected_error + ); + + let expected_request = Some(QueryGroupsRequest { + range: None, + predicate: None, + group_columns: vec![], + }); + assert_eq!(test_db.get_query_groups_request().await, expected_request); + + Ok(()) + } + fn make_timestamp_range(start: i64, end: i64) -> Option { Some(TimestampRange { start, end }) } @@ -1898,7 +2113,31 @@ mod tests { let s = format!("{} frames", data_frames.len()); - //Ok(self.to_string_vec(responses)) + Ok(vec![s]) + } + + /// Make a request to Storage::query_groups and do the + /// required async dance to flatten the resulting stream + async fn read_group( + &mut self, + request: ReadGroupRequest, + ) -> Result, tonic::Status> { + let responses: Vec<_> = self + .inner + .read_group(request) + .await? + .into_inner() + .try_collect() + .await?; + + let data_frames: Vec = responses + .into_iter() + .flat_map(|r| r.frames) + .flat_map(|f| f.data) + .collect(); + + let s = format!("{} group frames", data_frames.len()); + Ok(vec![s]) }