refactor: Encapsulate reconciliation logic more (#4644)

* refactor: extract code from state_reconciler

* refactor: Encapsulate reconcilation logic more

* fix: docs
pull/24376/head
Andrew Lamb 2022-05-19 15:25:36 -04:00 committed by GitHub
parent 0c21693826
commit a18a49736d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 283 additions and 210 deletions

View File

@ -1,9 +1,8 @@
use self::query_access::QuerierTableChunkPruner;
use self::state_reconciler::Reconciler;
use crate::{
chunk::ParquetChunkAdapter,
ingester::{self, IngesterPartition},
table::state_reconciler::{filter_parquet_files, tombstone_exclude_list},
tombstone::QuerierTombstone,
IngesterConnection,
};
use backoff::{Backoff, BackoffConfig};
@ -44,7 +43,7 @@ pub enum Error {
#[snafu(display("Cannot combine ingester data with catalog/cache: {}", source))]
StateFusion {
source: state_reconciler::FilterParquetError,
source: state_reconciler::ReconcileError,
},
}
@ -60,7 +59,7 @@ pub struct QuerierTable {
backoff_config: BackoffConfig,
/// Table name.
name: Arc<str>,
table_name: Arc<str>,
/// Table ID.
id: TableId,
@ -73,6 +72,9 @@ pub struct QuerierTable {
/// Interface to create chunks for this table.
chunk_adapter: Arc<ParquetChunkAdapter>,
/// Handle reconciling ingester and catalog data
reconciler: Reconciler,
}
impl QuerierTable {
@ -81,25 +83,32 @@ impl QuerierTable {
namespace_name: Arc<str>,
backoff_config: BackoffConfig,
id: TableId,
name: Arc<str>,
table_name: Arc<str>,
schema: Arc<Schema>,
ingester_connection: Arc<dyn IngesterConnection>,
chunk_adapter: Arc<ParquetChunkAdapter>,
) -> Self {
let reconciler = Reconciler::new(
Arc::clone(&table_name),
Arc::clone(&namespace_name),
Arc::clone(&chunk_adapter),
);
Self {
namespace_name,
backoff_config,
name,
table_name,
id,
schema,
ingester_connection,
chunk_adapter,
reconciler,
}
}
/// Table name.
pub fn name(&self) -> &Arc<str> {
&self.name
pub fn table_name(&self) -> &Arc<str> {
&self.table_name
}
/// Table ID.
@ -117,14 +126,14 @@ impl QuerierTable {
///
/// This currently contains all parquet files linked to their unprocessed tombstones.
pub async fn chunks(&self, predicate: &Predicate) -> Result<Vec<Arc<dyn QueryChunk>>> {
debug!(?predicate, namespace=%self.namespace_name, table_name=%self.name(), "Fetching all chunks");
debug!(?predicate, namespace=%self.namespace_name, table_name=%self.table_name(), "Fetching all chunks");
// ask ingesters for data
let ingester_partitions = self.ingester_partitions(predicate).await?;
debug!(
namespace=%self.namespace_name,
table_name=%self.name(),
table_name=%self.table_name(),
num_ingester_partitions=%ingester_partitions.len(),
"Ingester partitions fetched"
);
@ -146,7 +155,7 @@ impl QuerierTable {
debug!(
?parquet_files,
namespace=%self.namespace_name,
table_name=%self.name(),
table_name=%self.table_name(),
"Parquet files from catalog"
);
@ -160,118 +169,10 @@ impl QuerierTable {
.await
.expect("retry forever");
// fuse ingester and catalog state
let parquet_files =
filter_parquet_files(&ingester_partitions, parquet_files).context(StateFusionSnafu)?;
debug!(
?parquet_files,
namespace=%self.namespace_name,
table_name=%self.name(),
"Parquet files after filtering"
);
let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones);
// convert parquet files and tombstones to nicer objects
let mut chunks = Vec::with_capacity(parquet_files.len());
for parquet_file_with_metadata in parquet_files {
if let Some(chunk) = self
.chunk_adapter
.new_querier_chunk(parquet_file_with_metadata)
.await
{
chunks.push(chunk);
}
}
debug!(num_chunks=%chunks.len(), "Querier chunks");
let querier_tombstones: Vec<_> =
tombstones.into_iter().map(QuerierTombstone::from).collect();
// match chunks and tombstones
let mut tombstones_by_sequencer: HashMap<_, Vec<_>> = HashMap::new();
for tombstone in querier_tombstones {
tombstones_by_sequencer
.entry(tombstone.sequencer_id())
.or_default()
.push(tombstone);
}
let mut chunks2 = Vec::with_capacity(chunks.len());
for chunk in chunks.into_iter() {
let chunk = if let Some(tombstones) =
tombstones_by_sequencer.get(&chunk.meta().sequencer_id())
{
let mut delete_predicates = Vec::with_capacity(tombstones.len());
for tombstone in tombstones {
// check conditions that don't need catalog access first to avoid unnecessary catalog load
// Check if tombstone should be excluded based on the ingester response
if tombstone_exclusion
.contains(&(chunk.meta().partition_id(), tombstone.tombstone_id()))
{
continue;
}
// Check if tombstone even applies to the sequence number range within the parquet file. There
// are the following cases here:
//
// 1. Tombstone comes before chunk min sequencer number:
// There is no way the tombstone can affect the chunk.
// 2. Tombstone comes after chunk max sequencer number:
// Tombstone affects whole chunk (it might be marked as processed though, we'll check that
// further down).
// 3. Tombstone is in the min-max sequencer number range of the chunk:
// Technically the querier has NO way to determine the rows that are affected by the tombstone
// since we have no row-level sequence numbers. Such a file can be created by two sources -- the
// ingester and the compactor. The ingester must have materialized the tombstone while creating
// the parquet file, so the querier can skip it. The compactor also materialized the tombstones,
// so we can skip it as well. In the compactor case the tombstone will even be marked as
// processed.
//
// So the querier only needs to consider the tombstone in case 2.
if tombstone.sequence_number() <= chunk.meta().max_sequence_number() {
continue;
}
// TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086)
// check if tombstone is marked as processed
if self
.chunk_adapter
.catalog_cache()
.processed_tombstones()
.exists(
chunk
.parquet_file_id()
.expect("just created from a parquet file"),
tombstone.tombstone_id(),
)
.await
{
continue;
}
delete_predicates.push(Arc::clone(tombstone.delete_predicate()));
}
chunk.with_delete_predicates(delete_predicates)
} else {
chunk
};
chunks2.push(Arc::new(chunk) as Arc<dyn QueryChunk>);
}
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
// - tombstones don't need to be applied since they were already materialized by the ingester
chunks2.extend(
ingester_partitions
.into_iter()
.filter(|c| c.has_batches())
.map(|c| c as _),
);
debug!(num_chunks2=%chunks2.len(), "Chunks 2");
Ok(chunks2)
self.reconciler
.reconcile(ingester_partitions, tombstones, parquet_files)
.await
.context(StateFusionSnafu)
}
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
@ -301,7 +202,7 @@ impl QuerierTable {
.ingester_connection
.partitions(
Arc::clone(&self.namespace_name),
Arc::clone(&self.name),
Arc::clone(&self.table_name),
columns,
predicate,
Arc::clone(&self.schema),

View File

@ -41,7 +41,7 @@ impl TableProvider for QuerierTable {
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
// build provider out of all chunks
// TODO: push down some predicates to catalog
let mut builder = ProviderBuilder::new(self.name(), Arc::clone(self.schema()));
let mut builder = ProviderBuilder::new(self.table_name(), Arc::clone(self.schema()));
builder = builder.add_pruner(self.chunk_pruner());
let predicate = filters

View File

@ -1,4 +1,4 @@
//! Logic to reconcile the state that the querier got the from catalog and from the ingester.
//! Logic to reconcile the catalog and ingester state
//!
//! # Usage
//!
@ -11,98 +11,188 @@
//! 3. **Pruning:** Call [`filter_parquet_files`] and [`tombstone_exclude_list`] to filter out
//! files and tombstones that are too new (i.e. were created between step 1 and 2).
use crate::ingester::IngesterPartition;
use data_types::{
ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId,
};
mod interface;
use data_types::{ParquetFileWithMetadata, PartitionId, SequencerId, Tombstone, TombstoneId};
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use snafu::Snafu;
use std::{
collections::{HashMap, HashSet},
ops::Deref,
sync::Arc,
};
/// Information about an ingester partition.
///
/// This is mostly the same as [`IngesterPartition`] but allows easier mocking.
pub trait IngesterPartitionInfo {
fn partition_id(&self) -> PartitionId;
fn sequencer_id(&self) -> SequencerId;
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber>;
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber>;
}
use crate::{chunk::ParquetChunkAdapter, tombstone::QuerierTombstone, IngesterPartition};
impl IngesterPartitionInfo for Arc<IngesterPartition> {
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}
fn sequencer_id(&self) -> SequencerId {
self.deref().sequencer_id()
}
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().parquet_max_sequence_number()
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().tombstone_max_sequence_number()
}
}
/// Information about a parquet file.
///
/// This is mostly the same as [`ParquetFileWithMetadata`] but allows easier mocking.
pub trait ParquetFileInfo {
fn partition_id(&self) -> PartitionId;
fn min_sequence_number(&self) -> SequenceNumber;
fn max_sequence_number(&self) -> SequenceNumber;
}
impl ParquetFileInfo for ParquetFileWithMetadata {
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn min_sequence_number(&self) -> SequenceNumber {
self.min_sequence_number
}
fn max_sequence_number(&self) -> SequenceNumber {
self.max_sequence_number
}
}
/// Information about a tombstone.
///
/// This is mostly the same as [`Tombstone`] but allows easier mocking.
pub trait TombstoneInfo {
fn id(&self) -> TombstoneId;
fn sequencer_id(&self) -> SequencerId;
fn sequence_number(&self) -> SequenceNumber;
}
impl TombstoneInfo for Tombstone {
fn id(&self) -> TombstoneId {
self.id
}
fn sequencer_id(&self) -> SequencerId {
self.sequencer_id
}
fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}
use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo};
#[derive(Snafu, Debug)]
pub enum FilterParquetError {
pub enum ReconcileError {
#[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))]
CompactorConflict,
}
/// Handles reconciling catalog and ingester state.
#[derive(Debug)]
pub struct Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
chunk_adapter: Arc<ParquetChunkAdapter>,
}
impl Reconciler {
pub(crate) fn new(
table_name: Arc<str>,
namespace_name: Arc<str>,
chunk_adapter: Arc<ParquetChunkAdapter>,
) -> Self {
Self {
table_name,
namespace_name,
chunk_adapter,
}
}
/// Reconciles ingester state (ingester_partitions) and catalog
/// state (parquet_files and tombstones), producing a list of
/// chunks to query
pub(crate) async fn reconcile(
&self,
ingester_partitions: Vec<Arc<IngesterPartition>>,
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFileWithMetadata>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let tombstone_exclusion = tombstone_exclude_list(&ingester_partitions, &tombstones);
let querier_tombstones: Vec<_> =
tombstones.into_iter().map(QuerierTombstone::from).collect();
// match chunks and tombstones
let mut tombstones_by_sequencer: HashMap<SequencerId, Vec<QuerierTombstone>> =
HashMap::new();
for tombstone in querier_tombstones {
tombstones_by_sequencer
.entry(tombstone.sequencer_id())
.or_default()
.push(tombstone);
}
//
let parquet_files = filter_parquet_files(&ingester_partitions, parquet_files)?;
debug!(
?parquet_files,
namespace=%self.namespace_name(),
table_name=%self.table_name(),
"Parquet files after filtering"
);
// convert parquet files and tombstones into QuerierChunks
let mut parquet_chunks = Vec::with_capacity(parquet_files.len());
for parquet_file_with_metadata in parquet_files {
if let Some(chunk) = self
.chunk_adapter
.new_querier_chunk(parquet_file_with_metadata)
.await
{
parquet_chunks.push(chunk);
}
}
debug!(num_chunks=%parquet_chunks.len(), "Created parquet chunks");
let mut chunks: Vec<Arc<dyn QueryChunk>> =
Vec::with_capacity(parquet_chunks.len() + ingester_partitions.len());
for chunk in parquet_chunks.into_iter() {
let chunk = if let Some(tombstones) =
tombstones_by_sequencer.get(&chunk.meta().sequencer_id())
{
let mut delete_predicates = Vec::with_capacity(tombstones.len());
for tombstone in tombstones {
// check conditions that don't need catalog access first to avoid unnecessary catalog load
// Check if tombstone should be excluded based on the ingester response
if tombstone_exclusion
.contains(&(chunk.meta().partition_id(), tombstone.tombstone_id()))
{
continue;
}
// Check if tombstone even applies to the sequence number range within the parquet file. There
// are the following cases here:
//
// 1. Tombstone comes before chunk min sequencer number:
// There is no way the tombstone can affect the chunk.
// 2. Tombstone comes after chunk max sequencer number:
// Tombstone affects whole chunk (it might be marked as processed though, we'll check that
// further down).
// 3. Tombstone is in the min-max sequencer number range of the chunk:
// Technically the querier has NO way to determine the rows that are affected by the tombstone
// since we have no row-level sequence numbers. Such a file can be created by two sources -- the
// ingester and the compactor. The ingester must have materialized the tombstone while creating
// the parquet file, so the querier can skip it. The compactor also materialized the tombstones,
// so we can skip it as well. In the compactor case the tombstone will even be marked as
// processed.
//
// So the querier only needs to consider the tombstone in case 2.
if tombstone.sequence_number() <= chunk.meta().max_sequence_number() {
continue;
}
// TODO: also consider time ranges (https://github.com/influxdata/influxdb_iox/issues/4086)
// check if tombstone is marked as processed
if self
.chunk_adapter
.catalog_cache()
.processed_tombstones()
.exists(
chunk
.parquet_file_id()
.expect("just created from a parquet file"),
tombstone.tombstone_id(),
)
.await
{
continue;
}
delete_predicates.push(Arc::clone(tombstone.delete_predicate()));
}
chunk.with_delete_predicates(delete_predicates)
} else {
chunk
};
chunks.push(Arc::new(chunk) as Arc<dyn QueryChunk>);
}
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
// - tombstones don't need to be applied since they were already materialized by the ingester
let ingester_chunks = ingester_partitions
.into_iter()
.filter(|c| c.has_batches())
.map(|c| c as Arc<dyn QueryChunk>);
chunks.extend(ingester_chunks);
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
Ok(chunks)
}
#[must_use]
pub fn table_name(&self) -> &str {
self.table_name.as_ref()
}
#[must_use]
pub fn namespace_name(&self) -> &str {
self.namespace_name.as_ref()
}
}
/// Filter out parquet files that contain "too new" data.
///
/// The caller may only use the returned parquet files.
@ -114,11 +204,11 @@ pub enum FilterParquetError {
/// Note that the querier (and this method) do NOT care about the actual age of the parquet files, since the compactor
/// is free to to process files at any given moment (e.g. to combine them or to materialize tombstones). However if the
/// compactor combines files in a way that the querier would need to split it into "desired" data and "too new" data
/// then we will currently bail out with [`FilterParquetError::CompactorConflict`].
pub fn filter_parquet_files<I, P>(
/// then we will currently bail out with [`ReconcileError`].
fn filter_parquet_files<I, P>(
ingester_partitions: &[I],
parquet_files: Vec<P>,
) -> Result<Vec<P>, FilterParquetError>
) -> Result<Vec<P>, ReconcileError>
where
I: IngesterPartitionInfo,
P: ParquetFileInfo,
@ -148,7 +238,7 @@ where
if (file.max_sequence_number() > persisted_max)
&& (file.min_sequence_number() <= persisted_max)
{
return Err(FilterParquetError::CompactorConflict);
return Err(ReconcileError::CompactorConflict);
}
if file.max_sequence_number() > persisted_max {
// filter out, file is newer
@ -185,7 +275,7 @@ where
/// Since tombstones are sequencer-wide but data persistence is partition-based (which are sub-units of sequencers), we
/// cannot just remove tombstones entirely but need to decide on a per-partition basis. This function generates a lookup
/// table of partition-tombstone tuples that later need to be EXCLUDED/IGNORED when pairing tombstones with chunks.
pub fn tombstone_exclude_list<I, T>(
fn tombstone_exclude_list<I, T>(
ingester_partitions: &[I],
tombstones: &[T],
) -> HashSet<(PartitionId, TombstoneId)>
@ -227,6 +317,7 @@ where
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::SequenceNumber;
use super::*;
@ -252,7 +343,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(11),
}];
let err = filter_parquet_files(ingester_partitions, parquet_files).unwrap_err();
assert_matches!(err, FilterParquetError::CompactorConflict);
assert_matches!(err, ReconcileError::CompactorConflict);
}
#[test]

View File

@ -0,0 +1,81 @@
//! Interface for reconciling Ingester and catalog state
use crate::ingester::IngesterPartition;
use data_types::{
ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId,
};
use std::{ops::Deref, sync::Arc};
/// Information about an ingester partition.
///
/// This is mostly the same as [`IngesterPartition`] but allows easier mocking.
pub trait IngesterPartitionInfo {
fn partition_id(&self) -> PartitionId;
fn sequencer_id(&self) -> SequencerId;
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber>;
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber>;
}
impl IngesterPartitionInfo for Arc<IngesterPartition> {
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}
fn sequencer_id(&self) -> SequencerId {
self.deref().sequencer_id()
}
fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().parquet_max_sequence_number()
}
fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.deref().tombstone_max_sequence_number()
}
}
/// Information about a parquet file.
///
/// This is mostly the same as [`ParquetFileWithMetadata`] but allows easier mocking.
pub trait ParquetFileInfo {
fn partition_id(&self) -> PartitionId;
fn min_sequence_number(&self) -> SequenceNumber;
fn max_sequence_number(&self) -> SequenceNumber;
}
impl ParquetFileInfo for ParquetFileWithMetadata {
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn min_sequence_number(&self) -> SequenceNumber {
self.min_sequence_number
}
fn max_sequence_number(&self) -> SequenceNumber {
self.max_sequence_number
}
}
/// Information about a tombstone.
///
/// This is mostly the same as [`Tombstone`] but allows easier mocking.
pub trait TombstoneInfo {
fn id(&self) -> TombstoneId;
fn sequencer_id(&self) -> SequencerId;
fn sequence_number(&self) -> SequenceNumber;
}
impl TombstoneInfo for Tombstone {
fn id(&self) -> TombstoneId {
self.id
}
fn sequencer_id(&self) -> SequencerId {
self.sequencer_id
}
fn sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}