From 5ac4785e197f0f35676a0723d5e9e5c2757e2756 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 14 Apr 2022 14:41:00 +0100 Subject: [PATCH] feat: unbounded channel support for AdaptorStream Allows the AdaptorStream to work with either a bounded, or unbounded channel internally. --- datafusion_util/src/lib.rs | 49 ++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 5b57918834..4db7bbfa08 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -16,8 +16,8 @@ use datafusion::{ scalar::ScalarValue, }; use futures::{Stream, StreamExt}; -use tokio::sync::mpsc::Receiver; -use tokio_stream::wrappers::ReceiverStream; +use tokio::sync::mpsc::{Receiver, UnboundedReceiver}; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; /// Traits to help creating DataFusion [`Expr`]s pub trait AsExpr { @@ -162,22 +162,24 @@ impl futures::Stream for MemoryStream { } #[derive(Debug)] -/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels. +/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs +/// from tokio channels. /// -/// It sends streams of RecordBatches from a tokio channel *and* -/// crucially knows up front the schema each batch will have be used. -pub struct AdapterStream { +/// It sends streams of RecordBatches from a tokio channel *and* crucially knows +/// up front the schema each batch will have be used. +pub struct AdapterStream { /// Schema schema: SchemaRef, /// channel for getting deduplicated batches - inner: ReceiverStream>, + inner: T, } -impl AdapterStream { - /// Create a new stream which wraps the `inner` channel which - /// produces [`RecordBatch`]es that each have the specified schema +impl AdapterStream>> { + /// Create a new stream which wraps the `inner` channel which produces + /// [`RecordBatch`]es that each have the specified schema /// - /// Not called `new` because it returns a pinned reference rather than the object itself + /// Not called `new` because it returns a pinned reference rather than the + /// object itself. pub fn adapt( schema: SchemaRef, rx: Receiver>, @@ -187,7 +189,25 @@ impl AdapterStream { } } -impl Stream for AdapterStream { +impl AdapterStream>> { + /// Create a new stream which wraps the `inner` unbounded channel which + /// produces [`RecordBatch`]es that each have the specified schema + /// + /// Not called `new` because it returns a pinned reference rather than the + /// object itself. + pub fn adapt_unbounded( + schema: SchemaRef, + rx: UnboundedReceiver>, + ) -> SendableRecordBatchStream { + let inner = UnboundedReceiverStream::new(rx); + Box::pin(Self { schema, inner }) + } +} + +impl Stream for AdapterStream +where + T: Stream> + Unpin, +{ type Item = ArrowResult; fn poll_next( @@ -198,7 +218,10 @@ impl Stream for AdapterStream { } } -impl RecordBatchStream for AdapterStream { +impl RecordBatchStream for AdapterStream +where + T: Stream> + Unpin, +{ fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }