feat: basic read_group plumbing (#365)

* feat: basic read_group plumbing

* fix: Update delorean_storage/src/exec.rs
pull/24376/head
Andrew Lamb 2020-10-19 11:45:46 -04:00 committed by GitHub
parent ce76513048
commit bfb966b1f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 508 additions and 15 deletions

View File

@ -18,8 +18,11 @@ use delorean_arrow::{
use planning::DeloreanExecutionContext;
use schema_pivot::SchemaPivotNode;
// Publically export StringSets
pub use seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetConverter, SeriesSetRef};
// Publically export the different types of plans
pub use seriesset::{
Error as SeriesSetError, GroupedSeriesSetConverter, GroupedSeriesSetItem, SeriesSet,
SeriesSetConverter,
};
pub use stringset::{IntoStringSet, StringSet, StringSetRef};
use tokio::sync::mpsc;
@ -144,20 +147,49 @@ pub struct SeriesSetPlan {
pub field_columns: Vec<Arc<String>>,
}
/// A plan to run that can produce a grouped set series.
///
/// TODO: this may also need / support computing an aggregation per
/// group, pending on what is required for the gRPC layer.
#[derive(Debug)]
pub struct GroupedSeriesSetPlan {
/// The underlying SeriesSet
pub series_set_plan: SeriesSetPlan,
/// How many of the series_set_plan::tag_columns should be used to
/// compute the group
pub num_prefix_tag_group_columns: usize,
}
/// A container for plans which each produces a logical stream of
/// timeseries (from across many potential tables). A `SeriesSetPlans`
/// and can be executed to produce streams of `SeriesSet`s.
/// can be executed to produce streams of `SeriesSet`s.
#[derive(Debug, Default)]
pub struct SeriesSetPlans {
pub plans: Vec<SeriesSetPlan>,
}
/// A container for plans which each produces a logical stream of
/// timeseries (from across many potential tables) grouped in some
/// way. A `GroupedSeriesSetPlans` can be executed to produce
/// streams of `GroupedSeriesSet`s.
#[derive(Debug, Default)]
pub struct GroupedSeriesSetPlans {
pub grouped_plans: Vec<GroupedSeriesSetPlan>,
}
impl From<Vec<SeriesSetPlan>> for SeriesSetPlans {
fn from(plans: Vec<SeriesSetPlan>) -> Self {
Self { plans }
}
}
impl From<Vec<GroupedSeriesSetPlan>> for GroupedSeriesSetPlans {
fn from(grouped_plans: Vec<GroupedSeriesSetPlan>) -> Self {
Self { grouped_plans }
}
}
/// Handles executing plans, and marshalling the results into rust
/// native structures.
#[derive(Debug, Default)]
@ -240,6 +272,79 @@ impl Executor {
})
.collect::<Vec<_>>();
// now, wait for all the values to resolve and report any errors
for join_handle in handles.into_iter() {
join_handle.await.context(JoinError)??;
}
Ok(())
}
/// Executes the the Grouped plans, sending the
/// results one by one to the `tx` chanel.
///
/// Note that the returned future resolves (e.g. "returns") once
/// all plans have been sent to `tx`. This means that the future
/// will not resolve if there is nothing hooked up receiving
/// results from the other end of the channel and the channel
/// can't hold all the resulting series.
pub async fn to_grouped_series_set(
&self,
grouped_series_set_plans: GroupedSeriesSetPlans,
tx: mpsc::Sender<Result<GroupedSeriesSetItem, SeriesSetError>>,
) -> Result<()> {
let GroupedSeriesSetPlans { grouped_plans } = grouped_series_set_plans;
// Run the plans in parallel
let handles = grouped_plans
.into_iter()
.map(|plan| {
// Clone Arc's for transmission to threads
let counters = self.counters.clone();
let tx = tx.clone();
tokio::task::spawn(async move {
let GroupedSeriesSetPlan {
series_set_plan,
num_prefix_tag_group_columns,
} = plan;
let SeriesSetPlan {
table_name,
plan,
tag_columns,
field_columns,
} = series_set_plan;
let tag_columns = Arc::new(tag_columns);
let field_columns = Arc::new(field_columns);
// TODO run these on some executor other than the main tokio pool (maybe?)
let ctx = DeloreanExecutionContext::new(counters);
let physical_plan = ctx
.make_plan(&plan)
.await
.context(DataFusionPhysicalPlanning)?;
let it = ctx
.execute(physical_plan)
.await
.context(DataFusionExecution)?;
GroupedSeriesSetConverter::new(tx)
.convert(
table_name,
tag_columns,
num_prefix_tag_group_columns,
field_columns,
it,
)
.await
.context(SeriesSetConversion)?;
Ok(())
})
})
.collect::<Vec<_>>();
// now, wait for all the values to resolve and reprot any errors
for join_handle in handles.into_iter() {
join_handle.await.context(JoinError)??;

View File

@ -91,7 +91,25 @@ pub struct SeriesSet {
// The underlying record batch data
pub batch: RecordBatch,
}
pub type SeriesSetRef = Arc<SeriesSet>;
/// Describes a group of series "group of series" series. Namely,
/// several logical timeseries that share the same timestamps and
/// name=value tag keys, grouped by some subset of the tag keys
///
/// TODO: this may also support computing an aggregation per group,
/// pending on what is required for the gRPC layer.
#[derive(Debug)]
pub struct GroupDescription {
/// key = value pairs that define the group
pub tags: Vec<(Arc<String>, Arc<String>)>,
// TODO: maybe also include the resulting aggregate value (per group) here
}
#[derive(Debug)]
pub enum GroupedSeriesSetItem {
GroupStart(GroupDescription),
GroupData(SeriesSet),
}
// Handles converting record batches into SeriesSets, and sending them
// to tx
@ -319,6 +337,30 @@ impl SeriesSetConverter {
}
}
// Handles converting record batches into GroupedSeriesSets, and
// sending them to tx
#[derive(Debug)]
pub struct GroupedSeriesSetConverter {
tx: mpsc::Sender<Result<GroupedSeriesSetItem>>,
}
impl GroupedSeriesSetConverter {
pub fn new(tx: mpsc::Sender<Result<GroupedSeriesSetItem>>) -> Self {
Self { tx }
}
pub async fn convert(
&mut self,
_table_name: Arc<String>,
_tag_columns: Arc<Vec<Arc<String>>>,
_num_prefix_tag_group_columns: usize,
_field_columns: Arc<Vec<Arc<String>>>,
_it: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
unimplemented!("GroupedSeriesConverter");
}
}
#[cfg(test)]
mod tests {
use arrow::{

View File

@ -10,7 +10,7 @@ use async_trait::async_trait;
use delorean_arrow::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr};
use delorean_data_types::data::ReplicatedWrite;
use delorean_line_parser::ParsedLine;
use exec::{SeriesSetPlans, StringSetPlan};
use exec::{GroupedSeriesSetPlans, SeriesSetPlans, StringSetPlan};
use std::{fmt::Debug, sync::Arc};
@ -134,10 +134,30 @@ pub trait Database: Debug + Send + Sync {
/// are returned
async fn query_series(
&self,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
) -> Result<SeriesSetPlans, Self::Error>;
/// Returns a plan that finds sets of rows which form groups of
/// logical time series. Each series is defined by the unique
/// values in a set of "tag_columns" for each field in the
/// "field_columns". Each group is is defined by unique
/// combinations of the columns described
///
/// If `range` is specified, only rows which have data in the
/// specified timestamp range which match other predictes are
/// included.
///
/// If `predicate` is specified, then only rows which have at
/// least one non-null value in any row that matches the predicate
/// are returned
async fn query_groups(
&self,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
group_columns: Vec<String>,
) -> Result<GroupedSeriesSetPlans, Self::Error>;
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(

View File

@ -4,10 +4,12 @@
use delorean_arrow::arrow::record_batch::RecordBatch;
use crate::{
exec::GroupedSeriesSetPlans,
exec::SeriesSetPlans,
exec::{StringSet, StringSetPlan, StringSetRef},
Database, DatabaseStore, Predicate, TimestampRange,
};
use delorean_data_types::data::ReplicatedWrite;
use delorean_line_parser::{parse_lines, ParsedLine};
@ -42,6 +44,12 @@ pub struct TestDatabase {
/// The last request for `query_series`
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
/// Responses to return on the next request to `query_groups`
query_groups_values: Arc<Mutex<Option<GroupedSeriesSetPlans>>>,
/// The last request for `query_series`
query_groups_request: Arc<Mutex<Option<QueryGroupsRequest>>>,
}
/// Records the parameters passed to a column name request
@ -70,6 +78,14 @@ pub struct QuerySeriesRequest {
/// Stringified '{:?}' version of the predicate
pub predicate: Option<String>,
}
/// Records the parameters passed to a `query_groups` request
#[derive(Debug, PartialEq, Clone)]
pub struct QueryGroupsRequest {
pub range: Option<TimestampRange>,
/// Stringified '{:?}' version of the predicate
pub predicate: Option<String>,
pub group_columns: Vec<String>,
}
#[derive(Snafu, Debug)]
pub enum TestError {
@ -134,7 +150,6 @@ impl TestDatabase {
}
/// Set the series that will be returned on a call to query_series
/// TODO add in the actual values
pub async fn set_query_series_values(&self, plan: SeriesSetPlans) {
*(self.query_series_values.clone().lock().await) = Some(plan);
}
@ -143,6 +158,16 @@ impl TestDatabase {
pub async fn get_query_series_request(&self) -> Option<QuerySeriesRequest> {
self.query_series_request.clone().lock().await.take()
}
/// Set the series that will be returned on a call to query_groups
pub async fn set_query_groups_values(&self, plan: GroupedSeriesSetPlans) {
*(self.query_groups_values.clone().lock().await) = Some(plan);
}
/// Get the parameters from the last column name request
pub async fn get_query_groups_request(&self) -> Option<QueryGroupsRequest> {
self.query_groups_request.clone().lock().await.take()
}
}
/// returns true if this line is within the range of the timestamp
@ -291,6 +316,33 @@ impl Database for TestDatabase {
})
}
async fn query_groups(
&self,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
group_columns: Vec<String>,
) -> Result<GroupedSeriesSetPlans, Self::Error> {
let predicate = predicate.map(|p| format!("{:?}", p));
let new_queries_groups_request = Some(QueryGroupsRequest {
range,
predicate,
group_columns,
});
*self.query_groups_request.clone().lock().await = new_queries_groups_request;
self.query_groups_values
.clone()
.lock()
.await
.take()
// Turn None into an error
.context(General {
message: "No saved query_groups in TestDatabase",
})
}
/// Fetch the specified table names and columns as Arrow RecordBatches
async fn table_to_arrow(
&self,

View File

@ -1,8 +1,8 @@
use delorean_generated_types::wal as wb;
use delorean_line_parser::ParsedLine;
use delorean_storage::{
exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet, exec::StringSetPlan, Database,
Predicate, TimestampRange,
exec::GroupedSeriesSetPlans, exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet,
exec::StringSetPlan, Database, Predicate, TimestampRange,
};
use delorean_wal::WalBuilder;
use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails};
@ -457,6 +457,15 @@ impl Database for Db {
Ok(visitor.plans.into())
}
async fn query_groups(
&self,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
_group_columns: Vec<String>,
) -> Result<GroupedSeriesSetPlans, Self::Error> {
unimplemented!("query_groups unimplemented as part of write buffer database");
}
async fn table_to_arrow(
&self,
table_name: &str,

View File

@ -7,7 +7,7 @@ use delorean_arrow::arrow::{
datatypes::DataType as ArrowDataType,
};
use delorean_storage::exec::SeriesSet;
use delorean_storage::exec::{GroupedSeriesSetItem, SeriesSet};
use delorean_generated_types::{
read_response::{
@ -58,6 +58,32 @@ pub fn series_set_to_read_response(series_set: SeriesSet) -> Result<ReadResponse
Ok(ReadResponse { frames })
}
/// Convert `GroupedSeriesSetIem` into a form suitable for gRPC transport
///
/// Each `GroupedSeriesSetItem` gets converted into this pattern:
///
/// ```
/// (GroupFrame)
///
/// (SeriesFrame for field1)
/// (*Points for field1)
/// (SeriesFrame for field12)
/// (*Points for field1)
/// (....)
/// (SeriesFrame for field1)
/// (*Points for field1)
/// (SeriesFrame for field12)
/// (*Points for field1)
/// (....)
/// ```
///
/// The specific type of (*Points) depends on the type of field column.
pub fn grouped_series_set_to_read_response(
_grouped_series_set_item: GroupedSeriesSetItem,
) -> Result<ReadResponse> {
unimplemented!("grouped_series_set_to_read_response");
}
fn data_type(array: &ArrayRef) -> Result<DataType> {
match array.data_type() {
ArrowDataType::Utf8 => Ok(DataType::String),

View File

@ -29,7 +29,7 @@ use crate::server::rpc::expr::convert_predicate;
use crate::server::rpc::input::GrpcInputs;
use delorean_storage::{
exec::{Executor as StorageExecutor, SeriesSet, SeriesSetError},
exec::{Executor as StorageExecutor, GroupedSeriesSetItem, SeriesSet, SeriesSetError},
org_and_bucket_to_database, Database, DatabaseStore, TimestampRange as StorageTimestampRange,
};
@ -39,7 +39,7 @@ use tokio::sync::mpsc;
use tonic::Status;
use tracing::warn;
use super::rpc::data::series_set_to_read_response;
use super::rpc::data::{grouped_series_set_to_read_response, series_set_to_read_response};
#[derive(Debug, Snafu)]
pub enum Error {
@ -67,12 +67,24 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error creating group plans for database '{}': {}", db_name, source))]
PlanningGroupSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error running series plans for database '{}': {}", db_name, source))]
FilteringSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error running grouping plans for database '{}': {}", db_name, source))]
GroupingSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
"Can not retrieve tag values for '{}' in database '{}': {}",
tag_name,
@ -94,6 +106,9 @@ pub enum Error {
#[snafu(display("Computing series: {}", source))]
ComputingSeriesSet { source: SeriesSetError },
#[snafu(display("Computing groups series: {}", source))]
ComputingGroupedSeriesSet { source: SeriesSetError },
#[snafu(display("Converting time series into gRPC response: {}", source))]
ConvertingSeriesSet {
source: crate::server::rpc::data::Error,
@ -117,10 +132,13 @@ impl Error {
Status::invalid_argument(self.to_string())
}
Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::PlanningGroupSeries { .. } => Status::invalid_argument(self.to_string()),
Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::GroupingSeries { .. } => Status::invalid_argument(self.to_string()),
Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::NotYetImplemented { .. } => Status::internal(self.to_string()),
}
@ -226,7 +244,37 @@ where
&self,
req: tonic::Request<ReadGroupRequest>,
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
Err(Status::unimplemented("read_group"))
let (tx, rx) = mpsc::channel(4);
let read_group_request = req.into_inner();
let db_name = get_database_name(&read_group_request)?;
let ReadGroupRequest {
read_source: _read_source,
range,
predicate,
group_keys,
// TODO: handle Group::None
group: _group,
// TODO: handle aggregate values, especially whether None is the same as
// Some(AggregateType::None) or not
aggregate: _aggregate,
} = read_group_request;
read_group_impl(
tx.clone(),
self.db_store.clone(),
self.executor.clone(),
db_name,
range,
predicate,
group_keys,
)
.await
.map_err(|e| e.to_status())?;
Ok(tonic::Response::new(rx))
}
type TagKeysStream = mpsc::Receiver<Result<StringValuesResponse, Status>>;
@ -683,6 +731,81 @@ async fn convert_series_set(
}
}
/// Launch async tasks that send the result of executing read_group to `tx`
async fn read_group_impl<T>(
tx: mpsc::Sender<Result<ReadResponse, Status>>,
db_store: Arc<T>,
executor: Arc<StorageExecutor>,
db_name: String,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
group_keys: Vec<String>,
) -> Result<()>
where
T: DatabaseStore,
{
let range = convert_range(range);
let db = db_store
.db(&db_name)
.await
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.clone(),
})?;
let predicate_string = format!("{:?}", predicate);
let predicate =
convert_predicate(predicate).context(ConvertingPredicate { predicate_string })?;
let grouped_series_set_plan = db
.query_groups(range, predicate, group_keys)
.await
.map_err(|e| Error::PlanningFilteringSeries {
db_name: db_name.clone(),
source: Box::new(e),
})?;
// Spawn task to convert between series sets and the gRPC results
// and to run the actual plans (so we can return a result to the
// client before we start sending result)
let (tx_series, rx_series) = mpsc::channel(4);
tokio::spawn(async move { convert_grouped_series_set(rx_series, tx).await });
// fire up the plans and start the pipeline flowing
tokio::spawn(async move {
executor
.to_grouped_series_set(grouped_series_set_plan, tx_series)
.await
.map_err(|e| Error::GroupingSeries {
db_name: db_name.clone(),
source: Box::new(e),
})
});
Ok(())
}
/// Receives SeriesSets from rx, converts them to ReadResponse and
/// and sends them to tx
async fn convert_grouped_series_set(
mut rx: mpsc::Receiver<Result<GroupedSeriesSetItem, SeriesSetError>>,
mut tx: mpsc::Sender<Result<ReadResponse, Status>>,
) {
while let Some(grouped_series_set_item) = rx.recv().await {
let response = grouped_series_set_item
.context(ComputingGroupedSeriesSet)
.and_then(|grouped_series_set_item| {
grouped_series_set_to_read_response(grouped_series_set_item)
.context(ConvertingSeriesSet)
})
.map_err(|e| Status::internal(e.to_string()));
// ignore errors sending results there is no one to notice them, return early
if tx.send(response).await.is_err() {
return;
}
}
}
/// Instantiate a server listening on the specified address
/// implementing the Delorean and Storage gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
@ -714,9 +837,11 @@ mod tests {
use super::*;
use crate::panic::SendPanicsToTracing;
use delorean_storage::{
exec::GroupedSeriesSetPlans,
exec::SeriesSetPlans,
id::Id,
test::ColumnNamesRequest,
test::QueryGroupsRequest,
test::TestDatabaseStore,
test::{ColumnValuesRequest, QuerySeriesRequest},
};
@ -1695,6 +1820,96 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_read_group() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port
let mut fixture = Fixture::new(11902)
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let test_db = fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.expect("creating test database");
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
let group = delorean_generated_types::read_group_request::Group::None as i32;
let request = ReadGroupRequest {
read_source: source.clone(),
range: make_timestamp_range(150, 200),
predicate: make_state_ma_predicate(),
group_keys: vec![String::from("tag1")],
group,
aggregate: None,
};
let expected_request = QueryGroupsRequest {
range: Some(StorageTimestampRange::new(150, 200)),
predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()),
group_columns: vec![String::from("tag1")],
};
// TODO setup any expected results
let dummy_groups_set_plan = GroupedSeriesSetPlans::from(vec![]);
test_db.set_query_groups_values(dummy_groups_set_plan).await;
let actual_frames = fixture.storage_client.read_group(request).await?;
let expected_frames: Vec<String> = vec!["0 group frames".into()];
assert_eq!(
actual_frames, expected_frames,
"unexpected frames returned by query_groups"
);
assert_eq!(
test_db.get_query_groups_request().await,
Some(expected_request),
"unexpected request to query_groups"
);
// ---
// test error
// ---
let request = ReadGroupRequest {
read_source: source.clone(),
range: None,
predicate: None,
group_keys: vec![],
group,
aggregate: None,
};
// Note we don't set the response on the test database, so we expect an error
let response = fixture.storage_client.read_group(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
let expected_error = "No saved query_groups in TestDatabase";
assert!(
response_string.contains(expected_error),
"'{}' did not contain expected content '{}'",
response_string,
expected_error
);
let expected_request = Some(QueryGroupsRequest {
range: None,
predicate: None,
group_columns: vec![],
});
assert_eq!(test_db.get_query_groups_request().await, expected_request);
Ok(())
}
fn make_timestamp_range(start: i64, end: i64) -> Option<TimestampRange> {
Some(TimestampRange { start, end })
}
@ -1898,7 +2113,31 @@ mod tests {
let s = format!("{} frames", data_frames.len());
//Ok(self.to_string_vec(responses))
Ok(vec![s])
}
/// Make a request to Storage::query_groups and do the
/// required async dance to flatten the resulting stream
async fn read_group(
&mut self,
request: ReadGroupRequest,
) -> Result<Vec<String>, tonic::Status> {
let responses: Vec<_> = self
.inner
.read_group(request)
.await?
.into_inner()
.try_collect()
.await?;
let data_frames: Vec<frame::Data> = responses
.into_iter()
.flat_map(|r| r.frames)
.flat_map(|f| f.data)
.collect();
let s = format!("{} group frames", data_frames.len());
Ok(vec![s])
}