refactor: use DataFusion to read parquet files (#5531)

Remove our own hand-rolled logic and let DataFusion read the parquet
files.

As a bonus, this now supports predicate pushdown to the deserialization
step, so we can use parquets as in in-mem buffer.

Note that this currently uses some "nested" DataFusion hack due to the
way the `QueryChunk` interface works. Midterm I'll change the interface
so that the `ParquetExec` nodes are directly visible to DataFusion
instead of some opaque `SendableRecordBatchStream`.
pull/24376/head
Marco Neumann 2022-09-05 09:25:04 +00:00 committed by GitHub
parent f45cbfb88d
commit 064f0e9b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 337 deletions

View File

@ -812,7 +812,9 @@ mod tests {
let schema = table_schema.select_by_names(&selection).unwrap();
let path: ParquetFilePath = (&file).into();
let rx = storage.read_all(schema.as_arrow(), &path).unwrap();
let rx = storage
.read_all(schema.as_arrow(), &path, file.file_size_bytes as usize)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await
.unwrap()

View File

@ -1180,7 +1180,9 @@ mod tests {
let schema = table_schema.select_by_names(&selection).unwrap();
let path: ParquetFilePath = (&file).into();
let rx = storage.read_all(schema.as_arrow(), &path).unwrap();
let rx = storage
.read_all(schema.as_arrow(), &path, file.file_size_bytes as usize)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await
.unwrap()

View File

@ -84,6 +84,7 @@ impl ParquetChunk {
selection,
Arc::clone(&self.schema.as_arrow()),
&path,
self.file_size_bytes(),
)
}

View File

@ -7,24 +7,29 @@ use crate::{
ParquetFilePath,
};
use arrow::{
datatypes::{Field, Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
datatypes::{Field, SchemaRef},
error::ArrowError,
record_batch::RecordBatch,
};
use bytes::Bytes;
use datafusion::{
parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ProjectionMask},
physical_plan::SendableRecordBatchStream,
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
execution::context::TaskContext,
physical_plan::{
execute_stream,
file_format::{FileScanConfig, ParquetExec},
stream::RecordBatchStreamAdapter,
SendableRecordBatchStream, Statistics,
},
prelude::SessionContext,
};
use datafusion_util::{watch::WatchedTask, AdapterStream};
use futures::{Stream, TryStreamExt};
use object_store::{DynObjectStore, GetResult};
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::selection::{select_schema, Selection};
use std::{collections::HashMap, num::TryFromIntError, sync::Arc, time::Duration};
use std::{num::TryFromIntError, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::io::AsyncReadExt;
/// Parquet row group read size
pub const ROW_GROUP_READ_SIZE: usize = 1024 * 1024;
@ -174,46 +179,59 @@ impl ParquetStorage {
/// object store impl caches the fetched bytes.
pub fn read_filter(
&self,
_predicate: &Predicate,
predicate: &Predicate,
selection: Selection<'_>,
schema: SchemaRef,
path: &ParquetFilePath,
file_size: usize,
) -> Result<SendableRecordBatchStream, ReadError> {
let path = path.object_store_path();
trace!(path=?path, "fetching parquet data for filtered read");
// Compute final (output) schema after selection
let schema = select_schema(selection, &schema);
let schema = Arc::new(
select_schema(selection, &schema)
.as_ref()
.clone()
.with_metadata(Default::default()),
);
let (tx, rx) = tokio::sync::mpsc::channel(2);
// Run async dance here to make sure any error returned
// `download_and_scan_parquet` is sent back to the reader and
// not silently ignored
let object_store = Arc::clone(&self.object_store);
let schema_captured = Arc::clone(&schema);
let tx_captured = tx.clone();
let fut = async move {
let download_result =
download_and_scan_parquet(schema_captured, path, object_store, tx_captured.clone())
.await;
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
warn!(error=%e, "Parquet download & scan failed");
let e = ArrowError::ExternalError(Box::new(e));
if let Err(e) = tx_captured.send(ArrowResult::Err(e)).await {
// if no one is listening, there is no one else to hear our screams
debug!(%e, "Error sending result of download function. Receiver is closed.");
}
}
Ok(())
// create ParquetExec node
let object_meta = ObjectMeta {
location: path,
// we don't care about the "last modified" field
last_modified: Default::default(),
size: file_size,
};
let handle = WatchedTask::new(fut, vec![tx], "download and scan parquet");
let expr = predicate.filter_expr();
let base_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("iox://iox/").expect("valid object store URL"),
file_schema: Arc::clone(&schema),
file_groups: vec![vec![PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
};
let exec = ParquetExec::new(base_config, expr, None);
// returned stream simply reads off the rx channel
Ok(AdapterStream::adapt(schema, rx, handle))
// set up "fake" DataFusion session
let object_store = Arc::clone(&self.object_store);
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(execute_stream(Arc::new(exec), task_ctx)).try_flatten(),
)))
}
/// Read all data from the parquet file.
@ -221,96 +239,18 @@ impl ParquetStorage {
&self,
schema: SchemaRef,
path: &ParquetFilePath,
file_size: usize,
) -> Result<SendableRecordBatchStream, ReadError> {
self.read_filter(&Predicate::default(), Selection::All, schema, path)
self.read_filter(
&Predicate::default(),
Selection::All,
schema,
path,
file_size,
)
}
}
/// Downloads the specified parquet file to a local temporary file
/// and push the [`RecordBatch`] contents over `tx`, projecting the specified
/// column indexes.
///
/// This call MAY download a parquet file from object storage, temporarily
/// spilling it to disk while it is processed.
async fn download_and_scan_parquet(
expected_schema: SchemaRef,
path: object_store::path::Path,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
) -> Result<(), ReadError> {
trace!(?path, "Start parquet download & scan");
let read_stream = object_store.get(&path).await?;
let data = match read_stream {
GetResult::File(f, _) => {
trace!(?path, "Using file directly");
let mut f = tokio::fs::File::from_std(f);
let l = f.metadata().await?.len();
let mut buf = Vec::with_capacity(l as usize);
f.read_to_end(&mut buf).await?;
buf
}
GetResult::Stream(read_stream) => {
let chunks: Vec<_> = read_stream.try_collect().await?;
let mut buf = Vec::with_capacity(chunks.iter().map(|c| c.len()).sum::<usize>());
for c in chunks {
buf.extend(c);
}
buf
}
};
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(data))?;
// Check schema and calculate `file->expected` projections
let file_schema = builder.schema();
let (mask, reorder_projection) = match project_for_parquet_reader(file_schema, &expected_schema)
{
Ok((mask, reorder_projection)) => (mask, reorder_projection),
Err(e) => {
return Err(ReadError::SchemaMismatch { path, source: e });
}
};
let mask = ProjectionMask::roots(builder.parquet_schema(), mask);
// limit record batch size to number of rows
// See:
// - https://github.com/apache/arrow-rs/issues/2321
// - https://github.com/influxdata/conductor/issues/1103
let n_rows: usize = builder.metadata().file_metadata().num_rows().try_into()?;
let batch_size = n_rows.min(ROW_GROUP_READ_SIZE);
let record_batch_reader = builder
.with_projection(mask)
.with_batch_size(batch_size)
.build()?;
for batch in record_batch_reader {
let batch = batch.map(|batch| {
// project to fix column order
let batch = batch
.project(&reorder_projection)
.expect("bug in projection calculation");
// attach potential metadata
RecordBatch::try_new(Arc::clone(&expected_schema), batch.columns().to_vec())
.expect("bug in schema handling")
});
if tx.send(batch).await.is_err() {
debug!("Receiver hung up - exiting");
break;
}
}
debug!(?path, "Completed parquet download & scan");
Ok(())
}
/// Error during projecting parquet file data to an expected schema.
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
@ -330,66 +270,6 @@ pub enum ProjectionError {
},
}
/// Calculate project for the parquet-rs reader.
///
/// Expects the schema that was extracted from the actual parquet file and the desired output schema.
///
/// Returns two masks:
///
/// 1. A mask that can be passed to the parquet reader. Since the parquet reader however ignores the mask order, the
/// resulting record batches will have the same order as in the file (i.e. NOT the order in the desired schema).
/// 2. A re-order mask that can be used to reorder the output batches to actually match the desired schema.
///
/// Will fail the desired schema contains a column that is unknown or the field types in the two schemas do not match.
fn project_for_parquet_reader(
file_schema: &Schema,
expected_schema: &Schema,
) -> Result<(Vec<usize>, Vec<usize>), ProjectionError> {
let file_column_indices: HashMap<_, _> = file_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.enumerate()
.map(|(v, k)| (k, v))
.collect();
let mut mask = Vec::with_capacity(expected_schema.fields().len());
for field in expected_schema.fields() {
let file_idx = if let Some(idx) = file_column_indices.get(field.name().as_str()) {
*idx
} else {
return Err(ProjectionError::UnknownField(field.name().clone()));
};
let file_field = file_schema.field(file_idx);
if field != file_field {
return Err(ProjectionError::FieldTypeMismatch {
expected: field.clone(),
actual: file_field.clone(),
});
}
mask.push(file_idx);
}
// for some weird reason, the parquet-rs projection system only filters columns but ignores the mask order, so we
// need to calculate a reorder projection
// 1. remember for each mask element where it should go in the expected schema
let mut mask_with_index: Vec<(usize, usize)> = mask.iter().copied().enumerate().collect();
// 2. perform re-order as parquet-rs would do that (it just uses the mask and sorts it)
mask_with_index.sort_by_key(|(_a, b)| *b);
// 3. since we need to transform the re-ordered (i.e. messed up) view back into the mask, throw away the original
// mask, keep the expected schema position (added in step 1) and add the index within the parquet-rs output
let mut mask_with_index: Vec<(usize, usize)> = mask_with_index
.into_iter()
.map(|(a, _b)| a)
.enumerate()
.collect();
// 4. sort by the index within the expected schema (added in step 1, forwared in step 3)
mask_with_index.sort_by_key(|(_a, b)| *b);
// 5. just keep the index within the parquet-rs output (added in step 3)
let reorder_projection: Vec<usize> = mask_with_index.into_iter().map(|(a, _b)| a).collect();
Ok((mask, reorder_projection))
}
#[cfg(test)]
mod tests {
use super::*;
@ -512,7 +392,7 @@ mod tests {
assert_schema_check_fail(
other_batch,
schema,
"Schema mismatch (expected VS actual parquet file) for file '1/3/2/4/00000000-0000-0000-0000-000000000000.parquet': Type mismatch, expected Field { name: \"a\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None } but got Field { name: \"a\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }",
"Arrow error: External error: Execution error: Failed to map column projection for field a. Incompatible data types Int64 and Utf8",
).await;
}
@ -524,7 +404,7 @@ mod tests {
assert_schema_check_fail(
other_batch,
schema,
"Schema mismatch (expected VS actual parquet file) for file '1/3/2/4/00000000-0000-0000-0000-000000000000.parquet': Unknown field: a",
"Arrow error: Invalid argument error: Column 'a' is declared as non-nullable but contains null values",
).await;
}
@ -540,7 +420,7 @@ mod tests {
assert_schema_check_fail(
other_batch,
schema,
"Schema mismatch (expected VS actual parquet file) for file '1/3/2/4/00000000-0000-0000-0000-000000000000.parquet': Unknown field: b",
"Arrow error: Invalid argument error: Column 'b' is declared as non-nullable but contains null values",
).await;
}
@ -555,7 +435,7 @@ mod tests {
let schema = batch.schema();
// Serialize & upload the record batches.
upload(&store, &meta, batch).await;
let (_iox_md, file_size) = upload(&store, &meta, batch).await;
// add metadata to reference schema
let schema = Arc::new(
@ -564,7 +444,7 @@ mod tests {
.clone()
.with_metadata(HashMap::from([(String::from("foo"), String::from("bar"))])),
);
download(&store, &meta, Selection::All, schema)
download(&store, &meta, Selection::All, schema, file_size)
.await
.unwrap();
}
@ -591,9 +471,9 @@ mod tests {
.unwrap();
// Serialize & upload the record batches.
upload(&store, &meta, batch).await;
let (_iox_md, file_size) = upload(&store, &meta, batch).await;
download(&store, &meta, Selection::All, schema)
download(&store, &meta, Selection::All, schema, file_size)
.await
.unwrap();
}
@ -629,108 +509,6 @@ mod tests {
assert_roundtrip(file_batch, Selection::Some(&["a"]), schema, expected_batch).await;
}
#[test]
fn test_project_for_parquet_reader() {
assert_eq!(
run_project_for_parquet_reader(&[], &[]).unwrap(),
(vec![], vec![]),
);
assert_eq!(
run_project_for_parquet_reader(&[("a", ColType::Int), ("b", ColType::String)], &[])
.unwrap(),
(vec![], vec![]),
);
assert_eq!(
run_project_for_parquet_reader(
&[("a", ColType::Int), ("b", ColType::String)],
&[("a", ColType::Int), ("b", ColType::String)]
)
.unwrap(),
(vec![0, 1], vec![0, 1]),
);
assert_eq!(
run_project_for_parquet_reader(
&[("a", ColType::Int), ("b", ColType::String)],
&[("b", ColType::String), ("a", ColType::Int)]
)
.unwrap(),
(vec![1, 0], vec![1, 0]),
);
assert_eq!(
run_project_for_parquet_reader(
&[
("a", ColType::Int),
("b", ColType::String),
("c", ColType::String)
],
&[("c", ColType::String), ("a", ColType::Int)]
)
.unwrap(),
(vec![2, 0], vec![1, 0]),
);
assert_eq!(
run_project_for_parquet_reader(
&[
("a", ColType::Int),
("b", ColType::String),
("c", ColType::String),
("d", ColType::Int)
],
&[
("c", ColType::String),
("b", ColType::String),
("d", ColType::Int)
]
)
.unwrap(),
(vec![2, 1, 3], vec![1, 0, 2]),
);
assert_eq!(
run_project_for_parquet_reader(
&[
("field_int", ColType::Int),
("tag1", ColType::String),
("tag2", ColType::String),
("tag3", ColType::String),
("time", ColType::Int),
],
&[
("tag1", ColType::String),
("tag2", ColType::String),
("tag3", ColType::String),
("field_int", ColType::Int),
("time", ColType::Int),
]
)
.unwrap(),
(vec![1, 2, 3, 0, 4], vec![1, 2, 3, 0, 4]),
);
assert!(matches!(
run_project_for_parquet_reader(
&[("a", ColType::Int), ("b", ColType::String)],
&[("a", ColType::Int), ("c", ColType::String)]
)
.unwrap_err(),
ProjectionError::UnknownField(_),
));
assert!(matches!(
run_project_for_parquet_reader(
&[("a", ColType::Int), ("b", ColType::String)],
&[("a", ColType::Int), ("b", ColType::Int)]
)
.unwrap_err(),
ProjectionError::FieldTypeMismatch { .. },
));
}
fn to_string_array(strs: &[&str]) -> ArrayRef {
let array: StringArray = strs.iter().map(|s| Some(*s)).collect();
Arc::new(array)
@ -775,10 +553,17 @@ mod tests {
meta: &IoxMetadata,
selection: Selection<'_>,
expected_schema: SchemaRef,
file_size: usize,
) -> Result<RecordBatch, DataFusionError> {
let path: ParquetFilePath = meta.into();
let rx = store
.read_filter(&Predicate::default(), selection, expected_schema, &path)
.read_filter(
&Predicate::default(),
selection,
expected_schema,
&path,
file_size,
)
.expect("should read record batches from object store");
let schema = rx.schema();
datafusion::physical_plan::common::collect(rx)
@ -803,10 +588,10 @@ mod tests {
// Serialize & upload the record batches.
let meta = meta();
upload(&store, &meta, upload_batch).await;
let (_iox_md, file_size) = upload(&store, &meta, upload_batch).await;
// And compare to the original input
let actual_batch = download(&store, &meta, selection, expected_schema)
let actual_batch = download(&store, &meta, selection, expected_schema, file_size)
.await
.unwrap();
assert_eq!(actual_batch, expected_batch);
@ -822,49 +607,13 @@ mod tests {
let store = ParquetStorage::new(object_store);
let meta = meta();
upload(&store, &meta, persisted_batch).await;
let (_iox_md, file_size) = upload(&store, &meta, persisted_batch).await;
let err = download(&store, &meta, Selection::All, expected_schema)
let err = download(&store, &meta, Selection::All, expected_schema, file_size)
.await
.unwrap_err();
// And compare to the original input
if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err {
assert_eq!(&err.to_string(), msg,);
} else {
panic!("Wrong error type: {err}");
}
}
enum ColType {
Int,
String,
}
fn build_schema(cols: &[(&str, ColType)]) -> Schema {
let batch = RecordBatch::try_from_iter(
cols.iter()
.map(|(c, t)| {
let array = match t {
ColType::Int => to_int_array(&[1]),
ColType::String => to_string_array(&["foo"]),
};
(*c, array)
})
.chain(std::iter::once(("_not_empty", to_int_array(&[1])))),
)
.unwrap();
let indices: Vec<_> = (0..cols.len()).collect();
batch.schema().project(&indices).unwrap()
}
fn run_project_for_parquet_reader(
cols_file: &[(&str, ColType)],
cols_expected: &[(&str, ColType)],
) -> Result<(Vec<usize>, Vec<usize>), ProjectionError> {
let file_schema = build_schema(cols_file);
let expected_schema = build_schema(cols_expected);
project_for_parquet_reader(&file_schema, &expected_schema)
assert_eq!(err.to_string(), msg);
}
}

View File

@ -179,7 +179,11 @@ fn record_batches_stream(
store: ParquetStorage,
) -> Result<SendableRecordBatchStream, parquet_file::storage::ReadError> {
let path: ParquetFilePath = parquet_file.into();
store.read_all(schema.as_arrow(), &path)
store.read_all(
schema.as_arrow(),
&path,
parquet_file.file_size_bytes as usize,
)
}
#[derive(Debug, Snafu)]