diff --git a/Cargo.lock b/Cargo.lock index 33708b49c0..10a5b97485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3948,6 +3948,7 @@ dependencies = [ "pin-project", "predicate", "rand", + "read_buffer", "schema", "service_common", "service_grpc_schema", diff --git a/cache_system/src/driver.rs b/cache_system/src/driver.rs index 2ee34880d5..740af806a4 100644 --- a/cache_system/src/driver.rs +++ b/cache_system/src/driver.rs @@ -1,30 +1,35 @@ //! Main data structure, see [`Cache`]. -use std::{collections::HashMap, hash::Hash, sync::Arc}; +use super::{backend::CacheBackend, loader::Loader}; use futures::{ future::{BoxFuture, Shared}, FutureExt, TryFutureExt, }; use observability_deps::tracing::debug; use parking_lot::Mutex; +use std::{collections::HashMap, hash::Hash, sync::Arc}; use tokio::{ sync::oneshot::{error::RecvError, Sender}, task::JoinHandle, }; -use super::{backend::CacheBackend, loader::Loader}; - /// High-level cache implementation. /// /// # Concurrency -/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the -/// underlying loader will only be polled once, even when the requests are made while the loader is still running. +/// +/// Multiple cache requests for different keys can run at the same time. When data is requested for +/// the same key the underlying loader will only be polled once, even when the requests are made +/// while the loader is still running. /// /// # Cancellation -/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached. +/// +/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will +/// still be cached. /// /// # Panic -/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached. +/// +/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. +/// The data will NOT be cached. #[derive(Debug)] pub struct Cache where @@ -57,7 +62,8 @@ where /// Get value from cache. pub async fn get(&self, k: K) -> V { - // place state locking into its own scope so it doesn't leak into the generator (async function) + // place state locking into its own scope so it doesn't leak into the generator (async + // function) let receiver = { let mut state = self.state.lock(); @@ -83,7 +89,8 @@ where let tag = state.tag_counter; state.tag_counter += 1; - // need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled + // need to wrap the query into a tokio task so that it doesn't get cancelled when + // this very request is cancelled let state_captured = Arc::clone(&self.state); let loader = Arc::clone(&self.loader); let k_captured = k.clone(); @@ -93,7 +100,8 @@ where let k_for_loader = k_captured.clone(); // execute the loader - // If we panic here then `tx` will be dropped and the receivers will be notified. + // If we panic here then `tx` will be dropped and the receivers will be + // notified. let v = loader.load(k_for_loader).await; // remove "running" state and store result @@ -104,14 +112,16 @@ where Some(running_query) if running_query.tag == tag => { state.running_queries.remove(&k_captured); - // this very query is in charge of the key, so store in in the underlying cache + // this very query is in charge of the key, so store in in the + // underlying cache state.cached_entries.set(k_captured, v.clone()); true } _ => { - // This query is actually not really running any longer but got shut down, e.g. due - // to side loading. Do NOT store the generated value in the underlying cache. + // This query is actually not really running any longer but got + // shut down, e.g. due to side loading. Do NOT store the + // generated value in the underlying cache. false } @@ -119,8 +129,9 @@ where }; if !was_running { - // value was side-loaded, so we cannot populate `v`. Instead block this execution branch and - // wait for `rx_set` to deliver the side-loaded result. + // value was side-loaded, so we cannot populate `v`. Instead block this + // execution branch and wait for `rx_set` to deliver the side-loaded + // result. loop { tokio::task::yield_now().await; } @@ -134,8 +145,9 @@ where maybe_v = rx_set.fuse() => { match maybe_v { Ok(v) => { - // data get side-loaded via `Cache::set`. In this case, we do NOT modify the state - // because there would be a lock-gap. The `set` function will do that for us instead. + // data get side-loaded via `Cache::set`. In this case, we do + // NOT modify the state because there would be a lock-gap. The + // `set` function will do that for us instead. v } Err(_) => { @@ -187,9 +199,10 @@ where // it's OK when the receiver side is gone (likely panicked) running_query.set.send(v.clone()).ok(); - // When we side-load data into the running task, the task does NOT modify the backend, so we have to do - // that. The reason for not letting the task feed the side-loaded data back into `cached_entries` is that we - // would need to drop the state lock here before the task could acquire it, leading to a lock gap. + // When we side-load data into the running task, the task does NOT modify the + // backend, so we have to do that. The reason for not letting the task feed the + // side-loaded data back into `cached_entries` is that we would need to drop the + // state lock here before the task could acquire it, leading to a lock gap. Some(running_query.join_handle) } else { None @@ -215,9 +228,10 @@ where { fn drop(&mut self) { for (_k, running_query) in self.state.lock().running_queries.drain() { - // It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow - // the self. If it is still in use, aborting the task will cancel the contained future which in turn will - // drop the sender of the oneshot channel. The receivers will be notified. + // It's unlikely that anyone is still using the shared receiver at this point, because + // `Cache::get` borrows the `self`. If it is still in use, aborting the task will + // cancel the contained future which in turn will drop the sender of the oneshot + // channel. The receivers will be notified. running_query.join_handle.abort(); } } @@ -226,11 +240,12 @@ where /// A [`tokio::sync::oneshot::Receiver`] that can be cloned. /// /// The types are: -/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V` -/// (i.e. the `Arc`) must be cloneable for `Shared` +/// +/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time +/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared` /// - `Arc`: Is required because `RecvError` is not `Clone` but `Shared` requires that. -/// - `BoxFuture`: The transformation from `Result` to `Result>, Arc>` results in -/// a kinda messy type and we wanna erase that. +/// - `BoxFuture`: The transformation from `Result` to `Result>, +/// Arc>` results in a kinda messy type and we wanna erase that. /// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places. type SharedReceiver = Shared>, Arc>>>; @@ -483,14 +498,16 @@ mod tests { impl TestLoader { /// Panic when loading value for `k`. /// - /// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking. + /// If this is used together with [`block`](Self::block), the panic will occur AFTER + /// blocking. fn panic_once(&self, k: u8) { self.panic.lock().insert(k); } /// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called. /// - /// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking. + /// If this is used together with [`panic_once`](Self::panic_once), the panic will occur + /// AFTER blocking. fn block(&self) { let mut blocked = self.blocked.lock(); assert!(blocked.is_none()); @@ -523,7 +540,8 @@ mod tests { async fn load(&self, k: u8) -> String { self.loaded.lock().push(k); - // need to capture the cloned notify handle, otherwise the lock guard leaks into the generator + // need to capture the cloned notify handle, otherwise the lock guard leaks into the + // generator let maybe_block = self.blocked.lock().clone(); if let Some(block) = maybe_block { block.notified().await; diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index d84e9fa145..aa17c66567 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1218,11 +1218,11 @@ mod tests { let files1 = files.pop().unwrap(); let files0 = files.pop().unwrap(); let chunk_0 = adapter - .new_querier_chunk_from_file_with_metadata(files0) + .new_querier_parquet_chunk_from_file_with_metadata(files0) .await .unwrap(); let chunk_1 = adapter - .new_querier_chunk_from_file_with_metadata(files1) + .new_querier_parquet_chunk_from_file_with_metadata(files1) .await .unwrap(); // query the chunks @@ -1439,11 +1439,11 @@ mod tests { let files2 = files.pop().unwrap(); let files1 = files.pop().unwrap(); let chunk_0 = adapter - .new_querier_chunk_from_file_with_metadata(files1) + .new_querier_parquet_chunk_from_file_with_metadata(files1) .await .unwrap(); let chunk_1 = adapter - .new_querier_chunk_from_file_with_metadata(files2) + .new_querier_parquet_chunk_from_file_with_metadata(files2) .await .unwrap(); // query the chunks diff --git a/querier/Cargo.toml b/querier/Cargo.toml index ddbb340774..fcfc27102a 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -25,6 +25,7 @@ pin-project = "1.0" predicate = { path = "../predicate" } iox_query = { path = "../iox_query" } rand = "0.8.3" +read_buffer = { path = "../read_buffer" } service_common = { path = "../service_common" } service_grpc_schema = { path = "../service_grpc_schema" } schema = { path = "../schema" } diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index e9e7674056..23d3c88d20 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -15,6 +15,7 @@ pub mod parquet_file; pub mod partition; pub mod processed_tombstones; mod ram; +pub mod read_buffer; pub mod table; pub mod tombstones; diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs new file mode 100644 index 0000000000..8d14e419ea --- /dev/null +++ b/querier/src/cache/read_buffer.rs @@ -0,0 +1,201 @@ +//! Cache Parquet file data in Read Buffer chunks. + +use super::ram::RamSize; +use backoff::{Backoff, BackoffConfig}; +use cache_system::{ + backend::{ + lru::{LruBackend, ResourcePool}, + resource_consumption::FunctionEstimator, + shared::SharedBackend, + }, + driver::Cache, + loader::{metrics::MetricsLoader, FunctionLoader}, +}; +use data_types::{ParquetFile, ParquetFileId}; +use datafusion::physical_plan::SendableRecordBatchStream; +use futures::StreamExt; +use iox_time::TimeProvider; +use read_buffer::RBChunk; +use snafu::{ResultExt, Snafu}; +use std::{collections::HashMap, mem, sync::Arc}; + +const CACHE_ID: &str = "read_buffer"; + +/// Cache for parquet file data decoded into read buffer chunks +#[derive(Debug)] +pub struct ReadBufferCache { + cache: Cache>, + + /// Handle that allows clearing entries for existing cache entries + _backend: SharedBackend>, +} + +impl ReadBufferCache { + /// Create a new empty cache. + pub fn new( + time_provider: Arc, + metric_registry: &metric::Registry, + ram_pool: Arc>, + ) -> Self { + let loader = Box::new(FunctionLoader::new( + move |parquet_file_id: ParquetFileId| { + let backoff_config = BackoffConfig::default(); + + async move { + let rb_chunk = Backoff::new(&backoff_config) + .retry_all_errors("get read buffer chunk by parquet file ID", || async { + let parquet_file = parquet_file_by_id(parquet_file_id); + let table_name = parquet_file_table_name(&parquet_file).to_string(); + let record_batch_stream = record_batches_stream(&parquet_file); + read_buffer_chunk_from_stream(table_name, record_batch_stream).await + }) + .await + .expect("retry forever"); + + Arc::new(rb_chunk) + } + }, + )); + + let loader = Arc::new(MetricsLoader::new( + loader, + CACHE_ID, + Arc::clone(&time_provider), + metric_registry, + )); + + // add to memory pool + let backend = Box::new(LruBackend::new( + Box::new(HashMap::new()), + Arc::clone(&ram_pool), + CACHE_ID, + Arc::new(FunctionEstimator::new( + |k: &ParquetFileId, v: &Arc| { + RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size()) + }, + )), + )); + + // get a direct handle so we can clear out entries as needed + let _backend = SharedBackend::new(backend); + + let cache = Cache::new(loader, Box::new(_backend.clone())); + + Self { cache, _backend } + } + + /// Get read buffer chunks by Parquet file id + pub async fn get(&self, parquet_file_id: ParquetFileId) -> Arc { + self.cache.get(parquet_file_id).await + } +} + +fn parquet_file_by_id(_parquet_file_id: ParquetFileId) -> ParquetFile { + unimplemented!() +} + +fn parquet_file_table_name(_parquet_file: &ParquetFile) -> &str { + unimplemented!() +} + +fn record_batches_stream(_parquet_file: &ParquetFile) -> SendableRecordBatchStream { + unimplemented!() +} + +#[derive(Debug, Snafu)] +enum RBChunkError { + #[snafu(display("Error streaming record batches: {}", source))] + Streaming { source: arrow::error::ArrowError }, + + #[snafu(display("Error pushing record batch into chunk: {}", source))] + Pushing { source: arrow::error::ArrowError }, + + #[snafu(display("Read buffer error: {}", source))] + ReadBuffer { source: read_buffer::Error }, +} + +async fn read_buffer_chunk_from_stream( + table_name: String, + mut stream: SendableRecordBatchStream, +) -> Result { + let schema = stream.schema(); + + let mut builder = read_buffer::RBChunkBuilder::new(table_name, schema); + + while let Some(record_batch) = stream.next().await { + builder + .push_record_batch(record_batch.context(StreamingSnafu)?) + .context(PushingSnafu)?; + } + + builder.build().context(ReadBufferSnafu) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::ram::test_util::test_ram_pool; + use arrow::record_batch::RecordBatch; + use arrow_util::assert_batches_eq; + use datafusion_util::stream_from_batches; + use iox_tests::util::TestCatalog; + use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + use read_buffer::Predicate; + use schema::selection::Selection; + + fn make_cache(catalog: &TestCatalog) -> ReadBufferCache { + ReadBufferCache::new( + catalog.time_provider(), + &catalog.metric_registry(), + test_ram_pool(), + ) + } + + async fn make_catalog() -> Arc { + TestCatalog::new() + } + + #[tokio::test] + async fn test_rb_chunks() { + let catalog = make_catalog().await; + let _cache = make_cache(&catalog); + } + + fn lp_to_record_batch(lp: &str) -> RecordBatch { + let (_table, batch) = lp_to_mutable_batch(lp); + + batch.to_arrow(Selection::All).unwrap() + } + + #[tokio::test] + async fn build_read_buffer_chunk_from_stream_of_record_batches() { + let lines = ["cpu,host=a load=1 11", "cpu,host=a load=2 22"]; + let batches = lines + .into_iter() + .map(lp_to_record_batch) + .map(Arc::new) + .collect(); + + let stream = stream_from_batches(batches); + + let rb = read_buffer_chunk_from_stream("cpu".to_string(), stream) + .await + .unwrap(); + + let rb_batches: Vec = rb + .read_filter(Predicate::default(), Selection::All, vec![]) + .unwrap() + .collect(); + + let expected = [ + "+------+------+--------------------------------+", + "| host | load | time |", + "+------+------+--------------------------------+", + "| a | 1 | 1970-01-01T00:00:00.000000011Z |", + "| a | 2 | 1970-01-01T00:00:00.000000022Z |", + "+------+------+--------------------------------+", + ]; + + assert_batches_eq!(expected, &rb_batches); + } +} diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index b6f4ded819..7c06514af2 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -1,4 +1,4 @@ -//! Querier Chunk +//! Querier Chunks use crate::cache::CatalogCache; use arrow::record_batch::RecordBatch; @@ -20,7 +20,7 @@ use uuid::Uuid; mod query_access; -/// Immutable metadata attached to a [`QuerierChunk`]. +/// Immutable metadata attached to a [`QuerierParquetChunk`]. #[derive(Debug)] pub struct ChunkMeta { /// The ID of the chunk @@ -80,27 +80,19 @@ impl ChunkMeta { } } -/// Determines how the chunk data is currently accessible. -#[derive(Debug)] -pub enum ChunkStorage { - /// Data is currently available via parquet file within the object store. - Parquet { - /// ID of the parquet file if the chunk - parquet_file_id: ParquetFileId, - /// Chunk of the parquet file - chunk: Arc, - }, -} - -/// Chunk representation for the querier. +/// Chunk representation of Parquet file chunks for the querier. /// -/// These chunks are usually created on-demand. The querier cache system does not really have a notion of chunks (rather -/// it knows about parquet files, local FS caches, ingester data, cached read buffers) but we need to combine all that -/// knowledge into chunk objects because this is what the query engine (DataFusion and InfluxRPC) expect. +/// These chunks are usually created on-demand. The querier cache system does not really have a +/// notion of chunks (rather it knows about parquet files, local FS caches, ingester data, cached +/// read buffers) but we need to combine all that knowledge into chunk objects because this is what +/// the query engine (DataFusion and InfluxRPC) expect. #[derive(Debug)] -pub struct QuerierChunk { - /// How the data is currently structured / available for query. - storage: ChunkStorage, +pub struct QuerierParquetChunk { + /// ID of the Parquet file of the chunk + parquet_file_id: ParquetFileId, + + /// Chunk of the Parquet file + parquet_chunk: Arc, /// Immutable metadata. meta: Arc, @@ -112,19 +104,17 @@ pub struct QuerierChunk { partition_sort_key: Arc>, } -impl QuerierChunk { +impl QuerierParquetChunk { /// Create new parquet-backed chunk (object store data). - pub fn new_parquet( + pub fn new( parquet_file_id: ParquetFileId, - chunk: Arc, + parquet_chunk: Arc, meta: Arc, partition_sort_key: Arc>, ) -> Self { Self { - storage: ChunkStorage::Parquet { - parquet_file_id, - chunk, - }, + parquet_file_id, + parquet_chunk, meta, delete_predicates: Vec::new(), partition_sort_key, @@ -152,20 +142,14 @@ impl QuerierChunk { self.meta.as_ref() } - /// Parquet file ID if this chunk is backed by a parquet file. - pub fn parquet_file_id(&self) -> Option { - match &self.storage { - ChunkStorage::Parquet { - parquet_file_id, .. - } => Some(*parquet_file_id), - } + /// Parquet file ID + pub fn parquet_file_id(&self) -> ParquetFileId { + self.parquet_file_id } /// Return time range pub fn timestamp_min_max(&self) -> Option { - match &self.storage { - ChunkStorage::Parquet { chunk, .. } => chunk.timestamp_min_max(), - } + self.parquet_chunk.timestamp_min_max() } /// Partition sort key @@ -233,24 +217,24 @@ impl ParquetChunkAdapter { )) } - /// Create new querier chunk from a catalog record + /// Create new querier Parquet chunk from a catalog record /// /// Returns `None` if some data required to create this chunk is already gone from the catalog. - pub async fn new_querier_chunk_from_file_with_metadata( + pub async fn new_querier_parquet_chunk_from_file_with_metadata( &self, parquet_file_with_metadata: ParquetFileWithMetadata, - ) -> Option { + ) -> Option { let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata); - self.new_querier_chunk(&decoded_parquet_file).await + self.new_querier_parquet_chunk(&decoded_parquet_file).await } - /// Create new querier chunk. + /// Create new querier Parquet chunk. /// /// Returns `None` if some data required to create this chunk is already gone from the catalog. - pub async fn new_querier_chunk( + pub async fn new_querier_parquet_chunk( &self, decoded_parquet_file: &DecodedParquetFile, - ) -> Option { + ) -> Option { let parquet_file = &decoded_parquet_file.parquet_file; let chunk = Arc::new(self.new_parquet_chunk(decoded_parquet_file).await?); let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)); @@ -285,7 +269,7 @@ impl ParquetChunkAdapter { max_sequence_number: parquet_file.max_sequence_number, }); - Some(QuerierChunk::new_parquet( + Some(QuerierParquetChunk::new( parquet_file.id, chunk, meta, @@ -295,7 +279,7 @@ impl ParquetChunkAdapter { } /// collect data for the given chunk -pub async fn collect_read_filter(chunk: &QuerierChunk) -> Vec { +pub async fn collect_read_filter(chunk: &QuerierParquetChunk) -> Vec { chunk .read_filter( IOxSessionContext::default(), @@ -355,7 +339,7 @@ pub mod tests { // create chunk let chunk = adapter - .new_querier_chunk(&DecodedParquetFile::new(parquet_file)) + .new_querier_parquet_chunk(&DecodedParquetFile::new(parquet_file)) .await .unwrap(); diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index b36f45fbc6..edda004956 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,4 +1,4 @@ -use crate::chunk::{ChunkStorage, QuerierChunk}; +use crate::chunk::QuerierParquetChunk; use data_types::{ ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; @@ -18,17 +18,13 @@ pub enum Error { }, } -impl QueryChunkMeta for QuerierChunk { +impl QueryChunkMeta for QuerierParquetChunk { fn summary(&self) -> Option<&TableSummary> { - match &self.storage { - ChunkStorage::Parquet { chunk, .. } => Some(chunk.table_summary().as_ref()), - } + Some(self.parquet_chunk.table_summary().as_ref()) } fn schema(&self) -> Arc { - match &self.storage { - ChunkStorage::Parquet { chunk, .. } => chunk.schema(), - } + self.parquet_chunk.schema() } fn partition_sort_key(&self) -> Option<&SortKey> { @@ -52,7 +48,7 @@ impl QueryChunkMeta for QuerierChunk { } } -impl QueryChunk for QuerierChunk { +impl QueryChunk for QuerierParquetChunk { fn id(&self) -> ChunkId { self.meta().chunk_id } @@ -62,23 +58,19 @@ impl QueryChunk for QuerierChunk { } fn may_contain_pk_duplicates(&self) -> bool { - match &self.storage { - ChunkStorage::Parquet { .. } => false, - } + false } fn apply_predicate_to_metadata( &self, predicate: &predicate::Predicate, ) -> Result { - let pred_result = match &self.storage { - ChunkStorage::Parquet { chunk, .. } => { - if predicate.has_exprs() || chunk.has_timerange(predicate.range.as_ref()) { - PredicateMatch::Unknown - } else { - PredicateMatch::Zero - } - } + let pred_result = if predicate.has_exprs() + || self.parquet_chunk.has_timerange(predicate.range.as_ref()) + { + PredicateMatch::Unknown + } else { + PredicateMatch::Zero }; Ok(pred_result) @@ -90,15 +82,11 @@ impl QueryChunk for QuerierChunk { predicate: &predicate::Predicate, columns: schema::selection::Selection<'_>, ) -> Result, QueryChunkError> { - match &self.storage { - ChunkStorage::Parquet { chunk, .. } => { - if !predicate.is_empty() { - // if there is anything in the predicate, bail for now and force a full plan - return Ok(None); - } - Ok(chunk.column_names(columns)) - } + if !predicate.is_empty() { + // if there is anything in the predicate, bail for now and force a full plan + return Ok(None); } + Ok(self.parquet_chunk.column_names(columns)) } fn column_values( @@ -107,13 +95,9 @@ impl QueryChunk for QuerierChunk { _column_name: &str, _predicate: &predicate::Predicate, ) -> Result, QueryChunkError> { - match &self.storage { - ChunkStorage::Parquet { .. } => { - // Since DataFusion can read Parquet, there is no advantage to - // manually implementing this vs just letting DataFusion do its thing - Ok(None) - } - } + // Since DataFusion can read Parquet, there is no advantage to + // manually implementing this vs just letting DataFusion do its thing + Ok(None) } fn read_filter( @@ -134,23 +118,17 @@ impl QueryChunk for QuerierChunk { pred_with_deleted_exprs.merge_delete_predicates(&delete_predicates); debug!(?pred_with_deleted_exprs, "Merged negated predicate"); - match &self.storage { - ChunkStorage::Parquet { chunk, .. } => { - ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs)); - chunk - .read_filter(&pred_with_deleted_exprs, selection) - .context(ParquetFileChunkSnafu { - chunk_id: self.id(), - }) - .map_err(|e| Box::new(e) as _) - } - } + ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs)); + self.parquet_chunk + .read_filter(&pred_with_deleted_exprs, selection) + .context(ParquetFileChunkSnafu { + chunk_id: self.id(), + }) + .map_err(|e| Box::new(e) as _) } fn chunk_type(&self) -> &str { - match &self.storage { - ChunkStorage::Parquet { .. } => "parquet", - } + "parquet" } fn order(&self) -> ChunkOrder { diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index 55def48b63..e7efe984d1 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -24,7 +24,7 @@ use std::{ }; use crate::{ - chunk::{ParquetChunkAdapter, QuerierChunk}, + chunk::{ParquetChunkAdapter, QuerierParquetChunk}, tombstone::QuerierTombstone, IngesterPartition, }; @@ -115,12 +115,12 @@ impl Reconciler { "Parquet files after filtering" ); - // convert parquet files and tombstones into QuerierChunks + // convert parquet files and tombstones into QuerierParquetChunks let mut parquet_chunks = Vec::with_capacity(parquet_files.len()); for parquet_file_with_metadata in parquet_files { if let Some(chunk) = self .chunk_adapter - .new_querier_chunk_from_file_with_metadata(parquet_file_with_metadata) + .new_querier_parquet_chunk_from_file_with_metadata(parquet_file_with_metadata) .await { parquet_chunks.push(chunk); @@ -174,12 +174,7 @@ impl Reconciler { .chunk_adapter .catalog_cache() .processed_tombstones() - .exists( - chunk - .parquet_file_id() - .expect("just created from a parquet file"), - tombstone.tombstone_id(), - ) + .exists(chunk.parquet_file_id(), tombstone.tombstone_id()) .await { continue; @@ -282,7 +277,7 @@ trait UpdatableQuerierChunk: QueryChunk { fn upcast_to_querier_chunk(self: Box) -> Box; } -impl UpdatableQuerierChunk for QuerierChunk { +impl UpdatableQuerierChunk for QuerierParquetChunk { fn update_partition_sort_key( self: Box, sort_key: Arc>,