diff --git a/compactor2/src/components/df_planner/logging.rs b/compactor2/src/components/df_planner/logging.rs new file mode 100644 index 0000000000..05badd2899 --- /dev/null +++ b/compactor2/src/components/df_planner/logging.rs @@ -0,0 +1,79 @@ +use std::{fmt::Display, sync::Arc}; + +use async_trait::async_trait; +use data_types::{CompactionLevel, ParquetFile}; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +use observability_deps::tracing::{info, warn}; + +use crate::partition_info::PartitionInfo; + +use super::DataFusionPlanner; + +#[derive(Debug)] +pub struct LoggingDataFusionPlannerWrapper +where + T: DataFusionPlanner, +{ + inner: T, +} + +impl LoggingDataFusionPlannerWrapper +where + T: DataFusionPlanner, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl Display for LoggingDataFusionPlannerWrapper +where + T: DataFusionPlanner, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "logging({})", self.inner) + } +} + +#[async_trait] +impl DataFusionPlanner for LoggingDataFusionPlannerWrapper +where + T: DataFusionPlanner, +{ + async fn plan( + &self, + files: Vec, + partition: Arc, + compaction_level: CompactionLevel, + ) -> Result, DataFusionError> { + let partition_id = partition.partition_id; + let n_input_files = files.len(); + let input_file_size_bytes = files.iter().map(|f| f.file_size_bytes).sum::(); + let res = self.inner.plan(files, partition, compaction_level).await; + + match &res { + Ok(plan) => { + info!( + partition_id = partition_id.get(), + n_input_files, + input_file_size_bytes, + n_output_files = plan.output_partitioning().partition_count(), + compaction_level = compaction_level as i16, + "created DataFusion plan", + ); + } + Err(e) => { + warn!( + partition_id=partition_id.get(), + n_input_files, + input_file_size_bytes, + compaction_level=compaction_level as i16, + %e, + "failed to create DataFusion plan", + ) + } + } + + res + } +} diff --git a/compactor2/src/components/df_planner/mod.rs b/compactor2/src/components/df_planner/mod.rs index d62c70e425..1f3937a74d 100644 --- a/compactor2/src/components/df_planner/mod.rs +++ b/compactor2/src/components/df_planner/mod.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile}; use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +pub mod logging; pub mod panic; pub mod planner_v1; mod query_chunk; diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 9754a02234..7392224365 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -22,7 +22,7 @@ use super::{ mock::MockCommit, Commit, }, df_plan_exec::dedicated::DedicatedDataFusionPlanExec, - df_planner::planner_v1::V1DataFusionPlanner, + df_planner::{logging::LoggingDataFusionPlannerWrapper, planner_v1::V1DataFusionPlanner}, divide_initial::single_branch::SingleBranchDivideInitial, file_filter::{and::AndFileFilter, level_range::LevelRangeFileFilter}, files_filter::{chain::FilesFilterChain, per_file::PerFileFilesFilter}, @@ -42,10 +42,14 @@ use super::{ metrics::MetricsPartitionFilterWrapper, never_skipped::NeverSkippedPartitionFilter, PartitionFilter, }, + partition_source::{ + catalog::CatalogPartitionSource, logging::LoggingPartitionSourceWrapper, + metrics::MetricsPartitionSourceWrapper, + }, partitions_source::{ catalog::CatalogPartitionsSource, logging::LoggingPartitionsSourceWrapper, - metrics::MetricsPartitionsSourceWrapper, - randomize_order::RandomizeOrderPartitionsSourcesWrapper, + metrics::MetricsPartitionsSourceWrapper, mock::MockPartitionsSource, + randomize_order::RandomizeOrderPartitionsSourcesWrapper, PartitionsSource, }, round_split::all_now::AllNowRoundSplit, scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen}, @@ -58,6 +62,17 @@ pub fn hardcoded_components(config: &Config) -> Arc { // TODO: partitions source: Implementing ID-based sharding / hash-partitioning so we can run multiple compactors in // parallel. This should be a wrapper around the existing partions source. + let partitions_source: Arc = if let Some(ids) = &config.partition_filter { + Arc::new(MockPartitionsSource::new(ids.iter().cloned().collect())) + } else { + Arc::new(CatalogPartitionsSource::new( + config.backoff_config.clone(), + Arc::clone(&config.catalog), + config.partition_threshold, + Arc::clone(&config.time_provider), + )) + }; + let mut partition_filters: Vec> = vec![]; if let Some(ids) = &config.partition_filter { // filter as early as possible, so we don't need any catalog lookups for the filtered partitions @@ -103,14 +118,15 @@ pub fn hardcoded_components(config: &Config) -> Arc { Arc::new(Components { partitions_source: Arc::new(LoggingPartitionsSourceWrapper::new( MetricsPartitionsSourceWrapper::new( - RandomizeOrderPartitionsSourcesWrapper::new( - CatalogPartitionsSource::new( - config.backoff_config.clone(), - Arc::clone(&config.catalog), - config.partition_threshold, - Arc::clone(&config.time_provider), - ), - 1234, + RandomizeOrderPartitionsSourcesWrapper::new(partitions_source, 1234), + &config.metric_registry, + ), + )), + partition_source: Arc::new(LoggingPartitionSourceWrapper::new( + MetricsPartitionSourceWrapper::new( + CatalogPartitionSource::new( + config.backoff_config.clone(), + Arc::clone(&config.catalog), ), &config.metric_registry, ), @@ -165,12 +181,14 @@ pub fn hardcoded_components(config: &Config) -> Arc { config.backoff_config.clone(), Arc::clone(&config.catalog), )), - df_planner: Arc::new(V1DataFusionPlanner::new( - config.parquet_store_scratchpad.clone(), - Arc::clone(&config.exec), - config.max_desired_file_size_bytes, - config.percentage_max_file_size, - config.split_percentage, + df_planner: Arc::new(LoggingDataFusionPlannerWrapper::new( + V1DataFusionPlanner::new( + config.parquet_store_scratchpad.clone(), + Arc::clone(&config.exec), + config.max_desired_file_size_bytes, + config.percentage_max_file_size, + config.split_percentage, + ), )), df_plan_exec: Arc::new(DedicatedDataFusionPlanExec::new(Arc::clone(&config.exec))), parquet_file_sink: Arc::new(LoggingParquetFileSinkWrapper::new( diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index a8a751a52a..fcfc60f5f7 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -5,8 +5,8 @@ use self::{ divide_initial::DivideInitial, files_filter::FilesFilter, namespaces_source::NamespacesSource, parquet_file_sink::ParquetFileSink, partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, - partitions_source::PartitionsSource, round_split::RoundSplit, scratchpad::ScratchpadGen, - tables_source::TablesSource, + partition_source::PartitionSource, partitions_source::PartitionsSource, + round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource, }; pub mod commit; @@ -21,6 +21,7 @@ pub mod parquet_file_sink; pub mod partition_done_sink; pub mod partition_files_source; pub mod partition_filter; +pub mod partition_source; pub mod partitions_source; pub mod report; pub mod round_split; @@ -31,6 +32,7 @@ pub mod tables_source; #[derive(Debug, Clone)] pub struct Components { pub partitions_source: Arc, + pub partition_source: Arc, pub partition_files_source: Arc, pub files_filter: Arc, pub partition_filter: Arc, diff --git a/compactor2/src/components/partition_source/catalog.rs b/compactor2/src/components/partition_source/catalog.rs new file mode 100644 index 0000000000..bde8351eef --- /dev/null +++ b/compactor2/src/components/partition_source/catalog.rs @@ -0,0 +1,46 @@ +use std::{fmt::Display, sync::Arc}; + +use async_trait::async_trait; +use backoff::{Backoff, BackoffConfig}; +use data_types::{Partition, PartitionId}; +use iox_catalog::interface::Catalog; + +use super::PartitionSource; + +#[derive(Debug)] +pub struct CatalogPartitionSource { + backoff_config: BackoffConfig, + catalog: Arc, +} + +impl CatalogPartitionSource { + pub fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { + Self { + backoff_config, + catalog, + } + } +} + +impl Display for CatalogPartitionSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "catalog") + } +} + +#[async_trait] +impl PartitionSource for CatalogPartitionSource { + async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { + Backoff::new(&self.backoff_config) + .retry_all_errors("partition_by_id", || async { + self.catalog + .repositories() + .await + .partitions() + .get_by_id(partition_id) + .await + }) + .await + .expect("retry forever") + } +} diff --git a/compactor2/src/components/partition_source/logging.rs b/compactor2/src/components/partition_source/logging.rs new file mode 100644 index 0000000000..5f679f0780 --- /dev/null +++ b/compactor2/src/components/partition_source/logging.rs @@ -0,0 +1,90 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use data_types::{Partition, PartitionId}; +use observability_deps::tracing::{info, warn}; + +use super::PartitionSource; + +#[derive(Debug)] +pub struct LoggingPartitionSourceWrapper +where + T: PartitionSource, +{ + inner: T, +} + +impl LoggingPartitionSourceWrapper +where + T: PartitionSource, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl Display for LoggingPartitionSourceWrapper +where + T: PartitionSource, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "logging({})", self.inner) + } +} + +#[async_trait] +impl PartitionSource for LoggingPartitionSourceWrapper +where + T: PartitionSource, +{ + async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { + let partition = self.inner.fetch_by_id(partition_id).await; + match &partition { + Some(_) => { + info!(partition_id = partition_id.get(), "Fetch a partition",); + } + None => { + warn!(partition_id = partition_id.get(), "Partition not found",); + } + } + partition + } +} + +#[cfg(test)] +mod tests { + use test_helpers::tracing::TracingCapture; + + use crate::{ + components::partition_source::mock::MockPartitionSource, test_util::PartitionBuilder, + }; + + use super::*; + + #[test] + fn test_display() { + let source = LoggingPartitionSourceWrapper::new(MockPartitionSource::new(vec![])); + assert_eq!(source.to_string(), "logging(mock)",); + } + + #[tokio::test] + async fn test_fetch_by_id() { + let p = PartitionBuilder::new(5).build(); + let source = LoggingPartitionSourceWrapper::new(MockPartitionSource::new(vec![p.clone()])); + let capture = TracingCapture::new(); + + assert_eq!( + source.fetch_by_id(PartitionId::new(5)).await, + Some(p.clone()) + ); + assert_eq!(source.fetch_by_id(PartitionId::new(5)).await, Some(p)); + assert_eq!(source.fetch_by_id(PartitionId::new(1)).await, None); + + assert_eq!( + capture.to_string(), + "level = INFO; message = Fetch a partition; partition_id = 5; \n\ +level = INFO; message = Fetch a partition; partition_id = 5; \n\ +level = WARN; message = Partition not found; partition_id = 1; ", + ); + } +} diff --git a/compactor2/src/components/partition_source/metrics.rs b/compactor2/src/components/partition_source/metrics.rs new file mode 100644 index 0000000000..c9ce473013 --- /dev/null +++ b/compactor2/src/components/partition_source/metrics.rs @@ -0,0 +1,121 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use data_types::{Partition, PartitionId}; +use metric::{Registry, U64Counter}; + +use super::PartitionSource; + +#[derive(Debug)] +pub struct MetricsPartitionSourceWrapper +where + T: PartitionSource, +{ + fetch_found_counter: U64Counter, + fetch_notfound_counter: U64Counter, + inner: T, +} + +impl MetricsPartitionSourceWrapper +where + T: PartitionSource, +{ + pub fn new(inner: T, registry: &Registry) -> Self { + let fetch_metric = registry.register_metric::( + "iox_compactor_partition_fetch_count", + "Number of times the compactor fetched information for a dedicated partition", + ); + let fetch_found_counter = fetch_metric.recorder(&[("result", "found")]); + let fetch_notfound_counter = fetch_metric.recorder(&[("result", "not_found")]); + + Self { + fetch_found_counter, + fetch_notfound_counter, + inner, + } + } +} + +impl Display for MetricsPartitionSourceWrapper +where + T: PartitionSource, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "metrics({})", self.inner) + } +} + +#[async_trait] +impl PartitionSource for MetricsPartitionSourceWrapper +where + T: PartitionSource, +{ + async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { + let res = self.inner.fetch_by_id(partition_id).await; + match res { + Some(_) => self.fetch_found_counter.inc(1), + None => self.fetch_notfound_counter.inc(1), + } + res + } +} + +#[cfg(test)] +mod tests { + use metric::{Attributes, Metric}; + + use crate::{ + components::partition_source::mock::MockPartitionSource, test_util::PartitionBuilder, + }; + + use super::*; + + #[test] + fn test_display() { + let registry = Registry::new(); + let source = + MetricsPartitionSourceWrapper::new(MockPartitionSource::new(vec![]), ®istry); + assert_eq!(source.to_string(), "metrics(mock)",); + } + + #[tokio::test] + async fn test_fetch_by_id() { + let registry = Registry::new(); + let p = PartitionBuilder::new(5).build(); + let source = MetricsPartitionSourceWrapper::new( + MockPartitionSource::new(vec![p.clone()]), + ®istry, + ); + + assert_eq!(fetch_found_counter(®istry), 0,); + assert_eq!(fetch_notfound_counter(®istry), 0,); + + assert_eq!( + source.fetch_by_id(PartitionId::new(5)).await, + Some(p.clone()) + ); + assert_eq!(source.fetch_by_id(PartitionId::new(5)).await, Some(p)); + assert_eq!(source.fetch_by_id(PartitionId::new(1)).await, None); + + assert_eq!(fetch_found_counter(®istry), 2,); + assert_eq!(fetch_notfound_counter(®istry), 1,); + } + + fn fetch_found_counter(registry: &Registry) -> u64 { + registry + .get_instrument::>("iox_compactor_partition_fetch_count") + .expect("instrument not found") + .get_observer(&Attributes::from(&[("result", "found")])) + .expect("observer not found") + .fetch() + } + + fn fetch_notfound_counter(registry: &Registry) -> u64 { + registry + .get_instrument::>("iox_compactor_partition_fetch_count") + .expect("instrument not found") + .get_observer(&Attributes::from(&[("result", "not_found")])) + .expect("observer not found") + .fetch() + } +} diff --git a/compactor2/src/components/partition_source/mock.rs b/compactor2/src/components/partition_source/mock.rs new file mode 100644 index 0000000000..40d7c35bf2 --- /dev/null +++ b/compactor2/src/components/partition_source/mock.rs @@ -0,0 +1,73 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use data_types::{Partition, PartitionId}; + +use super::PartitionSource; + +#[derive(Debug)] +pub struct MockPartitionSource { + partitions: Vec, +} + +impl MockPartitionSource { + #[allow(dead_code)] // not used anywhere + pub fn new(partitions: Vec) -> Self { + Self { partitions } + } +} + +impl Display for MockPartitionSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "mock") + } +} + +#[async_trait] +impl PartitionSource for MockPartitionSource { + async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { + self.partitions + .iter() + .find(|p| p.id == partition_id) + .cloned() + } +} + +#[cfg(test)] +mod tests { + use crate::test_util::PartitionBuilder; + + use super::*; + + #[test] + fn test_display() { + assert_eq!(MockPartitionSource::new(vec![]).to_string(), "mock",); + } + + #[tokio::test] + async fn test_fetch_by_id() { + let p_1 = PartitionBuilder::new(5).build(); + let p_2 = PartitionBuilder::new(1).build(); + let p_3 = PartitionBuilder::new(12).build(); + let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; + let source = MockPartitionSource::new(partitions); + + assert_eq!( + source.fetch_by_id(PartitionId::new(5)).await, + Some(p_1.clone()) + ); + assert_eq!( + source.fetch_by_id(PartitionId::new(1)).await, + Some(p_2.clone()) + ); + + // fetching does not drain + assert_eq!( + source.fetch_by_id(PartitionId::new(5)).await, + Some(p_1.clone()) + ); + + // unknown table => None result + assert_eq!(source.fetch_by_id(PartitionId::new(3)).await, None,); + } +} diff --git a/compactor2/src/components/partition_source/mod.rs b/compactor2/src/components/partition_source/mod.rs new file mode 100644 index 0000000000..c06c683e67 --- /dev/null +++ b/compactor2/src/components/partition_source/mod.rs @@ -0,0 +1,18 @@ +use std::fmt::{Debug, Display}; + +use async_trait::async_trait; +use data_types::{Partition, PartitionId}; + +pub mod catalog; +pub mod logging; +pub mod metrics; +pub mod mock; + +/// A source of [partition](Partition) that may potentially need compacting. +#[async_trait] +pub trait PartitionSource: Debug + Display + Send + Sync { + /// Get partition for a given partition ID. + /// + /// This method performs retries. + async fn fetch_by_id(&self, partition_id: PartitionId) -> Option; +} diff --git a/compactor2/src/components/partitions_source/catalog.rs b/compactor2/src/components/partitions_source/catalog.rs index 15eb09b1df..d22be27481 100644 --- a/compactor2/src/components/partitions_source/catalog.rs +++ b/compactor2/src/components/partitions_source/catalog.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc, time::Duration}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; use iox_catalog::interface::Catalog; use iox_time::TimeProvider; @@ -55,18 +55,4 @@ impl PartitionsSource for CatalogPartitionsSource { .await .expect("retry forever") } - - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { - Backoff::new(&self.backoff_config) - .retry_all_errors("partition_by_id", || async { - self.catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await - }) - .await - .expect("retry forever") - } } diff --git a/compactor2/src/components/partitions_source/logging.rs b/compactor2/src/components/partitions_source/logging.rs index 53f2658a1b..9ca0572a87 100644 --- a/compactor2/src/components/partitions_source/logging.rs +++ b/compactor2/src/components/partitions_source/logging.rs @@ -1,7 +1,7 @@ use std::fmt::Display; use async_trait::async_trait; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; use observability_deps::tracing::{info, warn}; use super::PartitionsSource; @@ -45,24 +45,13 @@ where } partitions } - - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { - let partition = self.inner.fetch_by_id(partition_id).await; - info!(%partition_id, "Fetch a partition",); - if partition.is_none() { - warn!(%partition_id, "Partition not found",); - } - partition - } } #[cfg(test)] mod tests { use test_helpers::tracing::TracingCapture; - use crate::{ - components::partitions_source::mock::MockPartitionsSource, test_util::PartitionBuilder, - }; + use crate::components::partitions_source::mock::MockPartitionsSource; use super::*; @@ -86,22 +75,15 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = PartitionBuilder::new(5).build(); - let p_2 = PartitionBuilder::new(1).build(); - let p_3 = PartitionBuilder::new(12).build(); - let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; + let p_1 = PartitionId::new(5); + let p_2 = PartitionId::new(1); + let p_3 = PartitionId::new(12); + let partitions = vec![p_1, p_2, p_3]; let source = LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(partitions.clone())); let capture = TracingCapture::new(); - assert_eq!( - source.fetch().await, - vec![ - PartitionId::new(5), - PartitionId::new(1), - PartitionId::new(12), - ] - ); + assert_eq!(source.fetch().await, partitions,); // just the ordinary log message, no warning assert_eq!( capture.to_string(), diff --git a/compactor2/src/components/partitions_source/metrics.rs b/compactor2/src/components/partitions_source/metrics.rs index 4c7e30d790..9a607bf762 100644 --- a/compactor2/src/components/partitions_source/metrics.rs +++ b/compactor2/src/components/partitions_source/metrics.rs @@ -1,7 +1,7 @@ use std::fmt::Display; use async_trait::async_trait; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; use metric::{Registry, U64Counter}; use super::PartitionsSource; @@ -13,8 +13,6 @@ where { partitions_fetch_counter: U64Counter, partitions_counter: U64Counter, - fetch_found_counter: U64Counter, - fetch_notfound_counter: U64Counter, inner: T, } @@ -36,18 +34,9 @@ where ) .recorder(&[]); - let fetch_metric = registry.register_metric::( - "iox_compactor_partition_fetch_count", - "Number of times the compactor fetched information for a dedicated partition", - ); - let fetch_found_counter = fetch_metric.recorder(&[("result", "found")]); - let fetch_notfound_counter = fetch_metric.recorder(&[("result", "not_found")]); - Self { partitions_fetch_counter, partitions_counter, - fetch_found_counter, - fetch_notfound_counter, inner, } } @@ -73,24 +62,13 @@ where self.partitions_counter.inc(partitions.len() as u64); partitions } - - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { - let res = self.inner.fetch_by_id(partition_id).await; - match res { - Some(_) => self.fetch_found_counter.inc(1), - None => self.fetch_notfound_counter.inc(1), - } - res - } } #[cfg(test)] mod tests { use metric::{Attributes, Metric}; - use crate::{ - components::partitions_source::mock::MockPartitionsSource, test_util::PartitionBuilder, - }; + use crate::components::partitions_source::mock::MockPartitionsSource; use super::*; @@ -106,9 +84,9 @@ mod tests { async fn test_fetch() { let registry = Registry::new(); let partitions = vec![ - PartitionBuilder::new(5).build(), - PartitionBuilder::new(1).build(), - PartitionBuilder::new(12).build(), + PartitionId::new(5), + PartitionId::new(1), + PartitionId::new(12), ]; let source = MetricsPartitionsSourceWrapper::new( MockPartitionsSource::new(partitions.clone()), @@ -118,36 +96,12 @@ mod tests { assert_eq!(fetch_counter(®istry), 0,); assert_eq!(partition_counter(®istry), 0,); - let ids = partitions.iter().map(|p| p.id).collect::>(); - assert_eq!(source.fetch().await, ids,); + assert_eq!(source.fetch().await, partitions); assert_eq!(fetch_counter(®istry), 1,); assert_eq!(partition_counter(®istry), 3,); } - #[tokio::test] - async fn test_fetch_by_id() { - let registry = Registry::new(); - let p = PartitionBuilder::new(5).build(); - let source = MetricsPartitionsSourceWrapper::new( - MockPartitionsSource::new(vec![p.clone()]), - ®istry, - ); - - assert_eq!(fetch_found_counter(®istry), 0,); - assert_eq!(fetch_notfound_counter(®istry), 0,); - - assert_eq!( - source.fetch_by_id(PartitionId::new(5)).await, - Some(p.clone()) - ); - assert_eq!(source.fetch_by_id(PartitionId::new(5)).await, Some(p)); - assert_eq!(source.fetch_by_id(PartitionId::new(1)).await, None); - - assert_eq!(fetch_found_counter(®istry), 2,); - assert_eq!(fetch_notfound_counter(®istry), 1,); - } - fn fetch_counter(registry: &Registry) -> u64 { registry .get_instrument::>("iox_compactor_partitions_fetch_count") @@ -165,22 +119,4 @@ mod tests { .expect("observer not found") .fetch() } - - fn fetch_found_counter(registry: &Registry) -> u64 { - registry - .get_instrument::>("iox_compactor_partition_fetch_count") - .expect("instrument not found") - .get_observer(&Attributes::from(&[("result", "found")])) - .expect("observer not found") - .fetch() - } - - fn fetch_notfound_counter(registry: &Registry) -> u64 { - registry - .get_instrument::>("iox_compactor_partition_fetch_count") - .expect("instrument not found") - .get_observer(&Attributes::from(&[("result", "not_found")])) - .expect("observer not found") - .fetch() - } } diff --git a/compactor2/src/components/partitions_source/mock.rs b/compactor2/src/components/partitions_source/mock.rs index b8cb9d0a83..e58ccd69d1 100644 --- a/compactor2/src/components/partitions_source/mock.rs +++ b/compactor2/src/components/partitions_source/mock.rs @@ -1,18 +1,18 @@ use std::fmt::Display; use async_trait::async_trait; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; use super::PartitionsSource; #[derive(Debug)] pub struct MockPartitionsSource { - partitions: Vec, + partitions: Vec, } impl MockPartitionsSource { #[allow(dead_code)] // not used anywhere - pub fn new(partitions: Vec) -> Self { + pub fn new(partitions: Vec) -> Self { Self { partitions } } } @@ -26,21 +26,12 @@ impl Display for MockPartitionsSource { #[async_trait] impl PartitionsSource for MockPartitionsSource { async fn fetch(&self) -> Vec { - self.partitions.iter().map(|p| p.id).collect() - } - - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { - self.partitions - .iter() - .find(|p| p.id == partition_id) - .cloned() + self.partitions.clone() } } #[cfg(test)] mod tests { - use crate::test_util::PartitionBuilder; - use super::*; #[test] @@ -55,40 +46,13 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = PartitionBuilder::new(5).build(); - let p_2 = PartitionBuilder::new(1).build(); - let p_3 = PartitionBuilder::new(12).build(); - let parts = vec![p_1.clone(), p_2.clone(), p_3.clone()]; + let p_1 = PartitionId::new(5); + let p_2 = PartitionId::new(1); + let p_3 = PartitionId::new(12); + let parts = vec![p_1, p_2, p_3]; assert_eq!( MockPartitionsSource::new(parts.clone()).fetch().await, - [p_1.id, p_2.id, p_3.id] + parts, ); } - - #[tokio::test] - async fn test_fetch_by_id() { - let p_1 = PartitionBuilder::new(5).build(); - let p_2 = PartitionBuilder::new(1).build(); - let p_3 = PartitionBuilder::new(12).build(); - let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; - let source = MockPartitionsSource::new(partitions); - - assert_eq!( - source.fetch_by_id(PartitionId::new(5)).await, - Some(p_1.clone()) - ); - assert_eq!( - source.fetch_by_id(PartitionId::new(1)).await, - Some(p_2.clone()) - ); - - // fetching does not drain - assert_eq!( - source.fetch_by_id(PartitionId::new(5)).await, - Some(p_1.clone()) - ); - - // unknown table => None result - assert_eq!(source.fetch_by_id(PartitionId::new(3)).await, None,); - } } diff --git a/compactor2/src/components/partitions_source/mod.rs b/compactor2/src/components/partitions_source/mod.rs index c60003a612..414bf5842b 100644 --- a/compactor2/src/components/partitions_source/mod.rs +++ b/compactor2/src/components/partitions_source/mod.rs @@ -1,7 +1,10 @@ -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; pub mod catalog; pub mod logging; @@ -18,9 +21,14 @@ pub trait PartitionsSource: Debug + Display + Send + Sync { /// /// This should only perform basic, efficient filtering. It MUST NOT inspect individual parquet files. async fn fetch(&self) -> Vec; - - /// Get partition for a given partition ID. - /// - /// This method performs retries. - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option; +} + +#[async_trait] +impl PartitionsSource for Arc +where + T: PartitionsSource + ?Sized, +{ + async fn fetch(&self) -> Vec { + self.as_ref().fetch().await + } } diff --git a/compactor2/src/components/partitions_source/randomize_order.rs b/compactor2/src/components/partitions_source/randomize_order.rs index 6fc04e6753..75db600297 100644 --- a/compactor2/src/components/partitions_source/randomize_order.rs +++ b/compactor2/src/components/partitions_source/randomize_order.rs @@ -1,7 +1,7 @@ use std::fmt::Display; use async_trait::async_trait; -use data_types::{Partition, PartitionId}; +use data_types::PartitionId; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; use super::PartitionsSource; @@ -44,19 +44,11 @@ where partitions.shuffle(&mut rng); partitions } - - // TODO: nothing randomized here and maybe should have different trait for this? - async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { - let partition = self.inner.fetch_by_id(partition_id).await; - partition - } } #[cfg(test)] mod tests { - use crate::{ - components::partitions_source::mock::MockPartitionsSource, test_util::PartitionBuilder, - }; + use crate::components::partitions_source::mock::MockPartitionsSource; use super::*; @@ -76,35 +68,21 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = PartitionBuilder::new(5).build(); - let p_2 = PartitionBuilder::new(1).build(); - let p_3 = PartitionBuilder::new(12).build(); - let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; + let p_1 = PartitionId::new(5); + let p_2 = PartitionId::new(1); + let p_3 = PartitionId::new(12); + let partitions = vec![p_1, p_2, p_3]; // shuffles let source = RandomizeOrderPartitionsSourcesWrapper::new( MockPartitionsSource::new(partitions.clone()), 123, ); - assert_eq!( - source.fetch().await, - vec![ - PartitionId::new(12), - PartitionId::new(1), - PartitionId::new(5), - ], - ); + assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); // is deterministic in same source for _ in 0..100 { - assert_eq!( - source.fetch().await, - vec![ - PartitionId::new(12), - PartitionId::new(1), - PartitionId::new(5), - ], - ); + assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); } // is deterministic with new source @@ -113,14 +91,7 @@ mod tests { MockPartitionsSource::new(partitions.clone()), 123, ); - assert_eq!( - source.fetch().await, - vec![ - PartitionId::new(12), - PartitionId::new(1), - PartitionId::new(5), - ], - ); + assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); } // different seed => different output @@ -128,13 +99,6 @@ mod tests { MockPartitionsSource::new(partitions.clone()), 1234, ); - assert_eq!( - source.fetch().await, - vec![ - PartitionId::new(1), - PartitionId::new(12), - PartitionId::new(5), - ], - ); + assert_eq!(source.fetch().await, vec![p_2, p_3, p_1,],); } } diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index ad0e9285ae..114a73cd3c 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -9,6 +9,7 @@ pub fn log_components(components: &Components) { // use struct unpack so we don't forget any members let Components { partitions_source, + partition_source, partition_files_source, files_filter, partition_filter, @@ -26,6 +27,7 @@ pub fn log_components(components: &Components) { info!( %partitions_source, + %partition_source, %partition_files_source, %files_filter, %partition_filter, diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index c18291d863..4984105e75 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -206,7 +206,7 @@ async fn fetch_partition_info( // TODO: only read partition, table and its schema info the first time and cache them // Get info for the partition let partition = components - .partitions_source + .partition_source .fetch_by_id(partition_id) .await .ok_or_else::(|| String::from("Cannot find partition info").into())?;