From da5c74d3c6af61f9ea2b9fe8621ca3c62d8d599d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 28 Sep 2020 11:41:10 -0400 Subject: [PATCH] feat: storage interface plans + executor (#318) * feat: storage interface plans + executor * refactor: less `expect` * fix: use more idomatic rust From --- Cargo.lock | 1 + delorean_storage/Cargo.toml | 1 + delorean_storage/src/exec.rs | 473 +++++++++++++++++++++++++++++++++++ delorean_storage/src/lib.rs | 1 + 4 files changed, 476 insertions(+) create mode 100644 delorean_storage/src/exec.rs diff --git a/Cargo.lock b/Cargo.lock index 28869c0bf9..ca2aad3da3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,7 @@ dependencies = [ "serde_urlencoded 0.6.1", "snafu", "tokio", + "tracing", ] [[package]] diff --git a/delorean_storage/Cargo.toml b/delorean_storage/Cargo.toml index be6eb8d8cf..2305a434cf 100644 --- a/delorean_storage/Cargo.toml +++ b/delorean_storage/Cargo.toml @@ -12,5 +12,6 @@ async-trait = "0.1.40" tokio = { version = "0.2", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_urlencoded = "0.6.1" +tracing = "0.1" delorean_arrow = { path = "../delorean_arrow" } delorean_line_parser = { path = "../delorean_line_parser" } diff --git a/delorean_storage/src/exec.rs b/delorean_storage/src/exec.rs new file mode 100644 index 0000000000..e510051cc6 --- /dev/null +++ b/delorean_storage/src/exec.rs @@ -0,0 +1,473 @@ +//! This module handles the manipulation / execution of storage +//! plans. This is currently implemented using DataFusion, and this +//! interface abstracts away many of the details + +use std::{collections::BTreeSet, sync::atomic::AtomicU64, sync::atomic::Ordering, sync::Arc}; + +use datafusion::prelude::{ExecutionConfig, ExecutionContext}; +use delorean_arrow::{ + arrow::{ + array::{Array, StringArray, StringArrayOps}, + datatypes::DataType, + record_batch::RecordBatch, + }, + datafusion::{ + self, + logical_plan::{Expr, LogicalPlan}, + }, +}; + +use tracing::debug; + +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +/// Opaque error type +pub enum Error { + #[snafu(display("Plan Execution Error: {}", source))] + Execution { + source: Box, + }, + + #[snafu(display("Internal error optimizing plan: {}", source))] + DataFusionOptimizationError { + source: datafusion::error::ExecutionError, + }, + + #[snafu(display("Internal error during physical planning: {}", source))] + DataFusionPhysicalPlanningError { + source: datafusion::error::ExecutionError, + }, + + #[snafu(display("Internal error executing plan: {}", source))] + DataFusionExecutionError { + source: datafusion::error::ExecutionError, + }, + + #[snafu(display("Internal error extracting results from Record Batches: {}", message))] + InternalResultsExtraction { message: String }, + + #[snafu(display("Joining execution task: {}", source))] + JoinError { source: tokio::task::JoinError }, +} + +pub type Result = std::result::Result; + +pub type StringSet = BTreeSet; +pub type StringSetRef = Arc; + +/// Represents a general purpose predicate for evaluation. +/// +/// TBD can this predicate represent predicates for multiple tables? +#[derive(Clone, Debug)] +pub struct Predicate { + /// An expresson using the DataFusion expression operations. + pub expr: Expr, +} + +/// A plan which produces a logical set of Strings (e.g. tag +/// values). This includes variants with pre-calculated results as +/// well a variant that runs a full on DataFusion plan. +#[derive(Debug)] +pub enum StringSetPlan { + // If the results are known without having to run an actual datafusion plan + Known(Result), + // A datafusion plan(s) to execute. Each plan must produce + // RecordBatches with exactly one String column + Plan(Vec), +} + +impl From> for StringSetPlan +where + E: std::error::Error + Send + Sync + 'static, +{ + /// Create a plan from a known result, wrapping the error type + /// appropriately + fn from(result: Result) -> Self { + match result { + Ok(set) => Self::Known(Ok(set)), + Err(e) => Self::Known(Err(Error::Execution { + source: Box::new(e), + })), + } + } +} + +impl From> for StringSetPlan { + /// Create a DataFusion LogicalPlan node, each if which must + /// produce a single output Utf8 column. The output of each plan + /// will be included into the final set. + fn from(plans: Vec) -> Self { + Self::Plan(plans) + } +} + +/// Handles executing plans, and marshalling the results into rust +/// native structures. +#[derive(Debug, Default)] +pub struct Executor { + counters: Arc, +} + +impl Executor { + pub fn new() -> Self { + Self::default() + } + /// Executes this plan and returns the resulting set of strings + pub async fn to_string_set(&self, plan: StringSetPlan) -> Result { + match plan { + StringSetPlan::Known(res) => res, + StringSetPlan::Plan(plans) => run_logical_plans(self.counters.clone(), plans) + .await? + .into_stringset(), + } + } +} + +// Various statistics for execution +#[derive(Debug, Default)] +pub struct ExecutionCounters { + pub plans_run: AtomicU64, +} + +impl ExecutionCounters { + fn inc_plans_run(&self) { + self.plans_run.fetch_add(1, Ordering::Relaxed); + } +} + +/// plans and runs the plans in parallel and collects the results +/// run each plan in parallel and collect the results +async fn run_logical_plans( + counters: Arc, + plans: Vec, +) -> Result> { + let value_futures = plans + .into_iter() + .map(|plan| { + let counters = counters.clone(); + // TODO run these on some executor other than the main tokio pool + tokio::task::spawn(async move { run_logical_plan(counters, plan) }) + }) + .collect::>(); + + // now, wait for all the values to resolve and collect them together + let mut results = Vec::new(); + for join_handle in value_futures.into_iter() { + let mut plan_result = join_handle.await.context(JoinError)??; + + results.append(&mut plan_result); + } + Ok(results) +} + +/// Executes the logical plan using DataFusion and produces RecordBatches +fn run_logical_plan( + counters: Arc, + plan: LogicalPlan, +) -> Result> { + counters.inc_plans_run(); + + const BATCH_SIZE: usize = 1000; + + // TBD: Should we be reusing an execution context across all executions? + let config = ExecutionConfig::new().with_batch_size(BATCH_SIZE); + //let ctx = make_exec_context(config); // TODO (With the next chunk) + let ctx = ExecutionContext::with_config(config); + + debug!("Running plan, input:\n{:?}", plan); + // TODO the datafusion optimizer was removing filters.. + //let logical_plan = ctx.optimize(&plan).context(DataFusionOptimizationError)?; + let logical_plan = plan; + debug!("Running plan, optimized:\n{:?}", logical_plan); + + let physical_plan = ctx + .create_physical_plan(&logical_plan) + .context(DataFusionPhysicalPlanningError)?; + + debug!("Running plan, physical:\n{:?}", physical_plan); + + // This executes the query, using its own threads + // internally. TODO figure out a better way to control + // concurrency / plan admission + ctx.collect(physical_plan).context(DataFusionExecutionError) +} + +trait IntoStringSet { + fn into_stringset(self) -> Result; +} + +/// Converts record batches into StringSets. Assumes that the record +/// batches each have a single string column +impl IntoStringSet for Vec { + fn into_stringset(self) -> Result { + let mut strings = StringSet::new(); + + // process the record batches one by one + for record_batch in self.into_iter() { + let num_rows = record_batch.num_rows(); + let schema = record_batch.schema(); + let fields = schema.fields(); + if fields.len() != 1 { + return InternalResultsExtraction { + message: format!( + "Expected exactly 1 field in StringSet schema, found {} field in {:?}", + fields.len(), + schema + ), + } + .fail(); + } + let field = &fields[0]; + + if *field.data_type() != DataType::Utf8 { + return InternalResultsExtraction { + message: format!( + "Expected StringSet schema field to be Utf8, instead it was {:?}", + field.data_type() + ), + } + .fail(); + } + + let array = record_batch + .column(0) + .as_any() + .downcast_ref::(); + + match array { + Some(array) => add_utf8_array_to_stringset(&mut strings, array, num_rows)?, + None => { + return InternalResultsExtraction { + message: format!("Failed to downcast field {:?} to StringArray", field), + } + .fail() + } + } + } + Ok(StringSetRef::new(strings)) + } +} + +fn add_utf8_array_to_stringset( + dest: &mut StringSet, + src: &StringArray, + num_rows: usize, +) -> Result<()> { + for i in 0..num_rows { + // Not sure how to handle a NULL -- StringSet contains + // Strings, not Option + if src.is_null(i) { + return InternalResultsExtraction { + message: "Unexpected null value", + } + .fail(); + } else { + let src_value = src.value(i); + if !dest.contains(src_value) { + dest.insert(src_value.into()); + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use delorean_arrow::arrow::{ + array::Int64Array, + datatypes::{Field, Schema, SchemaRef}, + }; + + use super::*; + + #[tokio::test] + async fn executor_known_string_set_plan_ok() -> Result<()> { + let expected_strings = to_set(&["Foo", "Bar"]); + let result: Result<_> = Ok(expected_strings.clone()); + let plan = result.into(); + + let executor = Executor::default(); + let result_strings = executor.to_string_set(plan).await?; + assert_eq!(result_strings, expected_strings); + Ok(()) + } + + #[tokio::test] + async fn executor_known_string_set_plan_err() -> Result<()> { + let result = InternalResultsExtraction { + message: "this is a test", + } + .fail(); + + let plan = result.into(); + + let executor = Executor::default(); + let actual_result = executor.to_string_set(plan).await; + assert!(actual_result.is_err()); + assert!( + format!("{:?}", actual_result).contains("this is a test"), + "Actual result: '{:?}'", + actual_result + ); + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_single_plan_no_batches() -> Result<()> { + // Test with a single plan that produces no batches + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let scan = make_plan(schema, vec![]); + let plan: StringSetPlan = vec![scan].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await?; + + assert_eq!(results, StringSetRef::new(StringSet::new())); + + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_single_plan_one_batch() -> Result<()> { + // Test with a single plan that produces one record batch + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let data = to_string_array(&["foo", "bar", "baz", "foo"]); + let batch = + RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch"); + let scan = make_plan(schema, vec![batch]); + let plan: StringSetPlan = vec![scan].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await?; + + assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_single_plan_two_batch() -> Result<()> { + // Test with a single plan that produces multiple record batches + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let data1 = to_string_array(&["foo", "bar"]); + let batch1 = + RecordBatch::try_new(schema.clone(), vec![data1]).expect("created new record batch"); + let data2 = to_string_array(&["baz", "foo"]); + let batch2 = + RecordBatch::try_new(schema.clone(), vec![data2]).expect("created new record batch"); + let scan = make_plan(schema, vec![batch1, batch2]); + let plan: StringSetPlan = vec![scan].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await?; + + assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_multi_plan() -> Result<()> { + // Test with multiple datafusion logical plans + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + + let data1 = to_string_array(&["foo", "bar"]); + let batch1 = + RecordBatch::try_new(schema.clone(), vec![data1]).expect("created new record batch"); + let scan1 = make_plan(schema.clone(), vec![batch1]); + + let data2 = to_string_array(&["baz", "foo"]); + let batch2 = + RecordBatch::try_new(schema.clone(), vec![data2]).expect("created new record batch"); + let scan2 = make_plan(schema, vec![batch2]); + + let plan: StringSetPlan = vec![scan1, scan2].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await?; + + assert_eq!(results, to_set(&["foo", "bar", "baz"])); + + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_nulls() -> Result<()> { + // Ensure that nulls in the output set are handled reasonably + // (error, rather than silently ignored) + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let mut builder = StringArray::builder(2); + builder.append_value("foo").unwrap(); + builder.append_null().unwrap(); + let data = Arc::new(builder.finish()); + let batch = + RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch"); + let scan = make_plan(schema, vec![batch]); + let plan: StringSetPlan = vec![scan].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await; + + assert!(results.is_err(), "result is {:?}", results); + let expected_error = "Unexpected null value"; + assert!( + format!("{:?}", results).contains(expected_error), + "expected error '{}' not found in '{:?}'", + expected_error, + results + ); + + Ok(()) + } + + #[tokio::test] + async fn executor_datafusion_string_set_bad_schema() -> Result<()> { + // Ensure that an incorect schema (an int) gives a reasonable error + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + let data = Arc::new(Int64Array::from(vec![1])); + let batch = + RecordBatch::try_new(schema.clone(), vec![data]).expect("created new record batch"); + let scan = make_plan(schema, vec![batch]); + let plan: StringSetPlan = vec![scan].into(); + + let executor = Executor::new(); + let results = executor.to_string_set(plan).await; + + assert!(results.is_err(), "result is {:?}", results); + let expected_error = "Expected StringSet schema field to be Utf8, instead it was Int64"; + assert!( + format!("{:?}", results).contains(expected_error), + "expected error '{}' not found in '{:?}'", + expected_error, + results + ); + + Ok(()) + } + + /// return a set for testing + fn to_set(strs: &[&str]) -> StringSetRef { + StringSetRef::new(strs.iter().map(|s| s.to_string()).collect::()) + } + + fn to_string_array(strs: &[&str]) -> Arc { + let mut builder = StringArray::builder(strs.len()); + for s in strs { + builder.append_value(s).expect("appending string"); + } + Arc::new(builder.finish()) + } + + // creates a DataFusion plan that reads the RecordBatches into memory + fn make_plan(schema: SchemaRef, data: Vec) -> LogicalPlan { + let projected_schema = schema.clone(); + + LogicalPlan::InMemoryScan { + data: vec![data], // model one partition + schema, + projection: None, + projected_schema, + } + } +} diff --git a/delorean_storage/src/lib.rs b/delorean_storage/src/lib.rs index 3574adadda..4e7d9ae767 100644 --- a/delorean_storage/src/lib.rs +++ b/delorean_storage/src/lib.rs @@ -13,6 +13,7 @@ use std::collections::BTreeSet; use std::{fmt::Debug, sync::Arc}; +pub mod exec; pub mod id; /// Specifies a continuous range of nanosecond timestamps. Timestamp