diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 3e634bfc6f..a2514d7965 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -884,10 +884,6 @@ impl IngesterPartition { self.ingester_uuid } - pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id - } - pub(crate) fn completed_persistence_count(&self) -> u64 { self.completed_persistence_count } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index e101312f69..1c8891d4ef 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -278,7 +278,6 @@ impl ChunkAdapter { order, sort_key: Some(sort_key), partition_id: parquet_file.partition_id, - compaction_level: parquet_file.compaction_level, }); let parquet_chunk = Arc::new(ParquetChunk::new( diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 6338323eff..2e3dbeebec 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -1,6 +1,6 @@ //! Querier Chunks -use data_types::{ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId}; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; use datafusion::physical_plan::Statistics; use parquet_file::chunk::ParquetChunk; use schema::sort::SortKey; @@ -27,9 +27,6 @@ pub struct QuerierParquetChunkMeta { /// Partition ID. partition_id: PartitionId, - - /// Compaction level of the parquet file of the chunk - compaction_level: CompactionLevel, } impl QuerierParquetChunkMeta { @@ -47,11 +44,6 @@ impl QuerierParquetChunkMeta { pub fn partition_id(&self) -> PartitionId { self.partition_id } - - /// Compaction level of the parquet file of the chunk - pub fn compaction_level(&self) -> CompactionLevel { - self.compaction_level - } } #[derive(Debug)] diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index d8c455f51b..35243c34e1 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -1,4 +1,4 @@ -use self::{query_access::QuerierTableChunkPruner, state_reconciler::Reconciler}; +use self::query_access::QuerierTableChunkPruner; use crate::{ ingester::{self, IngesterPartition}, parquet::ChunkAdapter, @@ -26,7 +26,6 @@ pub use self::query_access::metrics::PruneMetrics; pub(crate) use self::query_access::MetricPruningObserver; mod query_access; -mod state_reconciler; #[cfg(test)] mod test_util; @@ -37,11 +36,6 @@ pub enum Error { #[snafu(display("Error getting partitions from ingester: {}", source))] GettingIngesterPartitions { source: ingester::Error }, - #[snafu(display("Cannot combine ingester data with catalog/cache: {}", source))] - StateFusion { - source: state_reconciler::ReconcileError, - }, - #[snafu(display("Chunk pruning failed: {}", source))] ChunkPruning { source: provider::Error }, } @@ -291,11 +285,6 @@ impl QuerierTable { return Ok(vec![]); }; - let reconciler = Reconciler::new( - Arc::clone(&self.table_name), - Arc::clone(&self.namespace_name), - ); - // create parquet files let parquet_files = self .chunk_adapter @@ -308,15 +297,25 @@ impl QuerierTable { ) .await; - let chunks = reconciler - .reconcile( - partitions, - retention_delete_pred, - parquet_files, - span_recorder.child_span("reconcile"), - ) - .await - .context(StateFusionSnafu)?; + // build final chunk list + let chunks = partitions + .into_iter() + .flat_map(|c| { + let c = match &retention_delete_pred { + Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]), + None => c, + }; + c.into_chunks().into_iter() + }) + .map(|c| Arc::new(c) as Arc) + .chain(parquet_files.into_iter().map(|c| { + let c = match &retention_delete_pred { + Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]), + None => c, + }; + Arc::new(c) as Arc + })) + .collect::>(); trace!("Fetched chunks"); let num_initial_chunks = chunks.len(); diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs deleted file mode 100644 index 221b5a9ae2..0000000000 --- a/querier/src/table/state_reconciler.rs +++ /dev/null @@ -1,123 +0,0 @@ -//! Logic to reconcile the catalog and ingester state - -mod interface; - -use data_types::DeletePredicate; -use iox_query::QueryChunk; -use observability_deps::tracing::debug; -use snafu::Snafu; -use std::sync::Arc; -use trace::span::{Span, SpanRecorder}; - -use crate::{parquet::QuerierParquetChunk, IngesterPartition}; - -#[derive(Snafu, Debug)] -#[allow(missing_copy_implementations)] -pub enum ReconcileError { - #[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))] - CompactorConflict, -} - -/// Handles reconciling catalog and ingester state. -#[derive(Debug)] -pub struct Reconciler { - table_name: Arc, - namespace_name: Arc, -} - -impl Reconciler { - pub(crate) fn new(table_name: Arc, namespace_name: Arc) -> Self { - Self { - table_name, - namespace_name, - } - } - - /// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files), - /// producing a list of chunks to query - pub(crate) async fn reconcile( - &self, - ingester_partitions: Vec, - retention_delete_pred: Option>, - parquet_files: Vec, - span: Option, - ) -> Result>, ReconcileError> { - let span_recorder = SpanRecorder::new(span); - let mut chunks = self - .build_chunks_from_parquet( - &ingester_partitions, - retention_delete_pred.clone(), - parquet_files, - span_recorder.child_span("build_chunks_from_parquet"), - ) - .await?; - chunks.extend(self.build_ingester_chunks(ingester_partitions, retention_delete_pred)); - debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); - - Ok(chunks) - } - - async fn build_chunks_from_parquet( - &self, - ingester_partitions: &[IngesterPartition], - retention_delete_pred: Option>, - parquet_files: Vec, - _span: Option, - ) -> Result>, ReconcileError> { - debug!( - namespace=%self.namespace_name(), - table_name=%self.table_name(), - num_parquet_files=parquet_files.len(), - "Reconciling " - ); - - debug!(num_chunks=%parquet_files.len(), "Created chunks from parquet files"); - - let mut chunks: Vec> = - Vec::with_capacity(parquet_files.len() + ingester_partitions.len()); - - let retention_expr_len = usize::from(retention_delete_pred.is_some()); - for chunk in parquet_files.into_iter() { - let mut delete_predicates = Vec::with_capacity(retention_expr_len); - - if let Some(retention_delete_pred) = retention_delete_pred.clone() { - delete_predicates.push(retention_delete_pred); - } - - let chunk = chunk.with_delete_predicates(delete_predicates); - - chunks.push(Arc::new(chunk)); - } - - Ok(chunks) - } - - fn build_ingester_chunks( - &self, - ingester_partitions: Vec, - retention_delete_pred: Option>, - ) -> impl Iterator> { - // Add ingester chunks to the overall chunk list. - // - filter out chunks that don't have any record batches - ingester_partitions - .into_iter() - .flat_map(move |c| { - let c = match &retention_delete_pred { - Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]), - None => c, - }; - c.into_chunks().into_iter() - }) - .map(|c| Arc::new(c) as Arc) - } - - #[must_use] - pub fn table_name(&self) -> &str { - self.table_name.as_ref() - } - - #[must_use] - pub fn namespace_name(&self) -> &str { - self.namespace_name.as_ref() - } -} diff --git a/querier/src/table/state_reconciler/interface.rs b/querier/src/table/state_reconciler/interface.rs deleted file mode 100644 index 5c7b4ed8e4..0000000000 --- a/querier/src/table/state_reconciler/interface.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! Interface for reconciling Ingester and catalog state - -use crate::{ingester::IngesterPartition, parquet::QuerierParquetChunk}; -use data_types::{CompactionLevel, ParquetFile, PartitionId}; -use std::{ops::Deref, sync::Arc}; - -/// Information about an ingester partition. -/// -/// This is mostly the same as [`IngesterPartition`] but allows easier mocking. -pub trait IngesterPartitionInfo { - fn partition_id(&self) -> PartitionId; -} - -impl IngesterPartitionInfo for IngesterPartition { - fn partition_id(&self) -> PartitionId { - self.deref().partition_id() - } -} - -impl IngesterPartitionInfo for Arc -where - T: IngesterPartitionInfo, -{ - fn partition_id(&self) -> PartitionId { - self.deref().partition_id() - } -} - -/// Information about a parquet file. -/// -/// This is mostly the same as [`ParquetFile`] but allows easier mocking. -pub trait ParquetFileInfo { - fn partition_id(&self) -> PartitionId; - fn compaction_level(&self) -> CompactionLevel; -} - -impl ParquetFileInfo for Arc { - fn partition_id(&self) -> PartitionId { - self.partition_id - } - - fn compaction_level(&self) -> CompactionLevel { - self.compaction_level - } -} - -impl ParquetFileInfo for QuerierParquetChunk { - fn partition_id(&self) -> PartitionId { - self.meta().partition_id() - } - - fn compaction_level(&self) -> CompactionLevel { - self.meta().compaction_level() - } -}