refactor: execute compaction plan (#6654)

* chore: address review comment of previous PR

* refactor: execute compact plan

* refactor: we will now compact all L0 and L1 files of a partition and split them as needed

* chore: comnents

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-01-20 17:34:50 -05:00 committed by GitHub
parent 6f39ae342e
commit 840923abab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1096 additions and 87 deletions

2
Cargo.lock generated
View File

@ -994,6 +994,7 @@ dependencies = [
name = "compactor2"
version = "0.1.0"
dependencies = [
"arrow_util",
"async-trait",
"backoff",
"data_types",
@ -1001,6 +1002,7 @@ dependencies = [
"futures",
"iox_catalog",
"iox_query",
"iox_tests",
"iox_time",
"metric",
"observability_deps",

View File

@ -43,4 +43,46 @@ pub struct Compactor2Config {
action
)]
pub exec_mem_pool_bytes: usize,
/// Desired max size of compacted parquet files.
/// It is a target desired value, rather than a guarantee.
/// 1024 * 1024 * 25 = 26,214,400 (25MB)
#[clap(
long = "compaction-max-desired-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
default_value = "26214400",
action
)]
pub max_desired_file_size_bytes: u64,
/// Percentage of desired max file size.
/// If the estimated compacted result is too small, no need to split it.
/// This percentage is to determine how small it is:
/// < percentage_max_file_size * max_desired_file_size_bytes:
/// This value must be between (0, 100)
/// Default is 20
#[clap(
long = "compaction-percentage-max-file_size",
env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE",
default_value = "20",
action
)]
pub percentage_max_file_size: u16,
/// Split file percentage
/// If the estimated compacted result is neither too small nor too large, it will be
/// split into 2 files determined by this percentage.
/// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes
/// . Too large means: > max_desired_file_size_bytes
/// . Any size in the middle will be considered neither too small nor too large
///
/// This value must be between (0, 100)
/// Default is 80
#[clap(
long = "compaction-split-percentage",
env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE",
default_value = "80",
action
)]
pub split_percentage: u16,
}

View File

@ -27,4 +27,6 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers"}

View File

@ -4,7 +4,7 @@ use std::{
};
use data_types::{CompactionLevel, ParquetFile, TimestampMinMax};
use datafusion::logical_expr::LogicalPlan;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use iox_query::{
exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
@ -24,13 +24,19 @@ use super::partition::PartitionInfo;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub(crate) enum Error {
pub enum Error {
#[snafu(display("Error building compact logical plan {}", source))]
CompactLogicalPlan {
source: iox_query::frontend::reorg::Error,
},
#[snafu(display("Error building compact physical plan {}", source))]
CompactPhysicalPlan { source: DataFusionError },
}
/// Observer function (for testing) that is invoked on the physical plan to be run
type PlanObserver = Box<dyn Fn(&dyn ExecutionPlan) + Send>;
/// Builder for compaction plans
pub(crate) struct CompactPlanBuilder {
// Partition of files to compact
@ -43,6 +49,8 @@ pub(crate) struct CompactPlanBuilder {
percentage_max_file_size: u16,
split_percentage: u16,
target_level: CompactionLevel,
// This is for plan observation for testing
plan_observer: Option<PlanObserver>,
}
impl CompactPlanBuilder {
@ -59,17 +67,26 @@ impl CompactPlanBuilder {
store: config.parquet_store.clone(),
exec: Arc::clone(&config.exec),
_time_provider: Arc::clone(&config.time_provider),
// TODO: make these configurable
max_desired_file_size_bytes: 100 * 1024 * 1024,
percentage_max_file_size: 30,
split_percentage: 90,
max_desired_file_size_bytes: config.max_desired_file_size_bytes,
percentage_max_file_size: config.percentage_max_file_size,
split_percentage: config.split_percentage,
target_level: compaction_level,
plan_observer: None,
}
}
/// Builds a logical compact plan respecting the specified file boundaries
/// specify a function to call on the created physical plan, prior to its execution (used for testing)
#[cfg(test)]
#[allow(dead_code)]
pub fn with_plan_observer(mut self, plan_observer: PlanObserver) -> Self {
self.plan_observer = Some(plan_observer);
self
}
/// Builds a compact plan respecting the specified file boundaries
/// This functon assumes that the compaction-levels of the files are either target_level or target_level-1
pub fn build_logical_compact_plan(self) -> Result<LogicalPlan, Error> {
pub async fn build_compact_plan(self) -> Result<Arc<dyn ExecutionPlan>, Error> {
//Result<LogicalPlan, Error> {
let Self {
partition,
files,
@ -80,6 +97,7 @@ impl CompactPlanBuilder {
percentage_max_file_size,
split_percentage,
target_level,
plan_observer,
} = self;
// total file size is the sum of the file sizes of the files to compact
@ -143,6 +161,7 @@ impl CompactPlanBuilder {
let (small_cutoff_bytes, large_cutoff_bytes) =
Self::cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size);
// Build logical compact plan
let ctx = exec.new_context(ExecutorType::Reorg);
let plan = if total_size <= small_cutoff_bytes {
// Compact everything into one file
@ -196,7 +215,17 @@ impl CompactPlanBuilder {
}
};
Ok(plan)
// Build physical compact plan
let physical_plan = ctx
.create_physical_plan(&plan)
.await
.context(CompactPhysicalPlanSnafu)?;
if let Some(plan_observer) = plan_observer {
plan_observer(physical_plan.as_ref());
}
Ok(physical_plan)
}
// compute cut off bytes for files
@ -270,3 +299,165 @@ impl CompactPlanBuilder {
.any(|&chunk| chunk.max >= min_time && chunk.min <= max_time)
}
}
#[cfg(test)]
mod tests {
use data_types::TimestampMinMax;
use crate::components::compact::compact_builder::CompactPlanBuilder;
#[test]
fn test_cutoff_bytes() {
let (small, large) = CompactPlanBuilder::cutoff_bytes(100, 30);
assert_eq!(small, 30);
assert_eq!(large, 130);
let (small, large) = CompactPlanBuilder::cutoff_bytes(100 * 1024 * 1024, 30);
assert_eq!(small, 30 * 1024 * 1024);
assert_eq!(large, 130 * 1024 * 1024);
let (small, large) = CompactPlanBuilder::cutoff_bytes(100, 60);
assert_eq!(small, 60);
assert_eq!(large, 160);
}
#[test]
fn test_compute_split_time() {
let min_time = 1;
let max_time = 11;
let total_size = 100;
let max_desired_file_size = 100;
let chunk_times = vec![TimestampMinMax {
min: min_time,
max: max_time,
}];
// no split
let result = CompactPlanBuilder::compute_split_time(
chunk_times.clone(),
min_time,
max_time,
total_size,
max_desired_file_size,
);
assert_eq!(result.len(), 1);
assert_eq!(result[0], max_time);
// split 70% and 30%
let max_desired_file_size = 70;
let result = CompactPlanBuilder::compute_split_time(
chunk_times.clone(),
min_time,
max_time,
total_size,
max_desired_file_size,
);
// only need to store the last split time
assert_eq!(result.len(), 1);
assert_eq!(result[0], 8); // = 1 (min_time) + 7
// split 40%, 40%, 20%
let max_desired_file_size = 40;
let result = CompactPlanBuilder::compute_split_time(
chunk_times,
min_time,
max_time,
total_size,
max_desired_file_size,
);
// store first and second split time
assert_eq!(result.len(), 2);
assert_eq!(result[0], 5); // = 1 (min_time) + 4
assert_eq!(result[1], 9); // = 5 (previous split_time) + 4
}
#[test]
fn compute_split_time_when_min_time_equals_max() {
// Imagine a customer is backfilling a large amount of data and for some reason, all the
// times on the data are exactly the same. That means the min_time and max_time will be the
// same, but the total_size will be greater than the desired size.
// We will not split it becasue the split has to stick to non-overlapped time range
let min_time = 1;
let max_time = 1;
let total_size = 200;
let max_desired_file_size = 100;
let chunk_times = vec![TimestampMinMax {
min: min_time,
max: max_time,
}];
let result = CompactPlanBuilder::compute_split_time(
chunk_times,
min_time,
max_time,
total_size,
max_desired_file_size,
);
// must return vector of one containing max_time
assert_eq!(result.len(), 1);
assert_eq!(result[0], 1);
}
#[test]
fn compute_split_time_please_dont_explode() {
// degenerated case where the step size is so small that it is < 1 (but > 0). In this case we shall still
// not loop forever.
let min_time = 10;
let max_time = 20;
let total_size = 600000;
let max_desired_file_size = 10000;
let chunk_times = vec![TimestampMinMax {
min: min_time,
max: max_time,
}];
let result = CompactPlanBuilder::compute_split_time(
chunk_times,
min_time,
max_time,
total_size,
max_desired_file_size,
);
assert_eq!(result.len(), 9);
}
#[test]
fn compute_split_time_chunk_gaps() {
// When the chunks have large gaps, we should not introduce a splits that cause time ranges
// known to be empty. Split T2 below should not exist.
// │ │
//┌────────────────┐ ┌──────────────┐
//│ Chunk 1 │ │ │ │ Chunk 2 │
//└────────────────┘ └──────────────┘
// │ │
// Split T1 Split T2
// Create a scenario where naive splitting would produce 2 splits (3 chunks) as shown above, but
// the only chunk data present is in the highest and lowest quarters, similar to what's shown above.
let min_time = 1;
let max_time = 100;
let total_size = 200;
let max_desired_file_size = total_size / 3;
let chunk_times = vec![
TimestampMinMax { min: 1, max: 24 },
TimestampMinMax { min: 75, max: 100 },
];
let result = CompactPlanBuilder::compute_split_time(
chunk_times,
min_time,
max_time,
total_size,
max_desired_file_size,
);
// must return vector of one, containing a Split T1 shown above.
assert_eq!(result.len(), 1);
assert_eq!(result[0], 34);
}
}

View File

@ -0,0 +1,193 @@
use std::{future, sync::Arc};
use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber, ShardId};
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt};
use iox_query::exec::{Executor, ExecutorType};
use iox_time::TimeProvider;
use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::{
metadata::IoxMetadata,
serialize::CodecError,
storage::{ParquetStorage, UploadError},
};
use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::config::Config;
use super::partition::PartitionInfo;
// fields no longer used but still exists in the catalog
const SHARD_ID: ShardId = ShardId::new(0);
const MAX_SEQUENCE_NUMBER: i64 = 0;
/// Compaction errors.
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Error executing compact plan {}", source))]
ExecuteCompactPlan { source: DataFusionError },
#[snafu(display("Could not serialize and persist record batches {}", source))]
Persist {
source: parquet_file::storage::UploadError,
},
#[snafu(display("Error executing parquet write task {}", source))]
ExecuteParquetTask { source: tokio::task::JoinError },
}
/// Executor of a plan
pub(crate) struct CompactExecutor {
// Partition of the plan to compact
partition: Arc<PartitionInfo>,
plan: Arc<dyn ExecutionPlan>,
store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
target_level: CompactionLevel,
}
impl CompactExecutor {
/// Create a new executor
pub fn new(
plan: Arc<dyn ExecutionPlan>,
partition: Arc<PartitionInfo>,
config: Arc<Config>,
target_level: CompactionLevel,
) -> Self {
Self {
partition,
plan,
store: config.parquet_store.clone(),
exec: Arc::clone(&config.exec),
time_provider: Arc::clone(&config.time_provider),
target_level,
}
}
pub async fn execute(self) -> Result<Vec<ParquetFileParams>, Error> {
let Self {
partition,
plan,
store,
exec,
time_provider,
target_level,
} = self;
let partition_id = partition.partition_id;
// Run to collect each stream of the plan
let stream_count = plan.output_partitioning().partition_count();
debug!(stream_count, "running plan with streams");
// These streams *must* to run in parallel otherwise a deadlock
// can occur. Since there is a merge in the plan, in order to make
// progress on one stream there must be (potential space) on the
// other streams.
//
// https://github.com/influxdata/influxdb_iox/issues/4306
// https://github.com/influxdata/influxdb_iox/issues/4324
let compacted_parquet_files: Vec<ParquetFileParams> = (0..stream_count)
.map(|i| {
// Prepare variables to pass to the closure
let ctx = exec.new_context(ExecutorType::Reorg);
let physical_plan = Arc::clone(&plan);
let store = store.clone();
let time_provider = Arc::clone(&time_provider);
let partition = Arc::clone(&partition);
let sort_key = partition.sort_key.clone();
// run as a separate tokio task so files can be written
// concurrently.
tokio::task::spawn(async move {
trace!(partition = i, "executing datafusion partition");
let data = ctx
.execute_stream_partitioned(physical_plan, i)
.await
.context(ExecuteCompactPlanSnafu)?;
trace!(partition = i, "built result stream for partition");
let meta = IoxMetadata {
object_store_id: Uuid::new_v4(),
creation_timestamp: time_provider.now(),
shard_id: SHARD_ID,
namespace_id: partition.namespace_id,
namespace_name: partition.namespace_name.clone().into(),
table_id: partition.table.id,
table_name: partition.table.name.clone().into(),
partition_id,
partition_key: partition.partition_key.clone(),
max_sequence_number: SequenceNumber::new(MAX_SEQUENCE_NUMBER),
compaction_level: target_level,
sort_key: sort_key.clone(),
};
debug!(
partition_id = partition_id.get(),
"executing and uploading compaction StreamSplitExec"
);
let object_store_id = meta.object_store_id;
info!(
partition_id = partition_id.get(),
object_store_id = object_store_id.to_string(),
"streaming exec to object store"
);
// Stream the record batches from the compaction exec, serialize
// them, and directly upload the resulting Parquet files to
// object storage.
let (parquet_meta, file_size) = match store.upload(data, &meta).await {
Ok(v) => v,
Err(UploadError::Serialise(CodecError::NoRows)) => {
// This MAY be a bug.
//
// This also may happen legitimately, though very, very
// rarely. See test_empty_parquet_file_panic for an
// explanation.
warn!(
partition_id = partition_id.get(),
object_store_id = object_store_id.to_string(),
"SplitExec produced an empty result stream"
);
return Ok(None);
}
Err(e) => return Err(Error::Persist { source: e }),
};
debug!(
partition_id = partition_id.get(),
object_store_id = object_store_id.to_string(),
"file uploaded to object store"
);
let parquet_file =
meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| {
partition
.table_schema
.columns
.get(name)
.expect("unknown column")
.id
});
Ok(Some(parquet_file))
})
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Check for errors in the task
.map(|t| t.context(ExecuteParquetTaskSnafu)?)
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.await?;
Ok(compacted_parquet_files)
}
}

View File

@ -2,11 +2,14 @@
use std::sync::Arc;
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams};
use snafu::Snafu;
use snafu::{ResultExt, Snafu};
use crate::config::Config;
use super::{compact_builder::CompactPlanBuilder, partition::PartitionInfo};
use super::{
compact_builder::CompactPlanBuilder, compact_executor::CompactExecutor,
partition::PartitionInfo,
};
/// Compaction errors.
#[derive(Debug, Snafu)]
@ -14,6 +17,16 @@ use super::{compact_builder::CompactPlanBuilder, partition::PartitionInfo};
pub enum Error {
#[snafu(display("Not implemented"))]
NotImplemented,
#[snafu(display("Error building compact plan: {}", source))]
BuildCompactPlan {
source: super::compact_builder::Error,
},
#[snafu(display("Error building compact plan: {}", source))]
ExecuteCompactPlan {
source: super::compact_executor::Error,
},
}
/// Perform compaction on given files including catalog transaction.
@ -26,13 +39,100 @@ pub async fn compact_files(
config: Arc<Config>,
compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileParams>, Error> {
let builder = CompactPlanBuilder::new(files, partition_info, config, compaction_level);
if files.is_empty() {
return Ok(vec![]);
}
let _logical_plan = builder.build_logical_compact_plan();
// build compact plan
let builder = CompactPlanBuilder::new(
files,
Arc::clone(&partition_info),
Arc::clone(&config),
compaction_level,
);
let plan = builder
.build_compact_plan()
.await
.context(BuildCompactPlanSnafu)?;
// TODO: build and run physical plans
// execute the plan
let executor = CompactExecutor::new(plan, partition_info, config, compaction_level);
let compacted_files = executor.execute().await.context(ExecuteCompactPlanSnafu)?;
// TODO: create parquet files for output plans
Ok(vec![])
Ok(compacted_files)
}
#[cfg(test)]
mod tests {
use data_types::CompactionLevel;
use std::sync::Arc;
use crate::{components::compact::compact_files::compact_files, test_util::TestSetup};
#[tokio::test]
async fn test_compact_no_file() {
test_helpers::maybe_start_logging();
// no files
let setup = TestSetup::new(false).await;
let TestSetup {
files,
partition_info,
config,
..
} = setup;
let compacted_files = compact_files(
Arc::clone(&files),
Arc::clone(&partition_info),
Arc::clone(&config),
CompactionLevel::FileNonOverlapped,
)
.await
.unwrap();
assert!(compacted_files.is_empty());
}
#[tokio::test]
async fn test_compact() {
test_helpers::maybe_start_logging();
// Create a test setup with 6 files
let setup = TestSetup::new(true).await;
let TestSetup {
files,
partition_info,
config,
..
} = setup;
// By default, the config value is small, so the output file will be split
let compacted_files = compact_files(
Arc::clone(&files),
Arc::clone(&partition_info),
Arc::clone(&config),
CompactionLevel::FileNonOverlapped,
)
.await
.unwrap();
assert_eq!(compacted_files.len(), 2);
let mut config = (*config).clone();
// let not split the output file by setting the config to a large value
config.max_desired_file_size_bytes = 100 * 1024 * 1024;
config.percentage_max_file_size = 100;
config.split_percentage = 100;
let compacted_files = compact_files(
Arc::clone(&files),
Arc::clone(&partition_info),
Arc::new(config),
CompactionLevel::FileNonOverlapped,
)
.await
.unwrap();
assert_eq!(compacted_files.len(), 1);
}
}

View File

@ -1,4 +1,5 @@
pub mod compact_builder;
pub mod compact_executor;
pub mod compact_files;
pub mod partition;
pub mod query_chunk;

View File

@ -22,14 +22,15 @@ pub struct PartitionInfo {
/// Namespace ID
pub namespace_id: NamespaceId,
/// Namespace name
pub namespace_name: String,
/// Table.
pub table: Arc<Table>,
// Table schema
pub table_schema: Arc<TableSchema>,
// /// Counts of the number of columns of each type, used for estimating arrow size
// pub column_type_counts: Vec<ColumnTypeCount>,
/// Sort key of the partition
pub sort_key: Option<SortKey>,
@ -42,6 +43,7 @@ impl PartitionInfo {
pub fn new(
partition_id: PartitionId,
namespace_id: NamespaceId,
namespace_name: String,
table: Arc<Table>,
table_schema: Arc<TableSchema>,
sort_key: Option<SortKey>,
@ -50,6 +52,7 @@ impl PartitionInfo {
Self {
partition_id,
namespace_id,
namespace_name,
table,
table_schema,
sort_key,

View File

@ -11,7 +11,7 @@ use iox_query::{
util::create_basic_summary,
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use observability_deps::tracing::debug;
use parquet_file::{chunk::ParquetChunk, storage::ParquetStorage};
use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate};
use schema::{merge::SchemaMerger, sort::SortKey, Projection, Schema};
@ -185,6 +185,7 @@ impl QueryChunk for QueryableParquetChunk {
// Order of the chunk so they can be deduplicated correctly
fn order(&self) -> ChunkOrder {
// TODO: If we chnage this design specified in driver.rs's compact functions, we will need to refine this
// Currently, we only compact files of level_n with level_n+1 and produce level_n+1 files,
// and with the strictly design that:
// . Level-0 files can overlap with any files.
@ -254,12 +255,14 @@ pub fn to_queryable_parquet_chunk(
let max_time = file.max_time;
let compaction_level = file.compaction_level;
trace!(
parquet_file_id=?file.id,
parquet_file_namespace_id=?file.namespace_id,
parquet_file_table_id=?file.table_id,
parquet_file_partition_id=?file.partition_id,
parquet_file_object_store_id=?file.object_store_id,
// Make it debug for it to show up in prod's initial setup
let uuid = file.object_store_id;
debug!(
parquet_file_id = file.id.get(),
parquet_file_namespace_id = file.namespace_id.get(),
parquet_file_table_id = file.table_id.get(),
parquet_file_partition_id = file.partition_id.get(),
parquet_file_object_store_id = uuid.to_string().as_str(),
"built parquet chunk from metadata"
);

View File

@ -1,22 +1,22 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{NamespaceId, NamespaceSchema};
use backoff::{Backoff, BackoffConfig};
use data_types::{Namespace, NamespaceId, NamespaceSchema};
use iox_catalog::interface::{get_schema_by_id, Catalog};
use super::NamespacesSource;
#[derive(Debug)]
pub struct CatalogNamespacesSource {
_backoff_config: BackoffConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogNamespacesSource {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
_backoff_config: backoff_config,
backoff_config,
catalog,
}
}
@ -30,7 +30,21 @@ impl Display for CatalogNamespacesSource {
#[async_trait]
impl NamespacesSource for CatalogNamespacesSource {
async fn fetch(&self, ns: NamespaceId) -> Option<NamespaceSchema> {
async fn fetch_by_id(&self, ns: NamespaceId) -> Option<Namespace> {
Backoff::new(&self.backoff_config)
.retry_all_errors("namespace_of_given_namespace_id", || async {
self.catalog
.repositories()
.await
.namespaces()
.get_by_id(ns)
.await
})
.await
.expect("retry forever")
}
async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option<NamespaceSchema> {
let mut repos = self.catalog.repositories().await;
// todos:
@ -39,10 +53,6 @@ impl NamespacesSource for CatalogNamespacesSource {
// and instead read and build TableSchema for a given TableId.
let ns = get_schema_by_id(ns, repos.as_mut()).await;
if let Ok(ns) = ns {
return Some(ns);
} else {
return None;
}
ns.ok()
}
}

View File

@ -1,18 +1,24 @@
use std::{collections::HashMap, fmt::Display};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceSchema};
use data_types::{Namespace, NamespaceId, NamespaceSchema};
use super::NamespacesSource;
#[derive(Debug, Clone)]
pub struct NamespaceWrapper {
pub ns: Namespace,
pub schema: NamespaceSchema,
}
#[derive(Debug)]
pub struct MockNamespacesSource {
namespaces: HashMap<NamespaceId, NamespaceSchema>,
namespaces: HashMap<NamespaceId, NamespaceWrapper>,
}
impl MockNamespacesSource {
#[allow(dead_code)] // not used anywhere
pub fn new(namespaces: HashMap<NamespaceId, NamespaceSchema>) -> Self {
pub fn new(namespaces: HashMap<NamespaceId, NamespaceWrapper>) -> Self {
Self { namespaces }
}
}
@ -25,8 +31,14 @@ impl Display for MockNamespacesSource {
#[async_trait]
impl NamespacesSource for MockNamespacesSource {
async fn fetch(&self, ns: NamespaceId) -> Option<NamespaceSchema> {
self.namespaces.get(&ns).cloned()
async fn fetch_by_id(&self, ns: NamespaceId) -> Option<Namespace> {
let wrapper = self.namespaces.get(&ns);
wrapper.map(|wrapper| wrapper.ns.clone())
}
async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option<NamespaceSchema> {
let wrapper = self.namespaces.get(&ns);
wrapper.map(|wrapper| wrapper.schema.clone())
}
}
@ -45,7 +57,7 @@ mod tests {
}
#[tokio::test]
async fn test_fetch() {
async fn test_fetch_namespace() {
let ns_1 = NamespaceBuilder::new(1).build();
let ns_2 = NamespaceBuilder::new(2).build();
@ -56,13 +68,47 @@ mod tests {
let source = MockNamespacesSource::new(namespaces);
// different tables
assert_eq!(source.fetch(NamespaceId::new(1)).await, Some(ns_1.clone()),);
assert_eq!(source.fetch(NamespaceId::new(2)).await, Some(ns_2),);
assert_eq!(
source.fetch_by_id(NamespaceId::new(1)).await,
Some(ns_1.clone().ns),
);
assert_eq!(source.fetch_by_id(NamespaceId::new(2)).await, Some(ns_2.ns),);
// fetching does not drain
assert_eq!(source.fetch(NamespaceId::new(1)).await, Some(ns_1),);
assert_eq!(source.fetch_by_id(NamespaceId::new(1)).await, Some(ns_1.ns),);
// unknown namespace => None result
assert_eq!(source.fetch(NamespaceId::new(3)).await, None,);
assert_eq!(source.fetch_by_id(NamespaceId::new(3)).await, None,);
}
#[tokio::test]
async fn test_fetch_namespace_schema() {
let ns_1 = NamespaceBuilder::new(1).build();
let ns_2 = NamespaceBuilder::new(2).build();
let namespaces = HashMap::from([
(NamespaceId::new(1), ns_1.clone()),
(NamespaceId::new(2), ns_2.clone()),
]);
let source = MockNamespacesSource::new(namespaces);
// different tables
assert_eq!(
source.fetch_schema_by_id(NamespaceId::new(1)).await,
Some(ns_1.clone().schema),
);
assert_eq!(
source.fetch_schema_by_id(NamespaceId::new(2)).await,
Some(ns_2.schema),
);
// fetching does not drain
assert_eq!(
source.fetch_schema_by_id(NamespaceId::new(1)).await,
Some(ns_1.schema),
);
// unknown namespace => None result
assert_eq!(source.fetch_schema_by_id(NamespaceId::new(3)).await, None,);
}
}

View File

@ -1,15 +1,20 @@
use std::fmt::{Debug, Display};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceSchema};
use data_types::{Namespace, NamespaceId, NamespaceSchema};
pub mod catalog;
pub mod mock;
#[async_trait]
pub trait NamespacesSource: Debug + Display + Send + Sync {
/// Get Namespace for a given namespace
///
/// This method performs retries.
async fn fetch_by_id(&self, ns: NamespaceId) -> Option<Namespace>;
/// Get NamespaceSchema for a given namespace
///
/// todo: make this method perform retries.
async fn fetch(&self, ns: NamespaceId) -> Option<NamespaceSchema>;
async fn fetch_schema_by_id(&self, ns: NamespaceId) -> Option<NamespaceSchema>;
}

View File

@ -33,4 +33,24 @@ pub struct Config {
/// Partitions with recent created files these last minutes are selected for compaction.
pub partition_minute_threshold: u64,
/// Desired max size of compacted parquet files
/// It is a target desired value than a guarantee
pub max_desired_file_size_bytes: u64,
/// Percentage of desired max file size.
/// If the estimated compacted result is too small, no need to split it.
/// This percentage is to determine how small it is:
/// < percentage_max_file_size * max_desired_file_size_bytes:
/// This value must be between (0, 100)
pub percentage_max_file_size: u16,
/// Split file percentage
/// If the estimated compacted result is neither too small nor too large, it will be split
/// into 2 files determined by this percentage.
/// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes
/// . Too large means: > max_desired_file_size_bytes
/// . Any size in the middle will be considered neither too small nor too large
/// This value must be between (0, 100)
pub split_percentage: u16,
}

View File

@ -12,9 +12,24 @@ use crate::{
config::Config,
};
// TODO: modify this comments accordingly as we go
// Currently, we only compact files of level_n with level_n+1 and produce level_n+1 files,
// and with the strictly design that:
// . Level-0 files can overlap with any files.
// . Level-N files (N > 0) cannot overlap with any files in the same level.
// . For Level-0 files, we always pick the smaller `created_at` files to compact (with
// each other and overlapped L1 files) first.
// . Level-N+1 files are results of compacting Level-N and/or Level-N+1 files, their `created_at`
// can be after the `created_at` of other Level-N files but they may include data loaded before
// the other Level-N files. Hence we should never use `created_at` of Level-N+1 files to order
// them with Level-N files.
// . We can only compact different sets of files of the same partition concurrently into the same target_level.
pub async fn compact(config: &Config, components: &Arc<Components>) {
let partition_ids = components.partitions_source.fetch().await;
// TODO: https://github.com/influxdata/influxdb_iox/issues/6657
// either here or before invoking this function to ignore this partition if it is in skipped_compactions
futures::stream::iter(partition_ids)
.map(|partition_id| {
let config = config.clone();
@ -28,43 +43,74 @@ pub async fn compact(config: &Config, components: &Arc<Components>) {
return;
}
// TODO: only read table and namespace info the first time and cache them
// TODO: only read partition, table and its schema info the first time and cache them
// Get info for the partition
let table = components.tables_source.fetch(files[0].table_id).await;
let namespace_schema = components
.namespaces_source
.fetch(files[0].namespace_id)
.await;
let partition = components.partitions_source.fetch_by_id(partition_id).await;
let table_schema = namespace_schema
.as_ref()
.unwrap()
.tables
.get(&table.as_ref().unwrap().name);
if table.is_none()
|| namespace_schema.is_none()
|| partition.is_none()
|| table_schema.is_none()
{
if partition.is_none() {
// Since we retry reading the catalog and cannot find enough needed info,
// this partition won't be able to get compacted
components
.partition_error_sink
.record(
partition_id,
"Cannot find table or table schema or partition info",
)
.record(partition_id, "Cannot find partition info")
.await;
return;
}
let partition = partition.unwrap();
let table = components.tables_source.fetch(files[0].table_id).await;
if table.is_none() {
components
.partition_error_sink
.record(partition_id, "Cannot find table ")
.await;
return;
}
let table = table.unwrap();
// TOD: after we have catalog funciton to read table schema, we should use it
// and avoid reading namespace schema
let namespace = components
.namespaces_source
.fetch_by_id(table.namespace_id)
.await;
if namespace.is_none() {
components
.partition_error_sink
.record(partition_id, "Cannot find namespace")
.await;
return;
}
let namespace = namespace.unwrap();
let namespace_schema = components
.namespaces_source
.fetch_schema_by_id(table.namespace_id)
.await;
if namespace_schema.is_none() {
components
.partition_error_sink
.record(partition_id, "Cannot find namespace schema")
.await;
return;
}
let namespace_schema = namespace_schema.unwrap();
let table_schema = namespace_schema.tables.get(&table.name);
if table_schema.is_none() {
components
.partition_error_sink
.record(partition_id, "Cannot find table schema")
.await;
return;
}
let table_schema = table_schema.unwrap();
let partition_info = PartitionInfo::new(
partition_id,
files[0].namespace_id,
Arc::new(table.unwrap()),
Arc::new(table_schema.unwrap().clone()),
table.namespace_id,
namespace.name,
Arc::new(table),
Arc::new(table_schema.clone()),
partition.sort_key(),
partition.partition_key,
);
@ -106,3 +152,125 @@ pub async fn compact(config: &Config, components: &Arc<Components>) {
.collect::<()>()
.await;
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_util::assert_batches_sorted_eq;
use data_types::CompactionLevel;
use crate::{
components::hardcoded::hardcoded_components, driver::compact, test_util::TestSetup,
};
#[tokio::test]
async fn test_compact_no_file() {
test_helpers::maybe_start_logging();
// no files
let setup = TestSetup::new(false).await;
let files = setup.list_by_table_not_to_delete().await;
assert!(files.is_empty());
// compact
let config = Arc::clone(&setup.config);
let components = hardcoded_components(&config);
compact(&config, &components).await;
// verify catalog is still empty
let files = setup.list_by_table_not_to_delete().await;
assert!(files.is_empty());
}
#[tokio::test]
async fn test_compact() {
test_helpers::maybe_start_logging();
// Create a test setup with 6 files
let setup = TestSetup::new(true).await;
// verify 6 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 6);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::Initial),
(3, CompactionLevel::Initial),
(4, CompactionLevel::FileNonOverlapped),
(5, CompactionLevel::Initial),
(6, CompactionLevel::Initial),
]
);
// compact
let config = Arc::clone(&setup.config);
let components = hardcoded_components(&config);
compact(&config, &components).await;
// verify number of files: 6 files are compacted into 2 files
let files = setup.list_by_table_not_to_delete().await;
assert_eq!(files.len(), 2);
//
// verify ID and compaction level of the files
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
println!("{:?}", files_and_levels);
assert_eq!(
files_and_levels,
vec![
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped),
]
);
// verify the content of files
// Compacted smaller file with the later data
let mut files = setup.list_by_table_not_to_delete().await;
let file1 = files.pop().unwrap();
let batches = setup.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+-----------------------------+",
"| 210 | | OH | 21 | 1970-01-01T00:00:00.000136Z |",
"+-----------+------+------+------+-----------------------------+",
],
&batches
);
// Compacted larger file with the earlier data
let file0 = files.pop().unwrap();
let batches = setup.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
[
"+-----------+------+------+------+-----------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+-----------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000068Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000030Z |",
"| 22 | | OH | 21 | 1970-01-01T00:00:00.000036Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"| 99 | OR | | | 1970-01-01T00:00:00.000012Z |",
"+-----------+------+------+------+-----------------------------+",
],
&batches
);
}
}

View File

@ -1,12 +1,22 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc};
use backoff::BackoffConfig;
use data_types::{
ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, NamespaceId, NamespaceSchema,
ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId, SequenceNumber,
ShardId, Table, TableId, TableSchema, Timestamp, TopicId,
ColumnId, ColumnSchema, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId, PartitionKey, QueryPoolId,
SequenceNumber, ShardId, Table, TableId, TableSchema, Timestamp, TopicId,
};
use datafusion::arrow::record_batch::RecordBatch;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use schema::sort::SortKey;
use uuid::Uuid;
use crate::{
components::{compact::partition::PartitionInfo, namespaces_source::mock::NamespaceWrapper},
config::Config,
};
#[derive(Debug)]
pub struct ParquetFileBuilder {
file: ParquetFile,
@ -90,7 +100,7 @@ impl TableBuilder {
#[derive(Debug)]
pub struct NamespaceBuilder {
ns: NamespaceSchema,
namespace: NamespaceWrapper,
}
impl NamespaceBuilder {
@ -149,20 +159,34 @@ impl NamespaceBuilder {
),
]);
let id = NamespaceId::new(id);
let topic_id = TopicId::new(0);
let query_pool_id = QueryPoolId::new(0);
Self {
ns: NamespaceSchema {
id: NamespaceId::new(id),
topic_id: TopicId::new(0),
query_pool_id: QueryPoolId::new(0),
tables,
max_columns_per_table: 10,
retention_period_ns: None,
namespace: NamespaceWrapper {
ns: Namespace {
id,
name: "ns".to_string(),
topic_id,
query_pool_id,
max_tables: 10,
max_columns_per_table: 10,
retention_period_ns: None,
},
schema: NamespaceSchema {
id,
topic_id,
query_pool_id,
tables,
max_columns_per_table: 10,
retention_period_ns: None,
},
},
}
}
pub fn build(self) -> NamespaceSchema {
self.ns
pub fn build(self) -> NamespaceWrapper {
self.namespace
}
}
@ -190,3 +214,191 @@ impl PartitionBuilder {
self.partition
}
}
const SHARD_INDEX: i32 = 1;
const PARTITION_MINUTE_THRESHOLD: u64 = 10;
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
const PERCENTAGE_MAX_FILE_SIZE: u16 = 5;
const SPLIT_PERCENTAGE: u16 = 80;
pub struct TestSetup {
pub files: Arc<Vec<ParquetFile>>,
pub partition_info: Arc<crate::components::compact::partition::PartitionInfo>,
pub catalog: Arc<TestCatalog>,
pub table: Arc<TestTable>,
pub config: Arc<Config>,
}
impl TestSetup {
pub async fn new(with_files: bool) -> Self {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let shard = ns.create_shard(SHARD_INDEX).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = table.catalog_schema().await;
let partition = table
.with_shard(&shard)
.create_partition("2022-07-13")
.await;
// The sort key comes from the catalog and should be the union of all tags the
// ingester has seen
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
let partition = partition.update_sort_key(sort_key.clone()).await;
let candidate_partition = Arc::new(PartitionInfo::new(
partition.partition.id,
ns.namespace.id,
ns.namespace.name.clone(),
Arc::new(table.table.clone()),
Arc::new(table_schema),
partition.partition.sort_key(),
partition.partition.partition_key.clone(),
));
let mut parquet_files = vec![];
if with_files {
let time = SystemProvider::new();
let time_16_minutes_ago = time.minutes_ago(16);
let time_5_minutes_ago = time.minutes_ago(5);
let time_2_minutes_ago = time.minutes_ago(2);
let time_1_minute_ago = time.minutes_ago(1);
let time_now = time.now();
// L1 file
let lp = vec![
"table,tag2=PA,tag3=15 field_int=1601i 30000",
"table,tag2=OH,tag3=21 field_int=21i 36000", // will be eliminated due to duplicate
]
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_1_minute_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_file_1_minute_ago = partition.create_parquet_file(builder).await.into();
// L0 file
let lp = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000", // latest L0 compared with duplicate in level_1_file_1_minute_ago_with_duplicates
// keep it
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_16_minutes_ago)
.with_compaction_level(CompactionLevel::Initial);
let level_0_file_16_minutes_ago = partition.create_parquet_file(builder).await.into();
// L0 file
let lp = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_5_minutes_ago)
.with_compaction_level(CompactionLevel::Initial);
let level_0_file_5_minutes_ago = partition.create_parquet_file(builder).await.into();
// L1 file
let lp = vec![
"table,tag1=VT field_int=88i 10000", // will be eliminated due to duplicate.
// Note: created time more recent than level_0_file_16_minutes_ago
// but always considered older ingested data
"table,tag1=OR field_int=99i 12000",
]
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_creation_time(time_1_minute_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
let level_1_file_1_minute_ago_with_duplicates =
partition.create_parquet_file(builder).await.into();
// L0 file
let lp = vec!["table,tag2=OH,tag3=21 field_int=22i 36000"].join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_min_time(0)
.with_max_time(36000)
.with_creation_time(time_now)
// Will put the group size between "small" and "large"
.with_size_override(50 * 1024 * 1024)
.with_compaction_level(CompactionLevel::Initial);
let medium_level_0_file_time_now = partition.create_parquet_file(builder).await.into();
// L0 file
let lp = vec![
"table,tag1=VT field_int=10i 68000",
"table,tag2=OH,tag3=21 field_int=210i 136000",
]
.join("\n");
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp)
.with_min_time(36001)
.with_max_time(136000)
.with_creation_time(time_2_minutes_ago)
// Will put the group size two multiples over "large"
.with_size_override(180 * 1024 * 1024)
.with_compaction_level(CompactionLevel::Initial);
let large_level_0_file_2_2_minutes_ago =
partition.create_parquet_file(builder).await.into();
// Order here isn't relevant; the chunk order should ensure the level 1 files are ordered
// first, then the other files by max seq num.
parquet_files = vec![
level_1_file_1_minute_ago,
level_0_file_16_minutes_ago,
level_0_file_5_minutes_ago,
level_1_file_1_minute_ago_with_duplicates,
medium_level_0_file_time_now,
large_level_0_file_2_2_minutes_ago,
];
}
let config = Arc::new(Config {
metric_registry: catalog.metric_registry(),
catalog: catalog.catalog(),
parquet_store: catalog.parquet_store.clone(),
time_provider: Arc::<iox_time::MockProvider>::clone(&catalog.time_provider),
exec: Arc::clone(&catalog.exec),
backoff_config: BackoffConfig::default(),
partition_concurrency: NonZeroUsize::new(1).unwrap(),
partition_minute_threshold: PARTITION_MINUTE_THRESHOLD,
max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE,
percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE,
split_percentage: SPLIT_PERCENTAGE,
});
Self {
files: Arc::new(parquet_files),
partition_info: candidate_partition,
catalog,
table,
config,
}
}
/// Get the catalog files stored in the catalog
pub async fn list_by_table_not_to_delete(&self) -> Vec<ParquetFile> {
self.catalog
.list_by_table_not_to_delete(self.table.table.id)
.await
}
/// Reads the specified parquet file out of object store
pub async fn read_parquet_file(&self, file: ParquetFile) -> Vec<RecordBatch> {
assert_eq!(file.table_id, self.table.table.id);
self.table.read_parquet_file(file).await
}
}

View File

@ -957,6 +957,14 @@ pub struct TestParquetFile {
pub size_override: Option<i64>,
}
impl From<TestParquetFile> for ParquetFile {
fn from(tpf: TestParquetFile) -> Self {
let TestParquetFile { parquet_file, .. } = tpf;
parquet_file
}
}
impl TestParquetFile {
/// Make the parquet file deletable
pub async fn flag_for_delete(&self) {

View File

@ -137,6 +137,9 @@ pub fn create_compactor2_server_type(
backoff_config: backoff::BackoffConfig::default(),
partition_concurrency: compactor_config.compaction_partition_concurrency,
partition_minute_threshold: compactor_config.compaction_partition_minute_threshold,
max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes,
percentage_max_file_size: compactor_config.percentage_max_file_size,
split_percentage: compactor_config.split_percentage,
});
Arc::new(Compactor2ServerType::new(
compactor,