diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index b1f67c5fb2..0b503621fb 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -7,14 +7,14 @@ use arrow::{ record_batch::RecordBatch, }; use internal_types::schema::builder::SchemaBuilder; -use read_buffer::{BinaryExpr, Chunk, ChunkMetrics, Predicate}; +use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk}; const BASE_TIME: i64 = 1351700038292387000_i64; const ONE_MS: i64 = 1_000_000; fn table_names(c: &mut Criterion) { let rb = generate_row_group(500_000); - let mut chunk = Chunk::new(ChunkMetrics::new_unregistered()); + let mut chunk = RBChunk::new(ChunkMetrics::new_unregistered()); chunk.upsert_table("table_a", rb); // no predicate - return all the tables @@ -61,7 +61,7 @@ fn table_names(c: &mut Criterion) { fn benchmark_table_names( c: &mut Criterion, bench_name: &str, - chunk: &Chunk, + chunk: &RBChunk, predicate: Predicate, expected_rows: usize, ) { diff --git a/read_buffer/benches/read_filter.rs b/read_buffer/benches/read_filter.rs index c5ab87c2fb..51068a3704 100644 --- a/read_buffer/benches/read_filter.rs +++ b/read_buffer/benches/read_filter.rs @@ -8,7 +8,7 @@ use internal_types::selection::Selection; use packers::{sorter, Packers}; use read_buffer::{ benchmarks::{Column, ColumnType, RowGroup}, - Chunk, + RBChunk, }; use read_buffer::{BinaryExpr, Predicate}; @@ -17,7 +17,7 @@ const ONE_MS: i64 = 1_000_000; pub fn read_filter(c: &mut Criterion) { let mut rng = rand::thread_rng(); - let mut chunk = Chunk::new(read_buffer::ChunkMetrics::new_unregistered()); + let mut chunk = RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()); let row_group = generate_row_group(200_000, &mut rng); read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group); @@ -27,7 +27,7 @@ pub fn read_filter(c: &mut Criterion) { // These benchmarks track the performance of read_filter without any predicate // but varying the size of projection (columns) requested -fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) { +fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) { let mut group = c.benchmark_group("read_filter/no_pred"); // All these projections involve the same number of rows but with varying @@ -63,7 +63,7 @@ fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) { } // These benchmarks track the performance of read_filter with different predicates -fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) { +fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) { let mut group = c.benchmark_group("read_filter/with_pred"); // these predicates vary the number of rows returned diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 48f8474c8f..049f8a3299 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -9,7 +9,7 @@ mod table; mod value; // Identifiers that are exported as part of the public API. -pub use chunk::{Chunk, ChunkMetrics, Error}; +pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error}; pub use row_group::{BinaryExpr, Predicate}; pub use schema::*; pub use table::ReadFilterResults; @@ -29,11 +29,11 @@ pub mod benchmarks { Column, RowIDs, }; pub use crate::row_group::{ColumnType, RowGroup}; - use crate::Chunk; + use crate::RBChunk; // Allow external benchmarks to use this crate-only test method pub fn upsert_table_with_row_group( - chunk: &mut Chunk, + chunk: &mut RBChunk, table_name: impl Into, row_group: RowGroup, ) { diff --git a/server/src/db.rs b/server/src/db.rs index 8130477f0a..3f1362513c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -41,7 +41,7 @@ use parquet_file::{ }; use query::{exec::Executor, predicate::Predicate, Database}; use rand_distr::{Distribution, Poisson}; -use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics}; +use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use snafu::{ResultExt, Snafu}; use std::{ any::Any, @@ -527,7 +527,7 @@ impl Db { let metrics = self .metrics_registry .register_domain_with_labels("read_buffer", self.metric_labels.clone()); - let mut rb_chunk = ReadBufferChunk::new(ReadBufferChunkMetrics::new( + let mut rb_chunk = RBChunk::new(ReadBufferChunkMetrics::new( &metrics, self.preserved_catalog .state() diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3664989bea..52ca50bf10 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -11,7 +11,7 @@ use internal_types::schema::Schema; use metrics::{Counter, Histogram, KeyValue}; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk}; use parquet_file::chunk::Chunk as ParquetChunk; -use read_buffer::Chunk as ReadBufferChunk; +use read_buffer::RBChunk; use tracker::{TaskRegistration, TaskTracker}; #[derive(Debug, Snafu)] @@ -121,7 +121,7 @@ pub enum ChunkStageFrozenRepr { MutableBufferSnapshot(Arc), /// Read Buffer that is optimized for in-memory data processing. - ReadBuffer(Arc), + ReadBuffer(Arc), } /// Represents the current lifecycle stage a chunk is in. @@ -187,7 +187,7 @@ pub enum ChunkStage { parquet: Arc, /// In-memory version of the parquet data. - read_buffer: Option>, + read_buffer: Option>, }, } @@ -612,7 +612,7 @@ impl Chunk { /// Set the chunk in the Moved state, setting the underlying /// storage handle to db, and discarding the underlying mutable buffer /// storage. - pub fn set_moved(&mut self, chunk: Arc) -> Result<()> { + pub fn set_moved(&mut self, chunk: Arc) -> Result<()> { match &mut self.stage { ChunkStage::Frozen { representation, .. } => match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { @@ -649,7 +649,7 @@ impl Chunk { pub fn set_writing_to_object_store( &mut self, registration: &TaskRegistration, - ) -> Result> { + ) -> Result> { match &self.stage { ChunkStage::Frozen { representation, .. } => { match &representation { @@ -737,7 +737,7 @@ impl Chunk { } } - pub fn set_unload_from_read_buffer(&mut self) -> Result> { + pub fn set_unload_from_read_buffer(&mut self) -> Result> { match &mut self.stage { ChunkStage::Persisted { parquet, diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index a933e1b6fa..66d375bc37 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -21,7 +21,7 @@ use query::{ pruning::Prunable, PartitionChunk, }; -use read_buffer::Chunk as ReadBufferChunk; +use read_buffer::RBChunk; use super::{ catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, @@ -94,7 +94,7 @@ enum State { chunk: Arc, }, ReadBuffer { - chunk: Arc, + chunk: Arc, partition_key: Arc, }, ParquetFile { diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index d23f9dda55..c8474cfb3b 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -6,8 +6,6 @@ use observability_deps::tracing::{info, warn}; use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job}; -use tracker::{RwLock, TaskTracker}; - use super::{ catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr}, Db, @@ -16,6 +14,7 @@ use data_types::database_rules::SortOrder; use futures::future::BoxFuture; use std::collections::HashSet; use std::time::{Duration, Instant}; +use tracker::{RwLock, TaskTracker}; pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1); /// Number of seconds to wait before retying a failed lifecycle action @@ -392,6 +391,7 @@ mod tests { use data_types::partition_metadata::TableSummary; use entry::test_helpers::lp_to_entry; use object_store::{memory::InMemory, parsed_path, ObjectStore}; + use read_buffer::RBChunk; use std::num::{NonZeroU32, NonZeroUsize}; use tracker::{TaskRegistration, TaskRegistry}; @@ -429,7 +429,7 @@ mod tests { } /// Transitions a new ("open") chunk into the "moved" state. - fn transition_to_moved(mut chunk: Chunk, rb: &Arc) -> Chunk { + fn transition_to_moved(mut chunk: Chunk, rb: &Arc) -> Chunk { chunk = transition_to_moving(chunk); chunk.set_moved(Arc::clone(&rb)).unwrap(); chunk @@ -437,10 +437,7 @@ mod tests { /// Transitions a new ("open") chunk into the "writing to object store" /// state. - fn transition_to_writing_to_object_store( - mut chunk: Chunk, - rb: &Arc, - ) -> Chunk { + fn transition_to_writing_to_object_store(mut chunk: Chunk, rb: &Arc) -> Chunk { chunk = transition_to_moved(chunk, rb); chunk .set_writing_to_object_store(&Default::default()) @@ -450,10 +447,7 @@ mod tests { /// Transitions a new ("open") chunk into the "written to object store" /// state. - fn transition_to_written_to_object_store( - mut chunk: Chunk, - rb: &Arc, - ) -> Chunk { + fn transition_to_written_to_object_store(mut chunk: Chunk, rb: &Arc) -> Chunk { chunk = transition_to_writing_to_object_store(chunk, rb); let parquet_chunk = new_parquet_chunk(&chunk); chunk @@ -783,9 +777,7 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new( - read_buffer::ChunkMetrics::new_unregistered(), - )); + let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered())); let chunks = vec![new_chunk(0, Some(0), Some(0))]; @@ -826,9 +818,7 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new( - read_buffer::ChunkMetrics::new_unregistered(), - )); + let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered())); let chunks = vec![new_chunk(0, Some(0), Some(0))]; @@ -879,9 +869,7 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new( - read_buffer::ChunkMetrics::new_unregistered(), - )); + let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered())); let chunks = vec![ // still moving => cannot write