From 657bfa1b204d2363e70f36118dc9b3fca6bebf60 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 16 Apr 2021 17:44:46 -0400 Subject: [PATCH] refactor: address Andrew's comments --- object_store/src/path.rs | 1 + parquet_file/src/storage.rs | 18 ++++++++---------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/object_store/src/path.rs b/object_store/src/path.rs index 6fdae7c4a2..7e7e7fa74c 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -62,6 +62,7 @@ pub enum Path { impl Path { /// Temp function until we support non-local files + /// Return true if this file i located on the local filesystem pub fn local_file(&self) -> bool { matches!(self, Self::File(_)) } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index a18c3de70d..df299881c9 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -72,7 +72,7 @@ pub enum Error { #[snafu(display("Non local file not supported"))] NonLocalFile {}, - #[snafu(display("Error open file: {}", source))] + #[snafu(display("Error opening file: {}", source))] OpenFile { source: std::io::Error }, #[snafu(display("Error at serialized file reader: {}", source))] @@ -227,12 +227,8 @@ impl Storage { if predicate.exprs.is_empty() { None } else { - // TODO: not sure why we store a list of exprs. It should be an expressions of - // one or many AND/OR. Make it one Expr for now to be consistent - // with DataFusion's predicate builder. - // Reference: - // datafusion::physical_plan::parquet::RowGroupPredicateBuilder::try_new - let predicate = predicate.exprs[0].clone(); + // Convert to datafusion's predicate + let predicate = predicate.filter_expr()?; Some(RowGroupPredicateBuilder::try_new(&predicate, schema).ok()?) } } @@ -280,7 +276,7 @@ impl Storage { // Might need different function if this display function does not show the full // path let filename = path.display(); - println!("Parquet filename: {}", filename); + // println!("Parquet filename: {}", filename); // TODO: remove after tests // Indices of columns in the schema needed to read let projection: Vec = Self::column_indices(selection, Arc::clone(&schema)); @@ -320,7 +316,7 @@ impl Storage { response_tx: &Sender>, result: ArrowResult, ) -> Result<()> { - // Note this function is running on its own blockng tokio thread so blocking + // Note this function is running on its own blocking tokio thread so blocking // here is ok. response_tx .blocking_send(result) @@ -343,7 +339,9 @@ impl Storage { if let Some(predicate_builder) = predicate_builder { let row_group_predicate = predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups()); - file_reader.filter_row_groups(&row_group_predicate); + file_reader.filter_row_groups(&row_group_predicate); //filter out + // row group based + // on the predicate } let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let mut batch_reader = arrow_reader