From 509c07330d33513ad0ba64db682e669dec952c96 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Sep 2021 18:26:01 +0200 Subject: [PATCH] refactor: decouple `parquet_file` from `query` --- Cargo.lock | 3 +- datafusion_util/Cargo.toml | 3 +- datafusion_util/src/lib.rs | 48 +++++++++++++++++++++++++++- parquet_file/Cargo.toml | 1 - parquet_file/src/storage.rs | 2 +- query/src/exec.rs | 1 - query/src/exec/split.rs | 3 +- query/src/exec/stream.rs | 53 ------------------------------- query/src/provider/deduplicate.rs | 3 +- 9 files changed, 54 insertions(+), 63 deletions(-) delete mode 100644 query/src/exec/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 6a84272995..c45ed75024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -900,6 +900,8 @@ version = "0.1.0" dependencies = [ "datafusion 0.1.0", "futures", + "tokio", + "tokio-stream", ] [[package]] @@ -2904,7 +2906,6 @@ dependencies = [ "persistence_windows", "predicate", "prost", - "query", "snafu", "tempfile", "test_helpers", diff --git a/datafusion_util/Cargo.toml b/datafusion_util/Cargo.toml index 9a284847d1..2a924f472a 100644 --- a/datafusion_util/Cargo.toml +++ b/datafusion_util/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" description = "Datafusion utilities" [dependencies] - datafusion = { path = "../datafusion" } futures = "0.3" +tokio = { version = "1.11", features = ["macros"] } +tokio-stream = "0.1.2" diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 3fd1d6d767..482ab6d81c 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -7,9 +7,12 @@ use std::task::{Context, Poll}; use datafusion::{ arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}, logical_plan::{binary_expr, col, lit, Expr, Operator}, - physical_plan::RecordBatchStream, + physical_plan::{RecordBatchStream, SendableRecordBatchStream}, scalar::ScalarValue, }; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc::Receiver; +use tokio_stream::wrappers::ReceiverStream; /// Traits to help creating DataFusion [`Expr`]s pub trait AsExpr { @@ -153,6 +156,49 @@ impl futures::Stream for MemoryStream { } } +#[derive(Debug)] +/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels. +/// +/// It sends streams of RecordBatches from a tokio channel *and* +/// crucially knows up front the schema each batch will have be used. +pub struct AdapterStream { + /// Schema + schema: SchemaRef, + /// channel for getting deduplicated batches + inner: ReceiverStream>, +} + +impl AdapterStream { + /// Create a new stream which wraps the `inner` channel which + /// produces [`RecordBatch`]es that each have the specified schema + /// + /// Not called `new` because it returns a pinned reference rather than the object itself + pub fn adapt( + schema: SchemaRef, + rx: Receiver>, + ) -> SendableRecordBatchStream { + let inner = ReceiverStream::new(rx); + Box::pin(Self { schema, inner }) + } +} + +impl Stream for AdapterStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +impl RecordBatchStream for AdapterStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 742f86fd12..67585ba5d6 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -25,7 +25,6 @@ parking_lot = "0.11.1" persistence_windows = { path = "../persistence_windows" } predicate = { path = "../predicate" } prost = "0.8" -query = { path = "../query" } snafu = "0.6" tempfile = "3.1.0" thrift = "0.13" diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index ff150431a1..3b40bdd855 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -11,6 +11,7 @@ use datafusion::{ logical_plan::Expr, physical_plan::{parquet::ParquetExec, ExecutionPlan, Partitioning, SendableRecordBatchStream}, }; +use datafusion_util::AdapterStream; use futures::StreamExt; use internal_types::selection::Selection; use iox_object_store::{IoxObjectStore, ParquetFilePath}; @@ -23,7 +24,6 @@ use parquet::{ file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone}, }; use predicate::predicate::Predicate; -use query::exec::stream::AdapterStream; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ io::{Cursor, Seek, SeekFrom, Write}, diff --git a/query/src/exec.rs b/query/src/exec.rs index 3b81f64f79..096d5ca3fd 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -7,7 +7,6 @@ pub mod fieldlist; mod schema_pivot; pub mod seriesset; pub(crate) mod split; -pub mod stream; pub mod stringset; mod task; pub use context::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; diff --git a/query/src/exec/split.rs b/query/src/exec/split.rs index c1ff168311..a7c5ca6721 100644 --- a/query/src/exec/split.rs +++ b/query/src/exec/split.rs @@ -26,12 +26,11 @@ use datafusion::{ scalar::ScalarValue, }; +use datafusion_util::AdapterStream; use futures::StreamExt; use observability_deps::tracing::{debug, trace}; use tokio::sync::{mpsc::Sender, Mutex}; -use crate::exec::stream::AdapterStream; - /// Implements stream splitting described in `make_stream_split` /// /// The resulting execution plan always produces exactly 2 partitions: diff --git a/query/src/exec/stream.rs b/query/src/exec/stream.rs deleted file mode 100644 index fc5ccfdd70..0000000000 --- a/query/src/exec/stream.rs +++ /dev/null @@ -1,53 +0,0 @@ -//! Common utilities for implement SendableRecordBatchStreams - -use std::sync::Arc; - -use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; -use futures::{Stream, StreamExt}; -use tokio::sync::mpsc::Receiver; -use tokio_stream::wrappers::ReceiverStream; - -use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}; - -#[derive(Debug)] -/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels. -/// -/// It sends streams of RecordBatches from a tokio channel *and* -/// crucially knows up front the schema each batch will have be used. -pub struct AdapterStream { - /// Schema - schema: SchemaRef, - /// channel for getting deduplicated batches - inner: ReceiverStream>, -} - -impl AdapterStream { - /// Create a new stream which wraps the `inner` channel which - /// produces [`RecordBatch`]es that each have the specified schema - /// - /// Not called `new` because it returns a pinned reference rather than the object itself - pub fn adapt( - schema: SchemaRef, - rx: Receiver>, - ) -> SendableRecordBatchStream { - let inner = ReceiverStream::new(rx); - Box::pin(Self { schema, inner }) - } -} - -impl Stream for AdapterStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_next_unpin(cx) - } -} - -impl RecordBatchStream for AdapterStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} diff --git a/query/src/provider/deduplicate.rs b/query/src/provider/deduplicate.rs index 363bfbd874..be6129bdf5 100644 --- a/query/src/provider/deduplicate.rs +++ b/query/src/provider/deduplicate.rs @@ -9,8 +9,7 @@ use arrow::{ record_batch::RecordBatch, }; use async_trait::async_trait; - -use crate::exec::stream::AdapterStream; +use datafusion_util::AdapterStream; use self::algo::RecordBatchDeduplicator; use datafusion::{