From ca4e0ad13b7681b6da6a16410cee6c1681e836bd Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 19 Nov 2021 12:35:18 +0000 Subject: [PATCH] refactor: add parquet chunk generator (#2209) (#3163) * refactor: add parquet chunk generator (#2209) * fix: tests Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- parquet_file/src/metadata.rs | 117 +++++--------------- parquet_file/src/storage.rs | 55 +++------- parquet_file/src/test_utils.rs | 21 ++-- parquet_file/src/test_utils/generator.rs | 134 +++++++++++++++++++++++ 4 files changed, 184 insertions(+), 143 deletions(-) create mode 100644 parquet_file/src/test_utils/generator.rs diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 0cdafcb202..805f015f92 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -870,26 +870,15 @@ mod tests { use schema::TIME_COLUMN_NAME; - use crate::test_utils::{ - chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk, - make_chunk_no_row_group, make_iox_object_store, TestSize, - }; + use crate::test_utils::create_partition_and_database_checkpoint; + use crate::test_utils::generator::{ChunkGenerator, GeneratorConfig}; #[tokio::test] async fn test_restore_from_file() { // setup: preserve chunk to object store - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); // step 1: read back schema @@ -911,18 +900,9 @@ mod tests { #[tokio::test] async fn test_restore_from_thrift() { // setup: write chunk to object store and only keep thrift-encoded metadata - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); let data = parquet_metadata.thrift_bytes().to_vec(); let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data); let decoded = parquet_metadata.decode().unwrap(); @@ -941,18 +921,10 @@ mod tests { #[tokio::test] async fn test_restore_from_file_no_row_group() { // setup: preserve chunk to object store - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk_no_row_group( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + generator.set_config(GeneratorConfig::NoData); + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); // step 1: read back schema @@ -971,18 +943,11 @@ mod tests { #[tokio::test] async fn test_restore_from_thrift_no_row_group() { // setup: write chunk to object store and only keep thrift-encoded metadata - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk_no_row_group( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + generator.set_config(GeneratorConfig::NoData); + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); + let data = parquet_metadata.thrift_bytes().to_vec(); let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data); let decoded = parquet_metadata.decode().unwrap(); @@ -1002,18 +967,9 @@ mod tests { #[tokio::test] async fn test_make_chunk() { - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); assert!(decoded.md.num_row_groups() > 1); @@ -1040,18 +996,10 @@ mod tests { #[tokio::test] async fn test_make_chunk_no_row_group() { - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk_no_row_group( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); + let mut generator = ChunkGenerator::new().await; + generator.set_config(GeneratorConfig::NoData); + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); assert_eq!(decoded.md.num_row_groups(), 0); @@ -1113,18 +1061,9 @@ mod tests { #[tokio::test] async fn test_parquet_metadata_size() { // setup: preserve chunk to object store - let iox_object_store = make_iox_object_store().await; - let chunk = make_chunk( - Arc::clone(&iox_object_store), - "foo", - chunk_addr(1), - TestSize::Full, - ) - .await; - let parquet_data = load_parquet_from_store(&chunk, iox_object_store) - .await - .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap(); - assert_eq!(parquet_metadata.size(), 3730); + let mut generator = ChunkGenerator::new().await; + let (chunk, _) = generator.generate().await; + let parquet_metadata = chunk.parquet_metadata(); + assert_eq!(parquet_metadata.size(), 3729); } } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 73c2e02cbd..f68d63a72c 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -429,17 +429,14 @@ impl TryClone for MemWriter { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::generator::ChunkGenerator; use crate::test_utils::{ - chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, - make_chunk_given_record_batch, make_iox_object_store, make_record_batch, - read_data_from_parquet_data, TestSize, + create_partition_and_database_checkpoint, load_parquet_from_store, make_iox_object_store, + make_record_batch, read_data_from_parquet_data, TestSize, }; use arrow::array::{ArrayRef, StringArray}; use arrow_util::assert_batches_eq; - use data_types::{ - chunk_metadata::{ChunkId, ChunkOrder}, - partition_metadata::TableSummary, - }; + use data_types::chunk_metadata::{ChunkId, ChunkOrder}; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion_util::MemoryStream; use parquet::schema::types::ColumnPath; @@ -584,37 +581,17 @@ mod tests { #[tokio::test] async fn test_write_read() { - //////////////////// - // Create test data which is also the expected data - let addr = chunk_addr(1); - let table = Arc::clone(&addr.table_name); - let (record_batches, schema, column_summaries, num_rows) = - make_record_batch("foo", TestSize::Full); - let mut table_summary = TableSummary::new(table.to_string()); - table_summary.columns = column_summaries.clone(); - let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches - let key_value_metadata = record_batch.schema().metadata().clone(); - - //////////////////// - // Make an OS in memory - let store = make_iox_object_store().await; - //////////////////// // Store the data as a chunk and write it to in the object store - // This test Storage::write_to_object_store - let chunk = make_chunk_given_record_batch( - Arc::clone(&store), - record_batches.clone(), - schema.clone(), - addr, - column_summaries.clone(), - ) - .await; + // This tests Storage::write_to_object_store + let mut generator = ChunkGenerator::new().await; + let (chunk, _) = generator.generate().await; + let key_value_metadata = chunk.schema().as_arrow().metadata().clone(); //////////////////// // Now let read it back // - let parquet_data = load_parquet_from_store(&chunk, Arc::clone(&store)) + let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store())) .await .unwrap(); let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); @@ -622,7 +599,7 @@ mod tests { // // 1. Check metadata at file level: Everything is correct let schema_actual = decoded.read_schema().unwrap(); - assert_eq!(Arc::new(schema.clone()), schema_actual); + assert_eq!(chunk.schema(), schema_actual); assert_eq!( key_value_metadata.clone(), schema_actual.as_arrow().metadata().clone() @@ -630,22 +607,19 @@ mod tests { // 2. Check statistics let table_summary_actual = decoded.read_statistics(&schema_actual).unwrap(); - assert_eq!(table_summary_actual, table_summary.columns); + assert_eq!(table_summary_actual, chunk.table_summary().columns); // 3. Check data // Note that the read_data_from_parquet_data function fixes the row-group/batches' level metadata bug in arrow let actual_record_batches = - read_data_from_parquet_data(Arc::clone(&schema.as_arrow()), parquet_data); + read_data_from_parquet_data(chunk.schema().as_arrow(), parquet_data); let mut actual_num_rows = 0; for batch in actual_record_batches.clone() { actual_num_rows += batch.num_rows(); // Check if record batch has meta data let batch_key_value_metadata = batch.schema().metadata().clone(); - assert_eq!( - schema.as_arrow().metadata().clone(), - batch_key_value_metadata - ); + assert_eq!(key_value_metadata, batch_key_value_metadata); } // Now verify return results. This assert_batches_eq still works correctly without the metadata @@ -660,8 +634,7 @@ mod tests { "| foo | | | | foo | | | | 4 | 9223372036854775807 | | | 4 | 18446744073709551615 | | | 40.1 | 1 | -0 | NaN | NaN | | | false | | | 1970-01-01T00:00:00.000004Z |", "+----------------+---------------+-------------------+------------------+-------------------------+------------------------+----------------------------+---------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+-------------------+--------------------+------------------------+-----------------------+-------------------------+------------------------+-----------------------+--------------------------+-------------------------+-----------------------------+", ]; - assert_eq!(num_rows, actual_num_rows); - assert_batches_eq!(expected.clone(), &record_batches); + assert_eq!(chunk.rows(), actual_num_rows); assert_batches_eq!(expected, &actual_record_batches); } } diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index ceccf79735..cbf0cb79b6 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -36,6 +36,8 @@ use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc}; use time::Time; use uuid::Uuid; +pub mod generator; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Error getting data from object store: {}", source))] @@ -114,6 +116,8 @@ pub fn chunk_addr(id: u128) -> ChunkAddr { } /// Same as [`make_chunk`] but parquet file does not contain any row group. +/// +/// TODO(raphael): Replace with ChunkGenerator pub async fn make_chunk( iox_object_store: Arc, column_prefix: &str, @@ -132,21 +136,12 @@ pub async fn make_chunk( .await } -/// Same as [`make_chunk`] but parquet file does not contain any row group. -pub async fn make_chunk_no_row_group( - store: Arc, - column_prefix: &str, - addr: ChunkAddr, - test_size: TestSize, -) -> ParquetChunk { - let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix, test_size); - make_chunk_given_record_batch(store, vec![], schema, addr, column_summaries).await -} - /// Create a test chunk by writing data to object store. /// /// TODO: This code creates a chunk that isn't hooked up with metrics -pub async fn make_chunk_given_record_batch( +/// +/// TODO(raphael): Replace with ChunkGenerator +async fn make_chunk_given_record_batch( iox_object_store: Arc, record_batches: Vec, schema: Schema, @@ -895,7 +890,7 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> /// Create test metadata by creating a parquet file and reading it back into memory. /// -/// See [`make_chunk`] for details. +/// TODO(raphael): Replace with ChunkGenerator pub async fn make_metadata( iox_object_store: &Arc, column_prefix: &str, diff --git a/parquet_file/src/test_utils/generator.rs b/parquet_file/src/test_utils/generator.rs new file mode 100644 index 0000000000..a06ea3b85c --- /dev/null +++ b/parquet_file/src/test_utils/generator.rs @@ -0,0 +1,134 @@ +use crate::chunk::{ChunkMetrics, ParquetChunk}; +use crate::metadata::IoxMetadata; +use crate::storage::Storage; +use crate::test_utils::{ + create_partition_and_database_checkpoint, make_iox_object_store, make_record_batch, TestSize, +}; +use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}; +use data_types::partition_metadata::{PartitionAddr, TableSummary}; +use datafusion_util::MemoryStream; +use iox_object_store::IoxObjectStore; +use std::sync::Arc; +use time::Time; + +/// Controls the number of row groups to generate for chunks +#[derive(Debug, Copy, Clone)] +pub enum GeneratorConfig { + /// Generates schema but skips generating data + NoData, + /// Generates 3 row groups with a limited selection of columns + Simple, + /// Generates 3 row groups with a wide variety of different columns + Full, +} + +/// A generator of persisted chunks for use in tests +#[derive(Debug)] +pub struct ChunkGenerator { + iox_object_store: Arc, + storage: Storage, + column_prefix: String, + config: GeneratorConfig, + partition: PartitionAddr, + next_chunk: u32, +} + +impl ChunkGenerator { + pub async fn new() -> Self { + Self::new_with_store(make_iox_object_store().await) + } + + pub fn new_with_store(iox_object_store: Arc) -> Self { + let storage = Storage::new(Arc::clone(&iox_object_store)); + Self { + iox_object_store, + storage, + column_prefix: "foo".to_string(), + config: GeneratorConfig::Full, + partition: PartitionAddr { + db_name: Arc::from("db1"), + table_name: Arc::from("table1"), + partition_key: Arc::from("part1"), + }, + next_chunk: 1, + } + } + + pub fn store(&self) -> &Arc { + &self.iox_object_store + } + + pub fn set_config(&mut self, config: GeneratorConfig) { + self.config = config; + } + + fn next_chunk(&mut self) -> (ChunkId, ChunkOrder) { + let t = self.next_chunk; + self.next_chunk += 1; + (ChunkId::new_test(t as _), ChunkOrder::new(t).unwrap()) + } + + pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) { + let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( + Arc::clone(&self.partition.table_name), + Arc::clone(&self.partition.partition_key), + ); + + let (chunk_id, chunk_order) = self.next_chunk(); + let chunk_addr = ChunkAddr::new(&self.partition, chunk_id); + + let metadata = IoxMetadata { + creation_timestamp: Time::from_timestamp(10, 20), + table_name: Arc::clone(&self.partition.table_name), + partition_key: Arc::clone(&self.partition.partition_key), + chunk_id, + chunk_order, + partition_checkpoint, + database_checkpoint, + time_of_first_write: Time::from_timestamp(30, 40), + time_of_last_write: Time::from_timestamp(50, 60), + }; + + let (record_batches, schema, column_summaries, rows) = match self.config { + GeneratorConfig::NoData => { + // Generating an entire row group just for its metadata seems wasteful + let (_, schema, column_summaries, _) = + make_record_batch(&self.column_prefix, TestSize::Minimal); + // Note: column summaries here are inconsistent with the actual data? + (vec![], schema, column_summaries, 0) + } + GeneratorConfig::Simple => make_record_batch(&self.column_prefix, TestSize::Minimal), + GeneratorConfig::Full => make_record_batch(&self.column_prefix, TestSize::Full), + }; + + let table_summary = TableSummary { + name: self.partition.table_name.to_string(), + columns: column_summaries, + }; + + let stream = Box::pin(MemoryStream::new_with_schema( + record_batches, + Arc::clone(schema.inner()), + )); + + let (path, file_size_bytes, parquet_metadata) = self + .storage + .write_to_object_store(chunk_addr, stream, metadata.clone()) + .await + .unwrap(); + + let chunk = ParquetChunk::new_from_parts( + Arc::clone(&self.partition.partition_key), + Arc::new(table_summary), + Arc::new(schema), + &path, + Arc::clone(&self.iox_object_store), + file_size_bytes, + Arc::new(parquet_metadata), + rows, + ChunkMetrics::new_unregistered(), + ); + + (chunk, metadata) + } +}