feat: basic compactor2 algo layout (#6616)
parent
413e4e4088
commit
380a855aab
|
@ -993,6 +993,7 @@ dependencies = [
|
|||
name = "compactor2"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"data_types",
|
||||
"futures",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
|
@ -1000,6 +1001,8 @@ dependencies = [
|
|||
"metric",
|
||||
"observability_deps",
|
||||
"parquet_file",
|
||||
"rand",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"workspace-hack",
|
||||
|
|
|
@ -1,5 +1,16 @@
|
|||
//! CLI config for compactor2-related commands
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
/// CLI config for compactor2
|
||||
#[derive(Debug, Clone, Copy, clap::Parser)]
|
||||
pub struct Compactor2Config {}
|
||||
pub struct Compactor2Config {
|
||||
/// Number of partitions that should be compacted in parallel.
|
||||
#[clap(
|
||||
long = "compaction-partition-concurrency",
|
||||
env = "INFLUXDB_IOX_COMPACTION_PARTITION_CONCURRENCY",
|
||||
default_value = "10",
|
||||
action
|
||||
)]
|
||||
pub compaction_partition_concurrency: NonZeroUsize,
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ edition.workspace = true
|
|||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
data_types = { path = "../data_types" }
|
||||
futures = "0.3"
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
|
@ -13,6 +14,8 @@ iox_time = { path = "../iox_time" }
|
|||
metric = { path = "../metric" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
rand = "0.8.3"
|
||||
snafu = "0.7"
|
||||
tokio = { version = "1", features = ["macros", "rt", "sync"] }
|
||||
tokio-util = { version = "0.7.4" }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
//! Actual compaction routine.
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::ParquetFile;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use snafu::Snafu;
|
||||
|
||||
/// Compaction errors.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub enum Error {}
|
||||
|
||||
/// Perform compaction on given files including catalog transaction.
|
||||
///
|
||||
/// This MUST use all files. No further filtering is performed here.
|
||||
pub async fn compact_files(
|
||||
_files: &[ParquetFile],
|
||||
_catalog: &Arc<dyn Catalog>,
|
||||
) -> Result<(), Error> {
|
||||
// TODO: implement this
|
||||
// TODO: split this into catalog actual DF execution and catalog bookkeeping
|
||||
Ok(())
|
||||
}
|
|
@ -9,7 +9,7 @@ use observability_deps::tracing::warn;
|
|||
use tokio::task::{JoinError, JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::{config::Config, driver::compact, rules::hardcoded_rules};
|
||||
|
||||
/// A [`JoinHandle`] that can be cloned
|
||||
type SharedJoinHandle = Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>;
|
||||
|
@ -28,12 +28,20 @@ pub struct Compactor2 {
|
|||
|
||||
impl Compactor2 {
|
||||
/// Start compactor.
|
||||
pub fn start(_config: Config) -> Self {
|
||||
pub fn start(config: Config) -> Self {
|
||||
let shutdown = CancellationToken::new();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
|
||||
let rules = hardcoded_rules();
|
||||
|
||||
let worker = tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = shutdown_captured.cancelled() => {}
|
||||
_ = async {
|
||||
loop {
|
||||
compact(&config, &rules).await;
|
||||
}
|
||||
} => unreachable!(),
|
||||
}
|
||||
});
|
||||
let worker = shared_handle(worker);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//! Config-related stuff.
|
||||
use std::sync::Arc;
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
|
@ -7,6 +7,7 @@ use iox_time::TimeProvider;
|
|||
use parquet_file::storage::ParquetStorage;
|
||||
|
||||
/// Config to set up a compactor.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
/// Metric registry.
|
||||
pub metric_registry: Arc<metric::Registry>,
|
||||
|
@ -22,4 +23,7 @@ pub struct Config {
|
|||
|
||||
/// Time provider.
|
||||
pub time_provider: Arc<dyn TimeProvider>,
|
||||
|
||||
/// Number of partitions that should be compacted in parallel.
|
||||
pub partition_concurrency: NonZeroUsize,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
use futures::StreamExt;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use observability_deps::tracing::{error, info};
|
||||
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
|
||||
|
||||
use crate::{compact::compact_files, config::Config, rules::Rules};
|
||||
|
||||
pub async fn compact(config: &Config, rules: &Arc<Rules>) {
|
||||
let partition_ids = get_partition_ids(&config.catalog).await;
|
||||
// TODO: implementing ID-based sharding / hash-partitioning so we can run multiple compactors in parallel
|
||||
let partition_ids = randomize_partition_order(partition_ids, 1234);
|
||||
|
||||
futures::stream::iter(partition_ids)
|
||||
.map(|partition_id| {
|
||||
let config = config.clone();
|
||||
let rules = Arc::clone(rules);
|
||||
|
||||
async move {
|
||||
let files = get_parquet_files(&config.catalog, partition_id).await;
|
||||
|
||||
let files = files
|
||||
.into_iter()
|
||||
.filter(|file| rules.file_filters.iter().all(|filter| filter.apply(file)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !rules
|
||||
.partition_filters
|
||||
.iter()
|
||||
.all(|filter| filter.apply(&files))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if files.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(e) = compact_files(&files, &config.catalog).await {
|
||||
error!(
|
||||
%e,
|
||||
partition_id=partition_id.get(),
|
||||
"Error while compacting partition",
|
||||
);
|
||||
// TODO: mark partition as "cannot compact"
|
||||
return;
|
||||
}
|
||||
info!(
|
||||
input_size = files.iter().map(|f| f.file_size_bytes).sum::<i64>(),
|
||||
input_files = files.len(),
|
||||
partition_id = partition_id.get(),
|
||||
"Compacted partition",
|
||||
);
|
||||
}
|
||||
})
|
||||
.buffer_unordered(config.partition_concurrency.get())
|
||||
.collect::<()>()
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Get partiion IDs from catalog.
|
||||
///
|
||||
/// This method performs retries.
|
||||
///
|
||||
/// This should only perform basic filtering. It MUST NOT inspect individual parquet files.
|
||||
async fn get_partition_ids(_catalog: &Arc<dyn Catalog>) -> Vec<PartitionId> {
|
||||
// TODO: get partition IDs from catalog, wrapped by retry
|
||||
vec![]
|
||||
}
|
||||
|
||||
/// Get parquet files for given partition.
|
||||
///
|
||||
/// This method performs retries.
|
||||
async fn get_parquet_files(
|
||||
_catalog: &Arc<dyn Catalog>,
|
||||
_partition: PartitionId,
|
||||
) -> Vec<ParquetFile> {
|
||||
// TODO: get files from from catalog, wrapped by retry
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn randomize_partition_order(mut partitions: Vec<PartitionId>, seed: u64) -> Vec<PartitionId> {
|
||||
let mut rng = StdRng::seed_from_u64(seed);
|
||||
partitions.shuffle(&mut rng);
|
||||
partitions
|
||||
}
|
|
@ -11,5 +11,8 @@
|
|||
clippy::dbg_macro
|
||||
)]
|
||||
|
||||
pub mod compact;
|
||||
pub mod compactor;
|
||||
pub mod config;
|
||||
mod driver;
|
||||
mod rules;
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
use std::fmt::Debug;
|
||||
|
||||
use data_types::ParquetFile;
|
||||
|
||||
pub trait FileFilter: Debug + Send + Sync {
|
||||
fn apply(&self, file: &ParquetFile) -> bool;
|
||||
|
||||
fn name(&self) -> &'static str;
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use self::{file_filter::FileFilter, partition_filter::PartitionFilter};
|
||||
|
||||
pub mod file_filter;
|
||||
pub mod partition_filter;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Rules {
|
||||
pub file_filters: Vec<Arc<dyn FileFilter>>,
|
||||
pub partition_filters: Vec<Arc<dyn PartitionFilter>>,
|
||||
}
|
||||
|
||||
/// Get hardcoded rules.
|
||||
///
|
||||
/// TODO: make this a runtime config
|
||||
pub fn hardcoded_rules() -> Arc<Rules> {
|
||||
Arc::new(Rules {
|
||||
file_filters: vec![],
|
||||
partition_filters: vec![],
|
||||
})
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
use std::fmt::Debug;
|
||||
|
||||
use data_types::ParquetFile;
|
||||
|
||||
pub trait PartitionFilter: Debug + Send + Sync {
|
||||
fn apply(&self, files: &[ParquetFile]) -> bool;
|
||||
|
||||
fn name(&self) -> &'static str;
|
||||
}
|
|
@ -126,7 +126,7 @@ pub fn create_compactor2_server_type(
|
|||
parquet_store: ParquetStorage,
|
||||
exec: Arc<Executor>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
_compactor_config: Compactor2Config,
|
||||
compactor_config: Compactor2Config,
|
||||
) -> Arc<dyn ServerType> {
|
||||
let compactor = Compactor2::start(compactor2::config::Config {
|
||||
metric_registry: Arc::clone(&metric_registry),
|
||||
|
@ -134,6 +134,7 @@ pub fn create_compactor2_server_type(
|
|||
parquet_store,
|
||||
exec,
|
||||
time_provider,
|
||||
partition_concurrency: compactor_config.compaction_partition_concurrency,
|
||||
});
|
||||
Arc::new(Compactor2ServerType::new(
|
||||
compactor,
|
||||
|
|
Loading…
Reference in New Issue