feat: gRPC plumbing + interface structures for read_filter (#351)

* feat: gRPC plumbing + support structures for read_filter

* fix: cleanup comments
pull/24376/head
Andrew Lamb 2020-10-12 14:12:53 -04:00 committed by GitHub
parent befd386088
commit 80088ffe37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 467 additions and 13 deletions

View File

@ -3,6 +3,7 @@
//! interface abstracts away many of the details
mod planning;
mod schema_pivot;
mod seriesset;
mod stringset;
use std::{sync::atomic::AtomicU64, sync::atomic::Ordering, sync::Arc};
@ -16,7 +17,9 @@ use planning::make_exec_context;
use schema_pivot::SchemaPivotNode;
// Publically export StringSets
pub use seriesset::{SeriesSet, SeriesSetRef};
pub use stringset::{IntoStringSet, StringSet, StringSetRef};
use tokio::sync::mpsc;
use tracing::debug;
@ -109,6 +112,22 @@ impl From<Vec<LogicalPlan>> for StringSetPlan {
}
}
/// A plan which produces a logical set of Series
#[derive(Debug)]
pub struct SeriesSetPlan {
/// Datafusion plan(s) to execute. Each plan must produce
/// RecordBatches with the following shape:
///
/// TODO DOCUMENT
pub plans: Vec<LogicalPlan>,
// The names of the columns that define tags
pub tag_columns: Vec<String>,
// The names of the columns which are "fields"
pub field_columns: Vec<String>,
}
/// Handles executing plans, and marshalling the results into rust
/// native structures.
#[derive(Debug, Default)]
@ -130,6 +149,19 @@ impl Executor {
.context(StringSetConversion),
}
}
/// Executes this plan, sending the resulting `SeriesSet`s one by one
/// via `tx`
pub async fn to_series_set(
&self,
plan: SeriesSetPlan,
_tx: mpsc::Sender<Result<SeriesSet>>,
) -> Result<()> {
if plan.plans.is_empty() {
return Ok(());
}
unimplemented!("Executing SeriesSet plans");
}
}
// Various statistics for execution

View File

@ -0,0 +1,52 @@
//! This module contains the definition of a "SeriesSet" a plan that when run produces
//! rows that can be logically divided into "Series"
//!
//! Specifically, this thing can produce represents a set of "tables",
//! and each table is sorted on a set of "tag" columns, meaning the
//! groups / series will be contiguous.
//!
//! For example, the output columns of such a plan would be:
//! (tag col0) (tag col1) ... (tag colN) (field val1) (field val2) ... (field valN) .. (timestamps)
//!
//! Note that the data will come out ordered by the tag keys (ORDER BY
//! (tag col0) (tag col1) ... (tag colN))
use std::sync::Arc;
use delorean_arrow::arrow::{self, array::ArrayRef};
//use snafu::{ensure, OptionExt, Snafu};
use snafu::Snafu;
#[derive(Debug, Snafu)]
/// Opaque error type
pub enum Error {
#[snafu(display("Plan Execution Error: {}", source))]
Execution {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display(
"Error reading record batch while converting from SeriesSet: {:?}",
source
))]
ReadingRecordBatch { source: arrow::error::ArrowError },
}
#[allow(dead_code)]
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct SeriesSet {
/// key = value pairs
keys: Vec<(String, String)>,
/// Arrow array for this series.
values: ArrayRef,
}
pub type SeriesSetRef = Arc<SeriesSet>;
/// Trait to convert RecordBatch'y things into `StringSetRef`s. Can
/// return errors, so don't use `std::convert::From`
pub trait IntoSeriesSet {
/// Convert this thing into a stringset
fn into_seriesset(self) -> Result<SeriesSet>;
}

View File

@ -10,7 +10,7 @@ use async_trait::async_trait;
use delorean_arrow::{arrow::record_batch::RecordBatch, datafusion::logical_plan::Expr};
use delorean_data_types::data::ReplicatedWrite;
use delorean_line_parser::ParsedLine;
use exec::StringSetPlan;
use exec::{SeriesSetPlan, StringSetPlan};
use std::{fmt::Debug, sync::Arc};
@ -99,8 +99,9 @@ pub trait Database: Debug + Send + Sync {
predicate: Option<Predicate>,
) -> Result<StringSetPlan, Self::Error>;
/// Returns the distinct values in the `column_name` column of
/// this database for rows that match optional predicates.
/// Returns a plan which can find the distinct values in the
/// `column_name` column of this database for rows that match
/// optional predicates.
///
/// If `table` is specified, then only values from the
/// specified database which match other predictes are included.
@ -120,6 +121,23 @@ pub trait Database: Debug + Send + Sync {
predicate: Option<Predicate>,
) -> Result<StringSetPlan, Self::Error>;
/// Returns a plan that finds sets of rows which form logical time
/// series. Each series is defined by the unique values in a set
/// of "tag_columns" for each field in the "field_columns"
///
/// If `range` is specified, only rows which have data in the
/// specified timestamp range which match other predictes are
/// included.
///
/// If `predicate` is specified, then only rows which have at
/// least one non-null value in any row that matches the predicate
/// are returned
async fn query_series(
&self,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
) -> Result<SeriesSetPlan, Self::Error>;
/// Fetch the specified table names and columns as Arrow
/// RecordBatches. Columns are returned in the order specified.
async fn table_to_arrow(

View File

@ -5,8 +5,8 @@
use delorean_arrow::arrow::record_batch::RecordBatch;
use crate::{
exec::StringSet, exec::StringSetPlan, exec::StringSetRef, Database, DatabaseStore, Predicate,
TimestampRange,
exec::{SeriesSetPlan, StringSet, StringSetPlan, StringSetRef},
Database, DatabaseStore, Predicate, TimestampRange,
};
use delorean_data_types::data::ReplicatedWrite;
use delorean_line_parser::{parse_lines, ParsedLine};
@ -36,6 +36,12 @@ pub struct TestDatabase {
/// the last request for column_values.
column_values_request: Arc<Mutex<Option<ColumnValuesRequest>>>,
/// responses to return on the next request to query_series
query_series_values: Arc<Mutex<Option<SeriesSetPlan>>>,
/// the last request for query_series
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
}
/// Records the parameters passed to a column name request
@ -57,6 +63,14 @@ pub struct ColumnValuesRequest {
pub predicate: Option<String>,
}
/// Records the parameters passed to a query_series request
#[derive(Debug, PartialEq, Clone)]
pub struct QuerySeriesRequest {
pub range: Option<TimestampRange>,
/// Stringified '{:?}' version of the predicate
pub predicate: Option<String>,
}
#[derive(Snafu, Debug)]
pub enum TestError {
#[snafu(display("Test database error: {}", message))]
@ -75,6 +89,8 @@ impl Default for TestDatabase {
column_names_request: Arc::new(Mutex::new(None)),
column_values: Arc::new(Mutex::new(None)),
column_values_request: Arc::new(Mutex::new(None)),
query_series_values: Arc::new(Mutex::new(None)),
query_series_request: Arc::new(Mutex::new(None)),
}
}
}
@ -131,6 +147,17 @@ impl TestDatabase {
pub async fn get_column_values_request(&self) -> Option<ColumnValuesRequest> {
self.column_values_request.clone().lock().await.take()
}
/// Set the series that will be returned on a call to query_series
/// TODO add in the actual values
pub async fn set_query_series_values(&self, plan: SeriesSetPlan) {
*(self.query_series_values.clone().lock().await) = Some(plan);
}
/// Get the parameters from the last column name request
pub async fn get_query_series_request(&self) -> Option<QuerySeriesRequest> {
self.query_series_request.clone().lock().await.take()
}
}
/// returns true if this line is within the range of the timestamp
@ -257,6 +284,28 @@ impl Database for TestDatabase {
Ok(column_values.into())
}
async fn query_series(
&self,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
) -> Result<SeriesSetPlan, Self::Error> {
let predicate = predicate.map(|p| format!("{:?}", p));
let new_queries_series_request = Some(QuerySeriesRequest { range, predicate });
*self.query_series_request.clone().lock().await = new_queries_series_request;
self.query_series_values
.clone()
.lock()
.await
.take()
// Turn None into an error
.context(General {
message: "No saved query_series in TestDatabase",
})
}
/// Fetch the specified table names and columns as Arrow RecordBatches
async fn table_to_arrow(
&self,

View File

@ -1,6 +1,8 @@
use delorean_generated_types::wal as wb;
use delorean_line_parser::ParsedLine;
use delorean_storage::{exec::StringSet, exec::StringSetPlan, Database, Predicate, TimestampRange};
use delorean_storage::{
exec::SeriesSetPlan, exec::StringSet, exec::StringSetPlan, Database, Predicate, TimestampRange,
};
use delorean_wal::WalBuilder;
use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails};
@ -439,6 +441,14 @@ impl Database for Db {
}
}
async fn query_series(
&self,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
) -> Result<SeriesSetPlan, Self::Error> {
unimplemented!("query series for write buffer database.");
}
async fn table_to_arrow(
&self,
table_name: &str,

View File

@ -12,6 +12,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use delorean_generated_types::{
delorean_server::{Delorean, DeloreanServer},
read_response::Frame,
storage_server::{Storage, StorageServer},
CapabilitiesResponse, CreateBucketRequest, CreateBucketResponse, DeleteBucketRequest,
DeleteBucketResponse, GetBucketsResponse, MeasurementFieldsRequest, MeasurementFieldsResponse,
@ -29,8 +30,8 @@ use crate::server::rpc::expr::convert_predicate;
use crate::server::rpc::input::GrpcInputs;
use delorean_storage::{
exec::Executor as StorageExecutor, org_and_bucket_to_database, Database, DatabaseStore,
TimestampRange as StorageTimestampRange,
exec::Executor as StorageExecutor, exec::SeriesSet, org_and_bucket_to_database, Database,
DatabaseStore, TimestampRange as StorageTimestampRange,
};
use snafu::{ResultExt, Snafu};
@ -59,6 +60,18 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error creating series plans for database '{}': {}", db_name, source))]
PlanningFilteringSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Error running series plans for database '{}': {}", db_name, source))]
FilteringSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
"Can not retrieve tag values for '{}' in database '{}': {}",
tag_name,
@ -94,6 +107,8 @@ impl Error {
// TODO: distinguish between input errors and internal errors
Status::invalid_argument(self.to_string())
}
Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()),
Self::NotYetImplemented { .. } => Status::internal(self.to_string()),
@ -164,9 +179,32 @@ where
async fn read_filter(
&self,
_req: tonic::Request<ReadFilterRequest>,
req: tonic::Request<ReadFilterRequest>,
) -> Result<tonic::Response<Self::ReadFilterStream>, Status> {
Err(Status::unimplemented("read_filter"))
let (tx, rx) = mpsc::channel(4);
let read_filter_request = req.into_inner();
let db_name = get_database_name(&read_filter_request)?;
let ReadFilterRequest {
read_source: _read_source,
range,
predicate,
} = read_filter_request;
read_filter_impl(
tx.clone(),
self.db_store.clone(),
self.executor.clone(),
db_name,
range,
predicate,
)
.await
.map_err(|e| e.to_status())?;
Ok(tonic::Response::new(rx))
}
type ReadGroupStream = mpsc::Receiver<Result<ReadResponse, Status>>;
@ -307,7 +345,7 @@ where
type MeasurementTagKeysStream = mpsc::Receiver<Result<StringValuesResponse, Status>>;
//#[tracing::instrument(level = "debug")]
#[tracing::instrument(level = "debug")]
async fn measurement_tag_keys(
&self,
req: tonic::Request<MeasurementTagKeysRequest>,
@ -560,6 +598,85 @@ where
Ok(StringValuesResponse { values })
}
/// Launch async tasks that send the result of executing read_filter to `tx`
async fn read_filter_impl<T>(
tx: mpsc::Sender<Result<ReadResponse, Status>>,
db_store: Arc<T>,
executor: Arc<StorageExecutor>,
db_name: String,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
) -> Result<()>
where
T: DatabaseStore,
{
let range = convert_range(range);
let db = db_store
.db(&db_name)
.await
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.clone(),
})?;
let predicate_string = format!("{:?}", predicate);
let predicate =
convert_predicate(predicate).context(ConvertingPredicate { predicate_string })?;
let series_plan =
db.query_series(range, predicate)
.await
.map_err(|e| Error::PlanningFilteringSeries {
db_name: db_name.clone(),
source: Box::new(e),
})?;
// Spawn task to convert between series sets and the gRPC results
// and to run the actual plans (so we can return a result to the
// client before we start sending result)
let (tx_series, rx_series) = mpsc::channel(4);
tokio::spawn(async move { convert_series_set(rx_series, tx).await });
// fire up the plans and start the pipeline flowing
tokio::spawn(async move {
executor
.to_series_set(series_plan, tx_series)
.await
.map_err(|e| Error::FilteringSeries {
db_name: db_name.clone(),
source: Box::new(e),
})
});
Ok(())
}
/// Receives SeriesSets from rx, converts them to ReadResponse and
/// and sends them to tx
async fn convert_series_set(
mut rx: mpsc::Receiver<delorean_storage::exec::Result<SeriesSet>>,
mut tx: mpsc::Sender<Result<ReadResponse, Status>>,
) {
while let Some(series_set) = rx.recv().await {
let response = series_set
.map(|_series_set| {
// todo the conversion here
warn!("Conversion NOT YET IMPLEMENTED");
ReadResponse {
frames: vec![Frame {
//data: Some(Data::Series(SeriesFrame { data_type, tags })),
data: None,
}],
}
})
.map_err(|e| Status::internal(e.to_string()));
// ignore errors sending results there is no one to notice them, return early
if tx.send(response).await.is_err() {
return;
}
}
}
/// Instantiate a server listening on the specified address
/// implementing the Delorean and Storage gRPC interfaces, the
/// underlying hyper server instance. Resolves when the server has
@ -591,7 +708,8 @@ mod tests {
use super::*;
use crate::panic::SendPanicsToTracing;
use delorean_storage::{
id::Id, test::ColumnNamesRequest, test::ColumnValuesRequest, test::TestDatabaseStore,
exec::SeriesSetPlan, id::Id, test::ColumnNamesRequest, test::ColumnValuesRequest,
test::QuerySeriesRequest, test::TestDatabaseStore,
};
use delorean_test_helpers::tracing::TracingCapture;
use std::{
@ -603,7 +721,9 @@ mod tests {
use futures::prelude::*;
use delorean_generated_types::{delorean_client, storage_client, ReadSource};
use delorean_generated_types::{
delorean_client, read_response::frame, storage_client, ReadSource,
};
use prost::Message;
type DeloreanClient = delorean_client::DeloreanClient<tonic::transport::Channel>;
@ -1423,6 +1543,153 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_read_filter() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port
let mut fixture = Fixture::new(11901)
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let test_db = fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.expect("creating test database");
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
#[derive(Debug)]
struct TestCase<'a> {
description: &'a str,
// TODO (when conversion between SeriesSets and Frames is implemented)
// series_set:
// expected output:
request: ReadFilterRequest,
expected_request: QuerySeriesRequest,
}
let test_cases = vec![
TestCase {
description: "No predicates / timestamps",
request: ReadFilterRequest {
read_source: source.clone(),
range: None,
predicate: None,
},
expected_request: QuerySeriesRequest {
range: None,
predicate: None,
},
},
TestCase {
description: "Only timestamp",
request: ReadFilterRequest {
read_source: source.clone(),
range: make_timestamp_range(150, 200),
predicate: None,
},
expected_request: QuerySeriesRequest {
range: Some(StorageTimestampRange::new(150, 200)),
predicate: None,
},
},
TestCase {
description: "Only predicate",
request: ReadFilterRequest {
read_source: source.clone(),
range: None,
predicate: make_state_ma_predicate(),
},
expected_request: QuerySeriesRequest {
range: None,
predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()),
},
},
TestCase {
description: "Both timestamp + predicate",
request: ReadFilterRequest {
read_source: source.clone(),
range: make_timestamp_range(150, 200),
predicate: make_state_ma_predicate(),
},
expected_request: QuerySeriesRequest {
range: Some(StorageTimestampRange::new(150, 200)),
predicate: Some("Predicate { expr: #state Eq Utf8(\"MA\") }".into()),
},
},
];
for test_case in test_cases.into_iter() {
let test_case_str = format!("{:?}", test_case);
let TestCase {
request,
expected_request,
..
} = test_case;
// TODO setup any expected results
let dummy_series_set_plan = SeriesSetPlan {
plans: vec![],
tag_columns: vec![],
field_columns: vec![],
};
test_db.set_query_series_values(dummy_series_set_plan).await;
let actual_frames = fixture.storage_client.read_filter(request).await?;
// TODO: encode this in the test case or something
let expected_frames: Vec<String> = vec!["0 frames".into()];
assert_eq!(
actual_frames, expected_frames,
"unexpected frames returned by query_series: {}",
test_case_str
);
assert_eq!(
test_db.get_query_series_request().await,
Some(expected_request),
"unexpected request to query_series: {}",
test_case_str
);
}
// ---
// test error
// ---
let request = ReadFilterRequest {
read_source: source.clone(),
range: None,
predicate: None,
};
// Note we don't set the response on the test database, so we expect an error
let response = fixture.storage_client.read_filter(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
let expected_error = "No saved query_series in TestDatabase";
assert!(
response_string.contains(expected_error),
"'{}' did not contain expected content '{}'",
response_string,
expected_error
);
let expected_request = Some(QuerySeriesRequest {
range: None,
predicate: None,
});
assert_eq!(test_db.get_query_series_request().await, expected_request);
Ok(())
}
fn make_timestamp_range(start: i64, end: i64) -> Option<TimestampRange> {
Some(TimestampRange { start, end })
}
@ -1604,6 +1871,32 @@ mod tests {
Ok(self.to_string_vec(responses))
}
/// Make a request to Storage::read_filter and do the
/// required async dance to flatten the resulting stream
async fn read_filter(
&mut self,
request: ReadFilterRequest,
) -> Result<Vec<String>, tonic::Status> {
let responses: Vec<_> = self
.inner
.read_filter(request)
.await?
.into_inner()
.try_collect()
.await?;
let data_frames: Vec<frame::Data> = responses
.into_iter()
.flat_map(|r| r.frames)
.flat_map(|f| f.data)
.collect();
let s = format!("{} frames", data_frames.len());
//Ok(self.to_string_vec(responses))
Ok(vec![s])
}
/// Convert the StringValueResponses into rust Strings, sorting the values
/// to ensure consistency.
fn to_string_vec(&self, responses: Vec<StringValuesResponse>) -> Vec<String> {