feat: Implement Cross Chunk Schema / RecordBatch merging at query time (#783)

* feat: feat: Implement Cross Chunk Schema / RecordBatch merging at query time

* docs: update comments about NullArray::new_with-type

* docs: Update comments based on code review

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-02-11 13:26:38 -05:00 committed by GitHub
parent c61923f05e
commit a03598dfe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 788 additions and 135 deletions

1
Cargo.lock generated
View File

@ -2583,6 +2583,7 @@ dependencies = [
"chrono",
"croaring",
"data_types",
"futures",
"influxdb_line_protocol",
"snafu",
"sqlparser 0.6.1",

View File

@ -93,6 +93,12 @@ pub enum Error {
MergingSchemas {
source: arrow_deps::arrow::error::ArrowError,
},
#[snafu(display("Schema Selection error while selecting '{}': {}", column_name, source))]
SelectingColumns {
column_name: String,
source: arrow_deps::arrow::error::ArrowError,
},
}
fn nullable_to_str(nullability: bool) -> &'static str {
@ -134,7 +140,7 @@ impl From<Schema> for ArrowSchemaRef {
impl From<&Schema> for ArrowSchemaRef {
fn from(s: &Schema) -> Self {
s.inner.clone()
s.as_arrow()
}
}
@ -186,6 +192,11 @@ impl Schema {
Ok(schema)
}
/// Return a valid Arrow `SchemaRef` representing this `Schema`
pub fn as_arrow(&self) -> ArrowSchemaRef {
self.inner.clone()
}
/// Create and validate a new Schema, creating metadata to
/// represent the the various parts. This method is intended to be
/// used only by the SchemaBuilder.

View File

@ -5,7 +5,7 @@
/// columns in the results.
pub enum Selection<'a> {
/// Return all columns (e.g. SELECT *)
/// The columns are returned in lexographical order by name
/// The columns are returned in an arbitrary order
All,
/// Return only the named columns

View File

@ -19,6 +19,7 @@ async-trait = "0.1"
chrono = "0.4"
croaring = "0.4.5"
data_types = { path = "../data_types" }
futures = "0.3.7"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
snafu = "0.6.2"
sqlparser = "0.6.1"

View File

@ -115,12 +115,12 @@ impl SQLQueryPlanner {
chunk_id,
})?;
builder = builder
.add_chunk(chunk, chunk_table_schema.into())
.context(AddingChunkToProvider {
builder = builder.add_chunk(chunk, chunk_table_schema).context(
AddingChunkToProvider {
table_name,
chunk_id,
})?
},
)?
}
}
}

View File

@ -141,7 +141,8 @@ pub trait PartitionChunk: Debug + Send + Sync {
) -> Result<Option<StringSet>, Self::Error>;
/// Returns the Schema for a table in this chunk, with the
/// specified column selection
/// specified column selection. An error is returned if the
/// selection refers to columns that do not exist.
async fn table_schema(
&self,
table_name: &str,

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use arrow_deps::{
arrow::datatypes::SchemaRef,
arrow::datatypes::SchemaRef as ArrowSchemaRef,
datafusion::{
datasource::{
datasource::{Statistics, TableProviderFilterPushDown},
@ -14,33 +14,34 @@ use arrow_deps::{
physical_plan::ExecutionPlan,
},
};
use data_types::schema::{builder::SchemaMerger, Schema};
use crate::{predicate::Predicate, PartitionChunk};
use crate::{predicate::Predicate, util::project_schema, PartitionChunk};
use snafu::{OptionExt, Snafu};
use snafu::{ResultExt, Snafu};
mod adapter;
mod make_null;
mod physical;
use self::physical::IOxReadFilterNode;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Chunk schema not compatible for table '{}'. They must be identical. Existing: {:?}, New: {:?}",
table_name,
existing_schema,
chunk_schema
))]
#[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))]
ChunkSchemaNotCompatible {
table_name: String,
existing_schema: SchemaRef,
chunk_schema: SchemaRef,
source: data_types::schema::builder::Error,
},
#[snafu(display(
"Internal error: no chunks found in builder for table '{}'",
"Internal error: no chunks found in builder for table '{}': {}",
table_name,
source,
))]
InternalNoChunks { table_name: String },
InternalNoChunks {
table_name: String,
source: data_types::schema::builder::Error,
},
#[snafu(display("Internal error: No rows found in table '{}'", table_name))]
InternalNoRowsInTable { table_name: String },
@ -53,63 +54,92 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct ProviderBuilder<C: PartitionChunk + 'static> {
table_name: Arc<String>,
schema: Option<SchemaRef>,
chunks: Vec<Arc<C>>,
schema_merger: SchemaMerger,
chunk_and_infos: Vec<ChunkInfo<C>>,
}
/// Holds the information needed to generate data for a specific chunk
#[derive(Debug)]
pub(crate) struct ChunkInfo<C>
where
C: PartitionChunk + 'static,
{
/// The schema of the table in just this chunk (the overall table
/// schema may have more columns if this chunk doesn't have
/// columns that are in other chunks)
chunk_table_schema: Schema,
chunk: Arc<C>,
}
// The #[derive(Clone)] clone was complaining about C not implementing
// Clone, which didn't make sense
// Tracked by https://github.com/rust-lang/rust/issues/26925
impl<C> Clone for ChunkInfo<C>
where
C: PartitionChunk + 'static,
{
fn clone(&self) -> Self {
Self {
chunk_table_schema: self.chunk_table_schema.clone(),
chunk: self.chunk.clone(),
}
}
}
impl<C: PartitionChunk> ProviderBuilder<C> {
pub fn new(table_name: impl Into<String>) -> Self {
Self {
table_name: Arc::new(table_name.into()),
schema: None,
chunks: Vec::new(),
schema_merger: SchemaMerger::new(),
chunk_and_infos: Vec::new(),
}
}
/// Add a new chunk to this provider
pub fn add_chunk(mut self, chunk: Arc<C>, chunk_table_schema: SchemaRef) -> Result<Self> {
self.schema = Some(if let Some(existing_schema) = self.schema.take() {
self.check_schema(existing_schema, chunk_table_schema)?
} else {
chunk_table_schema
});
self.chunks.push(chunk);
Ok(self)
}
pub fn add_chunk(self, chunk: Arc<C>, chunk_table_schema: Schema) -> Result<Self> {
let Self {
table_name,
schema_merger,
mut chunk_and_infos,
} = self;
/// returns Ok(combined_schema) if the schema of chunk is compatible with
/// `existing_schema`, Err() with why otherwise
fn check_schema(
&self,
existing_schema: SchemaRef,
chunk_schema: SchemaRef,
) -> Result<SchemaRef> {
// For now, use strict equality. Eventually should union the schema
if existing_schema != chunk_schema {
ChunkSchemaNotCompatible {
table_name: self.table_name.as_ref(),
existing_schema,
chunk_schema,
}
.fail()
} else {
Ok(chunk_schema)
}
let schema_merger =
schema_merger
.merge(chunk_table_schema.clone())
.context(ChunkSchemaNotCompatible {
table_name: table_name.as_ref(),
})?;
let chunk_info = ChunkInfo {
chunk_table_schema,
chunk,
};
chunk_and_infos.push(chunk_info);
Ok(Self {
table_name,
schema_merger,
chunk_and_infos,
})
}
pub fn build(self) -> Result<ChunkTableProvider<C>> {
let Self {
table_name,
schema,
chunks,
schema_merger,
chunk_and_infos,
} = self;
let schema = schema.context(InternalNoChunks {
table_name: table_name.as_ref(),
})?;
let iox_schema = schema_merger
.build()
.context(InternalNoChunks {
table_name: table_name.as_ref(),
})?
// sort so the columns are always in a consistent order
.sort_fields_by_name();
// if the table was reported to exist, it should not be empty
if chunks.is_empty() {
if chunk_and_infos.is_empty() {
return InternalNoRowsInTable {
table_name: table_name.as_ref(),
}
@ -118,8 +148,8 @@ impl<C: PartitionChunk> ProviderBuilder<C> {
Ok(ChunkTableProvider {
table_name,
schema,
chunks,
iox_schema,
chunk_and_infos,
})
}
}
@ -129,10 +159,24 @@ impl<C: PartitionChunk> ProviderBuilder<C> {
/// This allows DataFusion to see data from Chunks as a single table, as well as
/// push predicates and selections down to chunks
#[derive(Debug)]
pub struct ChunkTableProvider<C: PartitionChunk> {
pub struct ChunkTableProvider<C: PartitionChunk + 'static> {
table_name: Arc<String>,
schema: SchemaRef,
chunks: Vec<Arc<C>>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Schema,
// The chunks and their corresponding schema
chunk_and_infos: Vec<ChunkInfo<C>>,
}
impl<C: PartitionChunk + 'static> ChunkTableProvider<C> {
/// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Schema {
self.iox_schema.clone()
}
/// Return the Arrow schema view for the data provided by this provider
pub fn arrow_schema(&self) -> ArrowSchemaRef {
self.iox_schema.as_arrow()
}
}
impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
@ -140,8 +184,9 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
/// Schema with all available columns across all chunks
fn schema(&self) -> ArrowSchemaRef {
self.arrow_schema()
}
fn scan(
@ -159,12 +204,14 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
// optimization for providers which can offer them
let predicate = Predicate::default();
// Figure out the schema of the requested output
let scan_schema = project_schema(self.arrow_schema(), projection);
let plan = IOxReadFilterNode::new(
self.table_name.clone(),
self.schema.clone(),
self.chunks.clone(),
scan_schema,
self.chunk_and_infos.clone(),
predicate,
projection.clone(),
);
Ok(Arc::new(plan))

View File

@ -0,0 +1,373 @@
//! Holds a stream that ensures chunks have the same (uniform) schema
use snafu::Snafu;
use std::task::{Context, Poll};
use arrow_deps::{
arrow::{
datatypes::{DataType, SchemaRef},
error::Result as ArrowResult,
record_batch::RecordBatch,
},
datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::Stream;
use super::make_null::make_null_column;
/// Database schema creation / validation errors.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema",
field_name,))]
InternalLostInputField { field_name: String },
#[snafu(display("Internal error creating SchemaAdapterStream: input field '{}' had type '{:?}' which is different than output field '{}' which had type '{:?}'",
input_field_name, input_field_type, output_field_name, output_field_type,))]
InternalDataTypeMismatch {
input_field_name: String,
input_field_type: DataType,
output_field_name: String,
output_field_type: DataType,
},
#[snafu(display("Internal error creating SchemaAdapterStream: creating null of type '{:?}' which is different than output field '{}' which had type '{:?}'",
field_type, output_field_name, output_field_type,))]
InternalDataTypeMismatchForNull {
field_type: DataType,
output_field_name: String,
output_field_type: DataType,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This stream wraps another underlying stream to ensure it produces
/// the specified schema. If the underlying stream produces a subset
/// of the columns specified in desired schema, this stream creates
/// arrays with NULLs to pad out the missing columns
///
/// For example:
///
/// If a table had schema with Cols A, B, and C, but the chunk (input)
/// stream only produced record batches with columns A and C, this
/// stream would append a column of B / nulls to each record batch
/// that flowed through it
///
/// ```text
///
/// ┌────────────────┐ ┌─────────────────────────┐
/// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │
/// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │
/// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │
/// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │
/// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │
/// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │
/// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │
/// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │
/// │ │ │ │
/// │ Record Batch │ │ Record Batch │
/// └────────────────┘ └─────────────────────────┘
/// ```
pub(crate) struct SchemaAdapterStream {
input: SendableRecordBatchStream,
/// Output schema of this stream
/// The schema of `input` is always a subset of output_schema
output_schema: SchemaRef,
mappings: Vec<ColumnMapping>,
}
impl std::fmt::Debug for SchemaAdapterStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SchemaAdapterStream")
.field("input", &"(OPAQUE STREAM)")
.field("output_schema", &self.output_schema)
.field("mappings", &self.mappings)
.finish()
}
}
impl SchemaAdapterStream {
/// Try to create a new adapter stream that produces batches with
/// the specified output_schema
///
/// If the underlying stream produces columns that DO NOT appear
/// in the output schema, or are different types than the output
/// schema, an error will be produced.
pub(crate) fn try_new(
input: SendableRecordBatchStream,
output_schema: SchemaRef,
) -> Result<Self> {
let input_schema = input.schema();
// Figure out how to compute each column in the output
let mappings = output_schema
.fields()
.iter()
.map(|output_field| {
let input_field_index = input_schema
.fields()
.iter()
.enumerate()
.find(|(_, input_field)| output_field.name() == input_field.name())
.map(|(idx, _)| idx);
if let Some(input_field_index) = input_field_index {
ColumnMapping::FromInput(input_field_index)
} else {
ColumnMapping::MakeNull(output_field.data_type().clone())
}
})
.collect::<Vec<_>>();
// sanity logic checks
for input_field in input_schema.fields().iter() {
// that there are no fields in the input schema that are
// not present in the desired output schema (otherwise we
// are dropping fields -- theys should have been selected
// out with projection push down)
if output_schema
.fields()
.iter()
.find(|output_field| input_field.name() == output_field.name())
.is_none()
{
return InternalLostInputField {
field_name: input_field.name(),
}
.fail();
}
}
// Verify the mappings match the output type
for (output_index, mapping) in mappings.iter().enumerate() {
match mapping {
ColumnMapping::FromInput(input_index) => {
let input_field = input_schema.field(*input_index);
let output_field = output_schema.field(output_index);
if input_field.data_type() != output_field.data_type() {
return InternalDataTypeMismatch {
input_field_name: input_field.name(),
input_field_type: input_field.data_type().clone(),
output_field_name: output_field.name(),
output_field_type: output_field.data_type().clone(),
}
.fail();
}
}
ColumnMapping::MakeNull(data_type) => {
let output_field = output_schema.field(output_index);
if data_type != output_field.data_type() {
return InternalDataTypeMismatchForNull {
field_type: data_type.clone(),
output_field_name: output_field.name(),
output_field_type: output_field.data_type().clone(),
}
.fail();
}
}
}
}
Ok(Self {
input,
output_schema,
mappings,
})
}
/// Extends the record batch, if needed, so that it matches the schema
fn extend_batch(&self, batch: RecordBatch) -> ArrowResult<RecordBatch> {
let output_columns = self
.mappings
.iter()
.map(|mapping| match mapping {
ColumnMapping::FromInput(input_index) => Ok(batch.column(*input_index).clone()),
ColumnMapping::MakeNull(data_type) => make_null_column(data_type, batch.num_rows()),
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(self.output_schema.clone(), output_columns)
}
}
impl RecordBatchStream for SchemaAdapterStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
}
impl Stream for SchemaAdapterStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// the Poll result is an Opton<Result<Batch>> so we need a few
// layers of maps to get at the actual batch, if any
self.input.as_mut().poll_next(ctx).map(|maybe_result| {
maybe_result.map(|batch| batch.and_then(|batch| self.extend_batch(batch)))
})
}
// TODO is there a useful size_hint to pass?
}
/// Describes how to create column in the output.
#[derive(Debug)]
enum ColumnMapping {
/// Output column is found at <index> column of the input schema
FromInput(usize),
/// Output colum should be synthesized with nulls of the specified type
MakeNull(DataType),
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use arrow_deps::{
arrow::{
array::{Array, Int32Array, StringArray},
datatypes::{Field, Schema},
record_batch::RecordBatch,
},
assert_table_eq,
datafusion::physical_plan::common::{collect, SizedRecordBatchStream},
};
use test_helpers::assert_contains;
#[tokio::test]
async fn same_input_and_output() {
let batch = make_batch();
let output_schema = batch.schema();
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]);
let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema).unwrap();
let output = collect(Box::pin(adapter_stream))
.await
.expect("Running plan");
let expected = vec![
"+---+---+-----+",
"| a | b | c |",
"+---+---+-----+",
"| 1 | 4 | foo |",
"| 2 | 5 | bar |",
"| 3 | 6 | baz |",
"+---+---+-----+",
];
assert_table_eq!(&expected, &output);
}
#[tokio::test]
async fn input_different_order_than_output() {
let batch = make_batch();
// input has columns in different order than desired output
let output_schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Utf8, false),
Field::new("a", DataType::Int32, false),
]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]);
let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema).unwrap();
let output = collect(Box::pin(adapter_stream))
.await
.expect("Running plan");
let expected = vec![
"+---+-----+---+",
"| b | c | a |",
"+---+-----+---+",
"| 4 | foo | 1 |",
"| 5 | bar | 2 |",
"| 6 | baz | 3 |",
"+---+-----+---+",
];
assert_table_eq!(&expected, &output);
}
#[tokio::test]
async fn input_subset_of_output() {
let batch = make_batch();
// input has subset of columns of the desired otuput. d and e are not present
let output_schema = Arc::new(Schema::new(vec![
Field::new("c", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
Field::new("b", DataType::Int32, false),
Field::new("d", DataType::Float32, false),
Field::new("a", DataType::Int32, false),
]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]);
let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema).unwrap();
let output = collect(Box::pin(adapter_stream))
.await
.expect("Running plan");
let expected = vec![
"+-----+---+---+---+---+",
"| c | e | b | d | a |",
"+-----+---+---+---+---+",
"| foo | | 4 | | 1 |",
"| bar | | 5 | | 2 |",
"| baz | | 6 | | 3 |",
"+-----+---+---+---+---+",
];
assert_table_eq!(&expected, &output);
}
#[tokio::test]
async fn input_superset_of_columns() {
let batch = make_batch();
// No such column "b" in output -- column would be lost
let output_schema = Arc::new(Schema::new(vec![
Field::new("c", DataType::Utf8, false),
Field::new("a", DataType::Int32, false),
]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]);
let res = SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema);
assert_contains!(
res.unwrap_err().to_string(),
"field 'b' does not appear in the output schema"
);
}
#[tokio::test]
async fn input_has_different_type() {
let batch = make_batch();
// column c has string type in input, output asks float32
let output_schema = Arc::new(Schema::new(vec![
Field::new("c", DataType::Float32, false),
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]);
let res = SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema);
assert_contains!(res.unwrap_err().to_string(), "input field 'c' had type 'Utf8' which is different than output field 'c' which had type 'Float32'");
}
// input has different column types than desired output
fn make_batch() -> RecordBatch {
let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
let col_b = Arc::new(Int32Array::from(vec![4, 5, 6]));
let col_c = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
let schema = Schema::new(vec![
Field::new("a", col_a.data_type().clone(), false),
Field::new("b", col_b.data_type().clone(), false),
Field::new("c", col_c.data_type().clone(), false),
]);
RecordBatch::try_new(Arc::new(schema), vec![col_a, col_b, col_c]).unwrap()
}
}

View File

@ -0,0 +1,82 @@
use std::{iter::repeat, sync::Arc};
use arrow_deps::arrow::{
array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeStringArray, NullArray, StringArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
},
datatypes::DataType,
error::{ArrowError, Result as ArrowResult},
};
use std::iter::FromIterator;
/// Creates a column of all NULL values of the specified type and size
///
/// This should be replaced by a call to NullArray::new_with_type
///
/// We can't currently ue that call due to a bug in pretty printing:
/// https://github.com/apache/arrow/pull/9468
///
/// Once that is merged and we have updated Arrow, this function should be
/// replaced
pub fn make_null_column(data_type: &DataType, num_rows: usize) -> ArrowResult<ArrayRef> {
println!("Making null column: {:?}", data_type);
match data_type {
DataType::Null => Ok(Arc::new(NullArray::new(num_rows))),
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
repeat(None).take(num_rows),
))),
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(repeat(None).take(num_rows)))),
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(repeat(None).take(num_rows)))),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(repeat(None).take(num_rows)))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(repeat(None).take(num_rows)))),
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(repeat(None).take(num_rows)))),
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
repeat(None).take(num_rows),
))),
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
repeat(None).take(num_rows),
))),
DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(
repeat(None).take(num_rows),
))),
DataType::Float16 => make_error(data_type),
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(
repeat(None).take(num_rows),
))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(
repeat(None).take(num_rows),
))),
DataType::Timestamp(_, _) => make_error(data_type),
DataType::Date32 => make_error(data_type),
DataType::Date64 => make_error(data_type),
DataType::Time32(_) => make_error(data_type),
DataType::Time64(_) => make_error(data_type),
DataType::Duration(_) => make_error(data_type),
DataType::Interval(_) => make_error(data_type),
DataType::Binary => make_error(data_type),
DataType::FixedSizeBinary(_) => make_error(data_type),
DataType::LargeBinary => make_error(data_type),
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
repeat(None as Option<&str>).take(num_rows),
))),
DataType::LargeUtf8 => Ok(Arc::new(LargeStringArray::from_iter(
repeat(None as Option<&str>).take(num_rows),
))),
DataType::List(_) => make_error(data_type),
DataType::FixedSizeList(_, _) => make_error(data_type),
DataType::LargeList(_) => make_error(data_type),
DataType::Struct(_) => make_error(data_type),
DataType::Union(_) => make_error(data_type),
DataType::Dictionary(_, _) => make_error(data_type),
DataType::Decimal(_, _) => make_error(data_type),
}
}
fn make_error(data_type: &DataType) -> ArrowResult<ArrayRef> {
Err(ArrowError::NotYetImplemented(format!(
"make_null_column: Unsupported type {:?}",
data_type
)))
}

View File

@ -9,36 +9,37 @@ use arrow_deps::{
physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream},
},
};
use data_types::selection::Selection;
use data_types::{schema::Schema, selection::Selection};
use crate::{predicate::Predicate, PartitionChunk};
use async_trait::async_trait;
use super::{adapter::SchemaAdapterStream, ChunkInfo};
/// Implements the DataFusion physical plan interface
#[derive(Debug)]
pub struct IOxReadFilterNode<C: PartitionChunk + 'static> {
pub(crate) struct IOxReadFilterNode<C: PartitionChunk + 'static> {
table_name: Arc<String>,
/// The desired output schema (includes selection_
/// note that the chunk may not have all these columns.
schema: SchemaRef,
chunks: Vec<Arc<C>>,
chunk_and_infos: Vec<ChunkInfo<C>>,
predicate: Predicate,
projection: Option<Vec<usize>>,
}
impl<C: PartitionChunk + 'static> IOxReadFilterNode<C> {
pub fn new(
table_name: Arc<String>,
schema: SchemaRef,
chunks: Vec<Arc<C>>,
chunk_and_infos: Vec<ChunkInfo<C>>,
predicate: Predicate,
projection: Option<Vec<usize>>,
) -> Self {
Self {
table_name,
schema,
chunks,
chunk_and_infos,
predicate,
projection,
}
}
}
@ -54,7 +55,7 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.chunks.len())
Partitioning::UnknownPartitioning(self.chunk_and_infos.len())
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@ -73,9 +74,8 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
let new_self = Self {
table_name: self.table_name.clone(),
schema: self.schema.clone(),
chunks: self.chunks.clone(),
chunk_and_infos: self.chunk_and_infos.clone(),
predicate: self.predicate.clone(),
projection: self.projection.clone(),
};
Ok(Arc::new(new_self))
@ -86,21 +86,23 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
partition: usize,
) -> arrow_deps::datafusion::error::Result<SendableRecordBatchStream> {
let fields = self.schema.fields();
let selection_cols = self.projection.as_ref().map(|projection| {
projection
.iter()
.map(|&index| fields[index].name() as &str)
.collect::<Vec<_>>()
});
let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
let selection = if let Some(selection_cols) = selection_cols.as_ref() {
Selection::Some(&selection_cols)
} else {
Selection::All
};
let ChunkInfo {
chunk,
chunk_table_schema,
} = &self.chunk_and_infos[partition];
let chunk = &self.chunks[partition];
chunk
// The output selection is all the columns in the schema.
//
// However, this chunk may not have all those columns. Thus we
// restrict the requested selection to the actual columns
// available, and use SchemaAdapterStream to pad the rest of
// the columns with NULLs if necessary
let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
let selection = Selection::Some(&selection_cols);
let stream = chunk
.read_filter(&self.table_name, &self.predicate, selection)
.await
.map_err(|e| {
@ -110,6 +112,25 @@ impl<C: PartitionChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
chunk.id(),
e
))
})
})?;
let adapter = SchemaAdapterStream::try_new(stream, self.schema.clone())
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
Ok(Box::pin(adapter))
}
}
/// Removes any columns that are not present in schema, returning a possibly
/// restricted set of columns
fn restrict_selection<'a>(
selection_cols: Vec<&'a str>,
chunk_table_schema: &'a Schema,
) -> Vec<&'a str> {
let arrow_schema = chunk_table_schema.as_arrow();
selection_cols
.into_iter()
.filter(|col| arrow_schema.fields().iter().any(|f| f.name() == col))
.collect()
}

View File

@ -1,6 +1,9 @@
//! This module contains DataFusion utility functions and helpers
use std::sync::Arc;
use arrow_deps::{
arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef},
arrow::record_batch::RecordBatch,
datafusion::{
error::DataFusionError,
@ -60,3 +63,25 @@ pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, Da
let projection = None; // scan all columns
LogicalPlanBuilder::scan_memory(partitions, schema, projection)?.build()
}
/// Given the requested projection (set of requested columns),
/// returns the schema of selecting just those columns
///
/// TODO contribute this back upstream in arrow's Schema so we can
/// avoid the copy of fields
pub fn project_schema(
arrow_schema: ArrowSchemaRef,
projection: &Option<Vec<usize>>,
) -> ArrowSchemaRef {
match projection {
None => arrow_schema,
Some(projection) => {
let new_fields = projection
.iter()
.map(|&i| arrow_schema.field(i))
.cloned()
.collect();
Arc::new(ArrowSchema::new(new_fields))
}
}
}

View File

@ -27,6 +27,9 @@ pub enum Error {
chunk_id: u32,
},
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema { source: data_types::schema::Error },
#[snafu(display("Internal Predicate Conversion Error: {}", source))]
InternalPredicateConversion { source: super::pred::Error },
@ -228,15 +231,12 @@ impl PartitionChunk for DBChunk {
// Note Mutable buffer doesn't support predicate
// pushdown (other than pruning out the entire chunk
// via `might_pass_predicate)
let schema = self
.table_schema(table_name, selection.clone())
.await?
.into();
let schema: Schema = self.table_schema(table_name, selection.clone()).await?;
Ok(Box::pin(MutableBufferChunkStream::new(
chunk.clone(),
schema,
schema.as_arrow(),
table_name,
selection,
)))
}
DBChunk::ReadBuffer {

View File

@ -37,11 +37,10 @@ pub enum Error {
/// Adapter which will produce record batches from a mutable buffer
/// chunk on demand
pub(crate) struct MutableBufferChunkStream {
/// Schema
/// Requested output schema (includes selection)
schema: SchemaRef,
chunk: Arc<MBChunk>,
table_name: Arc<String>,
selection: OwnedSelection,
/// Vector of record batches to send in reverse order (send data[len-1]
/// next) Is None until the first call to poll_next
@ -49,17 +48,11 @@ pub(crate) struct MutableBufferChunkStream {
}
impl MutableBufferChunkStream {
pub fn new(
chunk: Arc<MBChunk>,
schema: SchemaRef,
table_name: impl Into<String>,
selection: Selection<'_>,
) -> Self {
pub fn new(chunk: Arc<MBChunk>, schema: SchemaRef, table_name: impl Into<String>) -> Self {
Self {
chunk,
schema,
table_name: Arc::new(table_name.into()),
selection: selection.into(),
data: None,
}
}
@ -67,14 +60,17 @@ impl MutableBufferChunkStream {
// gets the next batch, as needed
fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
if self.data.is_none() {
let selected_cols = match &self.selection {
OwnedSelection::Some(cols) => cols.iter().map(|s| s as &str).collect(),
OwnedSelection::All => vec![],
};
let selection = match &self.selection {
OwnedSelection::Some(_) => Selection::Some(&selected_cols),
OwnedSelection::All => Selection::All,
};
// Want all the columns in the schema. Note we don't
// use `Selection::All` here because the mutable buffer chunk would interpret it
// as "all columns in the table in that chunk" rather than
// all columns this query needs
let selected_cols = self
.schema
.fields()
.iter()
.map(|f| f.name() as &str)
.collect::<Vec<_>>();
let selection = Selection::Some(&selected_cols);
let mut data = Vec::new();
self.chunk
@ -147,23 +143,3 @@ impl futures::Stream for ReadFilterResultsStream {
// TODO is there a useful size_hint to pass?
}
// Something which owns the column names for the selection
enum OwnedSelection {
Some(Vec<String>),
All,
}
impl OwnedSelection {}
impl From<Selection<'_>> for OwnedSelection {
fn from(s: Selection<'_>) -> Self {
match s {
Selection::All => Self::All,
Selection::Some(col_refs) => {
let cols = col_refs.iter().map(|s| s.to_string()).collect();
Self::Some(cols)
}
}
}
}

View File

@ -125,3 +125,83 @@ impl DBSetup for TwoMeasurements {
vec![scenario1, scenario2, scenario3, scenario4]
}
}
/// Single measurement that has several different chunks with
/// different (but compatible) schema
pub struct MultiChunkSchemaMerge {}
#[async_trait]
impl DBSetup for MultiChunkSchemaMerge {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let data1 = "cpu,region=west user=23.2,system=5.0 100\n\
cpu,region=west user=21.0,system=6.0 150";
let data2 = "cpu,region=east,host=foo user=23.2 100\n\
cpu,region=west,host=bar user=21.0 250";
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario1 = DBScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
};
// spread across 2 mutable buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario2 = DBScenario {
scenario_name: "Data in open chunk and closed chunk of mutable buffer".into(),
db,
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
let scenario3 = DBScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer"
.into(),
db,
};
// in 2 read buffer chunks
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data1).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
writer.write_lp_string(&db, data2).await.unwrap();
db.rollover_partition(partition_key).await.unwrap();
db.load_chunk_to_read_buffer(partition_key, 0)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 0)
.await
.unwrap();
db.load_chunk_to_read_buffer(partition_key, 1)
.await
.unwrap();
db.drop_mutable_buffer_chunk(partition_key, 1)
.await
.unwrap();
let scenario4 = DBScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
};
vec![scenario1, scenario2, scenario3, scenario4]
}
}

View File

@ -13,6 +13,7 @@ use query::{exec::Executor, frontend::sql::SQLQueryPlanner};
/// output
macro_rules! run_sql_test_case {
($DB_SETUP:expr, $SQL:expr, $EXPECTED_LINES:expr) => {
//test_helpers::enable_logging();
let sql = $SQL.to_string();
for scenario in $DB_SETUP.make().await {
let DBScenario { scenario_name, db } = scenario;
@ -124,3 +125,37 @@ async fn sql_select_from_disk() {
];
run_sql_test_case!(TwoMeasurements {}, "SELECT * from disk", &expected);
}
#[tokio::test]
async fn sql_select_with_schema_merge() {
let expected = vec![
"+------+--------+--------+------+------+",
"| host | region | system | time | user |",
"+------+--------+--------+------+------+",
"| | west | 5 | 100 | 23.2 |",
"| | west | 6 | 150 | 21 |",
"| foo | east | | 100 | 23.2 |",
"| bar | west | | 250 | 21 |",
"+------+--------+--------+------+------+",
];
run_sql_test_case!(MultiChunkSchemaMerge {}, "SELECT * from cpu", &expected);
}
#[tokio::test]
async fn sql_select_with_schema_merge_subset() {
let expected = vec![
"+------+--------+--------+",
"| host | region | system |",
"+------+--------+--------+",
"| | west | 5 |",
"| | west | 6 |",
"| foo | east | |",
"| bar | west | |",
"+------+--------+--------+",
];
run_sql_test_case!(
MultiChunkSchemaMerge {},
"SELECT host, region, system from cpu",
&expected
);
}