feat: unbounded channel support for AdaptorStream

Allows the AdaptorStream to work with either a bounded, or unbounded
channel internally.
pull/24376/head
Dom Dwyer 2022-04-14 14:41:00 +01:00
parent 351b0d0c15
commit 5ac4785e19
1 changed files with 36 additions and 13 deletions

View File

@ -16,8 +16,8 @@ use datafusion::{
scalar::ScalarValue,
};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
/// Traits to help creating DataFusion [`Expr`]s
pub trait AsExpr {
@ -162,22 +162,24 @@ impl futures::Stream for MemoryStream {
}
#[derive(Debug)]
/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels.
/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs
/// from tokio channels.
///
/// It sends streams of RecordBatches from a tokio channel *and*
/// crucially knows up front the schema each batch will have be used.
pub struct AdapterStream {
/// It sends streams of RecordBatches from a tokio channel *and* crucially knows
/// up front the schema each batch will have be used.
pub struct AdapterStream<T> {
/// Schema
schema: SchemaRef,
/// channel for getting deduplicated batches
inner: ReceiverStream<ArrowResult<RecordBatch>>,
inner: T,
}
impl AdapterStream {
/// Create a new stream which wraps the `inner` channel which
/// produces [`RecordBatch`]es that each have the specified schema
impl AdapterStream<ReceiverStream<ArrowResult<RecordBatch>>> {
/// Create a new stream which wraps the `inner` channel which produces
/// [`RecordBatch`]es that each have the specified schema
///
/// Not called `new` because it returns a pinned reference rather than the object itself
/// Not called `new` because it returns a pinned reference rather than the
/// object itself.
pub fn adapt(
schema: SchemaRef,
rx: Receiver<ArrowResult<RecordBatch>>,
@ -187,7 +189,25 @@ impl AdapterStream {
}
}
impl Stream for AdapterStream {
impl AdapterStream<UnboundedReceiverStream<ArrowResult<RecordBatch>>> {
/// Create a new stream which wraps the `inner` unbounded channel which
/// produces [`RecordBatch`]es that each have the specified schema
///
/// Not called `new` because it returns a pinned reference rather than the
/// object itself.
pub fn adapt_unbounded(
schema: SchemaRef,
rx: UnboundedReceiver<ArrowResult<RecordBatch>>,
) -> SendableRecordBatchStream {
let inner = UnboundedReceiverStream::new(rx);
Box::pin(Self { schema, inner })
}
}
impl<T> Stream for AdapterStream<T>
where
T: Stream<Item = ArrowResult<RecordBatch>> + Unpin,
{
type Item = ArrowResult<RecordBatch>;
fn poll_next(
@ -198,7 +218,10 @@ impl Stream for AdapterStream {
}
}
impl RecordBatchStream for AdapterStream {
impl<T> RecordBatchStream for AdapterStream<T>
where
T: Stream<Item = ArrowResult<RecordBatch>> + Unpin,
{
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}