Merge pull request #5523 from influxdata/cn/more-compactor-tests

refactor: Use more iox test utils in compactor tests
pull/24376/head
kodiakhq[bot] 2022-08-31 19:10:24 +00:00 committed by GitHub
commit 26dfccf26e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 255 additions and 298 deletions

View File

@ -305,318 +305,155 @@ mod tests {
handler::CompactorConfig, handler::CompactorConfig,
}; };
use backoff::BackoffConfig; use backoff::BackoffConfig;
use data_types::{ use data_types::{ColumnType, ColumnTypeCount, CompactionLevel};
ColumnId, ColumnSet, ColumnType, ColumnTypeCount, CompactionLevel, ParquetFileParams,
SequenceNumber, ShardIndex, Timestamp,
};
use iox_query::exec::Executor; use iox_query::exec::Executor;
use iox_tests::util::TestCatalog; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
use iox_time::SystemProvider; use iox_time::SystemProvider;
use parquet_file::storage::ParquetStorage; use parquet_file::storage::ParquetStorage;
use std::{collections::VecDeque, sync::Arc, time::Duration}; use std::{collections::VecDeque, sync::Arc, time::Duration};
use uuid::Uuid;
#[tokio::test] #[tokio::test]
async fn test_compact_hot_partition_candidates() { async fn test_compact_hot_partition_candidates() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
let catalog = TestCatalog::new(); let TestSetup {
compactor,
// Test setup shard,
// Create a scenario of a table of 5 columns: tag, time, field int, field string, field bool. table,
// Thus, each file will have estimated memory buytes = 1125 * row_count (for even row_counts) ..
// The table has n partitions. Each partition has one L0 file and one overlapped L1 file. } = test_setup().await;
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let topic = txn.topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create(
"namespace_hot_partitions_to_compact",
"inf",
topic.id,
pool.id,
)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let _col1 = txn
.columns()
.create_or_get("tag", table.id, ColumnType::Tag)
.await
.unwrap();
let _col2 = txn
.columns()
.create_or_get("time", table.id, ColumnType::Time)
.await
.unwrap();
let _col3 = txn
.columns()
.create_or_get("field_int", table.id, ColumnType::I64)
.await
.unwrap();
let _col4 = txn
.columns()
.create_or_get("field_string", table.id, ColumnType::String)
.await
.unwrap();
let _col5 = txn
.columns()
.create_or_get("field_bool", table.id, ColumnType::Bool)
.await
.unwrap();
let shard = txn
.shards()
.create_or_get(&topic, ShardIndex::new(1))
.await
.unwrap();
// Create a compactor
// Compactor budget : 13,500
let time_provider = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let compactor = Compactor::new(
vec![shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
time_provider,
BackoffConfig::default(),
config,
Arc::new(metric::Registry::new()),
);
// Some times in the past to set to created_at of the files // Some times in the past to set to created_at of the files
let time_one_hour_ago = Timestamp::new( let hot_time_one_hour_ago =
(compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(), (compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos();
);
// P1: // P1:
// L0 2 rows. bytes: 1125 * 2 = 2,250 // L0 2 rows. bytes: 1125 * 2 = 2,250
// L1 2 rows. bytes: 1125 * 2 = 2,250 // L1 2 rows. bytes: 1125 * 2 = 2,250
// total = 2,250 + 2,250 = 4,500 // total = 2,250 + 2,250 = 4,500
let partition1 = txn let partition1 = table.with_shard(&shard).create_partition("one").await;
.partitions()
.create_or_get("one".into(), shard.id, table.id) let pf1_1 = TestParquetFileBuilder::default()
.await .with_min_time(1)
.unwrap(); .with_max_time(5)
// Basic parquet info .with_row_count(2)
let param = ParquetFileParams { .with_compaction_level(CompactionLevel::Initial)
shard_id: shard.id, .with_creation_time(hot_time_one_hour_ago);
namespace_id: namespace.id, partition1.create_parquet_file_catalog_record(pf1_1).await;
table_id: table.id,
partition_id: partition1.id, let pf1_2 = TestParquetFileBuilder::default()
object_store_id: Uuid::new_v4(), .with_min_time(4) // overlapped with pf1_1
max_sequence_number: SequenceNumber::new(100), .with_max_time(6)
min_time: Timestamp::new(1), .with_row_count(2)
max_time: Timestamp::new(5), .with_compaction_level(CompactionLevel::FileNonOverlapped)
file_size_bytes: 1000, // not matter .with_creation_time(hot_time_one_hour_ago);
row_count: 2, partition1.create_parquet_file_catalog_record(pf1_2).await;
compaction_level: CompactionLevel::Initial, // L0
created_at: time_one_hour_ago, // Hot partition
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
let _pf1_1 = txn.parquet_files().create(param.clone()).await.unwrap();
let paramf = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition1.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 2,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf1_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition1.id, &["tag", "time"])
.await
.unwrap();
// P2: // P2:
// L0 2 rows. bytes: 1125 * 2 = 2,250 // L0 2 rows. bytes: 1125 * 2 = 2,250
// L1 2 rows. bytes: 1125 * 2 = 2,250 // L1 2 rows. bytes: 1125 * 2 = 2,250
// total = 2,250 + 2,250 = 4,500 // total = 2,250 + 2,250 = 4,500
let partition2 = txn let partition2 = table.with_shard(&shard).create_partition("two").await;
.partitions()
.create_or_get("two".into(), shard.id, table.id) let pf2_1 = TestParquetFileBuilder::default()
.await .with_min_time(1)
.unwrap(); .with_max_time(5)
let paramf = ParquetFileParams { .with_row_count(2)
object_store_id: Uuid::new_v4(), .with_compaction_level(CompactionLevel::Initial)
partition_id: partition2.id, .with_creation_time(hot_time_one_hour_ago);
min_time: Timestamp::new(1), partition2.create_parquet_file_catalog_record(pf2_1).await;
max_time: Timestamp::new(5),
row_count: 2, let pf2_2 = TestParquetFileBuilder::default()
compaction_level: CompactionLevel::Initial, // L0 .with_min_time(4) // overlapped with pf2_1
..param.clone() .with_max_time(6)
}; .with_row_count(2)
let _pf2_1 = txn.parquet_files().create(paramf).await.unwrap(); .with_compaction_level(CompactionLevel::FileNonOverlapped)
let paramf = ParquetFileParams { .with_creation_time(hot_time_one_hour_ago);
object_store_id: Uuid::new_v4(), partition2.create_parquet_file_catalog_record(pf2_2).await;
partition_id: partition2.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 2,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf2_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition2.id, &["tag", "time"])
.await
.unwrap();
// P3: bytes >= 90% of full budget = 90% * 13,500 = 12,150 // P3: bytes >= 90% of full budget = 90% * 13,500 = 12,150
// L0 6 rows. bytes: 1125 * 6 = 6,750 // L0 6 rows. bytes: 1125 * 6 = 6,750
// L1 4 rows. bytes: 1125 * 4 = 4,500 // L1 4 rows. bytes: 1125 * 4 = 4,500
// total = 6,700 + 4,500 = 12,150 // total = 6,700 + 4,500 = 12,150
let partition3 = txn let partition3 = table.with_shard(&shard).create_partition("three").await;
.partitions() let pf3_1 = TestParquetFileBuilder::default()
.create_or_get("three".into(), shard.id, table.id) .with_min_time(1)
.await .with_max_time(5)
.unwrap(); .with_row_count(6)
let paramf = ParquetFileParams { .with_compaction_level(CompactionLevel::Initial)
object_store_id: Uuid::new_v4(), .with_creation_time(hot_time_one_hour_ago);
partition_id: partition3.id, partition3.create_parquet_file_catalog_record(pf3_1).await;
min_time: Timestamp::new(1),
max_time: Timestamp::new(5), let pf3_2 = TestParquetFileBuilder::default()
row_count: 6, .with_min_time(4) // overlapped with pf3_1
compaction_level: CompactionLevel::Initial, // L0 .with_max_time(6)
..param.clone() .with_row_count(4)
}; .with_compaction_level(CompactionLevel::FileNonOverlapped)
let _pf3_1 = txn.parquet_files().create(paramf).await.unwrap(); .with_creation_time(hot_time_one_hour_ago);
let paramf = ParquetFileParams { partition3.create_parquet_file_catalog_record(pf3_2).await;
object_store_id: Uuid::new_v4(),
partition_id: partition3.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 4,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf3_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition3.id, &["tag", "time"])
.await
.unwrap();
// P4: Over the full budget // P4: Over the full budget
// L0 with 8 rows.bytes = 1125 * 8 = 9,000 // L0 with 8 rows.bytes = 1125 * 8 = 9,000
// L1 with 6 rows.bytes = 1125 * 6 = 6,750 // L1 with 6 rows.bytes = 1125 * 6 = 6,750
// total = 15,750 // total = 15,750
let partition4 = txn let partition4 = table.with_shard(&shard).create_partition("four").await;
.partitions() let pf4_1 = TestParquetFileBuilder::default()
.create_or_get("four".into(), shard.id, table.id) .with_min_time(1)
.await .with_max_time(5)
.unwrap(); .with_row_count(8)
let paramf = ParquetFileParams { .with_compaction_level(CompactionLevel::Initial)
object_store_id: Uuid::new_v4(), .with_creation_time(hot_time_one_hour_ago);
partition_id: partition4.id, partition4.create_parquet_file_catalog_record(pf4_1).await;
min_time: Timestamp::new(1),
max_time: Timestamp::new(5), let pf4_2 = TestParquetFileBuilder::default()
row_count: 8, .with_min_time(4) // overlapped with pf4_1
compaction_level: CompactionLevel::Initial, // L0 .with_max_time(6)
..param.clone() .with_row_count(6)
}; .with_compaction_level(CompactionLevel::FileNonOverlapped)
let _pf4_1 = txn.parquet_files().create(paramf).await.unwrap(); .with_creation_time(hot_time_one_hour_ago);
let paramf = ParquetFileParams { partition4.create_parquet_file_catalog_record(pf4_2).await;
object_store_id: Uuid::new_v4(),
partition_id: partition4.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 6,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf4_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition4.id, &["tag", "time"])
.await
.unwrap();
// P5: // P5:
// L0 with 2 rows.bytes = 1125 * 2 = 2,250 // L0 with 2 rows.bytes = 1125 * 2 = 2,250
// L1 with 2 rows.bytes = 1125 * 2 = 2,250 // L1 with 2 rows.bytes = 1125 * 2 = 2,250
// total = 4,500 // total = 4,500
let partition5 = txn let partition5 = table.with_shard(&shard).create_partition("five").await;
.partitions() let pf5_1 = TestParquetFileBuilder::default()
.create_or_get("five".into(), shard.id, table.id) .with_min_time(1)
.await .with_max_time(5)
.unwrap(); .with_row_count(2)
let paramf = ParquetFileParams { .with_compaction_level(CompactionLevel::Initial)
object_store_id: Uuid::new_v4(), .with_creation_time(hot_time_one_hour_ago);
partition_id: partition5.id, partition5.create_parquet_file_catalog_record(pf5_1).await;
min_time: Timestamp::new(1),
max_time: Timestamp::new(5), let pf5_2 = TestParquetFileBuilder::default()
row_count: 2, .with_min_time(4) // overlapped with pf5_1
compaction_level: CompactionLevel::Initial, // L0 .with_max_time(6)
..param.clone() .with_row_count(2)
}; .with_compaction_level(CompactionLevel::FileNonOverlapped)
let _pf5_1 = txn.parquet_files().create(paramf).await.unwrap(); .with_creation_time(hot_time_one_hour_ago);
let paramf = ParquetFileParams { partition5.create_parquet_file_catalog_record(pf5_2).await;
object_store_id: Uuid::new_v4(),
partition_id: partition5.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 2,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf5_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition5.id, &["tag", "time"])
.await
.unwrap();
// P6: // P6:
// L0 with 2 rows.bytes = 1125 * 2 = 2,250 // L0 with 2 rows.bytes = 1125 * 2 = 2,250
// L1 with 2 rows.bytes = 1125 * 2 = 2,250 // L1 with 2 rows.bytes = 1125 * 2 = 2,250
// total = 4,500 // total = 4,500
let partition6 = txn let partition6 = table.with_shard(&shard).create_partition("six").await;
.partitions() let pf6_1 = TestParquetFileBuilder::default()
.create_or_get("six".into(), shard.id, table.id) .with_min_time(1)
.await .with_max_time(5)
.unwrap(); .with_row_count(2)
let paramf = ParquetFileParams { .with_compaction_level(CompactionLevel::Initial)
object_store_id: Uuid::new_v4(), .with_creation_time(hot_time_one_hour_ago);
partition_id: partition6.id, partition6.create_parquet_file_catalog_record(pf6_1).await;
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
row_count: 2,
compaction_level: CompactionLevel::Initial, // L0
..param.clone()
};
let _pf6_1 = txn.parquet_files().create(paramf).await.unwrap();
let paramf = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition6.id,
min_time: Timestamp::new(4), // overlapped
max_time: Timestamp::new(6),
row_count: 2,
compaction_level: CompactionLevel::FileNonOverlapped, // L1
..param.clone()
};
let _pf6_2 = txn.parquet_files().create(paramf).await.unwrap();
// update sort key
txn.partitions()
.update_sort_key(partition6.id, &["tag", "time"])
.await
.unwrap();
txn.commit().await.unwrap(); let pf6_2 = TestParquetFileBuilder::default()
.with_min_time(4) // overlapped with pf6_1
.with_max_time(6)
.with_row_count(2)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_creation_time(hot_time_one_hour_ago);
partition6.create_parquet_file_catalog_record(pf6_2).await;
// partition candidates: partitions with L0 and overlapped L1 // partition candidates: partitions with L0 and overlapped L1
let candidates = compactor let candidates = compactor
@ -633,7 +470,7 @@ mod tests {
// column types of the partitions // column types of the partitions
let table_columns = compactor.table_columns(&candidates).await.unwrap(); let table_columns = compactor.table_columns(&candidates).await.unwrap();
assert_eq!(table_columns.len(), 1); assert_eq!(table_columns.len(), 1);
let mut cols = table_columns.get(&table.id).unwrap().clone(); let mut cols = table_columns.get(&table.table.id).unwrap().clone();
assert_eq!(cols.len(), 5); assert_eq!(cols.len(), 5);
cols.sort_by_key(|c| c.col_type); cols.sort_by_key(|c| c.col_type);
let mut expected_cols = vec![ let mut expected_cols = vec![
@ -679,7 +516,7 @@ mod tests {
// https://github.com/influxdata/conductor/issues/1130 // https://github.com/influxdata/conductor/issues/1130
// "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema" // "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema"
compact_hot_partition_candidates( compact_hot_partition_candidates(
Arc::new(compactor), Arc::clone(&compactor),
mock_compaction, mock_compaction,
sorted_candidates, sorted_candidates,
table_columns, table_columns,
@ -697,21 +534,21 @@ mod tests {
let g1_candidate1 = &group1[0]; let g1_candidate1 = &group1[0];
assert_eq!(g1_candidate1.budget_bytes(), 4500); assert_eq!(g1_candidate1.budget_bytes(), 4500);
assert_eq!(g1_candidate1.partition.id(), partition1.id); assert_eq!(g1_candidate1.partition.id(), partition1.partition.id);
let g1_candidate1_pf_ids: Vec<_> = let g1_candidate1_pf_ids: Vec<_> =
g1_candidate1.files.iter().map(|pf| pf.id.get()).collect(); g1_candidate1.files.iter().map(|pf| pf.id.get()).collect();
assert_eq!(g1_candidate1_pf_ids, vec![2, 1]); assert_eq!(g1_candidate1_pf_ids, vec![2, 1]);
let g1_candidate2 = &group1[1]; let g1_candidate2 = &group1[1];
assert_eq!(g1_candidate2.budget_bytes(), 4500); assert_eq!(g1_candidate2.budget_bytes(), 4500);
assert_eq!(g1_candidate2.partition.id(), partition2.id); assert_eq!(g1_candidate2.partition.id(), partition2.partition.id);
let g1_candidate2_pf_ids: Vec<_> = let g1_candidate2_pf_ids: Vec<_> =
g1_candidate2.files.iter().map(|pf| pf.id.get()).collect(); g1_candidate2.files.iter().map(|pf| pf.id.get()).collect();
assert_eq!(g1_candidate2_pf_ids, vec![4, 3]); assert_eq!(g1_candidate2_pf_ids, vec![4, 3]);
let g1_candidate3 = &group1[2]; let g1_candidate3 = &group1[2];
assert_eq!(g1_candidate3.budget_bytes(), 4500); assert_eq!(g1_candidate3.budget_bytes(), 4500);
assert_eq!(g1_candidate3.partition.id(), partition5.id); assert_eq!(g1_candidate3.partition.id(), partition5.partition.id);
let g1_candidate3_pf_ids: Vec<_> = let g1_candidate3_pf_ids: Vec<_> =
g1_candidate3.files.iter().map(|pf| pf.id.get()).collect(); g1_candidate3.files.iter().map(|pf| pf.id.get()).collect();
assert_eq!(g1_candidate3_pf_ids, vec![10, 9]); assert_eq!(g1_candidate3_pf_ids, vec![10, 9]);
@ -722,7 +559,7 @@ mod tests {
let g2_candidate1 = &group2[0]; let g2_candidate1 = &group2[0];
assert_eq!(g2_candidate1.budget_bytes(), 4500); assert_eq!(g2_candidate1.budget_bytes(), 4500);
assert_eq!(g2_candidate1.partition.id(), partition6.id); assert_eq!(g2_candidate1.partition.id(), partition6.partition.id);
let g2_candidate1_pf_ids: Vec<_> = let g2_candidate1_pf_ids: Vec<_> =
g2_candidate1.files.iter().map(|pf| pf.id.get()).collect(); g2_candidate1.files.iter().map(|pf| pf.id.get()).collect();
assert_eq!(g2_candidate1_pf_ids, vec![12, 11]); assert_eq!(g2_candidate1_pf_ids, vec![12, 11]);
@ -733,7 +570,7 @@ mod tests {
let g3_candidate1 = &group3[0]; let g3_candidate1 = &group3[0];
assert_eq!(g3_candidate1.budget_bytes(), 11250); assert_eq!(g3_candidate1.budget_bytes(), 11250);
assert_eq!(g3_candidate1.partition.id(), partition3.id); assert_eq!(g3_candidate1.partition.id(), partition3.partition.id);
let g3_candidate1_pf_ids: Vec<_> = let g3_candidate1_pf_ids: Vec<_> =
g3_candidate1.files.iter().map(|pf| pf.id.get()).collect(); g3_candidate1.files.iter().map(|pf| pf.id.get()).collect();
assert_eq!(g3_candidate1_pf_ids, vec![6, 5]); assert_eq!(g3_candidate1_pf_ids, vec![6, 5]);
@ -763,4 +600,52 @@ mod tests {
memory_budget_bytes, memory_budget_bytes,
) )
} }
struct TestSetup {
compactor: Arc<Compactor>,
shard: Arc<TestShard>,
table: Arc<TestTable>,
}
async fn test_setup() -> TestSetup {
let catalog = TestCatalog::new();
let namespace = catalog
.create_namespace("namespace_hot_partitions_to_compact")
.await;
let shard = namespace.create_shard(1).await;
// Create a scenario of a table of 5 columns: tag, time, field int, field string, field
// bool. Thus, each file will have estimated memory bytes = 1125 * row_count (for even
// row_counts).
let table = namespace.create_table("test_table").await;
table.create_column("tag", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
table.create_column("field_int", ColumnType::I64).await;
table
.create_column("field_string", ColumnType::String)
.await;
table.create_column("field_bool", ColumnType::Bool).await;
// Create a compactor
// Compactor budget : 13,500
let time_provider = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let compactor = Arc::new(Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
time_provider,
BackoffConfig::default(),
config,
Arc::new(metric::Registry::new()),
));
TestSetup {
compactor,
shard,
table,
}
}
} }

View File

@ -470,7 +470,8 @@ impl TestPartition {
}) })
} }
/// Create a Parquet file in this partition with attributes specified by the builder /// Create a Parquet file in this partition in object storage and the catalog with attributes
/// specified by the builder
pub async fn create_parquet_file( pub async fn create_parquet_file(
self: &Arc<Self>, self: &Arc<Self>,
builder: TestParquetFileBuilder, builder: TestParquetFileBuilder,
@ -479,31 +480,36 @@ impl TestPartition {
record_batch, record_batch,
table, table,
schema, schema,
max_seq, max_sequence_number,
min_time, min_time,
max_time, max_time,
file_size_bytes, file_size_bytes,
creation_time, creation_time,
compaction_level, compaction_level,
to_delete, to_delete,
object_store_id,
row_count,
} = builder; } = builder;
let record_batch = record_batch.expect("A record batch is required"); let record_batch = record_batch.expect("A record batch is required");
let table = table.expect("A table is required"); let table = table.expect("A table is required");
let schema = schema.expect("A schema is required"); let schema = schema.expect("A schema is required");
assert_eq!( assert_eq!(
table, self.table.table.name, table, self.table.table.name,
"Table name of line protocol and partition should have matched", "Table name of line protocol and partition should have matched",
); );
assert!(
row_count.is_none(),
"Cannot have both a record batch and a manually set row_count!"
);
let row_count = record_batch.num_rows(); let row_count = record_batch.num_rows();
assert!(row_count > 0, "Parquet file must have at least 1 row"); assert!(row_count > 0, "Parquet file must have at least 1 row");
let (record_batch, sort_key) = sort_batch(record_batch, schema); let (record_batch, sort_key) = sort_batch(record_batch, schema.clone());
let record_batch = dedup_batch(record_batch, &sort_key); let record_batch = dedup_batch(record_batch, &sort_key);
let object_store_id = Uuid::new_v4(); let object_store_id = object_store_id.unwrap_or_else(Uuid::new_v4);
let max_sequence_number = SequenceNumber::new(max_seq);
let metadata = IoxMetadata { let metadata = IoxMetadata {
object_store_id, object_store_id,
creation_timestamp: now(), creation_timestamp: now(),
@ -518,31 +524,87 @@ impl TestPartition {
compaction_level: CompactionLevel::Initial, compaction_level: CompactionLevel::Initial,
sort_key: Some(sort_key.clone()), sort_key: Some(sort_key.clone()),
}; };
let table_catalog_schema = self.table.catalog_schema().await;
let column_set = ColumnSet::new(record_batch.schema().fields().iter().map(|f| {
table_catalog_schema
.columns
.get(f.name())
.expect("Column registered")
.id
}));
let real_file_size_bytes = create_parquet_file( let real_file_size_bytes = create_parquet_file(
ParquetStorage::new(Arc::clone(&self.catalog.object_store)), ParquetStorage::new(Arc::clone(&self.catalog.object_store)),
&metadata, &metadata,
record_batch, record_batch.clone(),
) )
.await; .await;
let builder = TestParquetFileBuilder {
record_batch: Some(record_batch),
table: Some(table),
schema: Some(schema),
max_sequence_number,
min_time,
max_time,
file_size_bytes: Some(file_size_bytes.unwrap_or(real_file_size_bytes as u64)),
creation_time,
compaction_level,
to_delete,
object_store_id: Some(object_store_id),
row_count: None, // will be computed from the record batch again
};
let result = self.create_parquet_file_catalog_record(builder).await;
let mut repos = self.catalog.catalog.repositories().await;
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
result
}
/// Only update the catalog with the builder's info, don't create anything in object storage.
/// Record batch is not required in this case.
pub async fn create_parquet_file_catalog_record(
self: &Arc<Self>,
builder: TestParquetFileBuilder,
) -> TestParquetFile {
let TestParquetFileBuilder {
record_batch,
max_sequence_number,
min_time,
max_time,
file_size_bytes,
creation_time,
compaction_level,
to_delete,
object_store_id,
row_count,
..
} = builder;
let table_catalog_schema = self.table.catalog_schema().await;
let (row_count, column_set) = if let Some(record_batch) = record_batch {
let column_set = ColumnSet::new(record_batch.schema().fields().iter().map(|f| {
table_catalog_schema
.columns
.get(f.name())
.expect("Column registered")
.id
}));
assert!(
row_count.is_none(),
"Cannot have both a record batch and a manually set row_count!"
);
(record_batch.num_rows(), column_set)
} else {
let column_set =
ColumnSet::new(table_catalog_schema.columns.values().map(|col| col.id));
(row_count.unwrap_or(0), column_set)
};
let parquet_file_params = ParquetFileParams { let parquet_file_params = ParquetFileParams {
shard_id: self.shard.shard.id, shard_id: self.shard.shard.id,
namespace_id: self.namespace.namespace.id, namespace_id: self.namespace.namespace.id,
table_id: self.table.table.id, table_id: self.table.table.id,
partition_id: self.partition.id, partition_id: self.partition.id,
object_store_id, object_store_id: object_store_id.unwrap_or_else(Uuid::new_v4),
max_sequence_number, max_sequence_number,
min_time: Timestamp::new(min_time), min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time), max_time: Timestamp::new(max_time),
file_size_bytes: file_size_bytes.unwrap_or(real_file_size_bytes as u64) as i64, file_size_bytes: file_size_bytes.unwrap_or(0) as i64,
row_count: row_count as i64, row_count: row_count as i64,
created_at: Timestamp::new(creation_time), created_at: Timestamp::new(creation_time),
compaction_level, compaction_level,
@ -555,7 +617,6 @@ impl TestPartition {
.create(parquet_file_params) .create(parquet_file_params)
.await .await
.unwrap(); .unwrap();
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
if to_delete { if to_delete {
repos repos
@ -582,13 +643,15 @@ pub struct TestParquetFileBuilder {
record_batch: Option<RecordBatch>, record_batch: Option<RecordBatch>,
table: Option<String>, table: Option<String>,
schema: Option<Schema>, schema: Option<Schema>,
max_seq: i64, max_sequence_number: SequenceNumber,
min_time: i64, min_time: i64,
max_time: i64, max_time: i64,
file_size_bytes: Option<u64>, file_size_bytes: Option<u64>,
creation_time: i64, creation_time: i64,
compaction_level: CompactionLevel, compaction_level: CompactionLevel,
to_delete: bool, to_delete: bool,
object_store_id: Option<Uuid>,
row_count: Option<usize>,
} }
impl Default for TestParquetFileBuilder { impl Default for TestParquetFileBuilder {
@ -597,13 +660,15 @@ impl Default for TestParquetFileBuilder {
record_batch: None, record_batch: None,
table: None, table: None,
schema: None, schema: None,
max_seq: 100, max_sequence_number: SequenceNumber::new(100),
min_time: now().timestamp_nanos(), min_time: now().timestamp_nanos(),
max_time: now().timestamp_nanos(), max_time: now().timestamp_nanos(),
file_size_bytes: None, file_size_bytes: None,
creation_time: 1, creation_time: 1,
compaction_level: CompactionLevel::Initial, compaction_level: CompactionLevel::Initial,
to_delete: false, to_delete: false,
object_store_id: None,
row_count: None,
} }
} }
} }
@ -638,7 +703,7 @@ impl TestParquetFileBuilder {
/// Specify the maximum sequence number for the parquet file metadata. /// Specify the maximum sequence number for the parquet file metadata.
pub fn with_max_seq(mut self, max_seq: i64) -> Self { pub fn with_max_seq(mut self, max_seq: i64) -> Self {
self.max_seq = max_seq; self.max_sequence_number = SequenceNumber::new(max_seq);
self self
} }
@ -677,6 +742,13 @@ impl TestParquetFileBuilder {
self.to_delete = to_delete; self.to_delete = to_delete;
self self
} }
/// Specify the number of rows in this parquet file. If line protocol/record batch are also
/// set, this will panic! Only use this when you're not specifying any rows!
pub fn with_row_count(mut self, row_count: usize) -> Self {
self.row_count = Some(row_count);
self
}
} }
async fn update_catalog_sort_key_if_needed( async fn update_catalog_sort_key_if_needed(