refactor: Extract read_parquet_file test helper function to iox_tests::utils

pull/24376/head
Carol (Nichols || Goulding) 2022-09-01 13:24:02 -04:00
parent eee7007c71
commit 85fb0acea6
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 34 additions and 34 deletions

View File

@ -169,13 +169,12 @@ pub(crate) async fn compact_cold_partition(
mod tests {
use super::*;
use crate::handler::CompactorConfig;
use ::parquet_file::{storage::ParquetStorage, ParquetFilePath};
use arrow::record_batch::RecordBatch;
use ::parquet_file::storage::ParquetStorage;
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel, ParquetFile};
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use iox_time::{SystemProvider, TimeProvider};
use std::time::Duration;
@ -400,7 +399,7 @@ mod tests {
// Later compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&table, file1).await;
let batches = table.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
@ -416,7 +415,7 @@ mod tests {
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = read_parquet_file(&table, file0).await;
let batches = table.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
@ -620,7 +619,7 @@ mod tests {
// Later compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&table, file1).await;
let batches = table.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
@ -636,7 +635,7 @@ mod tests {
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = read_parquet_file(&table, file0).await;
let batches = table.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
@ -768,7 +767,7 @@ mod tests {
// Later compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&table, file1).await;
let batches = table.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+-----------------------------+",
@ -783,7 +782,7 @@ mod tests {
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = read_parquet_file(&table, file0).await;
let batches = table.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+--------------------------------+",
@ -797,29 +796,6 @@ mod tests {
);
}
async fn read_parquet_file(table: &Arc<TestTable>, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(table.catalog.object_store());
// get schema
let table_catalog_schema = table.catalog_schema().await;
let column_id_lookup = table_catalog_schema.column_id_map();
let table_schema = table.schema().await;
let selection: Vec<_> = file
.column_set
.iter()
.map(|id| *column_id_lookup.get(id).unwrap())
.collect();
let schema = table_schema.select_by_names(&selection).unwrap();
let path: ParquetFilePath = (&file).into();
let rx = storage
.read_all(schema.as_arrow(), &path, file.file_size_bytes as usize)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await
.unwrap()
}
fn make_compactor_config() -> CompactorConfig {
let max_desired_file_size_bytes = 10_000;
let percentage_max_file_size = 30;

View File

@ -20,7 +20,7 @@ use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{memory::InMemory, DynObjectStore};
use observability_deps::tracing::debug;
use once_cell::sync::Lazy;
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage, ParquetFilePath};
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, compute_sort_key, SortKey},
@ -334,6 +334,30 @@ impl TestTable {
pub async fn schema(&self) -> Schema {
self.catalog_schema().await.try_into().unwrap()
}
/// Read the record batches from the specified Parquet File associated with this table.
pub async fn read_parquet_file(&self, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(self.catalog.object_store());
// get schema
let table_catalog_schema = self.catalog_schema().await;
let column_id_lookup = table_catalog_schema.column_id_map();
let table_schema = self.schema().await;
let selection: Vec<_> = file
.column_set
.iter()
.map(|id| *column_id_lookup.get(id).unwrap())
.collect();
let schema = table_schema.select_by_names(&selection).unwrap();
let path: ParquetFilePath = (&file).into();
let rx = storage
.read_all(schema.as_arrow(), &path, file.file_size_bytes as usize)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await
.unwrap()
}
}
/// A test column.