From 840923abab56f5f5f7fedf2a622e5f976a3d9eac Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 20 Jan 2023 17:34:50 -0500 Subject: [PATCH] refactor: execute compaction plan (#6654) * chore: address review comment of previous PR * refactor: execute compact plan * refactor: we will now compact all L0 and L1 files of a partition and split them as needed * chore: comnents Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 2 + clap_blocks/src/compactor2.rs | 42 +++ compactor2/Cargo.toml | 2 + .../src/components/compact/compact_builder.rs | 209 ++++++++++++++- .../components/compact/compact_executor.rs | 193 ++++++++++++++ .../src/components/compact/compact_files.rs | 116 ++++++++- compactor2/src/components/compact/mod.rs | 1 + .../src/components/compact/partition.rs | 7 +- .../src/components/compact/query_chunk.rs | 17 +- .../components/namespaces_source/catalog.rs | 30 ++- .../src/components/namespaces_source/mock.rs | 66 ++++- .../src/components/namespaces_source/mod.rs | 9 +- compactor2/src/config.rs | 20 ++ compactor2/src/driver.rs | 218 ++++++++++++++-- compactor2/src/test_util.rs | 240 +++++++++++++++++- iox_tests/src/util.rs | 8 + ioxd_compactor2/src/lib.rs | 3 + 17 files changed, 1096 insertions(+), 87 deletions(-) create mode 100644 compactor2/src/components/compact/compact_executor.rs diff --git a/Cargo.lock b/Cargo.lock index c54693454f..1fd75c12ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -994,6 +994,7 @@ dependencies = [ name = "compactor2" version = "0.1.0" dependencies = [ + "arrow_util", "async-trait", "backoff", "data_types", @@ -1001,6 +1002,7 @@ dependencies = [ "futures", "iox_catalog", "iox_query", + "iox_tests", "iox_time", "metric", "observability_deps", diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs index b6b8596fb6..cb2fc27baf 100644 --- a/clap_blocks/src/compactor2.rs +++ b/clap_blocks/src/compactor2.rs @@ -43,4 +43,46 @@ pub struct Compactor2Config { action )] pub exec_mem_pool_bytes: usize, + + /// Desired max size of compacted parquet files. + /// It is a target desired value, rather than a guarantee. + /// 1024 * 1024 * 25 = 26,214,400 (25MB) + #[clap( + long = "compaction-max-desired-size-bytes", + env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES", + default_value = "26214400", + action + )] + pub max_desired_file_size_bytes: u64, + + /// Percentage of desired max file size. + /// If the estimated compacted result is too small, no need to split it. + /// This percentage is to determine how small it is: + /// < percentage_max_file_size * max_desired_file_size_bytes: + /// This value must be between (0, 100) + /// Default is 20 + #[clap( + long = "compaction-percentage-max-file_size", + env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE", + default_value = "20", + action + )] + pub percentage_max_file_size: u16, + + /// Split file percentage + /// If the estimated compacted result is neither too small nor too large, it will be + /// split into 2 files determined by this percentage. + /// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes + /// . Too large means: > max_desired_file_size_bytes + /// . Any size in the middle will be considered neither too small nor too large + /// + /// This value must be between (0, 100) + /// Default is 80 + #[clap( + long = "compaction-split-percentage", + env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE", + default_value = "80", + action + )] + pub split_percentage: u16, } diff --git a/compactor2/Cargo.toml b/compactor2/Cargo.toml index 828f2e9f17..65a293d059 100644 --- a/compactor2/Cargo.toml +++ b/compactor2/Cargo.toml @@ -27,4 +27,6 @@ uuid = { version = "1", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] +arrow_util = { path = "../arrow_util" } +iox_tests = { path = "../iox_tests" } test_helpers = { path = "../test_helpers"} diff --git a/compactor2/src/components/compact/compact_builder.rs b/compactor2/src/components/compact/compact_builder.rs index 6c8e6d88bc..b345383abc 100644 --- a/compactor2/src/components/compact/compact_builder.rs +++ b/compactor2/src/components/compact/compact_builder.rs @@ -4,7 +4,7 @@ use std::{ }; use data_types::{CompactionLevel, ParquetFile, TimestampMinMax}; -use datafusion::logical_expr::LogicalPlan; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, @@ -24,13 +24,19 @@ use super::partition::PartitionInfo; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] -pub(crate) enum Error { +pub enum Error { #[snafu(display("Error building compact logical plan {}", source))] CompactLogicalPlan { source: iox_query::frontend::reorg::Error, }, + + #[snafu(display("Error building compact physical plan {}", source))] + CompactPhysicalPlan { source: DataFusionError }, } +/// Observer function (for testing) that is invoked on the physical plan to be run +type PlanObserver = Box; + /// Builder for compaction plans pub(crate) struct CompactPlanBuilder { // Partition of files to compact @@ -43,6 +49,8 @@ pub(crate) struct CompactPlanBuilder { percentage_max_file_size: u16, split_percentage: u16, target_level: CompactionLevel, + // This is for plan observation for testing + plan_observer: Option, } impl CompactPlanBuilder { @@ -59,17 +67,26 @@ impl CompactPlanBuilder { store: config.parquet_store.clone(), exec: Arc::clone(&config.exec), _time_provider: Arc::clone(&config.time_provider), - // TODO: make these configurable - max_desired_file_size_bytes: 100 * 1024 * 1024, - percentage_max_file_size: 30, - split_percentage: 90, + max_desired_file_size_bytes: config.max_desired_file_size_bytes, + percentage_max_file_size: config.percentage_max_file_size, + split_percentage: config.split_percentage, target_level: compaction_level, + plan_observer: None, } } - /// Builds a logical compact plan respecting the specified file boundaries + /// specify a function to call on the created physical plan, prior to its execution (used for testing) + #[cfg(test)] + #[allow(dead_code)] + pub fn with_plan_observer(mut self, plan_observer: PlanObserver) -> Self { + self.plan_observer = Some(plan_observer); + self + } + + /// Builds a compact plan respecting the specified file boundaries /// This functon assumes that the compaction-levels of the files are either target_level or target_level-1 - pub fn build_logical_compact_plan(self) -> Result { + pub async fn build_compact_plan(self) -> Result, Error> { + //Result { let Self { partition, files, @@ -80,6 +97,7 @@ impl CompactPlanBuilder { percentage_max_file_size, split_percentage, target_level, + plan_observer, } = self; // total file size is the sum of the file sizes of the files to compact @@ -143,6 +161,7 @@ impl CompactPlanBuilder { let (small_cutoff_bytes, large_cutoff_bytes) = Self::cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size); + // Build logical compact plan let ctx = exec.new_context(ExecutorType::Reorg); let plan = if total_size <= small_cutoff_bytes { // Compact everything into one file @@ -196,7 +215,17 @@ impl CompactPlanBuilder { } }; - Ok(plan) + // Build physical compact plan + let physical_plan = ctx + .create_physical_plan(&plan) + .await + .context(CompactPhysicalPlanSnafu)?; + + if let Some(plan_observer) = plan_observer { + plan_observer(physical_plan.as_ref()); + } + + Ok(physical_plan) } // compute cut off bytes for files @@ -270,3 +299,165 @@ impl CompactPlanBuilder { .any(|&chunk| chunk.max >= min_time && chunk.min <= max_time) } } + +#[cfg(test)] +mod tests { + use data_types::TimestampMinMax; + + use crate::components::compact::compact_builder::CompactPlanBuilder; + + #[test] + fn test_cutoff_bytes() { + let (small, large) = CompactPlanBuilder::cutoff_bytes(100, 30); + assert_eq!(small, 30); + assert_eq!(large, 130); + + let (small, large) = CompactPlanBuilder::cutoff_bytes(100 * 1024 * 1024, 30); + assert_eq!(small, 30 * 1024 * 1024); + assert_eq!(large, 130 * 1024 * 1024); + + let (small, large) = CompactPlanBuilder::cutoff_bytes(100, 60); + assert_eq!(small, 60); + assert_eq!(large, 160); + } + + #[test] + fn test_compute_split_time() { + let min_time = 1; + let max_time = 11; + let total_size = 100; + let max_desired_file_size = 100; + let chunk_times = vec![TimestampMinMax { + min: min_time, + max: max_time, + }]; + + // no split + let result = CompactPlanBuilder::compute_split_time( + chunk_times.clone(), + min_time, + max_time, + total_size, + max_desired_file_size, + ); + assert_eq!(result.len(), 1); + assert_eq!(result[0], max_time); + + // split 70% and 30% + let max_desired_file_size = 70; + let result = CompactPlanBuilder::compute_split_time( + chunk_times.clone(), + min_time, + max_time, + total_size, + max_desired_file_size, + ); + // only need to store the last split time + assert_eq!(result.len(), 1); + assert_eq!(result[0], 8); // = 1 (min_time) + 7 + + // split 40%, 40%, 20% + let max_desired_file_size = 40; + let result = CompactPlanBuilder::compute_split_time( + chunk_times, + min_time, + max_time, + total_size, + max_desired_file_size, + ); + // store first and second split time + assert_eq!(result.len(), 2); + assert_eq!(result[0], 5); // = 1 (min_time) + 4 + assert_eq!(result[1], 9); // = 5 (previous split_time) + 4 + } + + #[test] + fn compute_split_time_when_min_time_equals_max() { + // Imagine a customer is backfilling a large amount of data and for some reason, all the + // times on the data are exactly the same. That means the min_time and max_time will be the + // same, but the total_size will be greater than the desired size. + // We will not split it becasue the split has to stick to non-overlapped time range + + let min_time = 1; + let max_time = 1; + + let total_size = 200; + let max_desired_file_size = 100; + let chunk_times = vec![TimestampMinMax { + min: min_time, + max: max_time, + }]; + + let result = CompactPlanBuilder::compute_split_time( + chunk_times, + min_time, + max_time, + total_size, + max_desired_file_size, + ); + + // must return vector of one containing max_time + assert_eq!(result.len(), 1); + assert_eq!(result[0], 1); + } + + #[test] + fn compute_split_time_please_dont_explode() { + // degenerated case where the step size is so small that it is < 1 (but > 0). In this case we shall still + // not loop forever. + let min_time = 10; + let max_time = 20; + + let total_size = 600000; + let max_desired_file_size = 10000; + let chunk_times = vec![TimestampMinMax { + min: min_time, + max: max_time, + }]; + + let result = CompactPlanBuilder::compute_split_time( + chunk_times, + min_time, + max_time, + total_size, + max_desired_file_size, + ); + assert_eq!(result.len(), 9); + } + + #[test] + fn compute_split_time_chunk_gaps() { + // When the chunks have large gaps, we should not introduce a splits that cause time ranges + // known to be empty. Split T2 below should not exist. + // │ │ + //┌────────────────┐ ┌──────────────┐ + //│ Chunk 1 │ │ │ │ Chunk 2 │ + //└────────────────┘ └──────────────┘ + // │ │ + // Split T1 Split T2 + + // Create a scenario where naive splitting would produce 2 splits (3 chunks) as shown above, but + // the only chunk data present is in the highest and lowest quarters, similar to what's shown above. + let min_time = 1; + let max_time = 100; + + let total_size = 200; + let max_desired_file_size = total_size / 3; + let chunk_times = vec![ + TimestampMinMax { min: 1, max: 24 }, + TimestampMinMax { min: 75, max: 100 }, + ]; + + let result = CompactPlanBuilder::compute_split_time( + chunk_times, + min_time, + max_time, + total_size, + max_desired_file_size, + ); + + // must return vector of one, containing a Split T1 shown above. + assert_eq!(result.len(), 1); + assert_eq!(result[0], 34); + } +} diff --git a/compactor2/src/components/compact/compact_executor.rs b/compactor2/src/components/compact/compact_executor.rs new file mode 100644 index 0000000000..aa71f8075b --- /dev/null +++ b/compactor2/src/components/compact/compact_executor.rs @@ -0,0 +1,193 @@ +use std::{future, sync::Arc}; + +use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber, ShardId}; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt}; +use iox_query::exec::{Executor, ExecutorType}; +use iox_time::TimeProvider; +use observability_deps::tracing::{debug, info, trace, warn}; +use parquet_file::{ + metadata::IoxMetadata, + serialize::CodecError, + storage::{ParquetStorage, UploadError}, +}; +use snafu::{ResultExt, Snafu}; +use uuid::Uuid; + +use crate::config::Config; + +use super::partition::PartitionInfo; + +// fields no longer used but still exists in the catalog +const SHARD_ID: ShardId = ShardId::new(0); +const MAX_SEQUENCE_NUMBER: i64 = 0; + +/// Compaction errors. +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +pub enum Error { + #[snafu(display("Error executing compact plan {}", source))] + ExecuteCompactPlan { source: DataFusionError }, + + #[snafu(display("Could not serialize and persist record batches {}", source))] + Persist { + source: parquet_file::storage::UploadError, + }, + + #[snafu(display("Error executing parquet write task {}", source))] + ExecuteParquetTask { source: tokio::task::JoinError }, +} + +/// Executor of a plan +pub(crate) struct CompactExecutor { + // Partition of the plan to compact + partition: Arc, + plan: Arc, + store: ParquetStorage, + exec: Arc, + time_provider: Arc, + target_level: CompactionLevel, +} + +impl CompactExecutor { + /// Create a new executor + pub fn new( + plan: Arc, + partition: Arc, + config: Arc, + target_level: CompactionLevel, + ) -> Self { + Self { + partition, + plan, + store: config.parquet_store.clone(), + exec: Arc::clone(&config.exec), + time_provider: Arc::clone(&config.time_provider), + + target_level, + } + } + + pub async fn execute(self) -> Result, Error> { + let Self { + partition, + plan, + store, + exec, + time_provider, + target_level, + } = self; + + let partition_id = partition.partition_id; + + // Run to collect each stream of the plan + let stream_count = plan.output_partitioning().partition_count(); + debug!(stream_count, "running plan with streams"); + + // These streams *must* to run in parallel otherwise a deadlock + // can occur. Since there is a merge in the plan, in order to make + // progress on one stream there must be (potential space) on the + // other streams. + // + // https://github.com/influxdata/influxdb_iox/issues/4306 + // https://github.com/influxdata/influxdb_iox/issues/4324 + let compacted_parquet_files: Vec = (0..stream_count) + .map(|i| { + // Prepare variables to pass to the closure + let ctx = exec.new_context(ExecutorType::Reorg); + let physical_plan = Arc::clone(&plan); + let store = store.clone(); + let time_provider = Arc::clone(&time_provider); + let partition = Arc::clone(&partition); + let sort_key = partition.sort_key.clone(); + // run as a separate tokio task so files can be written + // concurrently. + tokio::task::spawn(async move { + trace!(partition = i, "executing datafusion partition"); + let data = ctx + .execute_stream_partitioned(physical_plan, i) + .await + .context(ExecuteCompactPlanSnafu)?; + trace!(partition = i, "built result stream for partition"); + + let meta = IoxMetadata { + object_store_id: Uuid::new_v4(), + creation_timestamp: time_provider.now(), + shard_id: SHARD_ID, + namespace_id: partition.namespace_id, + namespace_name: partition.namespace_name.clone().into(), + table_id: partition.table.id, + table_name: partition.table.name.clone().into(), + partition_id, + partition_key: partition.partition_key.clone(), + max_sequence_number: SequenceNumber::new(MAX_SEQUENCE_NUMBER), + compaction_level: target_level, + sort_key: sort_key.clone(), + }; + + debug!( + partition_id = partition_id.get(), + "executing and uploading compaction StreamSplitExec" + ); + + let object_store_id = meta.object_store_id; + info!( + partition_id = partition_id.get(), + object_store_id = object_store_id.to_string(), + "streaming exec to object store" + ); + + // Stream the record batches from the compaction exec, serialize + // them, and directly upload the resulting Parquet files to + // object storage. + let (parquet_meta, file_size) = match store.upload(data, &meta).await { + Ok(v) => v, + Err(UploadError::Serialise(CodecError::NoRows)) => { + // This MAY be a bug. + // + // This also may happen legitimately, though very, very + // rarely. See test_empty_parquet_file_panic for an + // explanation. + warn!( + partition_id = partition_id.get(), + object_store_id = object_store_id.to_string(), + "SplitExec produced an empty result stream" + ); + return Ok(None); + } + Err(e) => return Err(Error::Persist { source: e }), + }; + + debug!( + partition_id = partition_id.get(), + object_store_id = object_store_id.to_string(), + "file uploaded to object store" + ); + + let parquet_file = + meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| { + partition + .table_schema + .columns + .get(name) + .expect("unknown column") + .id + }); + + Ok(Some(parquet_file)) + }) + }) + // NB: FuturesOrdered allows the futures to run in parallel + .collect::>() + // Check for errors in the task + .map(|t| t.context(ExecuteParquetTaskSnafu)?) + // Discard the streams that resulted in empty output / no file uploaded + // to the object store. + .try_filter_map(|v| future::ready(Ok(v))) + // Collect all the persisted parquet files together. + .try_collect::>() + .await?; + + Ok(compacted_parquet_files) + } +} diff --git a/compactor2/src/components/compact/compact_files.rs b/compactor2/src/components/compact/compact_files.rs index bb53fdf60c..6a37a588e9 100644 --- a/compactor2/src/components/compact/compact_files.rs +++ b/compactor2/src/components/compact/compact_files.rs @@ -2,11 +2,14 @@ use std::sync::Arc; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams}; -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; use crate::config::Config; -use super::{compact_builder::CompactPlanBuilder, partition::PartitionInfo}; +use super::{ + compact_builder::CompactPlanBuilder, compact_executor::CompactExecutor, + partition::PartitionInfo, +}; /// Compaction errors. #[derive(Debug, Snafu)] @@ -14,6 +17,16 @@ use super::{compact_builder::CompactPlanBuilder, partition::PartitionInfo}; pub enum Error { #[snafu(display("Not implemented"))] NotImplemented, + + #[snafu(display("Error building compact plan: {}", source))] + BuildCompactPlan { + source: super::compact_builder::Error, + }, + + #[snafu(display("Error building compact plan: {}", source))] + ExecuteCompactPlan { + source: super::compact_executor::Error, + }, } /// Perform compaction on given files including catalog transaction. @@ -26,13 +39,100 @@ pub async fn compact_files( config: Arc, compaction_level: CompactionLevel, ) -> Result, Error> { - let builder = CompactPlanBuilder::new(files, partition_info, config, compaction_level); + if files.is_empty() { + return Ok(vec![]); + } - let _logical_plan = builder.build_logical_compact_plan(); + // build compact plan + let builder = CompactPlanBuilder::new( + files, + Arc::clone(&partition_info), + Arc::clone(&config), + compaction_level, + ); + let plan = builder + .build_compact_plan() + .await + .context(BuildCompactPlanSnafu)?; - // TODO: build and run physical plans + // execute the plan + let executor = CompactExecutor::new(plan, partition_info, config, compaction_level); + let compacted_files = executor.execute().await.context(ExecuteCompactPlanSnafu)?; - // TODO: create parquet files for output plans - - Ok(vec![]) + Ok(compacted_files) +} + +#[cfg(test)] +mod tests { + use data_types::CompactionLevel; + use std::sync::Arc; + + use crate::{components::compact::compact_files::compact_files, test_util::TestSetup}; + + #[tokio::test] + async fn test_compact_no_file() { + test_helpers::maybe_start_logging(); + + // no files + let setup = TestSetup::new(false).await; + let TestSetup { + files, + partition_info, + config, + .. + } = setup; + + let compacted_files = compact_files( + Arc::clone(&files), + Arc::clone(&partition_info), + Arc::clone(&config), + CompactionLevel::FileNonOverlapped, + ) + .await + .unwrap(); + + assert!(compacted_files.is_empty()); + } + + #[tokio::test] + async fn test_compact() { + test_helpers::maybe_start_logging(); + + // Create a test setup with 6 files + let setup = TestSetup::new(true).await; + let TestSetup { + files, + partition_info, + config, + .. + } = setup; + + // By default, the config value is small, so the output file will be split + let compacted_files = compact_files( + Arc::clone(&files), + Arc::clone(&partition_info), + Arc::clone(&config), + CompactionLevel::FileNonOverlapped, + ) + .await + .unwrap(); + assert_eq!(compacted_files.len(), 2); + + let mut config = (*config).clone(); + + // let not split the output file by setting the config to a large value + config.max_desired_file_size_bytes = 100 * 1024 * 1024; + config.percentage_max_file_size = 100; + config.split_percentage = 100; + + let compacted_files = compact_files( + Arc::clone(&files), + Arc::clone(&partition_info), + Arc::new(config), + CompactionLevel::FileNonOverlapped, + ) + .await + .unwrap(); + assert_eq!(compacted_files.len(), 1); + } } diff --git a/compactor2/src/components/compact/mod.rs b/compactor2/src/components/compact/mod.rs index 7229ee6740..0fe229e10a 100644 --- a/compactor2/src/components/compact/mod.rs +++ b/compactor2/src/components/compact/mod.rs @@ -1,4 +1,5 @@ pub mod compact_builder; +pub mod compact_executor; pub mod compact_files; pub mod partition; pub mod query_chunk; diff --git a/compactor2/src/components/compact/partition.rs b/compactor2/src/components/compact/partition.rs index c66e508985..f623235dfb 100644 --- a/compactor2/src/components/compact/partition.rs +++ b/compactor2/src/components/compact/partition.rs @@ -22,14 +22,15 @@ pub struct PartitionInfo { /// Namespace ID pub namespace_id: NamespaceId, + /// Namespace name + pub namespace_name: String, + /// Table. pub table: Arc, // Table schema pub table_schema: Arc, - // /// Counts of the number of columns of each type, used for estimating arrow size - // pub column_type_counts: Vec, /// Sort key of the partition pub sort_key: Option, @@ -42,6 +43,7 @@ impl PartitionInfo { pub fn new( partition_id: PartitionId, namespace_id: NamespaceId, + namespace_name: String, table: Arc
, table_schema: Arc, sort_key: Option, @@ -50,6 +52,7 @@ impl PartitionInfo { Self { partition_id, namespace_id, + namespace_name, table, table_schema, sort_key, diff --git a/compactor2/src/components/compact/query_chunk.rs b/compactor2/src/components/compact/query_chunk.rs index 01669afce5..10f578305b 100644 --- a/compactor2/src/components/compact/query_chunk.rs +++ b/compactor2/src/components/compact/query_chunk.rs @@ -11,7 +11,7 @@ use iox_query::{ util::create_basic_summary, QueryChunk, QueryChunkData, QueryChunkMeta, }; -use observability_deps::tracing::trace; +use observability_deps::tracing::debug; use parquet_file::{chunk::ParquetChunk, storage::ParquetStorage}; use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate}; use schema::{merge::SchemaMerger, sort::SortKey, Projection, Schema}; @@ -185,6 +185,7 @@ impl QueryChunk for QueryableParquetChunk { // Order of the chunk so they can be deduplicated correctly fn order(&self) -> ChunkOrder { + // TODO: If we chnage this design specified in driver.rs's compact functions, we will need to refine this // Currently, we only compact files of level_n with level_n+1 and produce level_n+1 files, // and with the strictly design that: // . Level-0 files can overlap with any files. @@ -254,12 +255,14 @@ pub fn to_queryable_parquet_chunk( let max_time = file.max_time; let compaction_level = file.compaction_level; - trace!( - parquet_file_id=?file.id, - parquet_file_namespace_id=?file.namespace_id, - parquet_file_table_id=?file.table_id, - parquet_file_partition_id=?file.partition_id, - parquet_file_object_store_id=?file.object_store_id, + // Make it debug for it to show up in prod's initial setup + let uuid = file.object_store_id; + debug!( + parquet_file_id = file.id.get(), + parquet_file_namespace_id = file.namespace_id.get(), + parquet_file_table_id = file.table_id.get(), + parquet_file_partition_id = file.partition_id.get(), + parquet_file_object_store_id = uuid.to_string().as_str(), "built parquet chunk from metadata" ); diff --git a/compactor2/src/components/namespaces_source/catalog.rs b/compactor2/src/components/namespaces_source/catalog.rs index 35fb921652..3282ff1b47 100644 --- a/compactor2/src/components/namespaces_source/catalog.rs +++ b/compactor2/src/components/namespaces_source/catalog.rs @@ -1,22 +1,22 @@ use std::{fmt::Display, sync::Arc}; use async_trait::async_trait; -use backoff::BackoffConfig; -use data_types::{NamespaceId, NamespaceSchema}; +use backoff::{Backoff, BackoffConfig}; +use data_types::{Namespace, NamespaceId, NamespaceSchema}; use iox_catalog::interface::{get_schema_by_id, Catalog}; use super::NamespacesSource; #[derive(Debug)] pub struct CatalogNamespacesSource { - _backoff_config: BackoffConfig, + backoff_config: BackoffConfig, catalog: Arc, } impl CatalogNamespacesSource { pub fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { Self { - _backoff_config: backoff_config, + backoff_config, catalog, } } @@ -30,7 +30,21 @@ impl Display for CatalogNamespacesSource { #[async_trait] impl NamespacesSource for CatalogNamespacesSource { - async fn fetch(&self, ns: NamespaceId) -> Option { + async fn fetch_by_id(&self, ns: NamespaceId) -> Option { + Backoff::new(&self.backoff_config) + .retry_all_errors("namespace_of_given_namespace_id", || async { + self.catalog + .repositories() + .await + .namespaces() + .get_by_id(ns) + .await + }) + .await + .expect("retry forever") + } + + async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option { let mut repos = self.catalog.repositories().await; // todos: @@ -39,10 +53,6 @@ impl NamespacesSource for CatalogNamespacesSource { // and instead read and build TableSchema for a given TableId. let ns = get_schema_by_id(ns, repos.as_mut()).await; - if let Ok(ns) = ns { - return Some(ns); - } else { - return None; - } + ns.ok() } } diff --git a/compactor2/src/components/namespaces_source/mock.rs b/compactor2/src/components/namespaces_source/mock.rs index ff2a21e700..bbbbb28f5d 100644 --- a/compactor2/src/components/namespaces_source/mock.rs +++ b/compactor2/src/components/namespaces_source/mock.rs @@ -1,18 +1,24 @@ use std::{collections::HashMap, fmt::Display}; use async_trait::async_trait; -use data_types::{NamespaceId, NamespaceSchema}; +use data_types::{Namespace, NamespaceId, NamespaceSchema}; use super::NamespacesSource; +#[derive(Debug, Clone)] +pub struct NamespaceWrapper { + pub ns: Namespace, + pub schema: NamespaceSchema, +} + #[derive(Debug)] pub struct MockNamespacesSource { - namespaces: HashMap, + namespaces: HashMap, } impl MockNamespacesSource { #[allow(dead_code)] // not used anywhere - pub fn new(namespaces: HashMap) -> Self { + pub fn new(namespaces: HashMap) -> Self { Self { namespaces } } } @@ -25,8 +31,14 @@ impl Display for MockNamespacesSource { #[async_trait] impl NamespacesSource for MockNamespacesSource { - async fn fetch(&self, ns: NamespaceId) -> Option { - self.namespaces.get(&ns).cloned() + async fn fetch_by_id(&self, ns: NamespaceId) -> Option { + let wrapper = self.namespaces.get(&ns); + wrapper.map(|wrapper| wrapper.ns.clone()) + } + + async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option { + let wrapper = self.namespaces.get(&ns); + wrapper.map(|wrapper| wrapper.schema.clone()) } } @@ -45,7 +57,7 @@ mod tests { } #[tokio::test] - async fn test_fetch() { + async fn test_fetch_namespace() { let ns_1 = NamespaceBuilder::new(1).build(); let ns_2 = NamespaceBuilder::new(2).build(); @@ -56,13 +68,47 @@ mod tests { let source = MockNamespacesSource::new(namespaces); // different tables - assert_eq!(source.fetch(NamespaceId::new(1)).await, Some(ns_1.clone()),); - assert_eq!(source.fetch(NamespaceId::new(2)).await, Some(ns_2),); + assert_eq!( + source.fetch_by_id(NamespaceId::new(1)).await, + Some(ns_1.clone().ns), + ); + assert_eq!(source.fetch_by_id(NamespaceId::new(2)).await, Some(ns_2.ns),); // fetching does not drain - assert_eq!(source.fetch(NamespaceId::new(1)).await, Some(ns_1),); + assert_eq!(source.fetch_by_id(NamespaceId::new(1)).await, Some(ns_1.ns),); // unknown namespace => None result - assert_eq!(source.fetch(NamespaceId::new(3)).await, None,); + assert_eq!(source.fetch_by_id(NamespaceId::new(3)).await, None,); + } + + #[tokio::test] + async fn test_fetch_namespace_schema() { + let ns_1 = NamespaceBuilder::new(1).build(); + let ns_2 = NamespaceBuilder::new(2).build(); + + let namespaces = HashMap::from([ + (NamespaceId::new(1), ns_1.clone()), + (NamespaceId::new(2), ns_2.clone()), + ]); + let source = MockNamespacesSource::new(namespaces); + + // different tables + assert_eq!( + source.fetch_schema_by_id(NamespaceId::new(1)).await, + Some(ns_1.clone().schema), + ); + assert_eq!( + source.fetch_schema_by_id(NamespaceId::new(2)).await, + Some(ns_2.schema), + ); + + // fetching does not drain + assert_eq!( + source.fetch_schema_by_id(NamespaceId::new(1)).await, + Some(ns_1.schema), + ); + + // unknown namespace => None result + assert_eq!(source.fetch_schema_by_id(NamespaceId::new(3)).await, None,); } } diff --git a/compactor2/src/components/namespaces_source/mod.rs b/compactor2/src/components/namespaces_source/mod.rs index 0bdd3898a7..1286185d34 100644 --- a/compactor2/src/components/namespaces_source/mod.rs +++ b/compactor2/src/components/namespaces_source/mod.rs @@ -1,15 +1,20 @@ use std::fmt::{Debug, Display}; use async_trait::async_trait; -use data_types::{NamespaceId, NamespaceSchema}; +use data_types::{Namespace, NamespaceId, NamespaceSchema}; pub mod catalog; pub mod mock; #[async_trait] pub trait NamespacesSource: Debug + Display + Send + Sync { + /// Get Namespace for a given namespace + /// + /// This method performs retries. + async fn fetch_by_id(&self, ns: NamespaceId) -> Option; + /// Get NamespaceSchema for a given namespace /// /// todo: make this method perform retries. - async fn fetch(&self, ns: NamespaceId) -> Option; + async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option; } diff --git a/compactor2/src/config.rs b/compactor2/src/config.rs index 8d9cc58fe4..716ef69717 100644 --- a/compactor2/src/config.rs +++ b/compactor2/src/config.rs @@ -33,4 +33,24 @@ pub struct Config { /// Partitions with recent created files these last minutes are selected for compaction. pub partition_minute_threshold: u64, + + /// Desired max size of compacted parquet files + /// It is a target desired value than a guarantee + pub max_desired_file_size_bytes: u64, + + /// Percentage of desired max file size. + /// If the estimated compacted result is too small, no need to split it. + /// This percentage is to determine how small it is: + /// < percentage_max_file_size * max_desired_file_size_bytes: + /// This value must be between (0, 100) + pub percentage_max_file_size: u16, + + /// Split file percentage + /// If the estimated compacted result is neither too small nor too large, it will be split + /// into 2 files determined by this percentage. + /// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes + /// . Too large means: > max_desired_file_size_bytes + /// . Any size in the middle will be considered neither too small nor too large + /// This value must be between (0, 100) + pub split_percentage: u16, } diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 6134803276..7b0cfa7d52 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -12,9 +12,24 @@ use crate::{ config::Config, }; +// TODO: modify this comments accordingly as we go +// Currently, we only compact files of level_n with level_n+1 and produce level_n+1 files, +// and with the strictly design that: +// . Level-0 files can overlap with any files. +// . Level-N files (N > 0) cannot overlap with any files in the same level. +// . For Level-0 files, we always pick the smaller `created_at` files to compact (with +// each other and overlapped L1 files) first. +// . Level-N+1 files are results of compacting Level-N and/or Level-N+1 files, their `created_at` +// can be after the `created_at` of other Level-N files but they may include data loaded before +// the other Level-N files. Hence we should never use `created_at` of Level-N+1 files to order +// them with Level-N files. +// . We can only compact different sets of files of the same partition concurrently into the same target_level. pub async fn compact(config: &Config, components: &Arc) { let partition_ids = components.partitions_source.fetch().await; + // TODO: https://github.com/influxdata/influxdb_iox/issues/6657 + // either here or before invoking this function to ignore this partition if it is in skipped_compactions + futures::stream::iter(partition_ids) .map(|partition_id| { let config = config.clone(); @@ -28,43 +43,74 @@ pub async fn compact(config: &Config, components: &Arc) { return; } - // TODO: only read table and namespace info the first time and cache them + // TODO: only read partition, table and its schema info the first time and cache them // Get info for the partition - let table = components.tables_source.fetch(files[0].table_id).await; - let namespace_schema = components - .namespaces_source - .fetch(files[0].namespace_id) - .await; let partition = components.partitions_source.fetch_by_id(partition_id).await; - let table_schema = namespace_schema - .as_ref() - .unwrap() - .tables - .get(&table.as_ref().unwrap().name); - - if table.is_none() - || namespace_schema.is_none() - || partition.is_none() - || table_schema.is_none() - { + if partition.is_none() { // Since we retry reading the catalog and cannot find enough needed info, // this partition won't be able to get compacted components .partition_error_sink - .record( - partition_id, - "Cannot find table or table schema or partition info", - ) + .record(partition_id, "Cannot find partition info") .await; return; } - let partition = partition.unwrap(); + + let table = components.tables_source.fetch(files[0].table_id).await; + if table.is_none() { + components + .partition_error_sink + .record(partition_id, "Cannot find table ") + .await; + return; + } + let table = table.unwrap(); + + // TOD: after we have catalog funciton to read table schema, we should use it + // and avoid reading namespace schema + let namespace = components + .namespaces_source + .fetch_by_id(table.namespace_id) + .await; + if namespace.is_none() { + components + .partition_error_sink + .record(partition_id, "Cannot find namespace") + .await; + return; + } + let namespace = namespace.unwrap(); + + let namespace_schema = components + .namespaces_source + .fetch_schema_by_id(table.namespace_id) + .await; + if namespace_schema.is_none() { + components + .partition_error_sink + .record(partition_id, "Cannot find namespace schema") + .await; + return; + } + let namespace_schema = namespace_schema.unwrap(); + + let table_schema = namespace_schema.tables.get(&table.name); + if table_schema.is_none() { + components + .partition_error_sink + .record(partition_id, "Cannot find table schema") + .await; + return; + } + let table_schema = table_schema.unwrap(); + let partition_info = PartitionInfo::new( partition_id, - files[0].namespace_id, - Arc::new(table.unwrap()), - Arc::new(table_schema.unwrap().clone()), + table.namespace_id, + namespace.name, + Arc::new(table), + Arc::new(table_schema.clone()), partition.sort_key(), partition.partition_key, ); @@ -106,3 +152,125 @@ pub async fn compact(config: &Config, components: &Arc) { .collect::<()>() .await; } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_util::assert_batches_sorted_eq; + use data_types::CompactionLevel; + + use crate::{ + components::hardcoded::hardcoded_components, driver::compact, test_util::TestSetup, + }; + + #[tokio::test] + async fn test_compact_no_file() { + test_helpers::maybe_start_logging(); + + // no files + let setup = TestSetup::new(false).await; + + let files = setup.list_by_table_not_to_delete().await; + assert!(files.is_empty()); + + // compact + let config = Arc::clone(&setup.config); + let components = hardcoded_components(&config); + compact(&config, &components).await; + + // verify catalog is still empty + let files = setup.list_by_table_not_to_delete().await; + assert!(files.is_empty()); + } + + #[tokio::test] + async fn test_compact() { + test_helpers::maybe_start_logging(); + + // Create a test setup with 6 files + let setup = TestSetup::new(true).await; + + // verify 6 files + let files = setup.list_by_table_not_to_delete().await; + assert_eq!(files.len(), 6); + // + // verify ID and compaction level of the files + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + assert_eq!( + files_and_levels, + vec![ + (1, CompactionLevel::FileNonOverlapped), + (2, CompactionLevel::Initial), + (3, CompactionLevel::Initial), + (4, CompactionLevel::FileNonOverlapped), + (5, CompactionLevel::Initial), + (6, CompactionLevel::Initial), + ] + ); + + // compact + let config = Arc::clone(&setup.config); + let components = hardcoded_components(&config); + compact(&config, &components).await; + + // verify number of files: 6 files are compacted into 2 files + let files = setup.list_by_table_not_to_delete().await; + assert_eq!(files.len(), 2); + // + // verify ID and compaction level of the files + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + println!("{:?}", files_and_levels); + assert_eq!( + files_and_levels, + vec![ + (7, CompactionLevel::FileNonOverlapped), + (8, CompactionLevel::FileNonOverlapped), + ] + ); + + // verify the content of files + // Compacted smaller file with the later data + let mut files = setup.list_by_table_not_to_delete().await; + let file1 = files.pop().unwrap(); + let batches = setup.read_parquet_file(file1).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+------+------+-----------------------------+", + "| field_int | tag1 | tag2 | tag3 | time |", + "+-----------+------+------+------+-----------------------------+", + "| 210 | | OH | 21 | 1970-01-01T00:00:00.000136Z |", + "+-----------+------+------+------+-----------------------------+", + ], + &batches + ); + + // Compacted larger file with the earlier data + let file0 = files.pop().unwrap(); + let batches = setup.read_parquet_file(file0).await; + assert_batches_sorted_eq!( + [ + "+-----------+------+------+------+-----------------------------+", + "| field_int | tag1 | tag2 | tag3 | time |", + "+-----------+------+------+------+-----------------------------+", + "| 10 | VT | | | 1970-01-01T00:00:00.000006Z |", + "| 10 | VT | | | 1970-01-01T00:00:00.000010Z |", + "| 10 | VT | | | 1970-01-01T00:00:00.000068Z |", + "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |", + "| 1601 | | PA | 15 | 1970-01-01T00:00:00.000030Z |", + "| 22 | | OH | 21 | 1970-01-01T00:00:00.000036Z |", + "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", + "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", + "| 99 | OR | | | 1970-01-01T00:00:00.000012Z |", + "+-----------+------+------+------+-----------------------------+", + ], + &batches + ); + } +} diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index 7ba1b238da..9beb8dd093 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -1,12 +1,22 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc}; +use backoff::BackoffConfig; use data_types::{ - ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, NamespaceId, NamespaceSchema, - ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId, SequenceNumber, - ShardId, Table, TableId, TableSchema, Timestamp, TopicId, + ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, + NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId, + SequenceNumber, ShardId, Table, TableId, TableSchema, Timestamp, TopicId, }; +use datafusion::arrow::record_batch::RecordBatch; +use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; +use iox_time::{SystemProvider, TimeProvider}; +use schema::sort::SortKey; use uuid::Uuid; +use crate::{ + components::{compact::partition::PartitionInfo, namespaces_source::mock::NamespaceWrapper}, + config::Config, +}; + #[derive(Debug)] pub struct ParquetFileBuilder { file: ParquetFile, @@ -90,7 +100,7 @@ impl TableBuilder { #[derive(Debug)] pub struct NamespaceBuilder { - ns: NamespaceSchema, + namespace: NamespaceWrapper, } impl NamespaceBuilder { @@ -149,20 +159,34 @@ impl NamespaceBuilder { ), ]); + let id = NamespaceId::new(id); + let topic_id = TopicId::new(0); + let query_pool_id = QueryPoolId::new(0); Self { - ns: NamespaceSchema { - id: NamespaceId::new(id), - topic_id: TopicId::new(0), - query_pool_id: QueryPoolId::new(0), - tables, - max_columns_per_table: 10, - retention_period_ns: None, + namespace: NamespaceWrapper { + ns: Namespace { + id, + name: "ns".to_string(), + topic_id, + query_pool_id, + max_tables: 10, + max_columns_per_table: 10, + retention_period_ns: None, + }, + schema: NamespaceSchema { + id, + topic_id, + query_pool_id, + tables, + max_columns_per_table: 10, + retention_period_ns: None, + }, }, } } - pub fn build(self) -> NamespaceSchema { - self.ns + pub fn build(self) -> NamespaceWrapper { + self.namespace } } @@ -190,3 +214,191 @@ impl PartitionBuilder { self.partition } } + +const SHARD_INDEX: i32 = 1; +const PARTITION_MINUTE_THRESHOLD: u64 = 10; +const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024; +const PERCENTAGE_MAX_FILE_SIZE: u16 = 5; +const SPLIT_PERCENTAGE: u16 = 80; + +pub struct TestSetup { + pub files: Arc>, + pub partition_info: Arc, + pub catalog: Arc, + pub table: Arc, + pub config: Arc, +} + +impl TestSetup { + pub async fn new(with_files: bool) -> Self { + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace_1hr_retention("ns").await; + let shard = ns.create_shard(SHARD_INDEX).await; + let table = ns.create_table("table").await; + table.create_column("field_int", ColumnType::I64).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag3", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; + let table_schema = table.catalog_schema().await; + + let partition = table + .with_shard(&shard) + .create_partition("2022-07-13") + .await; + + // The sort key comes from the catalog and should be the union of all tags the + // ingester has seen + let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]); + let partition = partition.update_sort_key(sort_key.clone()).await; + + let candidate_partition = Arc::new(PartitionInfo::new( + partition.partition.id, + ns.namespace.id, + ns.namespace.name.clone(), + Arc::new(table.table.clone()), + Arc::new(table_schema), + partition.partition.sort_key(), + partition.partition.partition_key.clone(), + )); + + let mut parquet_files = vec![]; + if with_files { + let time = SystemProvider::new(); + let time_16_minutes_ago = time.minutes_ago(16); + let time_5_minutes_ago = time.minutes_ago(5); + let time_2_minutes_ago = time.minutes_ago(2); + let time_1_minute_ago = time.minutes_ago(1); + let time_now = time.now(); + + // L1 file + let lp = vec![ + "table,tag2=PA,tag3=15 field_int=1601i 30000", + "table,tag2=OH,tag3=21 field_int=21i 36000", // will be eliminated due to duplicate + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_creation_time(time_1_minute_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction + let level_1_file_1_minute_ago = partition.create_parquet_file(builder).await.into(); + + // L0 file + let lp = vec![ + "table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate + "table,tag1=VT field_int=10i 10000", // latest L0 compared with duplicate in level_1_file_1_minute_ago_with_duplicates + // keep it + "table,tag1=UT field_int=70i 20000", + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_creation_time(time_16_minutes_ago) + .with_compaction_level(CompactionLevel::Initial); + let level_0_file_16_minutes_ago = partition.create_parquet_file(builder).await.into(); + + // L0 file + let lp = vec![ + "table,tag1=WA field_int=1500i 8000", // latest duplicate and kept + "table,tag1=VT field_int=10i 6000", + "table,tag1=UT field_int=270i 25000", + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_creation_time(time_5_minutes_ago) + .with_compaction_level(CompactionLevel::Initial); + let level_0_file_5_minutes_ago = partition.create_parquet_file(builder).await.into(); + + // L1 file + let lp = vec![ + "table,tag1=VT field_int=88i 10000", // will be eliminated due to duplicate. + // Note: created time more recent than level_0_file_16_minutes_ago + // but always considered older ingested data + "table,tag1=OR field_int=99i 12000", + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_creation_time(time_1_minute_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction + let level_1_file_1_minute_ago_with_duplicates = + partition.create_parquet_file(builder).await.into(); + + // L0 file + let lp = vec!["table,tag2=OH,tag3=21 field_int=22i 36000"].join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_min_time(0) + .with_max_time(36000) + .with_creation_time(time_now) + // Will put the group size between "small" and "large" + .with_size_override(50 * 1024 * 1024) + .with_compaction_level(CompactionLevel::Initial); + let medium_level_0_file_time_now = partition.create_parquet_file(builder).await.into(); + + // L0 file + let lp = vec![ + "table,tag1=VT field_int=10i 68000", + "table,tag2=OH,tag3=21 field_int=210i 136000", + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_min_time(36001) + .with_max_time(136000) + .with_creation_time(time_2_minutes_ago) + // Will put the group size two multiples over "large" + .with_size_override(180 * 1024 * 1024) + .with_compaction_level(CompactionLevel::Initial); + let large_level_0_file_2_2_minutes_ago = + partition.create_parquet_file(builder).await.into(); + + // Order here isn't relevant; the chunk order should ensure the level 1 files are ordered + // first, then the other files by max seq num. + parquet_files = vec![ + level_1_file_1_minute_ago, + level_0_file_16_minutes_ago, + level_0_file_5_minutes_ago, + level_1_file_1_minute_ago_with_duplicates, + medium_level_0_file_time_now, + large_level_0_file_2_2_minutes_ago, + ]; + } + + let config = Arc::new(Config { + metric_registry: catalog.metric_registry(), + catalog: catalog.catalog(), + parquet_store: catalog.parquet_store.clone(), + time_provider: Arc::::clone(&catalog.time_provider), + exec: Arc::clone(&catalog.exec), + backoff_config: BackoffConfig::default(), + partition_concurrency: NonZeroUsize::new(1).unwrap(), + partition_minute_threshold: PARTITION_MINUTE_THRESHOLD, + max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE, + percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE, + split_percentage: SPLIT_PERCENTAGE, + }); + + Self { + files: Arc::new(parquet_files), + partition_info: candidate_partition, + catalog, + table, + config, + } + } + + /// Get the catalog files stored in the catalog + pub async fn list_by_table_not_to_delete(&self) -> Vec { + self.catalog + .list_by_table_not_to_delete(self.table.table.id) + .await + } + + /// Reads the specified parquet file out of object store + pub async fn read_parquet_file(&self, file: ParquetFile) -> Vec { + assert_eq!(file.table_id, self.table.table.id); + self.table.read_parquet_file(file).await + } +} diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 7983f037df..a7da31128b 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -957,6 +957,14 @@ pub struct TestParquetFile { pub size_override: Option, } +impl From for ParquetFile { + fn from(tpf: TestParquetFile) -> Self { + let TestParquetFile { parquet_file, .. } = tpf; + + parquet_file + } +} + impl TestParquetFile { /// Make the parquet file deletable pub async fn flag_for_delete(&self) { diff --git a/ioxd_compactor2/src/lib.rs b/ioxd_compactor2/src/lib.rs index a15af2b18a..68d4cdbb83 100644 --- a/ioxd_compactor2/src/lib.rs +++ b/ioxd_compactor2/src/lib.rs @@ -137,6 +137,9 @@ pub fn create_compactor2_server_type( backoff_config: backoff::BackoffConfig::default(), partition_concurrency: compactor_config.compaction_partition_concurrency, partition_minute_threshold: compactor_config.compaction_partition_minute_threshold, + max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes, + percentage_max_file_size: compactor_config.percentage_max_file_size, + split_percentage: compactor_config.split_percentage, }); Arc::new(Compactor2ServerType::new( compactor,