From a18a49736ddce1852aa3345009023c3dfd6f7a05 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 19 May 2022 15:25:36 -0400 Subject: [PATCH] refactor: Encapsulate reconciliation logic more (#4644) * refactor: extract code from state_reconciler * refactor: Encapsulate reconcilation logic more * fix: docs --- querier/src/table/mod.rs | 149 ++-------- querier/src/table/query_access.rs | 2 +- querier/src/table/state_reconciler.rs | 261 ++++++++++++------ .../src/table/state_reconciler/interface.rs | 81 ++++++ 4 files changed, 283 insertions(+), 210 deletions(-) create mode 100644 querier/src/table/state_reconciler/interface.rs diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 7933292577..3a7e358a46 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -1,9 +1,8 @@ use self::query_access::QuerierTableChunkPruner; +use self::state_reconciler::Reconciler; use crate::{ chunk::ParquetChunkAdapter, ingester::{self, IngesterPartition}, - table::state_reconciler::{filter_parquet_files, tombstone_exclude_list}, - tombstone::QuerierTombstone, IngesterConnection, }; use backoff::{Backoff, BackoffConfig}; @@ -44,7 +43,7 @@ pub enum Error { #[snafu(display("Cannot combine ingester data with catalog/cache: {}", source))] StateFusion { - source: state_reconciler::FilterParquetError, + source: state_reconciler::ReconcileError, }, } @@ -60,7 +59,7 @@ pub struct QuerierTable { backoff_config: BackoffConfig, /// Table name. - name: Arc, + table_name: Arc, /// Table ID. id: TableId, @@ -73,6 +72,9 @@ pub struct QuerierTable { /// Interface to create chunks for this table. chunk_adapter: Arc, + + /// Handle reconciling ingester and catalog data + reconciler: Reconciler, } impl QuerierTable { @@ -81,25 +83,32 @@ impl QuerierTable { namespace_name: Arc, backoff_config: BackoffConfig, id: TableId, - name: Arc, + table_name: Arc, schema: Arc, ingester_connection: Arc, chunk_adapter: Arc, ) -> Self { + let reconciler = Reconciler::new( + Arc::clone(&table_name), + Arc::clone(&namespace_name), + Arc::clone(&chunk_adapter), + ); + Self { namespace_name, backoff_config, - name, + table_name, id, schema, ingester_connection, chunk_adapter, + reconciler, } } /// Table name. - pub fn name(&self) -> &Arc { - &self.name + pub fn table_name(&self) -> &Arc { + &self.table_name } /// Table ID. @@ -117,14 +126,14 @@ impl QuerierTable { /// /// This currently contains all parquet files linked to their unprocessed tombstones. pub async fn chunks(&self, predicate: &Predicate) -> Result>> { - debug!(?predicate, namespace=%self.namespace_name, table_name=%self.name(), "Fetching all chunks"); + debug!(?predicate, namespace=%self.namespace_name, table_name=%self.table_name(), "Fetching all chunks"); // ask ingesters for data let ingester_partitions = self.ingester_partitions(predicate).await?; debug!( namespace=%self.namespace_name, - table_name=%self.name(), + table_name=%self.table_name(), num_ingester_partitions=%ingester_partitions.len(), "Ingester partitions fetched" ); @@ -146,7 +155,7 @@ impl QuerierTable { debug!( ?parquet_files, namespace=%self.namespace_name, - table_name=%self.name(), + table_name=%self.table_name(), "Parquet files from catalog" ); @@ -160,118 +169,10 @@ impl QuerierTable { .await .expect("retry forever"); - // fuse ingester and catalog state - let parquet_files = - filter_parquet_files(&ingester_partitions, parquet_files).context(StateFusionSnafu)?; - debug!( - ?parquet_files, - namespace=%self.namespace_name, - table_name=%self.name(), - "Parquet files after filtering" - ); - let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones); - - // convert parquet files and tombstones to nicer objects - let mut chunks = Vec::with_capacity(parquet_files.len()); - for parquet_file_with_metadata in parquet_files { - if let Some(chunk) = self - .chunk_adapter - .new_querier_chunk(parquet_file_with_metadata) - .await - { - chunks.push(chunk); - } - } - debug!(num_chunks=%chunks.len(), "Querier chunks"); - let querier_tombstones: Vec<_> = - tombstones.into_iter().map(QuerierTombstone::from).collect(); - - // match chunks and tombstones - let mut tombstones_by_sequencer: HashMap<_, Vec<_>> = HashMap::new(); - for tombstone in querier_tombstones { - tombstones_by_sequencer - .entry(tombstone.sequencer_id()) - .or_default() - .push(tombstone); - } - let mut chunks2 = Vec::with_capacity(chunks.len()); - for chunk in chunks.into_iter() { - let chunk = if let Some(tombstones) = - tombstones_by_sequencer.get(&chunk.meta().sequencer_id()) - { - let mut delete_predicates = Vec::with_capacity(tombstones.len()); - for tombstone in tombstones { - // check conditions that don't need catalog access first to avoid unnecessary catalog load - - // Check if tombstone should be excluded based on the ingester response - if tombstone_exclusion - .contains(&(chunk.meta().partition_id(), tombstone.tombstone_id())) - { - continue; - } - - // Check if tombstone even applies to the sequence number range within the parquet file. There - // are the following cases here: - // - // 1. Tombstone comes before chunk min sequencer number: - // There is no way the tombstone can affect the chunk. - // 2. Tombstone comes after chunk max sequencer number: - // Tombstone affects whole chunk (it might be marked as processed though, we'll check that - // further down). - // 3. Tombstone is in the min-max sequencer number range of the chunk: - // Technically the querier has NO way to determine the rows that are affected by the tombstone - // since we have no row-level sequence numbers. Such a file can be created by two sources -- the - // ingester and the compactor. The ingester must have materialized the tombstone while creating - // the parquet file, so the querier can skip it. The compactor also materialized the tombstones, - // so we can skip it as well. In the compactor case the tombstone will even be marked as - // processed. - // - // So the querier only needs to consider the tombstone in case 2. - if tombstone.sequence_number() <= chunk.meta().max_sequence_number() { - continue; - } - - // TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086) - - // check if tombstone is marked as processed - if self - .chunk_adapter - .catalog_cache() - .processed_tombstones() - .exists( - chunk - .parquet_file_id() - .expect("just created from a parquet file"), - tombstone.tombstone_id(), - ) - .await - { - continue; - } - - delete_predicates.push(Arc::clone(tombstone.delete_predicate())); - } - chunk.with_delete_predicates(delete_predicates) - } else { - chunk - }; - - chunks2.push(Arc::new(chunk) as Arc); - } - - // Add ingester chunks to the overall chunk list. - // - filter out chunks that don't have any record batches - // - tombstones don't need to be applied since they were already materialized by the ingester - chunks2.extend( - ingester_partitions - .into_iter() - .filter(|c| c.has_batches()) - .map(|c| c as _), - ); - - debug!(num_chunks2=%chunks2.len(), "Chunks 2"); - - Ok(chunks2) + self.reconciler + .reconcile(ingester_partitions, tombstones, parquet_files) + .await + .context(StateFusionSnafu) } /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) @@ -301,7 +202,7 @@ impl QuerierTable { .ingester_connection .partitions( Arc::clone(&self.namespace_name), - Arc::clone(&self.name), + Arc::clone(&self.table_name), columns, predicate, Arc::clone(&self.schema), diff --git a/querier/src/table/query_access.rs b/querier/src/table/query_access.rs index ed68fa8e52..087068be8a 100644 --- a/querier/src/table/query_access.rs +++ b/querier/src/table/query_access.rs @@ -41,7 +41,7 @@ impl TableProvider for QuerierTable { ) -> Result, DataFusionError> { // build provider out of all chunks // TODO: push down some predicates to catalog - let mut builder = ProviderBuilder::new(self.name(), Arc::clone(self.schema())); + let mut builder = ProviderBuilder::new(self.table_name(), Arc::clone(self.schema())); builder = builder.add_pruner(self.chunk_pruner()); let predicate = filters diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index c9bbe2e152..b83f3fbae0 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -1,4 +1,4 @@ -//! Logic to reconcile the state that the querier got the from catalog and from the ingester. +//! Logic to reconcile the catalog and ingester state //! //! # Usage //! @@ -11,98 +11,188 @@ //! 3. **Pruning:** Call [`filter_parquet_files`] and [`tombstone_exclude_list`] to filter out //! files and tombstones that are too new (i.e. were created between step 1 and 2). -use crate::ingester::IngesterPartition; -use data_types::{ - ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId, -}; +mod interface; + +use data_types::{ParquetFileWithMetadata, PartitionId, SequencerId, Tombstone, TombstoneId}; +use iox_query::QueryChunk; use observability_deps::tracing::debug; use snafu::Snafu; use std::{ collections::{HashMap, HashSet}, - ops::Deref, sync::Arc, }; -/// Information about an ingester partition. -/// -/// This is mostly the same as [`IngesterPartition`] but allows easier mocking. -pub trait IngesterPartitionInfo { - fn partition_id(&self) -> PartitionId; - fn sequencer_id(&self) -> SequencerId; - fn parquet_max_sequence_number(&self) -> Option; - fn tombstone_max_sequence_number(&self) -> Option; -} +use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone, IngesterPartition}; -impl IngesterPartitionInfo for Arc { - fn partition_id(&self) -> PartitionId { - self.deref().partition_id() - } - - fn sequencer_id(&self) -> SequencerId { - self.deref().sequencer_id() - } - - fn parquet_max_sequence_number(&self) -> Option { - self.deref().parquet_max_sequence_number() - } - - fn tombstone_max_sequence_number(&self) -> Option { - self.deref().tombstone_max_sequence_number() - } -} - -/// Information about a parquet file. -/// -/// This is mostly the same as [`ParquetFileWithMetadata`] but allows easier mocking. -pub trait ParquetFileInfo { - fn partition_id(&self) -> PartitionId; - fn min_sequence_number(&self) -> SequenceNumber; - fn max_sequence_number(&self) -> SequenceNumber; -} - -impl ParquetFileInfo for ParquetFileWithMetadata { - fn partition_id(&self) -> PartitionId { - self.partition_id - } - - fn min_sequence_number(&self) -> SequenceNumber { - self.min_sequence_number - } - - fn max_sequence_number(&self) -> SequenceNumber { - self.max_sequence_number - } -} - -/// Information about a tombstone. -/// -/// This is mostly the same as [`Tombstone`] but allows easier mocking. -pub trait TombstoneInfo { - fn id(&self) -> TombstoneId; - fn sequencer_id(&self) -> SequencerId; - fn sequence_number(&self) -> SequenceNumber; -} - -impl TombstoneInfo for Tombstone { - fn id(&self) -> TombstoneId { - self.id - } - - fn sequencer_id(&self) -> SequencerId { - self.sequencer_id - } - - fn sequence_number(&self) -> SequenceNumber { - self.sequence_number - } -} +use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo}; #[derive(Snafu, Debug)] -pub enum FilterParquetError { +pub enum ReconcileError { #[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))] CompactorConflict, } +/// Handles reconciling catalog and ingester state. +#[derive(Debug)] +pub struct Reconciler { + table_name: Arc, + namespace_name: Arc, + chunk_adapter: Arc, +} + +impl Reconciler { + pub(crate) fn new( + table_name: Arc, + namespace_name: Arc, + chunk_adapter: Arc, + ) -> Self { + Self { + table_name, + namespace_name, + chunk_adapter, + } + } + + /// Reconciles ingester state (ingester_partitions) and catalog + /// state (parquet_files and tombstones), producing a list of + /// chunks to query + pub(crate) async fn reconcile( + &self, + ingester_partitions: Vec>, + tombstones: Vec, + parquet_files: Vec, + ) -> Result>, ReconcileError> { + let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones); + + let querier_tombstones: Vec<_> = + tombstones.into_iter().map(QuerierTombstone::from).collect(); + + // match chunks and tombstones + let mut tombstones_by_sequencer: HashMap> = + HashMap::new(); + + for tombstone in querier_tombstones { + tombstones_by_sequencer + .entry(tombstone.sequencer_id()) + .or_default() + .push(tombstone); + } + + // + let parquet_files = filter_parquet_files(&ingester_partitions, parquet_files)?; + + debug!( + ?parquet_files, + namespace=%self.namespace_name(), + table_name=%self.table_name(), + "Parquet files after filtering" + ); + + // convert parquet files and tombstones into QuerierChunks + let mut parquet_chunks = Vec::with_capacity(parquet_files.len()); + for parquet_file_with_metadata in parquet_files { + if let Some(chunk) = self + .chunk_adapter + .new_querier_chunk(parquet_file_with_metadata) + .await + { + parquet_chunks.push(chunk); + } + } + debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks"); + + let mut chunks: Vec> = + Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len()); + + for chunk in parquet_chunks.into_iter() { + let chunk = if let Some(tombstones) = + tombstones_by_sequencer.get(&chunk.meta().sequencer_id()) + { + let mut delete_predicates = Vec::with_capacity(tombstones.len()); + for tombstone in tombstones { + // check conditions that don't need catalog access first to avoid unnecessary catalog load + + // Check if tombstone should be excluded based on the ingester response + if tombstone_exclusion + .contains(&(chunk.meta().partition_id(), tombstone.tombstone_id())) + { + continue; + } + + // Check if tombstone even applies to the sequence number range within the parquet file. There + // are the following cases here: + // + // 1. Tombstone comes before chunk min sequencer number: + // There is no way the tombstone can affect the chunk. + // 2. Tombstone comes after chunk max sequencer number: + // Tombstone affects whole chunk (it might be marked as processed though, we'll check that + // further down). + // 3. Tombstone is in the min-max sequencer number range of the chunk: + // Technically the querier has NO way to determine the rows that are affected by the tombstone + // since we have no row-level sequence numbers. Such a file can be created by two sources -- the + // ingester and the compactor. The ingester must have materialized the tombstone while creating + // the parquet file, so the querier can skip it. The compactor also materialized the tombstones, + // so we can skip it as well. In the compactor case the tombstone will even be marked as + // processed. + // + // So the querier only needs to consider the tombstone in case 2. + if tombstone.sequence_number() <= chunk.meta().max_sequence_number() { + continue; + } + + // TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086) + + // check if tombstone is marked as processed + if self + .chunk_adapter + .catalog_cache() + .processed_tombstones() + .exists( + chunk + .parquet_file_id() + .expect("just created from a parquet file"), + tombstone.tombstone_id(), + ) + .await + { + continue; + } + + delete_predicates.push(Arc::clone(tombstone.delete_predicate())); + } + chunk.with_delete_predicates(delete_predicates) + } else { + chunk + }; + + chunks.push(Arc::new(chunk) as Arc); + } + + // Add ingester chunks to the overall chunk list. + // - filter out chunks that don't have any record batches + // - tombstones don't need to be applied since they were already materialized by the ingester + let ingester_chunks = ingester_partitions + .into_iter() + .filter(|c| c.has_batches()) + .map(|c| c as Arc); + + chunks.extend(ingester_chunks); + debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); + + Ok(chunks) + } + + #[must_use] + pub fn table_name(&self) -> &str { + self.table_name.as_ref() + } + + #[must_use] + pub fn namespace_name(&self) -> &str { + self.namespace_name.as_ref() + } +} + /// Filter out parquet files that contain "too new" data. /// /// The caller may only use the returned parquet files. @@ -114,11 +204,11 @@ pub enum FilterParquetError { /// Note that the querier (and this method) do NOT care about the actual age of the parquet files, since the compactor /// is free to to process files at any given moment (e.g. to combine them or to materialize tombstones). However if the /// compactor combines files in a way that the querier would need to split it into "desired" data and "too new" data -/// then we will currently bail out with [`FilterParquetError::CompactorConflict`]. -pub fn filter_parquet_files( +/// then we will currently bail out with [`ReconcileError`]. +fn filter_parquet_files( ingester_partitions: &[I], parquet_files: Vec

, -) -> Result, FilterParquetError> +) -> Result, ReconcileError> where I: IngesterPartitionInfo, P: ParquetFileInfo, @@ -148,7 +238,7 @@ where if (file.max_sequence_number() > persisted_max) && (file.min_sequence_number() <= persisted_max) { - return Err(FilterParquetError::CompactorConflict); + return Err(ReconcileError::CompactorConflict); } if file.max_sequence_number() > persisted_max { // filter out, file is newer @@ -185,7 +275,7 @@ where /// Since tombstones are sequencer-wide but data persistence is partition-based (which are sub-units of sequencers), we /// cannot just remove tombstones entirely but need to decide on a per-partition basis. This function generates a lookup /// table of partition-tombstone tuples that later need to be EXCLUDED/IGNORED when pairing tombstones with chunks. -pub fn tombstone_exclude_list( +fn tombstone_exclude_list( ingester_partitions: &[I], tombstones: &[T], ) -> HashSet<(PartitionId, TombstoneId)> @@ -227,6 +317,7 @@ where #[cfg(test)] mod tests { use assert_matches::assert_matches; + use data_types::SequenceNumber; use super::*; @@ -252,7 +343,7 @@ mod tests { max_sequence_number: SequenceNumber::new(11), }]; let err = filter_parquet_files(ingester_partitions, parquet_files).unwrap_err(); - assert_matches!(err, FilterParquetError::CompactorConflict); + assert_matches!(err, ReconcileError::CompactorConflict); } #[test] diff --git a/querier/src/table/state_reconciler/interface.rs b/querier/src/table/state_reconciler/interface.rs new file mode 100644 index 0000000000..a1b596a96c --- /dev/null +++ b/querier/src/table/state_reconciler/interface.rs @@ -0,0 +1,81 @@ +//! Interface for reconciling Ingester and catalog state + +use crate::ingester::IngesterPartition; +use data_types::{ + ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId, +}; +use std::{ops::Deref, sync::Arc}; + +/// Information about an ingester partition. +/// +/// This is mostly the same as [`IngesterPartition`] but allows easier mocking. +pub trait IngesterPartitionInfo { + fn partition_id(&self) -> PartitionId; + fn sequencer_id(&self) -> SequencerId; + fn parquet_max_sequence_number(&self) -> Option; + fn tombstone_max_sequence_number(&self) -> Option; +} + +impl IngesterPartitionInfo for Arc { + fn partition_id(&self) -> PartitionId { + self.deref().partition_id() + } + + fn sequencer_id(&self) -> SequencerId { + self.deref().sequencer_id() + } + + fn parquet_max_sequence_number(&self) -> Option { + self.deref().parquet_max_sequence_number() + } + + fn tombstone_max_sequence_number(&self) -> Option { + self.deref().tombstone_max_sequence_number() + } +} + +/// Information about a parquet file. +/// +/// This is mostly the same as [`ParquetFileWithMetadata`] but allows easier mocking. +pub trait ParquetFileInfo { + fn partition_id(&self) -> PartitionId; + fn min_sequence_number(&self) -> SequenceNumber; + fn max_sequence_number(&self) -> SequenceNumber; +} + +impl ParquetFileInfo for ParquetFileWithMetadata { + fn partition_id(&self) -> PartitionId { + self.partition_id + } + + fn min_sequence_number(&self) -> SequenceNumber { + self.min_sequence_number + } + + fn max_sequence_number(&self) -> SequenceNumber { + self.max_sequence_number + } +} + +/// Information about a tombstone. +/// +/// This is mostly the same as [`Tombstone`] but allows easier mocking. +pub trait TombstoneInfo { + fn id(&self) -> TombstoneId; + fn sequencer_id(&self) -> SequencerId; + fn sequence_number(&self) -> SequenceNumber; +} + +impl TombstoneInfo for Tombstone { + fn id(&self) -> TombstoneId { + self.id + } + + fn sequencer_id(&self) -> SequencerId { + self.sequencer_id + } + + fn sequence_number(&self) -> SequenceNumber { + self.sequence_number + } +}