From a2b81cdb20a4f112f2d267997361a1ddd3782f5d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 22 Oct 2021 13:16:34 -0400 Subject: [PATCH 1/3] fix: Put Debug format of rusoto errors in error messages Fixes #2942 --- object_store/src/aws.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 1264e9f4ae..783e8fbcaf 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -35,7 +35,7 @@ pub enum Error { NoData { bucket: String, location: String }, #[snafu(display( - "Unable to DELETE data. Bucket: {}, Location: {}, Error: {}", + "Unable to DELETE data. Bucket: {}, Location: {}, Error: {:?}", bucket, location, source, @@ -47,7 +47,7 @@ pub enum Error { }, #[snafu(display( - "Unable to GET data. Bucket: {}, Location: {}, Error: {}", + "Unable to GET data. Bucket: {}, Location: {}, Error: {:?}", bucket, location, source, @@ -59,7 +59,7 @@ pub enum Error { }, #[snafu(display( - "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {}", + "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {:?}", bucket, location, source, @@ -71,7 +71,7 @@ pub enum Error { }, #[snafu(display( - "Unable to PUT data. Bucket: {}, Location: {}, Error: {}", + "Unable to PUT data. Bucket: {}, Location: {}, Error: {:?}", bucket, location, source, @@ -82,14 +82,14 @@ pub enum Error { location: String, }, - #[snafu(display("Unable to list data. Bucket: {}, Error: {}", bucket, source))] + #[snafu(display("Unable to list data. Bucket: {}, Error: {:?}", bucket, source))] UnableToListData { source: rusoto_core::RusotoError, bucket: String, }, #[snafu(display( - "Unable to parse last modified date. Bucket: {}, Error: {}", + "Unable to parse last modified date. Bucket: {}, Error: {:?}", bucket, source ))] @@ -98,7 +98,7 @@ pub enum Error { bucket: String, }, - #[snafu(display("Unable to buffer data into temporary file, Error: {}", source))] + #[snafu(display("Unable to buffer data into temporary file, Error: {:?}", source))] UnableToBufferStream { source: std::io::Error }, #[snafu(display( From 6ae42bb774299218a3a2cf393352d9057620cef8 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 22 Oct 2021 13:59:48 -0400 Subject: [PATCH 2/3] fix: Put both Debug and Display of AWS errors in error messages --- object_store/src/aws.rs | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 783e8fbcaf..5d81a6ed5d 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -35,10 +35,11 @@ pub enum Error { NoData { bucket: String, location: String }, #[snafu(display( - "Unable to DELETE data. Bucket: {}, Location: {}, Error: {:?}", + "Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})", bucket, location, source, + source, ))] UnableToDeleteData { source: rusoto_core::RusotoError, @@ -47,10 +48,11 @@ pub enum Error { }, #[snafu(display( - "Unable to GET data. Bucket: {}, Location: {}, Error: {:?}", + "Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})", bucket, location, source, + source, ))] UnableToGetData { source: rusoto_core::RusotoError, @@ -59,10 +61,11 @@ pub enum Error { }, #[snafu(display( - "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {:?}", + "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})", bucket, location, source, + source, ))] UnableToGetPieceOfData { source: std::io::Error, @@ -71,10 +74,11 @@ pub enum Error { }, #[snafu(display( - "Unable to PUT data. Bucket: {}, Location: {}, Error: {:?}", + "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})", bucket, location, source, + source, ))] UnableToPutData { source: rusoto_core::RusotoError, @@ -82,29 +86,40 @@ pub enum Error { location: String, }, - #[snafu(display("Unable to list data. Bucket: {}, Error: {:?}", bucket, source))] + #[snafu(display( + "Unable to list data. Bucket: {}, Error: {} ({:?})", + bucket, + source, + source, + ))] UnableToListData { source: rusoto_core::RusotoError, bucket: String, }, #[snafu(display( - "Unable to parse last modified date. Bucket: {}, Error: {:?}", + "Unable to parse last modified date. Bucket: {}, Error: {} ({:?})", bucket, - source + source, + source, ))] UnableToParseLastModified { source: chrono::ParseError, bucket: String, }, - #[snafu(display("Unable to buffer data into temporary file, Error: {:?}", source))] + #[snafu(display( + "Unable to buffer data into temporary file, Error: {} ({:?})", + source, + source, + ))] UnableToBufferStream { source: std::io::Error }, #[snafu(display( - "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {:?}", + "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})", region, - source + source, + source, ))] InvalidRegion { region: String, From e8ddc4e0db4c59f039a34f0e3297c9eb495c1d5a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 22 Oct 2021 14:53:20 -0400 Subject: [PATCH 3/3] feat: Add custom `NonNullChecker` operator that checks for non-null values in columns (#2944) * feat: Add custom `NonNullChecker` operator that checks for non-null values in columns * fix: remove println * fix: typo Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- query/src/exec.rs | 41 ++- query/src/exec/context.rs | 13 +- query/src/exec/non_null_checker.rs | 488 +++++++++++++++++++++++++++++ query/src/exec/schema_pivot.rs | 2 +- 4 files changed, 541 insertions(+), 3 deletions(-) create mode 100644 query/src/exec/non_null_checker.rs diff --git a/query/src/exec.rs b/query/src/exec.rs index 5078f68980..24aa17ca4d 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -4,6 +4,7 @@ pub(crate) mod context; pub mod field; pub mod fieldlist; +mod non_null_checker; mod query_tracing; mod schema_pivot; pub mod seriesset; @@ -22,7 +23,7 @@ use datafusion::{ pub use context::{IOxExecutionConfig, IOxExecutionContext}; use schema_pivot::SchemaPivotNode; -use self::{split::StreamSplitNode, task::DedicatedExecutor}; +use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode, task::DedicatedExecutor}; /// Configuration for an Executor #[derive(Debug, Clone)] @@ -142,6 +143,44 @@ pub fn make_schema_pivot(input: LogicalPlan) -> LogicalPlan { LogicalPlan::Extension { node } } +/// Make a NonNullChecker node takes an arbitrary input array and +/// produces a single string output column that contains +/// +/// 1. the single `table_name` string if any of the input columns are non-null +/// 2. zero rows if all of the input columns are null +/// +/// For this input: +/// +/// ColA | ColB | ColC +/// ------+------+------ +/// 1 | NULL | NULL +/// 2 | 2 | NULL +/// 3 | 2 | NULL +/// +/// The output would be (given 'the_table_name' was the table name) +/// +/// non_null_column +/// ----------------- +/// the_table_name +/// +/// However, for this input (All NULL) +/// +/// ColA | ColB | ColC +/// ------+------+------ +/// NULL | NULL | NULL +/// NULL | NULL | NULL +/// NULL | NULL | NULL +/// +/// There would be no output rows +/// +/// non_null_column +/// ----------------- +pub fn make_non_null_checker(table_name: &str, input: LogicalPlan) -> LogicalPlan { + let node = Arc::new(NonNullCheckerNode::new(table_name, input)); + + LogicalPlan::Extension { node } +} + /// Create a StreamSplit node which takes an input stream of record /// batches and produces two output streams based on a predicate /// diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index 99af100458..44bf13455f 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -25,6 +25,7 @@ use trace::{ctx::SpanContext, span::SpanRecorder}; use crate::{ exec::{ fieldlist::{FieldList, IntoFieldList}, + non_null_checker::NonNullCheckerExec, query_tracing::TracedStream, schema_pivot::{SchemaPivotExec, SchemaPivotNode}, seriesset::{ @@ -46,7 +47,10 @@ use crate::plan::{ // Reuse DataFusion error and Result types for this module pub use datafusion::error::{DataFusionError as Error, Result}; -use super::{seriesset::series::Either, split::StreamSplitNode, task::DedicatedExecutor}; +use super::{ + non_null_checker::NonNullCheckerNode, seriesset::series::Either, split::StreamSplitNode, + task::DedicatedExecutor, +}; // The default catalog name - this impacts what SQL queries use if not specified pub const DEFAULT_CATALOG: &str = "public"; @@ -97,6 +101,13 @@ impl ExtensionPlanner for IOxExtensionPlanner { Arc::clone(&physical_inputs[0]), schema_pivot.schema().as_ref().clone().into(), )) as Arc) + } else if let Some(non_null_checker) = any.downcast_ref::() { + assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); + Some(Arc::new(NonNullCheckerExec::new( + Arc::clone(&physical_inputs[0]), + non_null_checker.schema().as_ref().clone().into(), + non_null_checker.value(), + )) as Arc) } else if let Some(stream_split) = any.downcast_ref::() { assert_eq!( logical_inputs.len(), diff --git a/query/src/exec/non_null_checker.rs b/query/src/exec/non_null_checker.rs new file mode 100644 index 0000000000..25edcc3e1e --- /dev/null +++ b/query/src/exec/non_null_checker.rs @@ -0,0 +1,488 @@ +//! This module contains code for the "NonNullChecker" DataFusion +//! extension plan node +//! +//! A NonNullChecker node takes an arbitrary input array and produces +//! a single string output column that contains +//! +//! 1. A single string if any of the input columns are non-null +//! 2. zero rows if all of the input columns are null +//! +//! For this input: +//! +//! ColA | ColB | ColC +//! ------+------+------ +//! 1 | NULL | NULL +//! 2 | 2 | NULL +//! 3 | 2 | NULL +//! +//! The output would be (given 'the_value' was provided to `NonNullChecker` node) +//! +//! non_null_column +//! ----------------- +//! the_value +//! +//! However, for this input (All NULL) +//! +//! ColA | ColB | ColC +//! ------+------+------ +//! NULL | NULL | NULL +//! NULL | NULL | NULL +//! NULL | NULL | NULL +//! +//! There would be no output rows +//! +//! non_null_column +//! ----------------- +//! +//! This operation can be used to implement the table_name metadata query + +use std::{ + any::Any, + fmt::{self, Debug}, + sync::Arc, +}; + +use async_trait::async_trait; + +use arrow::{ + array::{new_empty_array, StringArray}, + datatypes::{DataType, Field, Schema, SchemaRef}, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, +}; +use datafusion::{ + error::{DataFusionError as Error, Result}, + logical_plan::{DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode}, + physical_plan::{ + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, + }, +}; + +use datafusion_util::AdapterStream; +use observability_deps::tracing::debug; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; + +/// Implements the NonNullChecker operation as described in this module's documentation +pub struct NonNullCheckerNode { + input: LogicalPlan, + schema: DFSchemaRef, + /// these expressions represent what columns are "used" by this + /// node (in this case all of them) -- columns that are not used + /// are optimzied away by datafusion. + exprs: Vec, + + /// The value to produce if there are any non null Inputs + value: Arc, +} + +impl NonNullCheckerNode { + pub fn new(value: &str, input: LogicalPlan) -> Self { + let schema = make_non_null_checker_output_schema(); + + // Form exprs that refer to all of our input columns (so that + // datafusion knows not to opimize them away) + let exprs = input + .schema() + .fields() + .iter() + .map(|field| Expr::Column(field.qualified_column())) + .collect::>(); + + Self { + input, + schema, + exprs, + value: value.into(), + } + } + + /// Return the value associated with this checker + pub fn value(&self) -> Arc { + Arc::clone(&self.value) + } +} + +impl Debug for NonNullCheckerNode { + /// Use explain format for the Debug format. + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.fmt_for_explain(f) + } +} + +impl UserDefinedLogicalNode for NonNullCheckerNode { + fn as_any(&self) -> &dyn Any { + self + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + /// Schema for Pivot is a single string + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + self.exprs.clone() + } + + /// For example: `NonNullChecker('the_value')` + fn fmt_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "NonNullChecker('{}')", self.value) + } + + fn from_template( + &self, + exprs: &[Expr], + inputs: &[LogicalPlan], + ) -> Arc { + assert_eq!(inputs.len(), 1, "NonNullChecker: input sizes inconistent"); + assert_eq!( + exprs.len(), + self.exprs.len(), + "NonNullChecker: expression sizes inconistent" + ); + Arc::new(Self::new(self.value.as_ref(), inputs[0].clone())) + } +} + +// ------ The implementation of NonNullChecker code follows ----- + +/// Create the schema describing the output +pub fn make_non_null_checker_output_schema() -> DFSchemaRef { + let nullable = false; + Schema::new(vec![Field::new( + "non_null_column", + DataType::Utf8, + nullable, + )]) + .to_dfschema_ref() + .unwrap() +} + +/// Physical operator that implements the NonNullChecker operation aginst +/// data types +pub struct NonNullCheckerExec { + input: Arc, + /// Output schema + schema: SchemaRef, + /// The value to produce if there are any non null Inputs + value: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl NonNullCheckerExec { + pub fn new(input: Arc, schema: SchemaRef, value: Arc) -> Self { + Self { + input, + schema, + value, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl Debug for NonNullCheckerExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "NonNullCheckerExec") + } +} + +#[async_trait] +impl ExecutionPlan for NonNullCheckerExec { + fn as_any(&self) -> &(dyn std::any::Any + 'static) { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + use Partitioning::*; + match self.input.output_partitioning() { + RoundRobinBatch(num_partitions) => RoundRobinBatch(num_partitions), + // as this node transforms the output schema, whatever partitioning + // was present on the input is lost on the output + Hash(_, num_partitions) => UnknownPartitioning(num_partitions), + UnknownPartitioning(num_partitions) => UnknownPartitioning(num_partitions), + } + } + + fn required_child_distribution(&self) -> Distribution { + Distribution::UnspecifiedDistribution + } + + fn children(&self) -> Vec> { + vec![Arc::clone(&self.input)] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(Self { + input: Arc::clone(&children[0]), + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + value: Arc::clone(&self.value), + })), + _ => Err(Error::Internal( + "NonNullCheckerExec wrong number of children".to_string(), + )), + } + } + + /// Execute one partition and return an iterator over RecordBatch + async fn execute(&self, partition: usize) -> Result { + if self.output_partitioning().partition_count() <= partition { + return Err(Error::Internal(format!( + "NonNullCheckerExec invalid partition {}", + partition + ))); + } + + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let input_stream = self.input.execute(partition).await?; + + let (tx, rx) = mpsc::channel(1); + + let task = tokio::task::spawn(check_for_nulls( + input_stream, + Arc::clone(&self.schema), + baseline_metrics, + Arc::clone(&self.value), + tx.clone(), + )); + + // A second task watches the output of the worker task (TODO refactor into datafusion_util) + tokio::task::spawn(async move { + let task_result = task.await; + + let msg = match task_result { + Err(join_err) => { + debug!(e=%join_err, "Error joining null_check task"); + Some(ArrowError::ExternalError(Box::new(join_err))) + } + Ok(Err(e)) => { + debug!(%e, "Error in null_check task itself"); + Some(e) + } + Ok(Ok(())) => { + // successful + None + } + }; + + if let Some(e) = msg { + // try and tell the receiver something went + // wrong. Note we ignore errors sending this message + // as that means the receiver has already been + // shutdown and no one cares anymore lol + if tx.send(Err(e)).await.is_err() { + debug!("null_check receiver hung up"); + } + } + }); + + Ok(AdapterStream::adapt(self.schema(), rx)) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "NonNullCheckerExec") + } + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + // don't know anything about the statistics + Statistics::default() + } +} + +async fn check_for_nulls( + mut input_stream: SendableRecordBatchStream, + schema: SchemaRef, + baseline_metrics: BaselineMetrics, + value: Arc, + tx: mpsc::Sender>, +) -> ArrowResult<()> { + while let Some(input_batch) = input_stream.next().await.transpose()? { + let timer = baseline_metrics.elapsed_compute().timer(); + + if input_batch + .columns() + .iter() + .any(|arr| arr.null_count() != arr.len()) + { + // found a non null in input, return value + let arr: StringArray = vec![Some(value.as_ref())].into(); + + let output_batch = RecordBatch::try_new(schema, vec![Arc::new(arr)])?; + // ignore errors on sending (means receiver hung up) + std::mem::drop(timer); + tx.send(Ok(output_batch)).await.ok(); + return Ok(()); + } + // else keep looking + } + // if we got here, did not see any non null values. So + // send back an empty record batch + let output_batch = RecordBatch::try_new(schema, vec![new_empty_array(&DataType::Utf8)])?; + + // ignore errors on sending (means receiver hung up) + tx.send(Ok(output_batch)).await.ok(); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, StringArray}; + use arrow_util::assert_batches_eq; + use datafusion::physical_plan::{collect, memory::MemoryExec}; + + #[tokio::test] + async fn test_single_column_non_null() { + let t1 = StringArray::from(vec![Some("a"), Some("c"), Some("c")]); + let batch = RecordBatch::try_from_iter(vec![("t1", Arc::new(t1) as ArrayRef)]).unwrap(); + + let results = check("the_value", vec![batch]).await; + + let expected = vec![ + "+-----------------+", + "| non_null_column |", + "+-----------------+", + "| the_value |", + "+-----------------+", + ]; + assert_batches_eq!(&expected, &results); + } + + #[tokio::test] + async fn test_single_column_null() { + let t1 = StringArray::from(vec![None, None, None]); + let batch = RecordBatch::try_from_iter(vec![("t1", Arc::new(t1) as ArrayRef)]).unwrap(); + + let results = check("the_value", vec![batch]).await; + + let expected = vec![ + "+-----------------+", + "| non_null_column |", + "+-----------------+", + "+-----------------+", + ]; + assert_batches_eq!(&expected, &results); + } + + #[tokio::test] + async fn test_multi_column_non_null() { + let t1 = StringArray::from(vec![None, None, None]); + let t2 = StringArray::from(vec![None, None, Some("c")]); + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ]) + .unwrap(); + + let results = check("the_value", vec![batch]).await; + + let expected = vec![ + "+-----------------+", + "| non_null_column |", + "+-----------------+", + "| the_value |", + "+-----------------+", + ]; + assert_batches_eq!(&expected, &results); + } + + #[tokio::test] + async fn test_multi_column_null() { + let t1 = StringArray::from(vec![None, None, None]); + let t2 = StringArray::from(vec![None, None, None]); + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ]) + .unwrap(); + + let results = check("the_value", vec![batch]).await; + + let expected = vec![ + "+-----------------+", + "| non_null_column |", + "+-----------------+", + "+-----------------+", + ]; + assert_batches_eq!(&expected, &results); + } + + #[tokio::test] + async fn test_multi_column_second_batch_non_null() { + // this time only the second batch has a non null value + let t1 = StringArray::from(vec![None, None, None]); + let t2 = StringArray::from(vec![None, None, None]); + + let batch1 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ]) + .unwrap(); + + let t1 = StringArray::from(vec![None]); + let t2 = StringArray::from(vec![Some("f")]); + + let batch2 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ]) + .unwrap(); + + let results = check("another_value", vec![batch1, batch2]).await; + + let expected = vec![ + "+-----------------+", + "| non_null_column |", + "+-----------------+", + "| another_value |", + "+-----------------+", + ]; + assert_batches_eq!(&expected, &results); + } + + /// Run the input through the checker and return results + async fn check(value: &str, input: Vec) -> Vec { + test_helpers::maybe_start_logging(); + + // Setup in memory stream + let schema = input[0].schema(); + let projection = None; + let input = Arc::new(MemoryExec::try_new(&[input], schema, projection).unwrap()); + + // Create and run the checker + let schema: Schema = make_non_null_checker_output_schema().as_ref().into(); + let exec = Arc::new(NonNullCheckerExec::new( + input, + Arc::new(schema), + value.into(), + )); + let output = collect(Arc::clone(&exec) as Arc) + .await + .unwrap(); + + output + } +} diff --git a/query/src/exec/schema_pivot.rs b/query/src/exec/schema_pivot.rs index 8ee352eb78..b8329ccb93 100644 --- a/query/src/exec/schema_pivot.rs +++ b/query/src/exec/schema_pivot.rs @@ -124,7 +124,7 @@ impl UserDefinedLogicalNode for SchemaPivotNode { // ------ The implementation of SchemaPivot code follows ----- /// Create the schema describing the output -pub fn make_schema_pivot_output_schema() -> DFSchemaRef { +fn make_schema_pivot_output_schema() -> DFSchemaRef { let nullable = false; Schema::new(vec![Field::new( "non_null_column",