refactor: address Andrew's comments

pull/24376/head
Nga Tran 2021-04-16 17:44:46 -04:00
parent b3e110a241
commit 657bfa1b20
2 changed files with 9 additions and 10 deletions

View File

@ -62,6 +62,7 @@ pub enum Path {
impl Path { impl Path {
/// Temp function until we support non-local files /// Temp function until we support non-local files
/// Return true if this file i located on the local filesystem
pub fn local_file(&self) -> bool { pub fn local_file(&self) -> bool {
matches!(self, Self::File(_)) matches!(self, Self::File(_))
} }

View File

@ -72,7 +72,7 @@ pub enum Error {
#[snafu(display("Non local file not supported"))] #[snafu(display("Non local file not supported"))]
NonLocalFile {}, NonLocalFile {},
#[snafu(display("Error open file: {}", source))] #[snafu(display("Error opening file: {}", source))]
OpenFile { source: std::io::Error }, OpenFile { source: std::io::Error },
#[snafu(display("Error at serialized file reader: {}", source))] #[snafu(display("Error at serialized file reader: {}", source))]
@ -227,12 +227,8 @@ impl Storage {
if predicate.exprs.is_empty() { if predicate.exprs.is_empty() {
None None
} else { } else {
// TODO: not sure why we store a list of exprs. It should be an expressions of // Convert to datafusion's predicate
// one or many AND/OR. Make it one Expr for now to be consistent let predicate = predicate.filter_expr()?;
// with DataFusion's predicate builder.
// Reference:
// datafusion::physical_plan::parquet::RowGroupPredicateBuilder::try_new
let predicate = predicate.exprs[0].clone();
Some(RowGroupPredicateBuilder::try_new(&predicate, schema).ok()?) Some(RowGroupPredicateBuilder::try_new(&predicate, schema).ok()?)
} }
} }
@ -280,7 +276,7 @@ impl Storage {
// Might need different function if this display function does not show the full // Might need different function if this display function does not show the full
// path // path
let filename = path.display(); let filename = path.display();
println!("Parquet filename: {}", filename); // println!("Parquet filename: {}", filename); // TODO: remove after tests
// Indices of columns in the schema needed to read // Indices of columns in the schema needed to read
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema)); let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
@ -320,7 +316,7 @@ impl Storage {
response_tx: &Sender<ArrowResult<RecordBatch>>, response_tx: &Sender<ArrowResult<RecordBatch>>,
result: ArrowResult<RecordBatch>, result: ArrowResult<RecordBatch>,
) -> Result<()> { ) -> Result<()> {
// Note this function is running on its own blockng tokio thread so blocking // Note this function is running on its own blocking tokio thread so blocking
// here is ok. // here is ok.
response_tx response_tx
.blocking_send(result) .blocking_send(result)
@ -343,7 +339,9 @@ impl Storage {
if let Some(predicate_builder) = predicate_builder { if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = let row_group_predicate =
predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups()); predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups());
file_reader.filter_row_groups(&row_group_predicate); file_reader.filter_row_groups(&row_group_predicate); //filter out
// row group based
// on the predicate
} }
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let mut batch_reader = arrow_reader let mut batch_reader = arrow_reader