refactor: remove querier state reconciler (#8046)

The reconciler is a leftover from the Kafka-based write path. It doesn't
do anything anymore.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-06-22 11:03:46 +02:00 committed by GitHub
parent e72566e0e5
commit 4e18a5f9e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 21 additions and 213 deletions

View File

@ -884,10 +884,6 @@ impl IngesterPartition {
self.ingester_uuid
}
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
pub(crate) fn completed_persistence_count(&self) -> u64 {
self.completed_persistence_count
}

View File

@ -278,7 +278,6 @@ impl ChunkAdapter {
order,
sort_key: Some(sort_key),
partition_id: parquet_file.partition_id,
compaction_level: parquet_file.compaction_level,
});
let parquet_chunk = Arc::new(ParquetChunk::new(

View File

@ -1,6 +1,6 @@
//! Querier Chunks
use data_types::{ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId};
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::physical_plan::Statistics;
use parquet_file::chunk::ParquetChunk;
use schema::sort::SortKey;
@ -27,9 +27,6 @@ pub struct QuerierParquetChunkMeta {
/// Partition ID.
partition_id: PartitionId,
/// Compaction level of the parquet file of the chunk
compaction_level: CompactionLevel,
}
impl QuerierParquetChunkMeta {
@ -47,11 +44,6 @@ impl QuerierParquetChunkMeta {
pub fn partition_id(&self) -> PartitionId {
self.partition_id
}
/// Compaction level of the parquet file of the chunk
pub fn compaction_level(&self) -> CompactionLevel {
self.compaction_level
}
}
#[derive(Debug)]

View File

@ -1,4 +1,4 @@
use self::{query_access::QuerierTableChunkPruner, state_reconciler::Reconciler};
use self::query_access::QuerierTableChunkPruner;
use crate::{
ingester::{self, IngesterPartition},
parquet::ChunkAdapter,
@ -26,7 +26,6 @@ pub use self::query_access::metrics::PruneMetrics;
pub(crate) use self::query_access::MetricPruningObserver;
mod query_access;
mod state_reconciler;
#[cfg(test)]
mod test_util;
@ -37,11 +36,6 @@ pub enum Error {
#[snafu(display("Error getting partitions from ingester: {}", source))]
GettingIngesterPartitions { source: ingester::Error },
#[snafu(display("Cannot combine ingester data with catalog/cache: {}", source))]
StateFusion {
source: state_reconciler::ReconcileError,
},
#[snafu(display("Chunk pruning failed: {}", source))]
ChunkPruning { source: provider::Error },
}
@ -291,11 +285,6 @@ impl QuerierTable {
return Ok(vec![]);
};
let reconciler = Reconciler::new(
Arc::clone(&self.table_name),
Arc::clone(&self.namespace_name),
);
// create parquet files
let parquet_files = self
.chunk_adapter
@ -308,15 +297,25 @@ impl QuerierTable {
)
.await;
let chunks = reconciler
.reconcile(
partitions,
retention_delete_pred,
parquet_files,
span_recorder.child_span("reconcile"),
)
.await
.context(StateFusionSnafu)?;
// build final chunk list
let chunks = partitions
.into_iter()
.flat_map(|c| {
let c = match &retention_delete_pred {
Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]),
None => c,
};
c.into_chunks().into_iter()
})
.map(|c| Arc::new(c) as Arc<dyn QueryChunk>)
.chain(parquet_files.into_iter().map(|c| {
let c = match &retention_delete_pred {
Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]),
None => c,
};
Arc::new(c) as Arc<dyn QueryChunk>
}))
.collect::<Vec<_>>();
trace!("Fetched chunks");
let num_initial_chunks = chunks.len();

View File

@ -1,123 +0,0 @@
//! Logic to reconcile the catalog and ingester state
mod interface;
use data_types::DeletePredicate;
use iox_query::QueryChunk;
use observability_deps::tracing::debug;
use snafu::Snafu;
use std::sync::Arc;
use trace::span::{Span, SpanRecorder};
use crate::{parquet::QuerierParquetChunk, IngesterPartition};
#[derive(Snafu, Debug)]
#[allow(missing_copy_implementations)]
pub enum ReconcileError {
#[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))]
CompactorConflict,
}
/// Handles reconciling catalog and ingester state.
#[derive(Debug)]
pub struct Reconciler {
table_name: Arc<str>,
namespace_name: Arc<str>,
}
impl Reconciler {
pub(crate) fn new(table_name: Arc<str>, namespace_name: Arc<str>) -> Self {
Self {
table_name,
namespace_name,
}
}
/// Reconciles ingester state (ingester_partitions) and catalog state (parquet_files),
/// producing a list of chunks to query
pub(crate) async fn reconcile(
&self,
ingester_partitions: Vec<IngesterPartition>,
retention_delete_pred: Option<Arc<DeletePredicate>>,
parquet_files: Vec<QuerierParquetChunk>,
span: Option<Span>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
let span_recorder = SpanRecorder::new(span);
let mut chunks = self
.build_chunks_from_parquet(
&ingester_partitions,
retention_delete_pred.clone(),
parquet_files,
span_recorder.child_span("build_chunks_from_parquet"),
)
.await?;
chunks.extend(self.build_ingester_chunks(ingester_partitions, retention_delete_pred));
debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation");
Ok(chunks)
}
async fn build_chunks_from_parquet(
&self,
ingester_partitions: &[IngesterPartition],
retention_delete_pred: Option<Arc<DeletePredicate>>,
parquet_files: Vec<QuerierParquetChunk>,
_span: Option<Span>,
) -> Result<Vec<Arc<dyn QueryChunk>>, ReconcileError> {
debug!(
namespace=%self.namespace_name(),
table_name=%self.table_name(),
num_parquet_files=parquet_files.len(),
"Reconciling "
);
debug!(num_chunks=%parquet_files.len(), "Created chunks from parquet files");
let mut chunks: Vec<Arc<dyn QueryChunk>> =
Vec::with_capacity(parquet_files.len() + ingester_partitions.len());
let retention_expr_len = usize::from(retention_delete_pred.is_some());
for chunk in parquet_files.into_iter() {
let mut delete_predicates = Vec::with_capacity(retention_expr_len);
if let Some(retention_delete_pred) = retention_delete_pred.clone() {
delete_predicates.push(retention_delete_pred);
}
let chunk = chunk.with_delete_predicates(delete_predicates);
chunks.push(Arc::new(chunk));
}
Ok(chunks)
}
fn build_ingester_chunks(
&self,
ingester_partitions: Vec<IngesterPartition>,
retention_delete_pred: Option<Arc<DeletePredicate>>,
) -> impl Iterator<Item = Arc<dyn QueryChunk>> {
// Add ingester chunks to the overall chunk list.
// - filter out chunks that don't have any record batches
ingester_partitions
.into_iter()
.flat_map(move |c| {
let c = match &retention_delete_pred {
Some(pred) => c.with_delete_predicates(vec![Arc::clone(pred)]),
None => c,
};
c.into_chunks().into_iter()
})
.map(|c| Arc::new(c) as Arc<dyn QueryChunk>)
}
#[must_use]
pub fn table_name(&self) -> &str {
self.table_name.as_ref()
}
#[must_use]
pub fn namespace_name(&self) -> &str {
self.namespace_name.as_ref()
}
}

View File

@ -1,55 +0,0 @@
//! Interface for reconciling Ingester and catalog state
use crate::{ingester::IngesterPartition, parquet::QuerierParquetChunk};
use data_types::{CompactionLevel, ParquetFile, PartitionId};
use std::{ops::Deref, sync::Arc};
/// Information about an ingester partition.
///
/// This is mostly the same as [`IngesterPartition`] but allows easier mocking.
pub trait IngesterPartitionInfo {
fn partition_id(&self) -> PartitionId;
}
impl IngesterPartitionInfo for IngesterPartition {
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}
}
impl<T> IngesterPartitionInfo for Arc<T>
where
T: IngesterPartitionInfo,
{
fn partition_id(&self) -> PartitionId {
self.deref().partition_id()
}
}
/// Information about a parquet file.
///
/// This is mostly the same as [`ParquetFile`] but allows easier mocking.
pub trait ParquetFileInfo {
fn partition_id(&self) -> PartitionId;
fn compaction_level(&self) -> CompactionLevel;
}
impl ParquetFileInfo for Arc<ParquetFile> {
fn partition_id(&self) -> PartitionId {
self.partition_id
}
fn compaction_level(&self) -> CompactionLevel {
self.compaction_level
}
}
impl ParquetFileInfo for QuerierParquetChunk {
fn partition_id(&self) -> PartitionId {
self.meta().partition_id()
}
fn compaction_level(&self) -> CompactionLevel {
self.meta().compaction_level()
}
}