refactor: add parquet chunk generator (#2209) (#3163)

* refactor: add parquet chunk generator (#2209)

* fix: tests

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-11-19 12:35:18 +00:00 committed by GitHub
parent 0b3df2ab50
commit ca4e0ad13b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 184 additions and 143 deletions

View File

@ -870,26 +870,15 @@ mod tests {
use schema::TIME_COLUMN_NAME;
use crate::test_utils::{
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk,
make_chunk_no_row_group, make_iox_object_store, TestSize,
};
use crate::test_utils::create_partition_and_database_checkpoint;
use crate::test_utils::generator::{ChunkGenerator, GeneratorConfig};
#[tokio::test]
async fn test_restore_from_file() {
// setup: preserve chunk to object store
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
// step 1: read back schema
@ -911,18 +900,9 @@ mod tests {
#[tokio::test]
async fn test_restore_from_thrift() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let data = parquet_metadata.thrift_bytes().to_vec();
let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data);
let decoded = parquet_metadata.decode().unwrap();
@ -941,18 +921,10 @@ mod tests {
#[tokio::test]
async fn test_restore_from_file_no_row_group() {
// setup: preserve chunk to object store
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk_no_row_group(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
// step 1: read back schema
@ -971,18 +943,11 @@ mod tests {
#[tokio::test]
async fn test_restore_from_thrift_no_row_group() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk_no_row_group(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let data = parquet_metadata.thrift_bytes().to_vec();
let parquet_metadata = IoxParquetMetaData::from_thrift_bytes(data);
let decoded = parquet_metadata.decode().unwrap();
@ -1002,18 +967,9 @@ mod tests {
#[tokio::test]
async fn test_make_chunk() {
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
assert!(decoded.md.num_row_groups() > 1);
@ -1040,18 +996,10 @@ mod tests {
#[tokio::test]
async fn test_make_chunk_no_row_group() {
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk_no_row_group(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let mut generator = ChunkGenerator::new().await;
generator.set_config(GeneratorConfig::NoData);
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
let decoded = parquet_metadata.decode().unwrap();
assert_eq!(decoded.md.num_row_groups(), 0);
@ -1113,18 +1061,9 @@ mod tests {
#[tokio::test]
async fn test_parquet_metadata_size() {
// setup: preserve chunk to object store
let iox_object_store = make_iox_object_store().await;
let chunk = make_chunk(
Arc::clone(&iox_object_store),
"foo",
chunk_addr(1),
TestSize::Full,
)
.await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
assert_eq!(parquet_metadata.size(), 3730);
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let parquet_metadata = chunk.parquet_metadata();
assert_eq!(parquet_metadata.size(), 3729);
}
}

View File

@ -429,17 +429,14 @@ impl TryClone for MemWriter {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::generator::ChunkGenerator;
use crate::test_utils::{
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store,
make_chunk_given_record_batch, make_iox_object_store, make_record_batch,
read_data_from_parquet_data, TestSize,
create_partition_and_database_checkpoint, load_parquet_from_store, make_iox_object_store,
make_record_batch, read_data_from_parquet_data, TestSize,
};
use arrow::array::{ArrayRef, StringArray};
use arrow_util::assert_batches_eq;
use data_types::{
chunk_metadata::{ChunkId, ChunkOrder},
partition_metadata::TableSummary,
};
use data_types::chunk_metadata::{ChunkId, ChunkOrder};
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion_util::MemoryStream;
use parquet::schema::types::ColumnPath;
@ -584,37 +581,17 @@ mod tests {
#[tokio::test]
async fn test_write_read() {
////////////////////
// Create test data which is also the expected data
let addr = chunk_addr(1);
let table = Arc::clone(&addr.table_name);
let (record_batches, schema, column_summaries, num_rows) =
make_record_batch("foo", TestSize::Full);
let mut table_summary = TableSummary::new(table.to_string());
table_summary.columns = column_summaries.clone();
let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches
let key_value_metadata = record_batch.schema().metadata().clone();
////////////////////
// Make an OS in memory
let store = make_iox_object_store().await;
////////////////////
// Store the data as a chunk and write it to in the object store
// This test Storage::write_to_object_store
let chunk = make_chunk_given_record_batch(
Arc::clone(&store),
record_batches.clone(),
schema.clone(),
addr,
column_summaries.clone(),
)
.await;
// This tests Storage::write_to_object_store
let mut generator = ChunkGenerator::new().await;
let (chunk, _) = generator.generate().await;
let key_value_metadata = chunk.schema().as_arrow().metadata().clone();
////////////////////
// Now let read it back
//
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(&store))
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store()))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
@ -622,7 +599,7 @@ mod tests {
//
// 1. Check metadata at file level: Everything is correct
let schema_actual = decoded.read_schema().unwrap();
assert_eq!(Arc::new(schema.clone()), schema_actual);
assert_eq!(chunk.schema(), schema_actual);
assert_eq!(
key_value_metadata.clone(),
schema_actual.as_arrow().metadata().clone()
@ -630,22 +607,19 @@ mod tests {
// 2. Check statistics
let table_summary_actual = decoded.read_statistics(&schema_actual).unwrap();
assert_eq!(table_summary_actual, table_summary.columns);
assert_eq!(table_summary_actual, chunk.table_summary().columns);
// 3. Check data
// Note that the read_data_from_parquet_data function fixes the row-group/batches' level metadata bug in arrow
let actual_record_batches =
read_data_from_parquet_data(Arc::clone(&schema.as_arrow()), parquet_data);
read_data_from_parquet_data(chunk.schema().as_arrow(), parquet_data);
let mut actual_num_rows = 0;
for batch in actual_record_batches.clone() {
actual_num_rows += batch.num_rows();
// Check if record batch has meta data
let batch_key_value_metadata = batch.schema().metadata().clone();
assert_eq!(
schema.as_arrow().metadata().clone(),
batch_key_value_metadata
);
assert_eq!(key_value_metadata, batch_key_value_metadata);
}
// Now verify return results. This assert_batches_eq still works correctly without the metadata
@ -660,8 +634,7 @@ mod tests {
"| foo | | | | foo | | | | 4 | 9223372036854775807 | | | 4 | 18446744073709551615 | | | 40.1 | 1 | -0 | NaN | NaN | | | false | | | 1970-01-01T00:00:00.000004Z |",
"+----------------+---------------+-------------------+------------------+-------------------------+------------------------+----------------------------+---------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+----------------------+-------------------------+------------------------+----------------------+-------------------+--------------------+------------------------+-----------------------+-------------------------+------------------------+-----------------------+--------------------------+-------------------------+-----------------------------+",
];
assert_eq!(num_rows, actual_num_rows);
assert_batches_eq!(expected.clone(), &record_batches);
assert_eq!(chunk.rows(), actual_num_rows);
assert_batches_eq!(expected, &actual_record_batches);
}
}

View File

@ -36,6 +36,8 @@ use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use time::Time;
use uuid::Uuid;
pub mod generator;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error getting data from object store: {}", source))]
@ -114,6 +116,8 @@ pub fn chunk_addr(id: u128) -> ChunkAddr {
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
///
/// TODO(raphael): Replace with ChunkGenerator
pub async fn make_chunk(
iox_object_store: Arc<IoxObjectStore>,
column_prefix: &str,
@ -132,21 +136,12 @@ pub async fn make_chunk(
.await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk_no_row_group(
store: Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
test_size: TestSize,
) -> ParquetChunk {
let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix, test_size);
make_chunk_given_record_batch(store, vec![], schema, addr, column_summaries).await
}
/// Create a test chunk by writing data to object store.
///
/// TODO: This code creates a chunk that isn't hooked up with metrics
pub async fn make_chunk_given_record_batch(
///
/// TODO(raphael): Replace with ChunkGenerator
async fn make_chunk_given_record_batch(
iox_object_store: Arc<IoxObjectStore>,
record_batches: Vec<RecordBatch>,
schema: Schema,
@ -895,7 +890,7 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
/// Create test metadata by creating a parquet file and reading it back into memory.
///
/// See [`make_chunk`] for details.
/// TODO(raphael): Replace with ChunkGenerator
pub async fn make_metadata(
iox_object_store: &Arc<IoxObjectStore>,
column_prefix: &str,

View File

@ -0,0 +1,134 @@
use crate::chunk::{ChunkMetrics, ParquetChunk};
use crate::metadata::IoxMetadata;
use crate::storage::Storage;
use crate::test_utils::{
create_partition_and_database_checkpoint, make_iox_object_store, make_record_batch, TestSize,
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use data_types::partition_metadata::{PartitionAddr, TableSummary};
use datafusion_util::MemoryStream;
use iox_object_store::IoxObjectStore;
use std::sync::Arc;
use time::Time;
/// Controls the number of row groups to generate for chunks
#[derive(Debug, Copy, Clone)]
pub enum GeneratorConfig {
/// Generates schema but skips generating data
NoData,
/// Generates 3 row groups with a limited selection of columns
Simple,
/// Generates 3 row groups with a wide variety of different columns
Full,
}
/// A generator of persisted chunks for use in tests
#[derive(Debug)]
pub struct ChunkGenerator {
iox_object_store: Arc<IoxObjectStore>,
storage: Storage,
column_prefix: String,
config: GeneratorConfig,
partition: PartitionAddr,
next_chunk: u32,
}
impl ChunkGenerator {
pub async fn new() -> Self {
Self::new_with_store(make_iox_object_store().await)
}
pub fn new_with_store(iox_object_store: Arc<IoxObjectStore>) -> Self {
let storage = Storage::new(Arc::clone(&iox_object_store));
Self {
iox_object_store,
storage,
column_prefix: "foo".to_string(),
config: GeneratorConfig::Full,
partition: PartitionAddr {
db_name: Arc::from("db1"),
table_name: Arc::from("table1"),
partition_key: Arc::from("part1"),
},
next_chunk: 1,
}
}
pub fn store(&self) -> &Arc<IoxObjectStore> {
&self.iox_object_store
}
pub fn set_config(&mut self, config: GeneratorConfig) {
self.config = config;
}
fn next_chunk(&mut self) -> (ChunkId, ChunkOrder) {
let t = self.next_chunk;
self.next_chunk += 1;
(ChunkId::new_test(t as _), ChunkOrder::new(t).unwrap())
}
pub async fn generate(&mut self) -> (ParquetChunk, IoxMetadata) {
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&self.partition.table_name),
Arc::clone(&self.partition.partition_key),
);
let (chunk_id, chunk_order) = self.next_chunk();
let chunk_addr = ChunkAddr::new(&self.partition, chunk_id);
let metadata = IoxMetadata {
creation_timestamp: Time::from_timestamp(10, 20),
table_name: Arc::clone(&self.partition.table_name),
partition_key: Arc::clone(&self.partition.partition_key),
chunk_id,
chunk_order,
partition_checkpoint,
database_checkpoint,
time_of_first_write: Time::from_timestamp(30, 40),
time_of_last_write: Time::from_timestamp(50, 60),
};
let (record_batches, schema, column_summaries, rows) = match self.config {
GeneratorConfig::NoData => {
// Generating an entire row group just for its metadata seems wasteful
let (_, schema, column_summaries, _) =
make_record_batch(&self.column_prefix, TestSize::Minimal);
// Note: column summaries here are inconsistent with the actual data?
(vec![], schema, column_summaries, 0)
}
GeneratorConfig::Simple => make_record_batch(&self.column_prefix, TestSize::Minimal),
GeneratorConfig::Full => make_record_batch(&self.column_prefix, TestSize::Full),
};
let table_summary = TableSummary {
name: self.partition.table_name.to_string(),
columns: column_summaries,
};
let stream = Box::pin(MemoryStream::new_with_schema(
record_batches,
Arc::clone(schema.inner()),
));
let (path, file_size_bytes, parquet_metadata) = self
.storage
.write_to_object_store(chunk_addr, stream, metadata.clone())
.await
.unwrap();
let chunk = ParquetChunk::new_from_parts(
Arc::clone(&self.partition.partition_key),
Arc::new(table_summary),
Arc::new(schema),
&path,
Arc::clone(&self.iox_object_store),
file_size_bytes,
Arc::new(parquet_metadata),
rows,
ChunkMetrics::new_unregistered(),
);
(chunk, metadata)
}
}