diff --git a/compactor2/src/components/df_planner/query_chunk.rs b/compactor2/src/components/df_planner/query_chunk.rs index 4209acce8d..dadeda0b5b 100644 --- a/compactor2/src/components/df_planner/query_chunk.rs +++ b/compactor2/src/components/df_planner/query_chunk.rs @@ -25,19 +25,16 @@ pub struct QueryableParquetChunk { delete_predicates: Vec>, partition_id: PartitionId, sort_key: Option, - partition_sort_key: Option, order: ChunkOrder, summary: Arc, } impl QueryableParquetChunk { /// Initialize a QueryableParquetChunk - #[allow(clippy::too_many_arguments)] pub fn new( partition_id: PartitionId, data: Arc, sort_key: Option, - partition_sort_key: Option, order: ChunkOrder, ) -> Self { let summary = Arc::new(create_basic_summary( @@ -50,7 +47,6 @@ impl QueryableParquetChunk { delete_predicates: vec![], partition_id, sort_key, - partition_sort_key, order, summary, } @@ -80,10 +76,6 @@ impl QueryChunkMeta for QueryableParquetChunk { self.data.schema() } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.partition_sort_key.as_ref() - } - fn partition_id(&self) -> PartitionId { self.partition_id } @@ -218,11 +210,5 @@ fn to_queryable_parquet_chunk( ); let parquet_chunk = ParquetChunk::new(Arc::new(file.file.clone()), schema, store); - QueryableParquetChunk::new( - partition_id, - Arc::new(parquet_chunk), - sort_key, - partition_info.sort_key.clone(), - file.order, - ) + QueryableParquetChunk::new(partition_id, Arc::new(parquet_chunk), sort_key, file.order) } diff --git a/ingester2/src/query_adaptor.rs b/ingester2/src/query_adaptor.rs index 44b32d43ca..8e93855243 100644 --- a/ingester2/src/query_adaptor.rs +++ b/ingester2/src/query_adaptor.rs @@ -127,10 +127,6 @@ impl QueryChunkMeta for QueryAdaptor { &self.schema } - fn partition_sort_key(&self) -> Option<&SortKey> { - None // Ingester data has not persisted yet and should not be attached to any partition - } - fn partition_id(&self) -> PartitionId { self.partition_id } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 132c148e02..c3a952d2fd 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -63,10 +63,6 @@ pub trait QueryChunkMeta { /// return a reference to the summary of the data held in this chunk fn schema(&self) -> &Schema; - /// Return a reference to the chunk's partition sort key if any. - /// Only persisted chunk has its partition sort key - fn partition_sort_key(&self) -> Option<&SortKey>; - /// Return partition id for this chunk fn partition_id(&self) -> PartitionId; @@ -317,10 +313,6 @@ where self.as_ref().sort_key() } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.as_ref().partition_sort_key() - } - fn delete_predicates(&self) -> &[Arc] { let pred = self.as_ref().delete_predicates(); debug!(?pred, "Delete predicate in QueryChunkMeta"); @@ -346,10 +338,6 @@ impl QueryChunkMeta for Arc { self.as_ref().sort_key() } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.as_ref().partition_sort_key() - } - fn delete_predicates(&self) -> &[Arc] { let pred = self.as_ref().delete_predicates(); debug!(?pred, "Delete predicate in QueryChunkMeta"); diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index abe8a81bf6..82c7a9aea0 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -343,9 +343,6 @@ pub struct TestChunk { /// The sort key of this chunk sort_key: Option, - /// The partition sort key of this chunk - partition_sort_key: Option, - /// Suppress output quiet: bool, } @@ -427,7 +424,6 @@ impl TestChunk { delete_predicates: Default::default(), order: ChunkOrder::MIN, sort_key: None, - partition_sort_key: None, partition_id: PartitionId::new(0), quiet: false, } @@ -1133,14 +1129,6 @@ impl TestChunk { } } - /// Set the partition sort key for this chunk - pub fn with_partition_sort_key(self, sort_key: SortKey) -> Self { - Self { - partition_sort_key: Some(sort_key), - ..self - } - } - /// Returns all columns of the table pub fn all_column_names(&self) -> StringSet { self.schema @@ -1252,10 +1240,6 @@ impl QueryChunkMeta for TestChunk { &self.schema } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.partition_sort_key.as_ref() - } - fn partition_id(&self) -> PartitionId { self.partition_id } diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 50dd83e30d..4344d8d84c 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -236,7 +236,6 @@ struct IngesterResponseOk { struct ObserveIngesterRequest<'a> { res: Option>, t_start: Time, - time_provider: Arc, metrics: Arc, request: GetPartitionForIngester<'a>, span_recorder: SpanRecorder, @@ -248,14 +247,12 @@ impl<'a> ObserveIngesterRequest<'a> { metrics: Arc, span_recorder: &SpanRecorder, ) -> Self { - let time_provider = request.catalog_cache.time_provider(); - let t_start = time_provider.now(); + let t_start = request.time_provider.now(); let span_recorder = span_recorder.child("flight request"); Self { res: None, t_start, - time_provider, metrics, request, span_recorder, @@ -279,7 +276,7 @@ impl<'a> ObserveIngesterRequest<'a> { impl<'a> Drop for ObserveIngesterRequest<'a> { fn drop(&mut self) { - let t_end = self.time_provider.now(); + let t_end = self.request.time_provider.now(); if let Some(ingester_duration) = t_end.checked_duration_since(self.t_start) { let (metric, status, ok_status) = match self.res { @@ -314,7 +311,7 @@ impl<'a> Drop for ObserveIngesterRequest<'a> { pub struct IngesterConnectionImpl { unique_ingester_addresses: HashSet>, flight_client: Arc, - catalog_cache: Arc, + time_provider: Arc, metrics: Arc, backoff_config: BackoffConfig, } @@ -362,7 +359,7 @@ impl IngesterConnectionImpl { Self { unique_ingester_addresses: ingester_addresses.into_iter().collect(), flight_client, - catalog_cache, + time_provider: catalog_cache.time_provider(), metrics, backoff_config, } @@ -373,7 +370,7 @@ impl IngesterConnectionImpl { #[derive(Debug, Clone)] struct GetPartitionForIngester<'a> { flight_client: Arc, - catalog_cache: Arc, + time_provider: Arc, ingester_address: Arc, namespace_id: NamespaceId, columns: Vec, @@ -388,7 +385,7 @@ async fn execute( ) -> Result> { let GetPartitionForIngester { flight_client, - catalog_cache, + time_provider: _, ingester_address, namespace_id, columns, @@ -477,15 +474,14 @@ async fn execute( // reconstruct partitions let mut decoder = IngesterStreamDecoder::new( ingester_address, - catalog_cache, cached_table, span_recorder.child_span("IngesterStreamDecoder"), ); for (msg, md) in messages { - decoder.register(msg, md).await?; + decoder.register(msg, md)?; } - decoder.finalize().await + decoder.finalize() } /// Helper to disassemble the data from the ingester Apache Flight arrow stream. @@ -497,25 +493,18 @@ struct IngesterStreamDecoder { current_partition: Option, current_chunk: Option<(Schema, Vec)>, ingester_address: Arc, - catalog_cache: Arc, cached_table: Arc, span_recorder: SpanRecorder, } impl IngesterStreamDecoder { /// Create empty decoder. - fn new( - ingester_address: Arc, - catalog_cache: Arc, - cached_table: Arc, - span: Option, - ) -> Self { + fn new(ingester_address: Arc, cached_table: Arc, span: Option) -> Self { Self { finished_partitions: HashMap::new(), current_partition: None, current_chunk: None, ingester_address, - catalog_cache, cached_table, span_recorder: SpanRecorder::new(span), } @@ -538,40 +527,10 @@ impl IngesterStreamDecoder { /// Flush current partition, if any. /// /// This will also flush the current chunk. - async fn flush_partition(&mut self) -> Result<()> { + fn flush_partition(&mut self) -> Result<()> { self.flush_chunk()?; if let Some(current_partition) = self.current_partition.take() { - let schemas: Vec<_> = current_partition - .chunks() - .iter() - .map(|c| c.schema()) - .collect(); - let primary_keys: Vec<_> = schemas.iter().map(|s| s.primary_key()).collect(); - let primary_key: Vec<_> = primary_keys - .iter() - .flat_map(|pk| pk.iter()) - // cache may be older then the ingester response status, so some entries might be missing - .filter_map(|name| { - self.cached_table - .column_id_map_rev - .get(&Arc::from(name.to_owned())) - }) - .copied() - .collect(); - let partition_sort_key = self - .catalog_cache - .partition() - .sort_key( - Arc::clone(&self.cached_table), - current_partition.partition_id(), - &primary_key, - self.span_recorder - .child_span("cache GET partition sort key"), - ) - .await - .map(|sort_key| Arc::clone(&sort_key.sort_key)); - let current_partition = current_partition.with_partition_sort_key(partition_sort_key); self.finished_partitions .insert(current_partition.partition_id, current_partition); } @@ -580,7 +539,7 @@ impl IngesterStreamDecoder { } /// Register a new message and its metadata from the Flight stream. - async fn register( + fn register( &mut self, msg: DecodedPayload, md: IngesterQueryResponseMetadata, @@ -588,7 +547,7 @@ impl IngesterStreamDecoder { match msg { DecodedPayload::None => { // new partition announced - self.flush_partition().await?; + self.flush_partition()?; let partition_id = PartitionId::new(md.partition_id); let status = md.status.context(PartitionStatusMissingSnafu { @@ -664,8 +623,8 @@ impl IngesterStreamDecoder { } /// Flush internal state and return sorted set of partitions. - async fn finalize(mut self) -> Result> { - self.flush_partition().await?; + fn finalize(mut self) -> Result> { + self.flush_partition()?; let mut ids: Vec<_> = self.finished_partitions.keys().copied().collect(); ids.sort(); @@ -718,7 +677,7 @@ impl IngesterConnection for IngesterConnectionImpl { let metrics = Arc::clone(&metrics); let request = GetPartitionForIngester { flight_client: Arc::clone(&self.flight_client), - catalog_cache: Arc::clone(&self.catalog_cache), + time_provider: Arc::clone(&self.time_provider), ingester_address: Arc::clone(&ingester_address), namespace_id, cached_table: Arc::clone(&cached_table), @@ -901,19 +860,6 @@ impl IngesterPartition { Ok(self) } - /// Update partition sort key - pub(crate) fn with_partition_sort_key(self, partition_sort_key: Option>) -> Self { - Self { - partition_sort_key: partition_sort_key.clone(), - chunks: self - .chunks - .into_iter() - .map(|c| c.with_partition_sort_key(partition_sort_key.clone())) - .collect(), - ..self - } - } - pub(crate) fn ingester_uuid(&self) -> Option { self.ingester_uuid } @@ -978,21 +924,6 @@ pub struct IngesterChunk { } impl IngesterChunk { - /// [`Arc`]ed version of the partition sort key. - /// - /// Note that this might NOT be the up-to-date sort key of the partition but the one that existed when the chunk was - /// created. You must sync the keys to use the chunks. - pub(crate) fn partition_sort_key_arc(&self) -> Option> { - self.partition_sort_key.clone() - } - - pub(crate) fn with_partition_sort_key(self, partition_sort_key: Option>) -> Self { - Self { - partition_sort_key, - ..self - } - } - pub(crate) fn estimate_size(&self) -> usize { self.batches .iter() @@ -1024,10 +955,6 @@ impl QueryChunkMeta for IngesterChunk { &self.schema } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.partition_sort_key.as_ref().map(|sk| sk.as_ref()) - } - fn partition_id(&self) -> PartitionId { self.partition_id } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index 9987a98bdd..a49cb53c67 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -236,10 +236,6 @@ impl ChunkAdapter { self.catalog_cache.parquet_store(), )); - Some(QuerierParquetChunk::new( - parquet_chunk, - meta, - Some(Arc::clone(&partition_sort_key.sort_key)), - )) + Some(QuerierParquetChunk::new(parquet_chunk, meta)) } } diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 37b89eacde..cf0823d669 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -71,9 +71,6 @@ pub struct QuerierParquetChunk { /// Delete predicates to be combined with the chunk delete_predicates: Vec>, - /// Partition sort key (how does the read buffer use this?) - partition_sort_key: Option>, - /// Chunk of the Parquet file parquet_chunk: Arc, @@ -83,11 +80,7 @@ pub struct QuerierParquetChunk { impl QuerierParquetChunk { /// Create new parquet-backed chunk (object store data). - pub fn new( - parquet_chunk: Arc, - meta: Arc, - partition_sort_key: Option>, - ) -> Self { + pub fn new(parquet_chunk: Arc, meta: Arc) -> Self { let table_summary = Arc::new(create_basic_summary( parquet_chunk.rows() as u64, parquet_chunk.schema(), @@ -97,7 +90,6 @@ impl QuerierParquetChunk { Self { meta, delete_predicates: Vec::new(), - partition_sort_key, parquet_chunk, table_summary, } @@ -116,22 +108,6 @@ impl QuerierParquetChunk { self.meta.as_ref() } - /// [`Arc`]ed version of the partition sort key. - /// - /// Note that this might NOT be the up-to-date sort key of the partition but the one that existed when the chunk was - /// created. You must sync the keys to use the chunks. - pub fn partition_sort_key_arc(&self) -> Option> { - self.partition_sort_key.clone() - } - - /// Set partition sort key - pub fn with_partition_sort_key(self, partition_sort_key: Option>) -> Self { - Self { - partition_sort_key, - ..self - } - } - pub fn estimate_size(&self) -> usize { self.parquet_chunk.parquet_file().file_size_bytes as usize } diff --git a/querier/src/parquet/query_access.rs b/querier/src/parquet/query_access.rs index 18923b9ffc..1e12d2b82b 100644 --- a/querier/src/parquet/query_access.rs +++ b/querier/src/parquet/query_access.rs @@ -18,10 +18,6 @@ impl QueryChunkMeta for QuerierParquetChunk { self.parquet_chunk.schema() } - fn partition_sort_key(&self) -> Option<&SortKey> { - self.partition_sort_key.as_ref().map(|sk| sk.as_ref()) - } - fn partition_id(&self) -> PartitionId { self.meta().partition_id() } diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index 3b22495cc7..221b5a9ae2 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -2,18 +2,14 @@ mod interface; -use data_types::{DeletePredicate, PartitionId}; +use data_types::DeletePredicate; use iox_query::QueryChunk; use observability_deps::tracing::debug; -use schema::sort::SortKey; use snafu::Snafu; -use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Arc, -}; +use std::sync::Arc; use trace::span::{Span, SpanRecorder}; -use crate::{ingester::IngesterChunk, parquet::QuerierParquetChunk, IngesterPartition}; +use crate::{parquet::QuerierParquetChunk, IngesterPartition}; #[derive(Snafu, Debug)] #[allow(missing_copy_implementations)] @@ -58,13 +54,6 @@ impl Reconciler { chunks.extend(self.build_ingester_chunks(ingester_partitions, retention_delete_pred)); debug!(num_chunks=%chunks.len(), "Final chunk count after reconcilation"); - let chunks = self.sync_partition_sort_keys(chunks); - - let chunks: Vec> = chunks - .into_iter() - .map(|c| c.upcast_to_querier_chunk().into()) - .collect(); - Ok(chunks) } @@ -74,7 +63,7 @@ impl Reconciler { retention_delete_pred: Option>, parquet_files: Vec, _span: Option, - ) -> Result>, ReconcileError> { + ) -> Result>, ReconcileError> { debug!( namespace=%self.namespace_name(), table_name=%self.table_name(), @@ -84,7 +73,7 @@ impl Reconciler { debug!(num_chunks=%parquet_files.len(), "Created chunks from parquet files"); - let mut chunks: Vec> = + let mut chunks: Vec> = Vec::with_capacity(parquet_files.len() + ingester_partitions.len()); let retention_expr_len = usize::from(retention_delete_pred.is_some()); @@ -97,7 +86,7 @@ impl Reconciler { let chunk = chunk.with_delete_predicates(delete_predicates); - chunks.push(Box::new(chunk) as Box); + chunks.push(Arc::new(chunk)); } Ok(chunks) @@ -107,7 +96,7 @@ impl Reconciler { &self, ingester_partitions: Vec, retention_delete_pred: Option>, - ) -> impl Iterator> { + ) -> impl Iterator> { // Add ingester chunks to the overall chunk list. // - filter out chunks that don't have any record batches ingester_partitions @@ -119,43 +108,7 @@ impl Reconciler { }; c.into_chunks().into_iter() }) - .map(|c| Box::new(c) as Box) - } - - fn sync_partition_sort_keys( - &self, - chunks: Vec>, - ) -> Vec> { - // collect latest (= longest) sort key - // Note that the partition sort key may stale (only a subset of the most recent partition - // sort key) because newer chunks have new columns. - // However, since the querier doesn't (yet) know about these chunks in the `chunks` list above - // using the most up to date sort key from the chunks it does know about is sufficient. - let mut sort_keys = HashMap::>::new(); - for c in &chunks { - if let Some(sort_key) = c.partition_sort_key_arc() { - match sort_keys.entry(c.partition_id()) { - Entry::Occupied(mut o) => { - if sort_key.len() > o.get().len() { - *o.get_mut() = sort_key; - } - } - Entry::Vacant(v) => { - v.insert(sort_key); - } - } - } - } - - // write partition sort keys to chunks - chunks - .into_iter() - .map(|chunk| { - let partition_id = chunk.partition_id(); - let sort_key = sort_keys.get(&partition_id); - chunk.update_partition_sort_key(sort_key.cloned()) - }) - .collect() + .map(|c| Arc::new(c) as Arc) } #[must_use] @@ -168,94 +121,3 @@ impl Reconciler { self.namespace_name.as_ref() } } - -trait UpdatableQuerierChunk: QueryChunk { - fn partition_sort_key_arc(&self) -> Option>; - - fn update_partition_sort_key( - self: Box, - sort_key: Option>, - ) -> Box; - - fn upcast_to_querier_chunk(self: Box) -> Box; -} - -impl UpdatableQuerierChunk for QuerierParquetChunk { - fn partition_sort_key_arc(&self) -> Option> { - self.partition_sort_key_arc() - } - - fn update_partition_sort_key( - self: Box, - sort_key: Option>, - ) -> Box { - Box::new(self.with_partition_sort_key(sort_key)) - } - - fn upcast_to_querier_chunk(self: Box) -> Box { - self as _ - } -} - -impl UpdatableQuerierChunk for IngesterChunk { - fn partition_sort_key_arc(&self) -> Option> { - self.partition_sort_key_arc() - } - - fn update_partition_sort_key( - self: Box, - sort_key: Option>, - ) -> Box { - Box::new(self.with_partition_sort_key(sort_key)) - } - - fn upcast_to_querier_chunk(self: Box) -> Box { - self as _ - } -} - -#[cfg(test)] -mod tests { - use super::{ - interface::{IngesterPartitionInfo, ParquetFileInfo}, - *, - }; - use data_types::{CompactionLevel, SequenceNumber}; - - #[derive(Debug)] - struct MockIngesterPartitionInfo { - partition_id: PartitionId, - parquet_max_sequence_number: Option, - } - - impl IngesterPartitionInfo for MockIngesterPartitionInfo { - fn partition_id(&self) -> PartitionId { - self.partition_id - } - - fn parquet_max_sequence_number(&self) -> Option { - self.parquet_max_sequence_number - } - } - - #[derive(Debug, Clone, PartialEq, Eq)] - struct MockParquetFileInfo { - partition_id: PartitionId, - max_sequence_number: SequenceNumber, - compaction_level: CompactionLevel, - } - - impl ParquetFileInfo for MockParquetFileInfo { - fn partition_id(&self) -> PartitionId { - self.partition_id - } - - fn max_sequence_number(&self) -> SequenceNumber { - self.max_sequence_number - } - - fn compaction_level(&self) -> CompactionLevel { - self.compaction_level - } - } -}