refactor: clean up and add comments

pull/24376/head
Nga Tran 2021-05-07 09:31:41 -04:00
parent 55bf848bd2
commit ba015ee4df
5 changed files with 48 additions and 96 deletions

View File

@ -1,7 +1,7 @@
/// This module responsible to write given data to specify object store and /// This module responsible to write given data to specify object store and
/// read them back /// read them back
use arrow::{ use arrow::{
datatypes::{Schema, SchemaRef}, datatypes::{Schema as ArrowSchema, SchemaRef},
error::Result as ArrowResult, error::Result as ArrowResult,
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
@ -9,7 +9,7 @@ use datafusion::physical_plan::{
common::SizedRecordBatchStream, parquet::RowGroupPredicateBuilder, RecordBatchStream, common::SizedRecordBatchStream, parquet::RowGroupPredicateBuilder, RecordBatchStream,
SendableRecordBatchStream, SendableRecordBatchStream,
}; };
use internal_types::{schema::Schema as IOxSchema, selection::Selection}; use internal_types::{schema::Schema, selection::Selection};
use object_store::{ use object_store::{
path::{ObjectStorePath, Path}, path::{ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, ObjectStoreIntegration, ObjectStore, ObjectStoreApi, ObjectStoreIntegration,
@ -236,7 +236,7 @@ impl Storage {
/// Make a datafusion predicate builder for the given predicate and schema /// Make a datafusion predicate builder for the given predicate and schema
pub fn predicate_builder( pub fn predicate_builder(
predicate: &Predicate, predicate: &Predicate,
schema: Schema, schema: ArrowSchema,
) -> Option<RowGroupPredicateBuilder> { ) -> Option<RowGroupPredicateBuilder> {
if predicate.exprs.is_empty() { if predicate.exprs.is_empty() {
None None
@ -286,7 +286,7 @@ impl Storage {
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema)); let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
// Filter needed predicates // Filter needed predicates
let builder_schema = Schema::new(schema.fields().clone()); let builder_schema = ArrowSchema::new(schema.fields().clone());
let predicate_builder = Self::predicate_builder(predicate, builder_schema); let predicate_builder = Self::predicate_builder(predicate, builder_schema);
// Size of each batch // Size of each batch
@ -330,16 +330,15 @@ impl Storage {
return Err(e); return Err(e);
} }
// TODO: removed when #1082 done // Schema of all record batches must be the same, Get the first one
println!("Record batches from read_file: {:#?}", batches); // to build record batch stream
let batch_schema = if batches.is_empty() {
let sch = if batches.is_empty() {
schema schema
} else { } else {
batches[0].schema() batches[0].schema()
}; };
Ok(Box::pin(SizedRecordBatchStream::new(sch, batches))) Ok(Box::pin(SizedRecordBatchStream::new(batch_schema, batches)))
} }
// TODO notes: implemented this for #1082 but i turns out might not be able to use // TODO notes: implemented this for #1082 but i turns out might not be able to use
@ -379,7 +378,7 @@ impl Storage {
// // Get full string path // // Get full string path
// let full_path = format!("{:?}", file_root.path(&file_path)); // let full_path = format!("{:?}", file_root.path(&file_path));
// let full_path = full_path.trim_matches('"'); // let full_path = full_path.trim_matches('"');
// //println!("Full path filename: {}", full_path); // println!("Full path filename: {}", full_path);
// let mut total_rows = 0; // let mut total_rows = 0;
@ -390,18 +389,12 @@ impl Storage {
// let iox_schema = read_schema_from_parquet_metadata(metadata)?; // let iox_schema = read_schema_from_parquet_metadata(metadata)?;
// if let Some(predicate_builder) = predicate_builder { // if let Some(predicate_builder) = predicate_builder {
// println!("___ HAS PREDICATE BUILDER ___");
// let row_group_predicate = // let row_group_predicate =
// predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups()); // predicate_builder.build_row_group_predicate(file_reader.metadata().row_groups());
// file_reader.filter_row_groups(&row_group_predicate); //filter out // file_reader.filter_row_groups(&row_group_predicate); //filter out
// // row group based // // row group based
// // on the predicate // // on the predicate
// } // }
// // let parquet_data = load_parquet_data_from_object_store(path, store).await.unwrap();
// // let cursor = SliceableCursor::new(parquet_data);
// // let reader = SerializedFileReader::new(cursor).unwrap();
// // let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
// let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); // let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
// let mut batch_reader = arrow_reader // let mut batch_reader = arrow_reader
// .get_record_reader_by_columns(projection.to_owned(), batch_size) // .get_record_reader_by_columns(projection.to_owned(), batch_size)
@ -409,19 +402,9 @@ impl Storage {
// loop { // loop {
// match batch_reader.next() { // match batch_reader.next() {
// Some(Ok(batch)) => { // Some(Ok(batch)) => {
// println!("--- READ FROM OS:"); // //println!("ParquetExec got new batch from {}", filename);
// println!("-------- Record batch: {:#?}", batch);
// // TODO: remove this when arow-rs' ticket https://github.com/apache/arrow-rs/issues/252 is done
// let columns = batch.columns().to_vec();
// let fields = batch.schema().fields().clone();
// let column_schema = Schema::new_with_metadata(fields, iox_schema.as_arrow().metadata().clone());
// let new_batch = RecordBatch::try_new(Arc::new(column_schema), columns).context(ReadingFile)?;
// println!("-------- New Record batch: {:#?}", new_batch);
// total_rows += batch.num_rows(); // total_rows += batch.num_rows();
// Self::send_result(&response_tx, Ok(new_batch))?; // Self::send_result(&response_tx, Ok(batch))?;
// if limit.map(|l| total_rows >= l).unwrap_or(false) { // if limit.map(|l| total_rows >= l).unwrap_or(false) {
// break; // break;
// } // }
@ -466,15 +449,18 @@ impl Storage {
// Get full string path // Get full string path
let full_path = format!("{:?}", file_root.path(&file_path)); let full_path = format!("{:?}", file_root.path(&file_path));
let full_path = full_path.trim_matches('"'); let full_path = full_path.trim_matches('"');
//println!("Full path filename: {}", full_path); // TOTO: to be removed after both #1082 and #1342 done //println!("Full path filename: {}", full_path); // TOTO: to be removed after #1342 done
let mut total_rows = 0; let mut total_rows = 0;
let file = File::open(&full_path).context(OpenFile)?; let file = File::open(&full_path).context(OpenFile)?;
let mut file_reader = SerializedFileReader::new(file).context(SerializedFileReaderError)?; let mut file_reader = SerializedFileReader::new(file).context(SerializedFileReaderError)?;
// TODO: remove these line after https://github.com/apache/arrow-rs/issues/252 is done
// Get file level metadata to set it to the record batch's metadata below
let metadata = file_reader.metadata(); let metadata = file_reader.metadata();
// println!("___ META DATA: {:#?}", metadata); // println!("___ META DATA: {:#?}", metadata);
let iox_schema = read_schema_from_parquet_metadata(metadata)?; let schema = read_schema_from_parquet_metadata(metadata)?;
if let Some(predicate_builder) = predicate_builder { if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = let row_group_predicate =
@ -490,18 +476,20 @@ impl Storage {
loop { loop {
match batch_reader.next() { match batch_reader.next() {
Some(Ok(batch)) => { Some(Ok(batch)) => {
//println!("ParquetExec got new batch from {}", filename); TODO: remove when #1082 done
//println!("Batch value: {:#?}", batch);
total_rows += batch.num_rows(); total_rows += batch.num_rows();
// TODO: remove this when arow-rs' ticket https://github.com/apache/arrow-rs/issues/252 is done // TODO: remove these lines when arow-rs' ticket https://github.com/apache/arrow-rs/issues/252 is done
// println!("Batch value: {:#?}", batch);
// Since arrow's parquet reading does not return the row group level's metadata, the
// work around here is to get it from the file level whish is the same
let columns = batch.columns().to_vec(); let columns = batch.columns().to_vec();
let fields = batch.schema().fields().clone(); let fields = batch.schema().fields().clone();
let column_schema = let arrow_column_schema = ArrowSchema::new_with_metadata(
Schema::new_with_metadata(fields, iox_schema.as_arrow().metadata().clone()); fields,
let new_batch = RecordBatch::try_new(Arc::new(column_schema), columns) schema.as_arrow().metadata().clone(),
);
let new_batch = RecordBatch::try_new(Arc::new(arrow_column_schema), columns)
.context(ReadingFile)?; .context(ReadingFile)?;
// println!("-------- New Record batch: {:#?}", new_batch); // println!("-------- New Record batch: {:#?}", new_batch);
batches.push(Arc::new(new_batch)); batches.push(Arc::new(new_batch));
@ -518,26 +506,6 @@ impl Storage {
} }
} }
// let expected = vec![
// "+--------+--------+--------+--------+-------------------------------+",
// "| field1 | field2 | field3 | field4 | time |",
// "+--------+--------+--------+--------+-------------------------------+",
// "| 70.6 | | 2 | | 1970-01-01 00:00:00.000000100 |",
// "| 70.4 | ss | | | 1970-01-01 00:00:00.000000100 |",
// "| 70.5 | ss | | | 1970-01-01 00:00:00.000000100 |",
// "+--------+--------+--------+--------+-------------------------------+",
// ];
// let new_batches = batches.clone();
// println!("ASSERT ASSERT ");
// let b: Vec<RecordBatch> = new_batches
// .iter()
// .map(|a| RecordBatch::clone(&*a))
// .collect();
// assert_batches_eq!(expected, &b);
Ok(()) Ok(())
} }
@ -556,10 +524,6 @@ impl Storage {
} }
} }
// fn load_parquet_data_from_object_store(path: Path, store: Arc<ObjectStore>) -> () {
// todo!()
// }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct MemWriter { pub struct MemWriter {
mem: Arc<Mutex<Cursor<Vec<u8>>>>, mem: Arc<Mutex<Cursor<Vec<u8>>>>,
@ -603,7 +567,7 @@ impl TryClone for MemWriter {
} }
/// Read IOx schema from parquet metadata. /// Read IOx schema from parquet metadata.
pub fn read_schema_from_parquet_metadata(parquet_md: &ParquetMetaData) -> Result<IOxSchema> { pub fn read_schema_from_parquet_metadata(parquet_md: &ParquetMetaData) -> Result<Schema> {
let file_metadata = parquet_md.file_metadata(); let file_metadata = parquet_md.file_metadata();
let arrow_schema = parquet_to_arrow_schema( let arrow_schema = parquet_to_arrow_schema(
@ -614,10 +578,10 @@ pub fn read_schema_from_parquet_metadata(parquet_md: &ParquetMetaData) -> Result
let arrow_schema_ref = Arc::new(arrow_schema); let arrow_schema_ref = Arc::new(arrow_schema);
let iox_schema: IOxSchema = arrow_schema_ref let schema: Schema = arrow_schema_ref
.try_into() .try_into()
.context(IoxFromArrowFailure {})?; .context(IoxFromArrowFailure {})?;
Ok(iox_schema) Ok(schema)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -18,6 +18,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_write_read() { async fn test_write_read() {
////////////////////
// Create test data which is also the expected data // Create test data which is also the expected data
let table: &str = "table1"; let table: &str = "table1";
let (record_batches, schema, column_summaries, time_range, num_rows) = make_record_batch(); let (record_batches, schema, column_summaries, time_range, num_rows) = make_record_batch();
@ -26,10 +27,12 @@ mod tests {
let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches
let key_value_metadata = record_batch.schema().metadata().clone(); let key_value_metadata = record_batch.schema().metadata().clone();
////////////////////
// Make an OS in memory // Make an OS in memory
let store = make_object_store(); let store = make_object_store();
// Store the data as a Create a chunk and write it to in the object store ////////////////////
// Store the data as a chunk and write it to in the object store
// This test Storage::write_to_object_store // This test Storage::write_to_object_store
let chunk = make_chunk_given_record_batch( let chunk = make_chunk_given_record_batch(
Arc::clone(&store), Arc::clone(&store),
@ -41,11 +44,13 @@ mod tests {
) )
.await; .await;
////////////////////
// Now let read it back // Now let read it back
//
let (_read_table, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(&store)).await; let (_read_table, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(&store)).await;
let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap(); let parquet_metadata = read_parquet_metadata_from_file(parquet_data).unwrap();
// //
// 1. Check metadata // 1. Check metadata at file level: Everything is correct
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
//let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); //let schema_expected = chunk.table_schema(&table, Selection::All).unwrap();
@ -68,20 +73,25 @@ mod tests {
// 3. Check data // 3. Check data
// Read the parquet data from object store // Read the parquet data from object store
let (_read_table, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(&store)).await; let (_read_table, parquet_data) = load_parquet_from_store(&chunk, Arc::clone(&store)).await;
// Note that the read_data_from_parquet_data function fixes the row-group/batches' level metadata bug in arrow
let actual_record_batches = let actual_record_batches =
read_data_from_parquet_data(Arc::clone(&schema.as_arrow()), parquet_data); read_data_from_parquet_data(Arc::clone(&schema.as_arrow()), parquet_data);
let mut actual_num_rows = 0; let mut actual_num_rows = 0;
for batch in actual_record_batches.clone() { for batch in actual_record_batches.clone() {
actual_num_rows += batch.num_rows(); actual_num_rows += batch.num_rows();
// Check if record batch has meta data
// println!("Record batch: {:#?}", batch); // println!("Record batch: {:#?}", batch);
let batch_key_value_metadata = batch.schema().metadata().clone(); let batch_key_value_metadata = batch.schema().metadata().clone();
println!("Batch key value meta data: {:#?}", batch_key_value_metadata); // should have value // println!("Batch key value meta data: {:#?}", batch_key_value_metadata); // should have value
assert_eq!( assert_eq!(
schema.as_arrow().metadata().clone(), schema.as_arrow().metadata().clone(),
batch_key_value_metadata batch_key_value_metadata
); );
} }
// Now verify return results. This assert_batches_eq still works correctly without the metadata
// We might modify it to make it include checking metadata
let expected = vec![ let expected = vec![
"+--------------+-----------+-----------------------+--------------------+------------------+----------------------+------------------+------------------+---------------+----------------+------------+----------------------------+", "+--------------+-----------+-----------------------+--------------------+------------------+----------------------+------------------+------------------+---------------+----------------+------------+----------------------------+",
"| tag_nonempty | tag_empty | field_string_nonempty | field_string_empty | field_i64_normal | field_i64_range | field_u64_normal | field_f64_normal | field_f64_inf | field_f64_zero | field_bool | time |", "| tag_nonempty | tag_empty | field_string_nonempty | field_string_empty | field_i64_normal | field_i64_range | field_u64_normal | field_f64_normal | field_f64_inf | field_f64_zero | field_bool | time |",

View File

@ -508,13 +508,13 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
// Indices of columns in the schema needed to read // Indices of columns in the schema needed to read
let projection: Vec<usize> = Storage::column_indices(Selection::All, Arc::clone(&schema)); let projection: Vec<usize> = Storage::column_indices(Selection::All, Arc::clone(&schema));
let mut batch_reader = arrow_reader let mut batch_reader = arrow_reader
.get_record_reader_by_columns(projection, 1024) //batch_size) .get_record_reader_by_columns(projection, 1024)
.unwrap(); .unwrap();
loop { loop {
match batch_reader.next() { match batch_reader.next() {
Some(Ok(batch)) => { Some(Ok(batch)) => {
//println!("Record batch: {:#?}", batch); // The metadata of batch's schema is missing
// TODO: remove this when arow-rs' ticket https://github.com/apache/arrow-rs/issues/252#252 is done // TODO: remove this when arow-rs' ticket https://github.com/apache/arrow-rs/issues/252#252 is done
//println!("Record batch: {:#?}", batch); // The metadata of batch's schema is missing
let columns = batch.columns().to_vec(); let columns = batch.columns().to_vec();
let new_batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); let new_batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
// println!("Record batch: {:#?}", new_batch); // Should have the metadata now // println!("Record batch: {:#?}", new_batch); // Should have the metadata now

View File

@ -303,36 +303,16 @@ impl PartitionChunk for DbChunk {
chunk_id: chunk.id(), chunk_id: chunk.id(),
})?; })?;
// println!("--- READ FROM RUB:");
// let record_batch = copy.next().unwrap();
// println!("------- Results: {:#?}", record_batch);
Ok(Box::pin(ReadFilterResultsStream::new( Ok(Box::pin(ReadFilterResultsStream::new(
read_results, read_results,
schema.into(), schema.into(),
))) )))
} }
Self::ParquetFile { chunk, .. } => { Self::ParquetFile { chunk, .. } => chunk
chunk .read_filter(table_name, predicate, selection)
.read_filter(table_name, predicate, selection) .context(ParquetFileChunkError {
.context(ParquetFileChunkError { chunk_id: chunk.id(),
chunk_id: chunk.id(), }),
})
// let mut batches = Vec::new();
// // process the record batches one by one
// while let Some(record_batch) = results.next().await.transpose().expect("reading next batch")
// {
// batches.push(record_batch)
// }
// let batches = results
// .unwrap()
// .collect::<Vec<_>>()
// .
// .map(Result::unwrap)
// .collect();
}
} }
} }

View File

@ -154,8 +154,6 @@ async fn test_field_name_plan() {
"+--------+--------+--------+--------+-------------------------------+", "+--------+--------+--------+--------+-------------------------------+",
]; ];
println!("Batches in test_field_name_plan: {:#?}", results);
assert_batches_eq!(expected, &results); assert_batches_eq!(expected, &results);
} }
} }