refactor: decouple `parquet_file` from `query`

pull/24376/head
Marco Neumann 2021-09-14 18:26:01 +02:00
parent c33e5c22e6
commit 509c07330d
9 changed files with 54 additions and 63 deletions

3
Cargo.lock generated
View File

@ -900,6 +900,8 @@ version = "0.1.0"
dependencies = [
"datafusion 0.1.0",
"futures",
"tokio",
"tokio-stream",
]
[[package]]
@ -2904,7 +2906,6 @@ dependencies = [
"persistence_windows",
"predicate",
"prost",
"query",
"snafu",
"tempfile",
"test_helpers",

View File

@ -6,6 +6,7 @@ edition = "2018"
description = "Datafusion utilities"
[dependencies]
datafusion = { path = "../datafusion" }
futures = "0.3"
tokio = { version = "1.11", features = ["macros"] }
tokio-stream = "0.1.2"

View File

@ -7,9 +7,12 @@ use std::task::{Context, Poll};
use datafusion::{
arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch},
logical_plan::{binary_expr, col, lit, Expr, Operator},
physical_plan::RecordBatchStream,
physical_plan::{RecordBatchStream, SendableRecordBatchStream},
scalar::ScalarValue,
};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
/// Traits to help creating DataFusion [`Expr`]s
pub trait AsExpr {
@ -153,6 +156,49 @@ impl futures::Stream for MemoryStream {
}
}
#[derive(Debug)]
/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels.
///
/// It sends streams of RecordBatches from a tokio channel *and*
/// crucially knows up front the schema each batch will have be used.
pub struct AdapterStream {
/// Schema
schema: SchemaRef,
/// channel for getting deduplicated batches
inner: ReceiverStream<ArrowResult<RecordBatch>>,
}
impl AdapterStream {
/// Create a new stream which wraps the `inner` channel which
/// produces [`RecordBatch`]es that each have the specified schema
///
/// Not called `new` because it returns a pinned reference rather than the object itself
pub fn adapt(
schema: SchemaRef,
rx: Receiver<ArrowResult<RecordBatch>>,
) -> SendableRecordBatchStream {
let inner = ReceiverStream::new(rx);
Box::pin(Self { schema, inner })
}
}
impl Stream for AdapterStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for AdapterStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -25,7 +25,6 @@ parking_lot = "0.11.1"
persistence_windows = { path = "../persistence_windows" }
predicate = { path = "../predicate" }
prost = "0.8"
query = { path = "../query" }
snafu = "0.6"
tempfile = "3.1.0"
thrift = "0.13"

View File

@ -11,6 +11,7 @@ use datafusion::{
logical_plan::Expr,
physical_plan::{parquet::ParquetExec, ExecutionPlan, Partitioning, SendableRecordBatchStream},
};
use datafusion_util::AdapterStream;
use futures::StreamExt;
use internal_types::selection::Selection;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
@ -23,7 +24,6 @@ use parquet::{
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
};
use predicate::predicate::Predicate;
use query::exec::stream::AdapterStream;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},

View File

@ -7,7 +7,6 @@ pub mod fieldlist;
mod schema_pivot;
pub mod seriesset;
pub(crate) mod split;
pub mod stream;
pub mod stringset;
mod task;
pub use context::{DEFAULT_CATALOG, DEFAULT_SCHEMA};

View File

@ -26,12 +26,11 @@ use datafusion::{
scalar::ScalarValue,
};
use datafusion_util::AdapterStream;
use futures::StreamExt;
use observability_deps::tracing::{debug, trace};
use tokio::sync::{mpsc::Sender, Mutex};
use crate::exec::stream::AdapterStream;
/// Implements stream splitting described in `make_stream_split`
///
/// The resulting execution plan always produces exactly 2 partitions:

View File

@ -1,53 +0,0 @@
//! Common utilities for implement SendableRecordBatchStreams
use std::sync::Arc;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch};
#[derive(Debug)]
/// Implements a [`SendableRecordBatchStream`] to help create DataFusion outputs from tokio channels.
///
/// It sends streams of RecordBatches from a tokio channel *and*
/// crucially knows up front the schema each batch will have be used.
pub struct AdapterStream {
/// Schema
schema: SchemaRef,
/// channel for getting deduplicated batches
inner: ReceiverStream<ArrowResult<RecordBatch>>,
}
impl AdapterStream {
/// Create a new stream which wraps the `inner` channel which
/// produces [`RecordBatch`]es that each have the specified schema
///
/// Not called `new` because it returns a pinned reference rather than the object itself
pub fn adapt(
schema: SchemaRef,
rx: Receiver<ArrowResult<RecordBatch>>,
) -> SendableRecordBatchStream {
let inner = ReceiverStream::new(rx);
Box::pin(Self { schema, inner })
}
}
impl Stream for AdapterStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for AdapterStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

View File

@ -9,8 +9,7 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use crate::exec::stream::AdapterStream;
use datafusion_util::AdapterStream;
use self::algo::RecordBatchDeduplicator;
use datafusion::{