feat: Compact cold partitions completely

Fixes #5330.
pull/24376/head
Carol (Nichols || Goulding) 2022-08-12 15:58:54 -04:00
parent 327446f0cd
commit 10ba3fef47
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 601 additions and 35 deletions

View File

@ -135,9 +135,20 @@ macro_rules! gen_compactor_config {
)]
pub cold_input_size_threshold_bytes: u64,
/// A compaction operation for cold partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total number of L0 + L1 files
/// crosses this threshold.
/// Desired max size of cold compacted parquet files.
/// It is a target desired value, rather than a guarantee.
///
/// Default is 1024 * 1024 * 100 = 104,857,600 bytes (100MB)
#[clap(
long = "--compaction-cold-max-desired-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_COLD_MAX_DESIRED_FILE_SIZE_BYTES",
default_value = "104857600",
action
)]
pub cold_max_desired_file_size_bytes: u64,
/// A compaction operation will gather as many L0 files with their overlapping L1 files
/// to compact together until the total number of L0 + L1 files crosses this threshold.
/// Later compactions will pick up the remaining L0 files.
///
/// A compaction operation will be limited by this or by the cold input size threshold,
@ -167,8 +178,8 @@ macro_rules! gen_compactor_config {
/// The memory budget asigned to this compactor.
/// For each partition candidate, we will esimate the memory needed to compact each file
/// and only add more files if their needed estimated memory is below this memory budget.
/// Since we must compact L1 files that overlapped with L0 files, if their total estimated
/// memory do not allow us to compact a part of a partition at all, we will not compact
/// Since we must compact L1 files that overlapped with L0 files, if their total estimated
/// memory do not allow us to compact a part of a partition at all, we will not compact
/// it and will log the partition and its related information in a table in our catalog for
/// further diagnosis of the issue.
/// How many candidates compacted concurrently are also decided using this estimation and
@ -205,6 +216,7 @@ impl CompactorOnceConfig {
min_number_recent_ingested_files_per_partition: self
.min_number_recent_ingested_files_per_partition,
cold_input_size_threshold_bytes: self.cold_input_size_threshold_bytes,
cold_max_desired_file_size_bytes: self.cold_max_desired_file_size_bytes,
cold_input_file_count_threshold: self.cold_input_file_count_threshold,
hot_multiple: self.hot_multiple,
memory_budget_bytes: self.memory_budget_bytes,

View File

@ -803,6 +803,7 @@ mod tests {
max_number_partitions_per_shard: 1,
min_number_recent_ingested_files_per_partition: 1,
cold_input_size_threshold_bytes: 600 * 1024 * 1024,
cold_max_desired_file_size_bytes: 104_857_600,
cold_input_file_count_threshold: 100,
hot_multiple: 4,
memory_budget_bytes: 10 * 1024 * 1024,

View File

@ -391,7 +391,7 @@ pub(crate) async fn compact_one_hot_partition(
let compact_result = parquet_file_combining::compact_parquet_files(
to_compact.files,
partition,
Arc::new(partition),
Arc::clone(&compactor.catalog),
compactor.store.clone(),
Arc::clone(&compactor.exec),
@ -920,6 +920,7 @@ mod tests {
max_number_partitions_per_shard: 1,
min_number_recent_ingested_files_per_partition: 1,
cold_input_size_threshold_bytes: 600 * 1024 * 1024,
cold_max_desired_file_size_bytes: 104_857_600,
cold_input_file_count_threshold: 100,
hot_multiple: 4,
memory_budget_bytes: 100_000_000,
@ -1157,6 +1158,7 @@ mod tests {
max_number_partitions_per_shard: 100,
min_number_recent_ingested_files_per_partition: 1,
cold_input_size_threshold_bytes: 600 * 1024 * 1024,
cold_max_desired_file_size_bytes: 104_857_600,
cold_input_file_count_threshold: 100,
hot_multiple: 4,
memory_budget_bytes: 12 * 1125, // 13,500 bytes

View File

@ -124,9 +124,13 @@ pub struct CompactorConfig {
/// threshold. Later compactions will pick up the remaining L0 files.
pub cold_input_size_threshold_bytes: u64,
/// A compaction operation or cold partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total number of L0 + L1 files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
/// Desired max size of cold compacted parquet files.
/// It is a target desired value, rather than a guarantee.
pub cold_max_desired_file_size_bytes: u64,
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total number of L0 + L1 files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
///
/// A compaction operation will be limited by this or by the input size threshold, whichever is
/// hit first.
@ -267,6 +271,7 @@ async fn compact_cold_partitions(compactor: Arc<Compactor>) -> usize {
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let p = Arc::new(p);
let compaction_result =
crate::compact_cold_partition(&comp, p, &Default::default()).await;

View File

@ -22,7 +22,10 @@ pub mod query;
pub mod server;
pub mod utils;
use crate::compact::{Compactor, PartitionCompactionCandidateWithInfo};
use crate::{
compact::{Compactor, PartitionCompactionCandidateWithInfo},
parquet_file_lookup::ParquetFilesForCompaction,
};
use data_types::{CompactionLevel, ParquetFileId};
use metric::Attributes;
use snafu::{ResultExt, Snafu};
@ -49,21 +52,22 @@ pub(crate) enum Error {
/// One compaction operation of one cold partition
///
/// Takes a hash-map `size_overrides` that mocks the size of the detected [`CompactorParquetFile`]s. This will
/// influence the size calculation of the compactor (i.e. when/how to compact files), but leave the actual physical
/// size of the file as it is (i.e. file deserialization can still rely on the original value).
/// Takes a hash-map `size_overrides` that mocks the size of the detected
/// [`CompactorParquetFile`]s. This will influence the size calculation of the compactor (i.e.
/// when/how to compact files), but leave the actual physical size of the file as it is (i.e. file
/// deserialization can still rely on the original value).
///
/// [`CompactorParquetFile`]: crate::parquet_file::CompactorParquetFile
pub(crate) async fn compact_cold_partition(
compactor: &Compactor,
partition: PartitionCompactionCandidateWithInfo,
partition: Arc<PartitionCompactionCandidateWithInfo>,
size_overrides: &HashMap<ParquetFileId, i64>,
) -> Result<(), Error> {
let start_time = compactor.time_provider.now();
let shard_id = partition.shard_id();
compact_remaining_level_0_files(compactor, partition, size_overrides).await?;
full_compaction().await?;
compact_remaining_level_0_files(compactor, Arc::clone(&partition), size_overrides).await?;
full_compaction(compactor, partition, size_overrides).await?;
let attributes = Attributes::from([
("shard_id", format!("{}", shard_id).into()),
@ -83,7 +87,7 @@ pub(crate) async fn compact_cold_partition(
async fn compact_remaining_level_0_files(
compactor: &Compactor,
partition: PartitionCompactionCandidateWithInfo,
partition: Arc<PartitionCompactionCandidateWithInfo>,
size_overrides: &HashMap<ParquetFileId, i64>,
) -> Result<(), Error> {
let parquet_files_for_compaction =
@ -132,8 +136,68 @@ async fn compact_remaining_level_0_files(
Ok(())
}
async fn full_compaction() -> Result<(), Error> {
todo!()
/// Given a partition that needs to have full compaction run,
///
/// - Select all files in the partition, which this method assumes will only be level 1
/// without overlaps (level 0 and level 2 files will be ignored)
/// - Split the files into groups based on size take files in the list until the current group size
/// is greater than cold_max_desired_file_size_bytes
/// - Compact each group into a new level 2 file, no splitting
async fn full_compaction(
compactor: &Compactor,
partition: Arc<PartitionCompactionCandidateWithInfo>,
size_overrides: &HashMap<ParquetFileId, i64>,
) -> Result<(), Error> {
// select all files in this partition
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition(
Arc::clone(&compactor.catalog),
partition.id(),
size_overrides,
)
.await
.context(LookupSnafu)?;
let ParquetFilesForCompaction {
level_1,
.. // Ignore other levels
} = parquet_files_for_compaction;
let num_files = level_1.len();
let mut group_file_size_bytes = 0;
let max_file_size_bytes = compactor.config.cold_max_desired_file_size_bytes;
let mut group = Vec::with_capacity(num_files);
let mut groups = Vec::with_capacity(num_files);
for file in level_1 {
group_file_size_bytes += file.file_size_bytes() as u64;
group.push(file);
if group_file_size_bytes >= max_file_size_bytes {
groups.push(group);
group = Vec::with_capacity(num_files);
group_file_size_bytes = 0;
}
}
if !group.is_empty() {
groups.push(group);
}
for group in groups {
// TODO: if there's only 1 level 1 file in the group, upgrade to level 2 w/o compaction?
parquet_file_combining::compact_final_no_splits(
group,
Arc::clone(&partition),
Arc::clone(&compactor.catalog),
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
)
.await
.context(CombiningSnafu)?;
}
Ok(())
}
#[cfg(test)]
@ -150,7 +214,7 @@ mod tests {
use std::time::Duration;
#[tokio::test]
async fn test_compact_cold_partition_many_files() {
async fn test_compact_remaining_level_0_files_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
@ -303,9 +367,9 @@ mod tests {
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let c = Arc::new(candidates.pop_front().unwrap());
compact_cold_partition(&compactor, c, &size_overrides)
compact_remaining_level_0_files(&compactor, c, &size_overrides)
.await
.unwrap();
@ -370,7 +434,7 @@ mod tests {
}
#[tokio::test]
async fn test_compact_cold_partition_one_level_0_without_overlap() {
async fn test_compact_remaining_level_0_files_one_level_0_without_overlap() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
@ -452,9 +516,9 @@ mod tests {
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let c = Arc::new(candidates.pop_front().unwrap());
compact_cold_partition(&compactor, c, &size_overrides)
compact_remaining_level_0_files(&compactor, c, &size_overrides)
.await
.unwrap();
@ -519,9 +583,225 @@ mod tests {
max_number_partitions_per_shard: 1,
min_number_recent_ingested_files_per_partition: 1,
cold_input_size_threshold_bytes: 600 * 1024 * 1024,
cold_max_desired_file_size_bytes: 104_857_600,
cold_input_file_count_threshold: 100,
hot_multiple: 4,
memory_budget_bytes: 100_000_000,
}
}
#[tokio::test]
async fn full_cold_compaction_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_creation_time(time_38_hour_ago);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
compactor.config.max_desired_file_size_bytes as i64 + 10,
);
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_creation_time(time_38_hour_ago);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
100, // small file
);
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_creation_time(time_38_hour_ago);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
100, // small file
);
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_creation_time(time_38_hour_ago);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
100, // small file
);
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
100, // small file
);
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
100, // small file
);
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
// Compact
let candidates = compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = Arc::new(candidates.pop_front().unwrap());
compact_cold_partition(&compactor, c, &size_overrides)
.await
.unwrap();
// Should have 1 non-soft-deleted file:
//
// - the level 2 file created after combining all 3 level 1 files created by the first step
// of compaction to compact remaining level 0 files
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 1);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final),]);
// ------------------------------------------------
// Verify the parquet file content
let file = files.pop().unwrap();
let batches = table.read_parquet_file(file).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+--------------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"| 421 | | OH | 21 | 1970-01-01T00:00:00.000091Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"| 81601 | | PA | 15 | 1970-01-01T00:00:00.000090Z |",
"+-----------+------+------+------+--------------------------------+",
],
&batches
);
}
}

View File

@ -74,7 +74,7 @@ pub(crate) enum Error {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn compact_parquet_files(
files: Vec<CompactorParquetFile>,
partition: PartitionCompactionCandidateWithInfo,
partition: Arc<PartitionCompactionCandidateWithInfo>,
// The global catalog for schema, parquet files and tombstones
catalog: Arc<dyn Catalog>,
// Object store for reading input parquet files and writing compacted parquet files
@ -95,7 +95,7 @@ pub(crate) async fn compact_parquet_files(
// 1 - `split_percentage` in the later compacted file.
percentage_max_file_size: u16,
// When data is between a "small" and "large" amount, split the compacted files at roughly this
// percentage in the earlier compacted file, and the remainder .in the later compacted file.
// percentage in the earlier compacted file, and the remainder in the later compacted file.
split_percentage: u16,
) -> Result<(), Error> {
let partition_id = partition.id();
@ -241,8 +241,6 @@ pub(crate) async fn compact_parquet_files(
.await
.context(CompactPhysicalPlanSnafu)?;
let partition = Arc::new(partition);
// Run to collect each stream of the plan
let stream_count = physical_plan.output_partitioning().partition_count();
@ -364,6 +362,195 @@ pub(crate) async fn compact_parquet_files(
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn compact_final_no_splits(
files: Vec<CompactorParquetFile>,
partition: Arc<PartitionCompactionCandidateWithInfo>,
// The global catalog for schema, parquet files and tombstones
catalog: Arc<dyn Catalog>,
// Object store for reading input parquet files and writing compacted parquet files
store: ParquetStorage,
// Executor for running queries, compacting, and persisting
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
let partition_id = partition.id();
let num_files = files.len();
ensure!(
num_files > 0,
NotEnoughParquetFilesSnafu {
num_files,
partition_id
}
);
debug!(
?partition_id,
num_files, "final compaction of files to level 2"
);
// Collect all the parquet file IDs, to be able to set their catalog records to be
// deleted. These should already be unique, no need to dedupe.
let original_parquet_file_ids: Vec<_> = files.iter().map(|f| f.id()).collect();
// Convert the input files into QueryableParquetChunk for making query plan
let query_chunks: Vec<_> = files
.into_iter()
.map(|file| {
to_queryable_parquet_chunk(
file,
store.clone(),
partition.table.name.clone(),
&partition.table_schema,
partition.sort_key.clone(),
)
})
.collect();
trace!(
n_query_chunks = query_chunks.len(),
"gathered parquet data to compact"
);
// Compute max sequence numbers and min/max time
// unwrap here will work because the len of the query_chunks already >= 1
let (head, tail) = query_chunks.split_first().unwrap();
let mut max_sequence_number = head.max_sequence_number();
let mut min_time = head.min_time();
let mut max_time = head.max_time();
for c in tail {
max_sequence_number = max(max_sequence_number, c.max_sequence_number());
min_time = min(min_time, c.min_time());
max_time = max(max_time, c.max_time());
}
// Merge schema of the compacting chunks
let query_chunks: Vec<_> = query_chunks
.into_iter()
.map(|c| Arc::new(c) as Arc<dyn QueryChunk>)
.collect();
let merged_schema = QueryableParquetChunk::merge_schemas(&query_chunks);
debug!(
num_cols = merged_schema.as_arrow().fields().len(),
"Number of columns in the merged schema to build query plan"
);
// All partitions in the catalog MUST contain a sort key.
let sort_key = partition
.sort_key
.as_ref()
.expect("no partition sort key in catalog")
.filter_to(&merged_schema.primary_key());
let ctx = exec.new_context(ExecutorType::Reorg);
// Compact everything into one file
let plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
.compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone())
.context(CompactLogicalPlanSnafu)?;
let ctx = exec.new_context(ExecutorType::Reorg);
let physical_plan = ctx
.create_physical_plan(&plan)
.await
.context(CompactPhysicalPlanSnafu)?;
// Run to collect each stream of the plan
let stream_count = physical_plan.output_partitioning().partition_count();
debug!("running plan with {} streams", stream_count);
// These streams *must* to run in parallel otherwise a deadlock
// can occur. Since there is a merge in the plan, in order to make
// progress on one stream there must be (potential space) on the
// other streams.
//
// https://github.com/influxdata/influxdb_iox/issues/4306
// https://github.com/influxdata/influxdb_iox/issues/4324
let compacted_parquet_files = (0..stream_count)
.map(|i| {
// Prepare variables to pass to the closure
let ctx = exec.new_context(ExecutorType::Reorg);
let physical_plan = Arc::clone(&physical_plan);
let store = store.clone();
let time_provider = Arc::clone(&time_provider);
let sort_key = sort_key.clone();
let partition = Arc::clone(&partition);
// run as a separate tokio task so files can be written
// concurrently.
tokio::task::spawn(async move {
trace!(partition = i, "executing datafusion partition");
let data = ctx
.execute_stream_partitioned(physical_plan, i)
.await
.context(ExecuteCompactPlanSnafu)?;
trace!(partition = i, "built result stream for partition");
let meta = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: time_provider.now(),
shard_id: partition.shard_id(),
namespace_id: partition.namespace_id(),
namespace_name: partition.namespace.name.clone().into(),
table_id: partition.table.id,
table_name: partition.table.name.clone().into(),
partition_id,
partition_key: partition.partition_key.clone(),
max_sequence_number,
compaction_level: CompactionLevel::Final,
sort_key: Some(sort_key.clone()),
};
debug!(
?partition_id,
"executing and uploading compaction StreamSplitExec"
);
let object_store_id = meta.object_store_id;
info!(?partition_id, %object_store_id, "streaming exec to object store");
// Stream the record batches from the compaction exec, serialize
// them, and directly upload the resulting Parquet files to
// object storage.
let (parquet_meta, file_size) =
store.upload(data, &meta).await.context(PersistSnafu)?;
debug!(?partition_id, %object_store_id, "file uploaded to object store");
let parquet_file =
meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| {
partition
.table_schema
.columns
.get(name)
.expect("unknown column")
.id
});
Ok(parquet_file)
})
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Check for errors in the task
.map(|t| t.context(ExecuteParquetTaskSnafu)?)
.try_collect::<Vec<_>>()
.await?;
update_catalog(
catalog,
partition_id,
compacted_parquet_files,
&original_parquet_file_ids,
)
.await
.context(CatalogSnafu { partition_id })?;
info!(?partition_id, "compaction complete");
Ok(())
}
/// Convert ParquetFile to a QueryableParquetChunk
fn to_queryable_parquet_chunk(
file: CompactorParquetFile,
@ -530,7 +717,7 @@ mod tests {
struct TestSetup {
catalog: Arc<TestCatalog>,
table: Arc<TestTable>,
candidate_partition: PartitionCompactionCandidateWithInfo,
candidate_partition: Arc<PartitionCompactionCandidateWithInfo>,
parquet_files: Vec<CompactorParquetFile>,
}
@ -556,7 +743,7 @@ mod tests {
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
let partition = partition.update_sort_key(sort_key.clone()).await;
let candidate_partition = PartitionCompactionCandidateWithInfo {
let candidate_partition = Arc::new(PartitionCompactionCandidateWithInfo {
table: Arc::new(table.table.clone()),
table_schema: Arc::new(table_schema),
namespace: Arc::new(ns.namespace.clone()),
@ -568,7 +755,7 @@ mod tests {
},
sort_key: partition.partition.sort_key(),
partition_key: partition.partition.partition_key.clone(),
};
});
let lp = vec![
"table,tag2=PA,tag3=15 field_int=1601i 30000",

View File

@ -133,6 +133,7 @@ pub(crate) fn filter_hot_parquet_files(
let ParquetFilesForCompaction {
level_0,
level_1: mut remaining_level_1,
.. // Ignore other levels
} = parquet_files_for_compaction;
if level_0.is_empty() {
@ -300,6 +301,7 @@ pub(crate) fn filter_cold_parquet_files(
let ParquetFilesForCompaction {
level_0,
level_1: mut remaining_level_1,
.. // Ignore other levels
} = parquet_files_for_compaction;
if level_0.is_empty() {
@ -631,6 +633,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -658,6 +661,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![ParquetFileBuilder::level_0().id(1).build()],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -684,6 +688,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![ParquetFileBuilder::level_0().id(1).build()],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -741,6 +746,7 @@ mod tests {
.max_time(500)
.build(),
],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -845,6 +851,7 @@ mod tests {
.file_size_bytes(10)
.build(),
],
level_2: vec![],
};
// total needed budget for one file with a tag, a time and 11 rows = 1176
@ -926,6 +933,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -945,6 +953,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![ParquetFileBuilder::level_0().id(1).build()],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -965,6 +974,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction {
level_0: vec![ParquetFileBuilder::level_0().id(1).build()],
level_1: vec![],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -1001,6 +1011,7 @@ mod tests {
.max_time(500)
.build(),
],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -1044,6 +1055,7 @@ mod tests {
.max_time(500)
.build(),
],
level_2: vec![],
};
let (files_metric, bytes_metric) = metrics();
@ -1108,6 +1120,7 @@ mod tests {
.file_size_bytes(10)
.build(),
],
level_2: vec![],
};
// all level 0 files & no level 1 files get returned
@ -1217,6 +1230,7 @@ mod tests {
.file_size_bytes(10)
.build(),
],
level_2: vec![],
};
// Max size 0; only the first level 0 file and its overlapping level 1 files get

View File

@ -32,6 +32,9 @@ pub(crate) struct ParquetFilesForCompaction {
/// Parquet files for a partition with `CompactionLevel::FileNonOverlapped`. Arbitrary order.
pub(crate) level_1: Vec<CompactorParquetFile>,
/// Parquet files for a partition with `CompactionLevel::Final`. Arbitrary order.
pub(crate) level_2: Vec<CompactorParquetFile>,
}
impl ParquetFilesForCompaction {
@ -62,6 +65,7 @@ impl ParquetFilesForCompaction {
let mut level_0 = Vec::with_capacity(parquet_files.len());
let mut level_1 = Vec::with_capacity(parquet_files.len());
let mut level_2 = Vec::with_capacity(parquet_files.len());
for parquet_file in parquet_files {
let parquet_file = match size_overrides.get(&parquet_file.id) {
@ -71,12 +75,17 @@ impl ParquetFilesForCompaction {
match parquet_file.compaction_level() {
CompactionLevel::Initial => level_0.push(parquet_file),
CompactionLevel::FileNonOverlapped => level_1.push(parquet_file),
CompactionLevel::Final => level_2.push(parquet_file),
}
}
level_0.sort_by_key(|pf| pf.max_sequence_number());
Ok(Self { level_0, level_1 })
Ok(Self {
level_0,
level_1,
level_2,
})
}
}
@ -256,7 +265,46 @@ mod tests {
}
#[tokio::test]
async fn one_level_0_file_one_level_1_file_gets_returned() {
async fn one_level_2_file_gets_returned() {
test_helpers::maybe_start_logging();
let TestSetup {
catalog, partition, ..
} = test_setup().await;
// Create a level 2 file
let builder = TestParquetFileBuilder::default()
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
.with_compaction_level(CompactionLevel::Final);
let parquet_file = partition.create_parquet_file(builder).await;
let parquet_files_for_compaction = ParquetFilesForCompaction::for_partition(
Arc::clone(&catalog.catalog),
partition.partition.id,
&HashMap::default(),
)
.await
.unwrap();
assert!(
parquet_files_for_compaction.level_0.is_empty(),
"Expected empty, got: {:#?}",
parquet_files_for_compaction.level_0
);
assert!(
parquet_files_for_compaction.level_1.is_empty(),
"Expected empty, got: {:#?}",
parquet_files_for_compaction.level_1
);
assert_eq!(
parquet_files_for_compaction.level_2,
vec![parquet_file.parquet_file.into()]
);
}
#[tokio::test]
async fn one_file_of_each_level_gets_returned() {
test_helpers::maybe_start_logging();
let TestSetup {
catalog, partition, ..
@ -274,6 +322,12 @@ mod tests {
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let l1 = partition.create_parquet_file(builder).await;
// Create a level 2 file
let builder = TestParquetFileBuilder::default()
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
.with_compaction_level(CompactionLevel::Final);
let l2 = partition.create_parquet_file(builder).await;
let parquet_files_for_compaction = ParquetFilesForCompaction::for_partition(
Arc::clone(&catalog.catalog),
partition.partition.id,
@ -291,6 +345,11 @@ mod tests {
parquet_files_for_compaction.level_1,
vec![l1.parquet_file.into()]
);
assert_eq!(
parquet_files_for_compaction.level_2,
vec![l2.parquet_file.into()]
);
}
#[tokio::test]

View File

@ -237,7 +237,7 @@ impl QueryChunk for QueryableParquetChunk {
fn order(&self) -> ChunkOrder {
match self.compaction_level {
CompactionLevel::Initial => ChunkOrder::new(self.max_sequence_number.get()),
CompactionLevel::FileNonOverlapped => ChunkOrder::new(0),
CompactionLevel::FileNonOverlapped | CompactionLevel::Final => ChunkOrder::new(0),
}
}

View File

@ -41,6 +41,9 @@ pub enum CompactionLevel {
Initial = 0,
/// Level of files persisted by a Compactor that do not overlap with non-level-0 files.
FileNonOverlapped = 1,
/// Level of files persisted by a Compactor that are fully compacted and should not be
/// recompacted unless a new overlapping Initial level file arrives
Final = 2,
}
impl TryFrom<i32> for CompactionLevel {

View File

@ -431,6 +431,7 @@ impl Config {
max_number_partitions_per_shard: 1,
min_number_recent_ingested_files_per_partition: 1,
cold_input_size_threshold_bytes: 629_145_600,
cold_max_desired_file_size_bytes: 104_857_600,
cold_input_file_count_threshold: 100,
hot_multiple: 4,
memory_budget_bytes: 300_000,

View File

@ -203,6 +203,7 @@ pub async fn build_compactor_from_config(
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
cold_input_size_threshold_bytes,
cold_max_desired_file_size_bytes,
cold_input_file_count_threshold,
hot_multiple,
memory_budget_bytes,
@ -217,6 +218,7 @@ pub async fn build_compactor_from_config(
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
cold_input_size_threshold_bytes,
cold_max_desired_file_size_bytes,
cold_input_file_count_threshold,
hot_multiple,
memory_budget_bytes,