Merge branch 'main' into dom/rpc-namespace
commit
a489d03e1b
|
@ -997,6 +997,7 @@ dependencies = [
|
|||
"arrow_util",
|
||||
"async-trait",
|
||||
"backoff",
|
||||
"bytes",
|
||||
"data_types",
|
||||
"datafusion",
|
||||
"futures",
|
||||
|
@ -3005,6 +3006,7 @@ dependencies = [
|
|||
"backoff",
|
||||
"clap_blocks",
|
||||
"compactor2",
|
||||
"data_types",
|
||||
"hyper",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use std::num::NonZeroUsize;
|
||||
|
||||
/// CLI config for compactor2
|
||||
#[derive(Debug, Clone, Copy, clap::Parser)]
|
||||
#[derive(Debug, Clone, clap::Parser)]
|
||||
pub struct Compactor2Config {
|
||||
/// Number of partitions that should be compacted in parallel.
|
||||
///
|
||||
|
@ -116,4 +116,36 @@ pub struct Compactor2Config {
|
|||
action
|
||||
)]
|
||||
pub partition_timeout_secs: u64,
|
||||
|
||||
/// Filter partitions to the given set of IDs.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
#[clap(
|
||||
long = "compaction-partition-filter",
|
||||
env = "INFLUXDB_IOX_COMPACTION_PARTITION_FILTER",
|
||||
action
|
||||
)]
|
||||
pub partition_filter: Option<Vec<i64>>,
|
||||
|
||||
/// Shadow mode.
|
||||
///
|
||||
/// This will NOT write / commit any output to the object store or catalog.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
#[clap(
|
||||
long = "compaction-shadow-mode",
|
||||
env = "INFLUXDB_IOX_COMPACTION_SHADOW_MODE",
|
||||
action
|
||||
)]
|
||||
pub shadow_mode: bool,
|
||||
|
||||
/// Ignores "partition marked w/ error and shall be skipped" entries in the catalog.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
#[clap(
|
||||
long = "compaction-ignore-partition-skip-marker",
|
||||
env = "INFLUXDB_IOX_COMPACTION_IGNORE_PARTITION_SKIP_MARKER",
|
||||
action
|
||||
)]
|
||||
pub ignore_partition_skip_marker: bool,
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ license.workspace = true
|
|||
[dependencies]
|
||||
async-trait = "0.1.63"
|
||||
backoff = { path = "../backoff" }
|
||||
bytes = "1.3"
|
||||
datafusion = { workspace = true }
|
||||
data_types = { path = "../data_types" }
|
||||
futures = "0.3"
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
//! Main compactor entry point.
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{
|
||||
future::{BoxFuture, Shared},
|
||||
FutureExt, TryFutureExt,
|
||||
};
|
||||
use observability_deps::tracing::warn;
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use tokio::task::{JoinError, JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracker::AsyncSemaphoreMetrics;
|
||||
|
@ -34,6 +34,10 @@ pub struct Compactor2 {
|
|||
impl Compactor2 {
|
||||
/// Start compactor.
|
||||
pub fn start(config: Config) -> Self {
|
||||
if config.shadow_mode {
|
||||
info!("Starting in shadow mode");
|
||||
}
|
||||
|
||||
let shutdown = CancellationToken::new();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
|
||||
|
@ -51,7 +55,7 @@ impl Compactor2 {
|
|||
_ = shutdown_captured.cancelled() => {}
|
||||
_ = async {
|
||||
loop {
|
||||
compact(config.partition_concurrency, Duration::from_secs(config.partition_timeout_secs), Arc::clone(&job_semaphore), &components).await;
|
||||
compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await;
|
||||
// TODO: implement throttling if there was no work to do
|
||||
}
|
||||
} => unreachable!(),
|
||||
|
|
|
@ -8,7 +8,11 @@ mod tests {
|
|||
use tracker::AsyncSemaphoreMetrics;
|
||||
|
||||
use crate::{
|
||||
components::hardcoded::hardcoded_components, driver::compact, test_util::TestSetup,
|
||||
components::{
|
||||
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components, Components,
|
||||
},
|
||||
driver::compact,
|
||||
test_util::{list_object_store, TestSetup},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -16,7 +20,7 @@ mod tests {
|
|||
test_helpers::maybe_start_logging();
|
||||
|
||||
// no files
|
||||
let setup = TestSetup::new(false).await;
|
||||
let setup = TestSetup::builder().build().await;
|
||||
|
||||
let files = setup.list_by_table_not_to_delete().await;
|
||||
assert!(files.is_empty());
|
||||
|
@ -34,7 +38,7 @@ mod tests {
|
|||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a test setup with 6 files
|
||||
let setup = TestSetup::new(true).await;
|
||||
let setup = TestSetup::builder().with_files().build().await;
|
||||
|
||||
// verify 6 files
|
||||
let files = setup.list_by_table_not_to_delete().await;
|
||||
|
@ -155,7 +159,7 @@ mod tests {
|
|||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a test setup with 6 files
|
||||
let setup = TestSetup::new(true).await;
|
||||
let setup = TestSetup::builder().with_files().build().await;
|
||||
|
||||
// verify 6 files
|
||||
let files = setup.list_by_table_not_to_delete().await;
|
||||
|
@ -209,9 +213,118 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_partition_fail() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a test setup with 6 files
|
||||
let setup = TestSetup::builder().with_files().build().await;
|
||||
|
||||
let catalog_files_pre = setup.list_by_table_not_to_delete().await;
|
||||
assert!(!catalog_files_pre.is_empty());
|
||||
|
||||
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
|
||||
assert!(!object_store_files_pre.is_empty());
|
||||
|
||||
run_compact_failing(&setup).await;
|
||||
|
||||
let catalog_files_post = setup.list_by_table_not_to_delete().await;
|
||||
assert_eq!(catalog_files_pre, catalog_files_post);
|
||||
|
||||
let object_store_files_post = list_object_store(&setup.catalog.object_store).await;
|
||||
assert_eq!(object_store_files_pre, object_store_files_post);
|
||||
|
||||
let skipped = setup
|
||||
.catalog
|
||||
.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.list_skipped_compactions()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(skipped.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shadow_mode() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a test setup with 6 files
|
||||
let setup = TestSetup::builder()
|
||||
.with_files()
|
||||
.with_shadow_mode()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let catalog_files_pre = setup.list_by_table_not_to_delete().await;
|
||||
assert!(!catalog_files_pre.is_empty());
|
||||
|
||||
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
|
||||
assert!(!object_store_files_pre.is_empty());
|
||||
|
||||
run_compact(&setup).await;
|
||||
|
||||
let catalog_files_post = setup.list_by_table_not_to_delete().await;
|
||||
assert_eq!(catalog_files_pre, catalog_files_post);
|
||||
|
||||
let object_store_files_post = list_object_store(&setup.catalog.object_store).await;
|
||||
assert_eq!(object_store_files_pre, object_store_files_post);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shadow_mode_partition_fail() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a test setup with 6 files
|
||||
let setup = TestSetup::builder()
|
||||
.with_files()
|
||||
.with_shadow_mode()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let catalog_files_pre = setup.list_by_table_not_to_delete().await;
|
||||
assert!(!catalog_files_pre.is_empty());
|
||||
|
||||
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
|
||||
assert!(!object_store_files_pre.is_empty());
|
||||
|
||||
run_compact_failing(&setup).await;
|
||||
|
||||
let catalog_files_post = setup.list_by_table_not_to_delete().await;
|
||||
assert_eq!(catalog_files_pre, catalog_files_post);
|
||||
|
||||
let object_store_files_post = list_object_store(&setup.catalog.object_store).await;
|
||||
assert_eq!(object_store_files_pre, object_store_files_post);
|
||||
|
||||
let skipped = setup
|
||||
.catalog
|
||||
.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.list_skipped_compactions()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(skipped, vec![]);
|
||||
}
|
||||
|
||||
async fn run_compact(setup: &TestSetup) {
|
||||
let components = hardcoded_components(&setup.config);
|
||||
run_compact_impl(setup, components).await;
|
||||
}
|
||||
|
||||
async fn run_compact_failing(setup: &TestSetup) {
|
||||
let components = hardcoded_components(&setup.config);
|
||||
let components = Arc::new(Components {
|
||||
df_planner: Arc::new(PanicDataFusionPlanner::new()),
|
||||
..components.as_ref().clone()
|
||||
});
|
||||
run_compact_impl(setup, components).await;
|
||||
}
|
||||
|
||||
async fn run_compact_impl(setup: &TestSetup, components: Arc<Components>) {
|
||||
let config = Arc::clone(&setup.config);
|
||||
let components = hardcoded_components(&config);
|
||||
let job_semaphore = Arc::new(
|
||||
Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10),
|
||||
);
|
||||
|
|
|
@ -30,7 +30,7 @@ pub trait Commit: Debug + Display + Send + Sync {
|
|||
#[async_trait]
|
||||
impl<T> Commit for Arc<T>
|
||||
where
|
||||
T: Commit,
|
||||
T: Commit + ?Sized,
|
||||
{
|
||||
async fn commit(
|
||||
&self,
|
||||
|
|
|
@ -50,15 +50,7 @@ impl DataFusionPlanExec for DedicatedDataFusionPlanExec {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
|
||||
use datafusion::{
|
||||
arrow::datatypes::SchemaRef,
|
||||
execution::context::TaskContext,
|
||||
physical_expr::PhysicalSortExpr,
|
||||
physical_plan::{Partitioning, Statistics},
|
||||
};
|
||||
use schema::SchemaBuilder;
|
||||
use crate::components::df_planner::panic::PanicPlan;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -77,52 +69,4 @@ mod tests {
|
|||
let err = stream.try_collect::<Vec<_>>().await.unwrap_err();
|
||||
assert_eq!(err.to_string(), "External error: foo");
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PanicPlan;
|
||||
|
||||
impl ExecutionPlan for PanicPlan {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
SchemaBuilder::new().build().unwrap().as_arrow()
|
||||
}
|
||||
|
||||
fn output_partitioning(&self) -> Partitioning {
|
||||
Partitioning::UnknownPartitioning(1)
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
|
||||
None
|
||||
}
|
||||
|
||||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
|
||||
assert!(children.is_empty());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
_context: Arc<TaskContext>,
|
||||
) -> datafusion::error::Result<SendableRecordBatchStream> {
|
||||
assert_eq!(partition, 0);
|
||||
let stream = futures::stream::once(async move { panic!("foo") });
|
||||
let stream = RecordBatchStreamAdapter::new(self.schema(), stream);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn statistics(&self) -> Statistics {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ use async_trait::async_trait;
|
|||
use data_types::{CompactionLevel, ParquetFile};
|
||||
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
|
||||
|
||||
pub mod panic;
|
||||
pub mod planner_v1;
|
||||
mod query_chunk;
|
||||
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
use std::{any::Any, fmt::Display, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{CompactionLevel, ParquetFile};
|
||||
use datafusion::{
|
||||
arrow::datatypes::SchemaRef,
|
||||
error::DataFusionError,
|
||||
execution::context::TaskContext,
|
||||
physical_expr::PhysicalSortExpr,
|
||||
physical_plan::{
|
||||
stream::RecordBatchStreamAdapter, ExecutionPlan, Partitioning, SendableRecordBatchStream,
|
||||
Statistics,
|
||||
},
|
||||
};
|
||||
use schema::SchemaBuilder;
|
||||
|
||||
use crate::partition_info::PartitionInfo;
|
||||
|
||||
use super::DataFusionPlanner;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PanicDataFusionPlanner;
|
||||
|
||||
impl PanicDataFusionPlanner {
|
||||
#[allow(dead_code)] // not used anywhere
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for PanicDataFusionPlanner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "panic")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DataFusionPlanner for PanicDataFusionPlanner {
|
||||
async fn plan(
|
||||
&self,
|
||||
_files: Vec<ParquetFile>,
|
||||
_partition: Arc<PartitionInfo>,
|
||||
_compaction_level: CompactionLevel,
|
||||
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
Ok(Arc::new(PanicPlan))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PanicPlan;
|
||||
|
||||
impl ExecutionPlan for PanicPlan {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
SchemaBuilder::new().build().unwrap().as_arrow()
|
||||
}
|
||||
|
||||
fn output_partitioning(&self) -> Partitioning {
|
||||
Partitioning::UnknownPartitioning(1)
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
|
||||
None
|
||||
}
|
||||
|
||||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
|
||||
assert!(children.is_empty());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
_context: Arc<TaskContext>,
|
||||
) -> datafusion::error::Result<SendableRecordBatchStream> {
|
||||
assert_eq!(partition, 0);
|
||||
let stream = futures::stream::once(async move { panic!("foo") });
|
||||
let stream = RecordBatchStreamAdapter::new(self.schema(), stream);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn statistics(&self) -> Statistics {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::{physical_plan::collect, prelude::SessionContext};
|
||||
|
||||
use crate::test_util::partition_info;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(PanicDataFusionPlanner::new().to_string(), "panic");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "foo")]
|
||||
async fn test_panic() {
|
||||
let planner = PanicDataFusionPlanner::new();
|
||||
let partition = partition_info();
|
||||
let plan = planner
|
||||
.plan(vec![], partition, CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let session_ctx = SessionContext::new();
|
||||
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
|
||||
collect(plan, task_ctx).await.ok();
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
use object_store::memory::InMemory;
|
||||
|
||||
use crate::{
|
||||
components::{
|
||||
|
@ -18,6 +19,7 @@ use crate::{
|
|||
use super::{
|
||||
commit::{
|
||||
catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper,
|
||||
mock::MockCommit, Commit,
|
||||
},
|
||||
df_plan_exec::dedicated::DedicatedDataFusionPlanExec,
|
||||
df_planner::planner_v1::V1DataFusionPlanner,
|
||||
|
@ -31,12 +33,14 @@ use super::{
|
|||
partition_done_sink::{
|
||||
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
|
||||
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
|
||||
mock::MockPartitionDoneSink, PartitionDoneSink,
|
||||
},
|
||||
partition_files_source::catalog::CatalogPartitionFilesSource,
|
||||
partition_filter::{
|
||||
and::AndPartitionFilter, has_files::HasFilesPartitionFilter,
|
||||
and::AndPartitionFilter, by_id::ByIdPartitionFilter, has_files::HasFilesPartitionFilter,
|
||||
has_matching_file::HasMatchingFilePartitionFilter, logging::LoggingPartitionFilterWrapper,
|
||||
metrics::MetricsPartitionFilterWrapper, never_skipped::NeverSkippedPartitionFilter,
|
||||
PartitionFilter,
|
||||
},
|
||||
partitions_source::{
|
||||
catalog::CatalogPartitionsSource, logging::LoggingPartitionsSourceWrapper,
|
||||
|
@ -44,7 +48,7 @@ use super::{
|
|||
randomize_order::RandomizeOrderPartitionsSourcesWrapper,
|
||||
},
|
||||
round_split::all_now::AllNowRoundSplit,
|
||||
scratchpad::prod::ProdScratchpadGen,
|
||||
scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen},
|
||||
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
|
||||
Components,
|
||||
};
|
||||
|
@ -54,6 +58,48 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
// TODO: partitions source: Implementing ID-based sharding / hash-partitioning so we can run multiple compactors in
|
||||
// parallel. This should be a wrapper around the existing partions source.
|
||||
|
||||
let mut partition_filters: Vec<Arc<dyn PartitionFilter>> = vec![];
|
||||
if let Some(ids) = &config.partition_filter {
|
||||
// filter as early as possible, so we don't need any catalog lookups for the filtered partitions
|
||||
partition_filters.push(Arc::new(ByIdPartitionFilter::new(ids.clone())));
|
||||
}
|
||||
if !config.ignore_partition_skip_marker {
|
||||
partition_filters.push(Arc::new(NeverSkippedPartitionFilter::new(
|
||||
CatalogSkippedCompactionsSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
),
|
||||
)));
|
||||
}
|
||||
partition_filters.push(Arc::new(HasMatchingFilePartitionFilter::new(
|
||||
LevelRangeFileFilter::new(CompactionLevel::Initial..=CompactionLevel::Initial),
|
||||
)));
|
||||
partition_filters.push(Arc::new(HasFilesPartitionFilter::new()));
|
||||
|
||||
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
|
||||
Arc::new(MockPartitionDoneSink::new())
|
||||
} else {
|
||||
Arc::new(CatalogPartitionDoneSink::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
))
|
||||
};
|
||||
|
||||
let commit: Arc<dyn Commit> = if config.shadow_mode {
|
||||
Arc::new(MockCommit::new())
|
||||
} else {
|
||||
Arc::new(CatalogCommit::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
))
|
||||
};
|
||||
|
||||
let scratchpad_store_output = if config.shadow_mode {
|
||||
Arc::new(IgnoreWrites::new(Arc::new(InMemory::new())))
|
||||
} else {
|
||||
Arc::clone(config.parquet_store_real.object_store())
|
||||
};
|
||||
|
||||
Arc::new(Components {
|
||||
partitions_source: Arc::new(LoggingPartitionsSourceWrapper::new(
|
||||
MetricsPartitionsSourceWrapper::new(
|
||||
|
@ -61,7 +107,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
CatalogPartitionsSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
config.partition_minute_threshold,
|
||||
config.partition_threshold,
|
||||
Arc::clone(&config.time_provider),
|
||||
),
|
||||
1234,
|
||||
|
@ -82,37 +128,21 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
)])),
|
||||
partition_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
||||
MetricsPartitionFilterWrapper::new(
|
||||
AndPartitionFilter::new(vec![
|
||||
Arc::new(NeverSkippedPartitionFilter::new(
|
||||
CatalogSkippedCompactionsSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
),
|
||||
)),
|
||||
Arc::new(HasMatchingFilePartitionFilter::new(
|
||||
LevelRangeFileFilter::new(
|
||||
CompactionLevel::Initial..=CompactionLevel::Initial,
|
||||
),
|
||||
)),
|
||||
Arc::new(HasFilesPartitionFilter::new()),
|
||||
]),
|
||||
AndPartitionFilter::new(partition_filters),
|
||||
&config.metric_registry,
|
||||
),
|
||||
)),
|
||||
partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new(
|
||||
MetricsPartitionDoneSinkWrapper::new(
|
||||
ErrorKindPartitionDoneSinkWrapper::new(
|
||||
CatalogPartitionDoneSink::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
),
|
||||
partition_done_sink,
|
||||
HashSet::from([ErrorKind::OutOfMemory, ErrorKind::Unknown]),
|
||||
),
|
||||
&config.metric_registry,
|
||||
),
|
||||
)),
|
||||
commit: Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
|
||||
CatalogCommit::new(config.backoff_config.clone(), Arc::clone(&config.catalog)),
|
||||
commit,
|
||||
&config.metric_registry,
|
||||
))),
|
||||
namespaces_source: Arc::new(CatalogNamespacesSource::new(
|
||||
|
@ -148,6 +178,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
config.backoff_config.clone(),
|
||||
Arc::clone(config.parquet_store_real.object_store()),
|
||||
Arc::clone(config.parquet_store_scratchpad.object_store()),
|
||||
scratchpad_store_output,
|
||||
)),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ pub mod scratchpad;
|
|||
pub mod skipped_compactions_source;
|
||||
pub mod tables_source;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Components {
|
||||
pub partitions_source: Arc<dyn PartitionsSource>,
|
||||
pub partition_files_source: Arc<dyn PartitionFilesSource>,
|
||||
|
|
|
@ -70,8 +70,8 @@ mod tests {
|
|||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
|
||||
use schema::SchemaBuilder;
|
||||
|
||||
use crate::components::parquet_file_sink::{
|
||||
mock::MockParquetFileSink, test_util::partition_info,
|
||||
use crate::{
|
||||
components::parquet_file_sink::mock::MockParquetFileSink, test_util::partition_info,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
|
|
@ -100,7 +100,7 @@ mod tests {
|
|||
};
|
||||
use schema::SchemaBuilder;
|
||||
|
||||
use crate::components::parquet_file_sink::test_util::partition_info;
|
||||
use crate::test_util::partition_info;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
|
|
@ -15,9 +15,6 @@ pub mod logging;
|
|||
pub mod mock;
|
||||
pub mod object_store;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ParquetFileSink: Debug + Display + Send + Sync {
|
||||
async fn store(
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableId, TableSchema};
|
||||
|
||||
use crate::partition_info::PartitionInfo;
|
||||
|
||||
pub fn partition_info() -> Arc<PartitionInfo> {
|
||||
let namespace_id = NamespaceId::new(2);
|
||||
let table_id = TableId::new(3);
|
||||
|
||||
Arc::new(PartitionInfo {
|
||||
partition_id: PartitionId::new(1),
|
||||
namespace_id,
|
||||
namespace_name: String::from("ns"),
|
||||
table: Arc::new(Table {
|
||||
id: table_id,
|
||||
namespace_id,
|
||||
name: String::from("table"),
|
||||
}),
|
||||
table_schema: Arc::new(TableSchema {
|
||||
id: table_id,
|
||||
columns: BTreeMap::from([]),
|
||||
}),
|
||||
sort_key: None,
|
||||
partition_key: PartitionKey::from("pk"),
|
||||
})
|
||||
}
|
|
@ -11,7 +11,6 @@ pub struct MockPartitionDoneSink {
|
|||
}
|
||||
|
||||
impl MockPartitionDoneSink {
|
||||
#[allow(dead_code)] // not used anywhere
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ pub trait PartitionDoneSink: Debug + Display + Send + Sync {
|
|||
#[async_trait]
|
||||
impl<T> PartitionDoneSink for Arc<T>
|
||||
where
|
||||
T: PartitionDoneSink,
|
||||
T: PartitionDoneSink + ?Sized,
|
||||
{
|
||||
async fn record(
|
||||
&self,
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
use std::{collections::HashSet, fmt::Display};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
|
||||
use super::PartitionFilter;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ByIdPartitionFilter {
|
||||
ids: HashSet<PartitionId>,
|
||||
}
|
||||
|
||||
impl ByIdPartitionFilter {
|
||||
pub fn new(ids: HashSet<PartitionId>) -> Self {
|
||||
Self { ids }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ByIdPartitionFilter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "by_id")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionFilter for ByIdPartitionFilter {
|
||||
async fn apply(&self, partition_id: PartitionId, _files: &[ParquetFile]) -> bool {
|
||||
self.ids.contains(&partition_id)
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ use async_trait::async_trait;
|
|||
use data_types::{ParquetFile, PartitionId};
|
||||
|
||||
pub mod and;
|
||||
pub mod by_id;
|
||||
pub mod has_files;
|
||||
pub mod has_matching_file;
|
||||
pub mod logging;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{fmt::Display, sync::Arc};
|
||||
use std::{fmt::Display, sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
@ -12,7 +12,7 @@ use super::PartitionsSource;
|
|||
pub struct CatalogPartitionsSource {
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
minute_threshold: u64,
|
||||
threshold: Duration,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
|
@ -20,13 +20,13 @@ impl CatalogPartitionsSource {
|
|||
pub fn new(
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
minute_threshold: u64,
|
||||
threshold: Duration,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
backoff_config,
|
||||
catalog,
|
||||
minute_threshold,
|
||||
threshold,
|
||||
time_provider,
|
||||
}
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ impl Display for CatalogPartitionsSource {
|
|||
#[async_trait]
|
||||
impl PartitionsSource for CatalogPartitionsSource {
|
||||
async fn fetch(&self) -> Vec<PartitionId> {
|
||||
let time_minutes_ago = self.time_provider.minutes_ago(self.minute_threshold);
|
||||
let cutoff = self.time_provider.now() - self.threshold;
|
||||
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("partitions_to_compact", || async {
|
||||
|
@ -49,7 +49,7 @@ impl PartitionsSource for CatalogPartitionsSource {
|
|||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.partitions_to_compact(time_minutes_ago.into())
|
||||
.partitions_to_compact(cutoff.into())
|
||||
.await
|
||||
})
|
||||
.await
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
use std::{fmt::Display, ops::Range, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use object_store::{
|
||||
path::Path, DynObjectStore, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
|
||||
};
|
||||
use tokio::io::{sink, AsyncWrite};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IgnoreWrites {
|
||||
inner: Arc<DynObjectStore>,
|
||||
}
|
||||
|
||||
impl IgnoreWrites {
|
||||
pub fn new(inner: Arc<DynObjectStore>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for IgnoreWrites {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ignore_writes({})", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStore for IgnoreWrites {
|
||||
async fn put(&self, _location: &Path, _bytes: Bytes) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
_location: &Path,
|
||||
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
|
||||
Ok((Uuid::new_v4().to_string(), Box::new(sink())))
|
||||
}
|
||||
|
||||
async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get(&self, location: &Path) -> Result<GetResult> {
|
||||
self.inner.get(location).await
|
||||
}
|
||||
|
||||
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
|
||||
self.inner.get_range(location, range).await
|
||||
}
|
||||
|
||||
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
|
||||
self.inner.get_ranges(location, ranges).await
|
||||
}
|
||||
|
||||
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
|
||||
self.inner.head(location).await
|
||||
}
|
||||
|
||||
async fn delete(&self, _location: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
|
||||
self.inner.list(prefix).await
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
|
||||
self.inner.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -4,7 +4,7 @@ use async_trait::async_trait;
|
|||
use parquet_file::ParquetFilePath;
|
||||
use uuid::Uuid;
|
||||
|
||||
// pub mod context;
|
||||
pub mod ignore_writes_object_store;
|
||||
pub mod prod;
|
||||
mod util;
|
||||
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use std::{collections::HashSet, fmt::Display, num::NonZeroUsize, sync::Arc};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
fmt::Display,
|
||||
num::NonZeroUsize,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::BackoffConfig;
|
||||
|
@ -16,22 +21,25 @@ use super::{
|
|||
pub struct ProdScratchpadGen {
|
||||
concurrency: NonZeroUsize,
|
||||
backoff_config: BackoffConfig,
|
||||
store_real: Arc<DynObjectStore>,
|
||||
store_input: Arc<DynObjectStore>,
|
||||
store_scratchpad: Arc<DynObjectStore>,
|
||||
store_output: Arc<DynObjectStore>,
|
||||
}
|
||||
|
||||
impl ProdScratchpadGen {
|
||||
pub fn new(
|
||||
concurrency: NonZeroUsize,
|
||||
backoff_config: BackoffConfig,
|
||||
store_real: Arc<DynObjectStore>,
|
||||
store_input: Arc<DynObjectStore>,
|
||||
store_scratchpad: Arc<DynObjectStore>,
|
||||
store_output: Arc<DynObjectStore>,
|
||||
) -> Self {
|
||||
Self {
|
||||
concurrency,
|
||||
backoff_config,
|
||||
store_real,
|
||||
store_input,
|
||||
store_scratchpad,
|
||||
store_output,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -47,10 +55,11 @@ impl ScratchpadGen for ProdScratchpadGen {
|
|||
Box::new(ProdScratchpad {
|
||||
concurrency: self.concurrency,
|
||||
backoff_config: self.backoff_config.clone(),
|
||||
store_real: Arc::clone(&self.store_real),
|
||||
store_input: Arc::clone(&self.store_input),
|
||||
store_scratchpad: Arc::clone(&self.store_scratchpad),
|
||||
store_output: Arc::clone(&self.store_output),
|
||||
mask: Uuid::new_v4(),
|
||||
files_unmasked: HashSet::default(),
|
||||
files_unmasked: HashMap::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -59,10 +68,16 @@ impl ScratchpadGen for ProdScratchpadGen {
|
|||
struct ProdScratchpad {
|
||||
concurrency: NonZeroUsize,
|
||||
backoff_config: BackoffConfig,
|
||||
store_real: Arc<DynObjectStore>,
|
||||
store_input: Arc<DynObjectStore>,
|
||||
store_scratchpad: Arc<DynObjectStore>,
|
||||
store_output: Arc<DynObjectStore>,
|
||||
mask: Uuid,
|
||||
files_unmasked: HashSet<ParquetFilePath>,
|
||||
|
||||
/// Set of known, unmasked file.
|
||||
///
|
||||
/// If the file is part of this map, it is in the scratchpad. If the boolean key is set, it was already copied to
|
||||
/// the output store
|
||||
files_unmasked: HashMap<ParquetFilePath, bool>,
|
||||
}
|
||||
|
||||
impl ProdScratchpad {
|
||||
|
@ -85,11 +100,24 @@ impl ProdScratchpad {
|
|||
&mut self,
|
||||
files_unmasked: &[ParquetFilePath],
|
||||
files_masked: &[ParquetFilePath],
|
||||
output: bool,
|
||||
) -> (Vec<ParquetFilePath>, Vec<ParquetFilePath>) {
|
||||
files_unmasked
|
||||
.iter()
|
||||
.zip(files_masked)
|
||||
.filter(|(f_unmasked, _f_masked)| self.files_unmasked.insert(**f_unmasked))
|
||||
.filter(
|
||||
|(f_unmasked, _f_masked)| match self.files_unmasked.entry(**f_unmasked) {
|
||||
Entry::Occupied(mut o) => {
|
||||
let old_var = *o.get();
|
||||
*o.get_mut() |= output;
|
||||
output && !old_var
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(output);
|
||||
true
|
||||
}
|
||||
},
|
||||
)
|
||||
.unzip()
|
||||
}
|
||||
}
|
||||
|
@ -102,7 +130,11 @@ impl Drop for ProdScratchpad {
|
|||
// clean up eventually
|
||||
// Note: Use manual clean up code and do not create yet-another ProdScratchpad to avoid infinite recursions
|
||||
// during drop.
|
||||
let files = self.files_unmasked.drain().collect::<Vec<_>>();
|
||||
let files = self
|
||||
.files_unmasked
|
||||
.drain()
|
||||
.map(|(k, _in_out)| k)
|
||||
.collect::<Vec<_>>();
|
||||
let (files_masked, _uuids) = self.apply_mask(&files);
|
||||
let store_scratchpad = Arc::clone(&self.store_scratchpad);
|
||||
let concurrency = self.concurrency;
|
||||
|
@ -124,11 +156,11 @@ impl Drop for ProdScratchpad {
|
|||
impl Scratchpad for ProdScratchpad {
|
||||
async fn load_to_scratchpad(&mut self, files: &[ParquetFilePath]) -> Vec<Uuid> {
|
||||
let (files_to, uuids) = self.apply_mask(files);
|
||||
let (files_from, files_to) = self.check_known(files, &files_to);
|
||||
let (files_from, files_to) = self.check_known(files, &files_to, false);
|
||||
copy_files(
|
||||
&files_from,
|
||||
&files_to,
|
||||
Arc::clone(&self.store_real),
|
||||
Arc::clone(&self.store_input),
|
||||
Arc::clone(&self.store_scratchpad),
|
||||
&self.backoff_config,
|
||||
self.concurrency,
|
||||
|
@ -141,13 +173,13 @@ impl Scratchpad for ProdScratchpad {
|
|||
let (files_to, uuids) = self.apply_mask(files);
|
||||
|
||||
// only keep files that we did not know about, all others we've already synced it between the two stores
|
||||
let (files_to, files_from) = self.check_known(&files_to, files);
|
||||
let (files_to, files_from) = self.check_known(&files_to, files, true);
|
||||
|
||||
copy_files(
|
||||
&files_from,
|
||||
&files_to,
|
||||
Arc::clone(&self.store_scratchpad),
|
||||
Arc::clone(&self.store_real),
|
||||
Arc::clone(&self.store_output),
|
||||
&self.backoff_config,
|
||||
self.concurrency,
|
||||
)
|
||||
|
@ -156,7 +188,12 @@ impl Scratchpad for ProdScratchpad {
|
|||
}
|
||||
|
||||
async fn clean_from_scratchpad(&mut self, files: &[ParquetFilePath]) {
|
||||
let (files_masked, _uuids) = self.apply_mask(files);
|
||||
let files = files
|
||||
.iter()
|
||||
.filter(|f| self.files_unmasked.remove(f).is_some())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let (files_masked, _uuids) = self.apply_mask(&files);
|
||||
delete_files(
|
||||
&files_masked,
|
||||
Arc::clone(&self.store_scratchpad),
|
||||
|
@ -167,8 +204,9 @@ impl Scratchpad for ProdScratchpad {
|
|||
}
|
||||
|
||||
async fn clean(&mut self) {
|
||||
let files: Vec<_> = self.files_unmasked.drain().collect();
|
||||
let files: Vec<_> = self.files_unmasked.keys().cloned().collect();
|
||||
self.clean_from_scratchpad(&files).await;
|
||||
self.files_unmasked.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,34 +214,39 @@ impl Scratchpad for ProdScratchpad {
|
|||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use test_helpers::tracing::TracingCapture;
|
||||
use test_helpers::{maybe_start_logging, tracing::TracingCapture};
|
||||
|
||||
use crate::components::scratchpad::test_util::{
|
||||
assert_content, file_path, get_content, stores,
|
||||
use crate::{
|
||||
components::scratchpad::test_util::{assert_content, file_path, stores},
|
||||
test_util::list_object_store,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
let (store_real, store_scratchpad) = stores();
|
||||
let (store_input, store_scratchpad, store_output) = stores();
|
||||
let gen = ProdScratchpadGen::new(
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
BackoffConfig::default(),
|
||||
store_real,
|
||||
store_input,
|
||||
store_scratchpad,
|
||||
store_output,
|
||||
);
|
||||
assert_eq!(gen.to_string(), "prod");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging() {
|
||||
let (store_real, store_scratchpad) = stores();
|
||||
maybe_start_logging();
|
||||
|
||||
let (store_input, store_scratchpad, store_output) = stores();
|
||||
let gen = ProdScratchpadGen::new(
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&store_real),
|
||||
Arc::clone(&store_input),
|
||||
Arc::clone(&store_scratchpad),
|
||||
Arc::clone(&store_output),
|
||||
);
|
||||
let mut pad = gen.pad();
|
||||
|
||||
|
@ -216,30 +259,33 @@ mod tests {
|
|||
let f7_masked = file_path(7);
|
||||
|
||||
for f in [&f1, &f2, &f3, &f4] {
|
||||
store_real
|
||||
store_input
|
||||
.put(&f.object_store_path(), vec![].into())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_scratchpad, []).await;
|
||||
assert_content(&store_output, []).await;
|
||||
|
||||
let uuids = pad.load_to_scratchpad(&[f1, f2]).await;
|
||||
assert_eq!(uuids.len(), 2);
|
||||
let f1_masked = f1.with_object_store_id(uuids[0]);
|
||||
let f2_masked = f2.with_object_store_id(uuids[1]);
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_scratchpad, [&f1_masked, &f2_masked]).await;
|
||||
assert_content(&store_output, []).await;
|
||||
|
||||
let uuids = pad.load_to_scratchpad(&[f2, f3]).await;
|
||||
assert_eq!(uuids.len(), 2);
|
||||
assert_eq!(f2_masked.objest_store_id(), uuids[0]);
|
||||
let f3_masked = f3.with_object_store_id(uuids[1]);
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_scratchpad, [&f1_masked, &f2_masked, &f3_masked]).await;
|
||||
assert_content(&store_output, []).await;
|
||||
|
||||
for f in [&f5_masked, &f6_masked, &f7_masked] {
|
||||
store_scratchpad
|
||||
|
@ -248,7 +294,7 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(
|
||||
&store_scratchpad,
|
||||
[
|
||||
|
@ -256,13 +302,14 @@ mod tests {
|
|||
],
|
||||
)
|
||||
.await;
|
||||
assert_content(&store_output, []).await;
|
||||
|
||||
let uuids = pad.make_public(&[f5_masked, f5_masked]).await;
|
||||
let uuids = pad.make_public(&[f5_masked, f6_masked]).await;
|
||||
assert_eq!(uuids.len(), 2);
|
||||
let f5 = f5_masked.with_object_store_id(uuids[0]);
|
||||
let f6 = f6_masked.with_object_store_id(uuids[1]);
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4, &f5, &f6]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(
|
||||
&store_scratchpad,
|
||||
[
|
||||
|
@ -270,30 +317,48 @@ mod tests {
|
|||
],
|
||||
)
|
||||
.await;
|
||||
assert_content(&store_output, [&f5, &f6]).await;
|
||||
|
||||
let uuids = pad.make_public(&[f1_masked]).await;
|
||||
assert_eq!(uuids.len(), 1);
|
||||
assert_eq!(f1.objest_store_id(), uuids[0]);
|
||||
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(
|
||||
&store_scratchpad,
|
||||
[
|
||||
&f1_masked, &f2_masked, &f3_masked, &f5_masked, &f6_masked, &f7_masked,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
assert_content(&store_output, [&f1, &f5, &f6]).await;
|
||||
|
||||
pad.clean_from_scratchpad(&[f1, f5]).await;
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4, &f5, &f6]).await;
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(
|
||||
&store_scratchpad,
|
||||
[&f2_masked, &f3_masked, &f6_masked, &f7_masked],
|
||||
)
|
||||
.await;
|
||||
assert_content(&store_output, [&f1, &f5, &f6]).await;
|
||||
|
||||
pad.clean().await;
|
||||
|
||||
assert_content(&store_real, [&f1, &f2, &f3, &f4, &f5, &f6]).await;
|
||||
assert_content(&store_scratchpad, [&f6_masked, &f7_masked]).await; // pad didn't know about these files
|
||||
assert_content(&store_input, [&f1, &f2, &f3, &f4]).await;
|
||||
assert_content(&store_scratchpad, [&f7_masked]).await; // pad didn't know about these files
|
||||
assert_content(&store_output, [&f1, &f5, &f6]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_collision() {
|
||||
let (store_real, store_scratchpad) = stores();
|
||||
let (store_input, store_scratchpad, store_output) = stores();
|
||||
let gen = ProdScratchpadGen::new(
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&store_real),
|
||||
Arc::clone(&store_input),
|
||||
Arc::clone(&store_scratchpad),
|
||||
Arc::clone(&store_output),
|
||||
);
|
||||
|
||||
let mut pad1 = gen.pad();
|
||||
|
@ -301,7 +366,7 @@ mod tests {
|
|||
|
||||
let f = file_path(1);
|
||||
|
||||
store_real
|
||||
store_input
|
||||
.put(&f.object_store_path(), Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -323,18 +388,19 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_clean_on_drop() {
|
||||
let (store_real, store_scratchpad) = stores();
|
||||
let (store_input, store_scratchpad, store_output) = stores();
|
||||
let gen = ProdScratchpadGen::new(
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&store_real),
|
||||
Arc::clone(&store_input),
|
||||
Arc::clone(&store_scratchpad),
|
||||
Arc::clone(&store_output),
|
||||
);
|
||||
let mut pad = gen.pad();
|
||||
|
||||
let f = file_path(1);
|
||||
|
||||
store_real
|
||||
store_input
|
||||
.put(&f.object_store_path(), Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -354,7 +420,7 @@ mod tests {
|
|||
// eventually cleaned up
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
if get_content(&store_scratchpad).await.is_empty() {
|
||||
if list_object_store(&store_scratchpad).await.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -368,18 +434,19 @@ mod tests {
|
|||
#[tokio::test]
|
||||
#[should_panic(expected = "foo")]
|
||||
async fn test_clean_does_not_crash_on_panic() {
|
||||
let (store_real, store_scratchpad) = stores();
|
||||
let (store_input, store_scratchpad, store_output) = stores();
|
||||
let gen = ProdScratchpadGen::new(
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
BackoffConfig::default(),
|
||||
Arc::clone(&store_real),
|
||||
Arc::clone(&store_input),
|
||||
Arc::clone(&store_scratchpad),
|
||||
Arc::clone(&store_output),
|
||||
);
|
||||
let mut pad = gen.pad();
|
||||
|
||||
let f = file_path(1);
|
||||
|
||||
store_real
|
||||
store_input
|
||||
.put(&f.object_store_path(), Default::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, PartitionId, ShardId, TableId};
|
||||
use futures::TryStreamExt;
|
||||
use object_store::{memory::InMemory, path::Path, DynObjectStore};
|
||||
use object_store::{memory::InMemory, DynObjectStore};
|
||||
use parquet_file::ParquetFilePath;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn stores() -> (Arc<DynObjectStore>, Arc<DynObjectStore>) {
|
||||
(Arc::new(InMemory::new()), Arc::new(InMemory::new()))
|
||||
use crate::test_util::list_object_store;
|
||||
|
||||
pub fn stores() -> (
|
||||
Arc<DynObjectStore>,
|
||||
Arc<DynObjectStore>,
|
||||
Arc<DynObjectStore>,
|
||||
) {
|
||||
(
|
||||
Arc::new(InMemory::new()),
|
||||
Arc::new(InMemory::new()),
|
||||
Arc::new(InMemory::new()),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn file_path(i: u128) -> ParquetFilePath {
|
||||
|
@ -20,17 +29,7 @@ pub fn file_path(i: u128) -> ParquetFilePath {
|
|||
)
|
||||
}
|
||||
|
||||
pub async fn get_content(store: &Arc<DynObjectStore>) -> HashSet<Path> {
|
||||
store
|
||||
.list(None)
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|f| f.location)
|
||||
.try_collect::<HashSet<_>>()
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub async fn assert_content<const N: usize>(
|
||||
store: &Arc<DynObjectStore>,
|
||||
files: [&ParquetFilePath; N],
|
||||
|
@ -39,6 +38,8 @@ pub async fn assert_content<const N: usize>(
|
|||
.iter()
|
||||
.map(|f| f.object_store_path())
|
||||
.collect::<HashSet<_>>();
|
||||
let actual = get_content(store).await;
|
||||
assert_eq!(expected.len(), N, "duplicate files in expected clause");
|
||||
|
||||
let actual = list_object_store(store).await;
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
//! Config-related stuff.
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc, time::Duration};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{ShardId, ShardIndex};
|
||||
use data_types::{PartitionId, ShardId, ShardIndex};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use iox_time::TimeProvider;
|
||||
|
@ -48,8 +48,8 @@ pub struct Config {
|
|||
/// Number of jobs PER PARTITION that move files in and out of the scratchpad.
|
||||
pub partition_scratchpad_concurrency: NonZeroUsize,
|
||||
|
||||
/// Partitions with recent created files these last minutes are selected for compaction.
|
||||
pub partition_minute_threshold: u64,
|
||||
/// Partitions with recent created files this recent duration are selected for compaction.
|
||||
pub partition_threshold: Duration,
|
||||
|
||||
/// Desired max size of compacted parquet files
|
||||
/// It is a target desired value than a guarantee
|
||||
|
@ -71,8 +71,25 @@ pub struct Config {
|
|||
/// This value must be between (0, 100)
|
||||
pub split_percentage: u16,
|
||||
|
||||
/// Maximum duration of the per-partition compaction task in seconds.
|
||||
pub partition_timeout_secs: u64,
|
||||
/// Maximum duration of the per-partition compaction task.
|
||||
pub partition_timeout: Duration,
|
||||
|
||||
/// Filter partitions to the given set of IDs.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
pub partition_filter: Option<HashSet<PartitionId>>,
|
||||
|
||||
/// Shadow mode.
|
||||
///
|
||||
/// This will NOT write / commit any output to the object store or catalog.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
pub shadow_mode: bool,
|
||||
|
||||
/// Ignores "partition marked w/ error and shall be skipped" entries in the catalog.
|
||||
///
|
||||
/// This is mostly useful for debugging.
|
||||
pub ignore_partition_skip_marker: bool,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
|
|
@ -18,6 +18,8 @@ mod driver;
|
|||
mod error;
|
||||
mod partition_info;
|
||||
|
||||
#[cfg(test)]
|
||||
mod compactor_tests;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
num::NonZeroUsize,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use backoff::BackoffConfig;
|
||||
use data_types::{
|
||||
|
@ -7,8 +12,10 @@ use data_types::{
|
|||
SequenceNumber, ShardId, SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId,
|
||||
};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use futures::TryStreamExt;
|
||||
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
|
||||
use iox_time::TimeProvider;
|
||||
use object_store::{path::Path, DynObjectStore};
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use schema::sort::SortKey;
|
||||
use uuid::Uuid;
|
||||
|
@ -253,21 +260,33 @@ impl SkippedCompactionBuilder {
|
|||
}
|
||||
|
||||
const SHARD_INDEX: i32 = 1;
|
||||
const PARTITION_MINUTE_THRESHOLD: u64 = 10;
|
||||
const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min
|
||||
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
|
||||
const PERCENTAGE_MAX_FILE_SIZE: u16 = 5;
|
||||
const SPLIT_PERCENTAGE: u16 = 80;
|
||||
|
||||
pub struct TestSetup {
|
||||
pub files: Arc<Vec<ParquetFile>>,
|
||||
pub partition_info: Arc<PartitionInfo>,
|
||||
pub catalog: Arc<TestCatalog>,
|
||||
pub table: Arc<TestTable>,
|
||||
pub config: Arc<Config>,
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TestSetupBuilder {
|
||||
with_files: bool,
|
||||
shadow_mode: bool,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
pub async fn new(with_files: bool) -> Self {
|
||||
impl TestSetupBuilder {
|
||||
pub fn with_files(self) -> Self {
|
||||
Self {
|
||||
with_files: true,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_shadow_mode(self) -> Self {
|
||||
Self {
|
||||
shadow_mode: true,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build(self) -> TestSetup {
|
||||
let catalog = TestCatalog::new();
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let shard = ns.create_shard(SHARD_INDEX).await;
|
||||
|
@ -301,7 +320,7 @@ impl TestSetup {
|
|||
|
||||
let time_provider = Arc::<iox_time::MockProvider>::clone(&catalog.time_provider);
|
||||
let mut parquet_files = vec![];
|
||||
if with_files {
|
||||
if self.with_files {
|
||||
let time_1_minute_future = time_provider.minutes_into_future(1);
|
||||
let time_2_minutes_future = time_provider.minutes_into_future(2);
|
||||
let time_3_minutes_future = time_provider.minutes_into_future(3);
|
||||
|
@ -423,14 +442,17 @@ impl TestSetup {
|
|||
partition_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
job_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
partition_minute_threshold: PARTITION_MINUTE_THRESHOLD,
|
||||
partition_threshold: PARTITION_THRESHOLD,
|
||||
max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE,
|
||||
percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE,
|
||||
split_percentage: SPLIT_PERCENTAGE,
|
||||
partition_timeout_secs: 3_600,
|
||||
partition_timeout: Duration::from_secs(3_600),
|
||||
partition_filter: None,
|
||||
shadow_mode: self.shadow_mode,
|
||||
ignore_partition_skip_marker: false,
|
||||
});
|
||||
|
||||
Self {
|
||||
TestSetup {
|
||||
files: Arc::new(parquet_files),
|
||||
partition_info: candidate_partition,
|
||||
catalog,
|
||||
|
@ -438,6 +460,20 @@ impl TestSetup {
|
|||
config,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestSetup {
|
||||
pub files: Arc<Vec<ParquetFile>>,
|
||||
pub partition_info: Arc<PartitionInfo>,
|
||||
pub catalog: Arc<TestCatalog>,
|
||||
pub table: Arc<TestTable>,
|
||||
pub config: Arc<Config>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
pub fn builder() -> TestSetupBuilder {
|
||||
TestSetupBuilder::default()
|
||||
}
|
||||
|
||||
/// Get the catalog files stored in the catalog
|
||||
pub async fn list_by_table_not_to_delete(&self) -> Vec<ParquetFile> {
|
||||
|
@ -452,3 +488,36 @@ impl TestSetup {
|
|||
self.table.read_parquet_file(file).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_object_store(store: &Arc<DynObjectStore>) -> HashSet<Path> {
|
||||
store
|
||||
.list(None)
|
||||
.await
|
||||
.unwrap()
|
||||
.map_ok(|f| f.location)
|
||||
.try_collect::<HashSet<_>>()
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn partition_info() -> Arc<PartitionInfo> {
|
||||
let namespace_id = NamespaceId::new(2);
|
||||
let table_id = TableId::new(3);
|
||||
|
||||
Arc::new(PartitionInfo {
|
||||
partition_id: PartitionId::new(1),
|
||||
namespace_id,
|
||||
namespace_name: String::from("ns"),
|
||||
table: Arc::new(Table {
|
||||
id: table_id,
|
||||
namespace_id,
|
||||
name: String::from("table"),
|
||||
}),
|
||||
table_schema: Arc::new(TableSchema {
|
||||
id: table_id,
|
||||
columns: BTreeMap::from([]),
|
||||
}),
|
||||
sort_key: None,
|
||||
partition_key: PartitionKey::from("pk"),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ async-trait = "0.1"
|
|||
backoff = { path = "../backoff" }
|
||||
clap_blocks = { path = "../clap_blocks" }
|
||||
compactor2 = { path = "../compactor2" }
|
||||
data_types = { path = "../data_types" }
|
||||
hyper = "0.14"
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
|
|
|
@ -2,6 +2,7 @@ use async_trait::async_trait;
|
|||
use backoff::BackoffConfig;
|
||||
use clap_blocks::compactor2::Compactor2Config;
|
||||
use compactor2::{compactor::Compactor2, config::Config};
|
||||
use data_types::PartitionId;
|
||||
use hyper::{Body, Request, Response};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
|
@ -19,6 +20,7 @@ use parquet_file::storage::ParquetStorage;
|
|||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use trace::TraceCollector;
|
||||
|
@ -156,11 +158,18 @@ pub async fn create_compactor2_server_type(
|
|||
job_concurrency: compactor_config.compaction_job_concurrency,
|
||||
partition_scratchpad_concurrency: compactor_config
|
||||
.compaction_partition_scratchpad_concurrency,
|
||||
partition_minute_threshold: compactor_config.compaction_partition_minute_threshold,
|
||||
partition_threshold: Duration::from_secs(
|
||||
compactor_config.compaction_partition_minute_threshold * 60,
|
||||
),
|
||||
max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes,
|
||||
percentage_max_file_size: compactor_config.percentage_max_file_size,
|
||||
split_percentage: compactor_config.split_percentage,
|
||||
partition_timeout_secs: compactor_config.partition_timeout_secs,
|
||||
partition_timeout: Duration::from_secs(compactor_config.partition_timeout_secs),
|
||||
partition_filter: compactor_config
|
||||
.partition_filter
|
||||
.map(|parts| parts.into_iter().map(PartitionId::new).collect()),
|
||||
shadow_mode: compactor_config.shadow_mode,
|
||||
ignore_partition_skip_marker: compactor_config.ignore_partition_skip_marker,
|
||||
});
|
||||
|
||||
Arc::new(Compactor2ServerType::new(
|
||||
|
|
Loading…
Reference in New Issue