refactor: export rb chunk as RBChunk

pull/24376/head
Edd Robinson 2021-06-11 17:25:33 +01:00
parent d7428f568f
commit ff19beb0ad
7 changed files with 28 additions and 40 deletions

View File

@ -7,14 +7,14 @@ use arrow::{
record_batch::RecordBatch,
};
use internal_types::schema::builder::SchemaBuilder;
use read_buffer::{BinaryExpr, Chunk, ChunkMetrics, Predicate};
use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk};
const BASE_TIME: i64 = 1351700038292387000_i64;
const ONE_MS: i64 = 1_000_000;
fn table_names(c: &mut Criterion) {
let rb = generate_row_group(500_000);
let mut chunk = Chunk::new(ChunkMetrics::new_unregistered());
let mut chunk = RBChunk::new(ChunkMetrics::new_unregistered());
chunk.upsert_table("table_a", rb);
// no predicate - return all the tables
@ -61,7 +61,7 @@ fn table_names(c: &mut Criterion) {
fn benchmark_table_names(
c: &mut Criterion,
bench_name: &str,
chunk: &Chunk,
chunk: &RBChunk,
predicate: Predicate,
expected_rows: usize,
) {

View File

@ -8,7 +8,7 @@ use internal_types::selection::Selection;
use packers::{sorter, Packers};
use read_buffer::{
benchmarks::{Column, ColumnType, RowGroup},
Chunk,
RBChunk,
};
use read_buffer::{BinaryExpr, Predicate};
@ -17,7 +17,7 @@ const ONE_MS: i64 = 1_000_000;
pub fn read_filter(c: &mut Criterion) {
let mut rng = rand::thread_rng();
let mut chunk = Chunk::new(read_buffer::ChunkMetrics::new_unregistered());
let mut chunk = RBChunk::new(read_buffer::ChunkMetrics::new_unregistered());
let row_group = generate_row_group(200_000, &mut rng);
read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group);
@ -27,7 +27,7 @@ pub fn read_filter(c: &mut Criterion) {
// These benchmarks track the performance of read_filter without any predicate
// but varying the size of projection (columns) requested
fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
let mut group = c.benchmark_group("read_filter/no_pred");
// All these projections involve the same number of rows but with varying
@ -63,7 +63,7 @@ fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
}
// These benchmarks track the performance of read_filter with different predicates
fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
let mut group = c.benchmark_group("read_filter/with_pred");
// these predicates vary the number of rows returned

View File

@ -9,7 +9,7 @@ mod table;
mod value;
// Identifiers that are exported as part of the public API.
pub use chunk::{Chunk, ChunkMetrics, Error};
pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error};
pub use row_group::{BinaryExpr, Predicate};
pub use schema::*;
pub use table::ReadFilterResults;
@ -29,11 +29,11 @@ pub mod benchmarks {
Column, RowIDs,
};
pub use crate::row_group::{ColumnType, RowGroup};
use crate::Chunk;
use crate::RBChunk;
// Allow external benchmarks to use this crate-only test method
pub fn upsert_table_with_row_group(
chunk: &mut Chunk,
chunk: &mut RBChunk,
table_name: impl Into<String>,
row_group: RowGroup,
) {

View File

@ -41,7 +41,7 @@ use parquet_file::{
};
use query::{exec::Executor, predicate::Predicate, Database};
use rand_distr::{Distribution, Poisson};
use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ResultExt, Snafu};
use std::{
any::Any,
@ -527,7 +527,7 @@ impl Db {
let metrics = self
.metrics_registry
.register_domain_with_labels("read_buffer", self.metric_labels.clone());
let mut rb_chunk = ReadBufferChunk::new(ReadBufferChunkMetrics::new(
let mut rb_chunk = RBChunk::new(ReadBufferChunkMetrics::new(
&metrics,
self.preserved_catalog
.state()

View File

@ -11,7 +11,7 @@ use internal_types::schema::Schema;
use metrics::{Counter, Histogram, KeyValue};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk};
use parquet_file::chunk::Chunk as ParquetChunk;
use read_buffer::Chunk as ReadBufferChunk;
use read_buffer::RBChunk;
use tracker::{TaskRegistration, TaskTracker};
#[derive(Debug, Snafu)]
@ -121,7 +121,7 @@ pub enum ChunkStageFrozenRepr {
MutableBufferSnapshot(Arc<MBChunkSnapshot>),
/// Read Buffer that is optimized for in-memory data processing.
ReadBuffer(Arc<ReadBufferChunk>),
ReadBuffer(Arc<RBChunk>),
}
/// Represents the current lifecycle stage a chunk is in.
@ -187,7 +187,7 @@ pub enum ChunkStage {
parquet: Arc<ParquetChunk>,
/// In-memory version of the parquet data.
read_buffer: Option<Arc<ReadBufferChunk>>,
read_buffer: Option<Arc<RBChunk>>,
},
}
@ -612,7 +612,7 @@ impl Chunk {
/// Set the chunk in the Moved state, setting the underlying
/// storage handle to db, and discarding the underlying mutable buffer
/// storage.
pub fn set_moved(&mut self, chunk: Arc<ReadBufferChunk>) -> Result<()> {
pub fn set_moved(&mut self, chunk: Arc<RBChunk>) -> Result<()> {
match &mut self.stage {
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
@ -649,7 +649,7 @@ impl Chunk {
pub fn set_writing_to_object_store(
&mut self,
registration: &TaskRegistration,
) -> Result<Arc<ReadBufferChunk>> {
) -> Result<Arc<RBChunk>> {
match &self.stage {
ChunkStage::Frozen { representation, .. } => {
match &representation {
@ -737,7 +737,7 @@ impl Chunk {
}
}
pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<ReadBufferChunk>> {
pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<RBChunk>> {
match &mut self.stage {
ChunkStage::Persisted {
parquet,

View File

@ -21,7 +21,7 @@ use query::{
pruning::Prunable,
PartitionChunk,
};
use read_buffer::Chunk as ReadBufferChunk;
use read_buffer::RBChunk;
use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
@ -94,7 +94,7 @@ enum State {
chunk: Arc<ChunkSnapshot>,
},
ReadBuffer {
chunk: Arc<ReadBufferChunk>,
chunk: Arc<RBChunk>,
partition_key: Arc<str>,
},
ParquetFile {

View File

@ -6,8 +6,6 @@ use observability_deps::tracing::{info, warn};
use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job};
use tracker::{RwLock, TaskTracker};
use super::{
catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr},
Db,
@ -16,6 +14,7 @@ use data_types::database_rules::SortOrder;
use futures::future::BoxFuture;
use std::collections::HashSet;
use std::time::{Duration, Instant};
use tracker::{RwLock, TaskTracker};
pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1);
/// Number of seconds to wait before retying a failed lifecycle action
@ -392,6 +391,7 @@ mod tests {
use data_types::partition_metadata::TableSummary;
use entry::test_helpers::lp_to_entry;
use object_store::{memory::InMemory, parsed_path, ObjectStore};
use read_buffer::RBChunk;
use std::num::{NonZeroU32, NonZeroUsize};
use tracker::{TaskRegistration, TaskRegistry};
@ -429,7 +429,7 @@ mod tests {
}
/// Transitions a new ("open") chunk into the "moved" state.
fn transition_to_moved(mut chunk: Chunk, rb: &Arc<read_buffer::Chunk>) -> Chunk {
fn transition_to_moved(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
chunk = transition_to_moving(chunk);
chunk.set_moved(Arc::clone(&rb)).unwrap();
chunk
@ -437,10 +437,7 @@ mod tests {
/// Transitions a new ("open") chunk into the "writing to object store"
/// state.
fn transition_to_writing_to_object_store(
mut chunk: Chunk,
rb: &Arc<read_buffer::Chunk>,
) -> Chunk {
fn transition_to_writing_to_object_store(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
chunk = transition_to_moved(chunk, rb);
chunk
.set_writing_to_object_store(&Default::default())
@ -450,10 +447,7 @@ mod tests {
/// Transitions a new ("open") chunk into the "written to object store"
/// state.
fn transition_to_written_to_object_store(
mut chunk: Chunk,
rb: &Arc<read_buffer::Chunk>,
) -> Chunk {
fn transition_to_written_to_object_store(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
chunk = transition_to_writing_to_object_store(chunk, rb);
let parquet_chunk = new_parquet_chunk(&chunk);
chunk
@ -783,9 +777,7 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new(
read_buffer::ChunkMetrics::new_unregistered(),
));
let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -826,9 +818,7 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new(
read_buffer::ChunkMetrics::new_unregistered(),
));
let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -879,9 +869,7 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new(
read_buffer::ChunkMetrics::new_unregistered(),
));
let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
let chunks = vec![
// still moving => cannot write