Merge pull request #1700 from influxdata/er/refactor/rb_chunk

refactor: export Read Buffer Chunk as RBChunk
pull/24376/head
kodiakhq[bot] 2021-06-11 17:57:50 +00:00 committed by GitHub
commit cf523e23b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 40 deletions

View File

@ -7,14 +7,14 @@ use arrow::{
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use internal_types::schema::builder::SchemaBuilder; use internal_types::schema::builder::SchemaBuilder;
use read_buffer::{BinaryExpr, Chunk, ChunkMetrics, Predicate}; use read_buffer::{BinaryExpr, ChunkMetrics, Predicate, RBChunk};
const BASE_TIME: i64 = 1351700038292387000_i64; const BASE_TIME: i64 = 1351700038292387000_i64;
const ONE_MS: i64 = 1_000_000; const ONE_MS: i64 = 1_000_000;
fn table_names(c: &mut Criterion) { fn table_names(c: &mut Criterion) {
let rb = generate_row_group(500_000); let rb = generate_row_group(500_000);
let mut chunk = Chunk::new(ChunkMetrics::new_unregistered()); let mut chunk = RBChunk::new(ChunkMetrics::new_unregistered());
chunk.upsert_table("table_a", rb); chunk.upsert_table("table_a", rb);
// no predicate - return all the tables // no predicate - return all the tables
@ -61,7 +61,7 @@ fn table_names(c: &mut Criterion) {
fn benchmark_table_names( fn benchmark_table_names(
c: &mut Criterion, c: &mut Criterion,
bench_name: &str, bench_name: &str,
chunk: &Chunk, chunk: &RBChunk,
predicate: Predicate, predicate: Predicate,
expected_rows: usize, expected_rows: usize,
) { ) {

View File

@ -8,7 +8,7 @@ use internal_types::selection::Selection;
use packers::{sorter, Packers}; use packers::{sorter, Packers};
use read_buffer::{ use read_buffer::{
benchmarks::{Column, ColumnType, RowGroup}, benchmarks::{Column, ColumnType, RowGroup},
Chunk, RBChunk,
}; };
use read_buffer::{BinaryExpr, Predicate}; use read_buffer::{BinaryExpr, Predicate};
@ -17,7 +17,7 @@ const ONE_MS: i64 = 1_000_000;
pub fn read_filter(c: &mut Criterion) { pub fn read_filter(c: &mut Criterion) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut chunk = Chunk::new(read_buffer::ChunkMetrics::new_unregistered()); let mut chunk = RBChunk::new(read_buffer::ChunkMetrics::new_unregistered());
let row_group = generate_row_group(200_000, &mut rng); let row_group = generate_row_group(200_000, &mut rng);
read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group); read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group);
@ -27,7 +27,7 @@ pub fn read_filter(c: &mut Criterion) {
// These benchmarks track the performance of read_filter without any predicate // These benchmarks track the performance of read_filter without any predicate
// but varying the size of projection (columns) requested // but varying the size of projection (columns) requested
fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) { fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
let mut group = c.benchmark_group("read_filter/no_pred"); let mut group = c.benchmark_group("read_filter/no_pred");
// All these projections involve the same number of rows but with varying // All these projections involve the same number of rows but with varying
@ -63,7 +63,7 @@ fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
} }
// These benchmarks track the performance of read_filter with different predicates // These benchmarks track the performance of read_filter with different predicates
fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) { fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &RBChunk) {
let mut group = c.benchmark_group("read_filter/with_pred"); let mut group = c.benchmark_group("read_filter/with_pred");
// these predicates vary the number of rows returned // these predicates vary the number of rows returned

View File

@ -9,7 +9,7 @@ mod table;
mod value; mod value;
// Identifiers that are exported as part of the public API. // Identifiers that are exported as part of the public API.
pub use chunk::{Chunk, ChunkMetrics, Error}; pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error};
pub use row_group::{BinaryExpr, Predicate}; pub use row_group::{BinaryExpr, Predicate};
pub use schema::*; pub use schema::*;
pub use table::ReadFilterResults; pub use table::ReadFilterResults;
@ -29,11 +29,11 @@ pub mod benchmarks {
Column, RowIDs, Column, RowIDs,
}; };
pub use crate::row_group::{ColumnType, RowGroup}; pub use crate::row_group::{ColumnType, RowGroup};
use crate::Chunk; use crate::RBChunk;
// Allow external benchmarks to use this crate-only test method // Allow external benchmarks to use this crate-only test method
pub fn upsert_table_with_row_group( pub fn upsert_table_with_row_group(
chunk: &mut Chunk, chunk: &mut RBChunk,
table_name: impl Into<String>, table_name: impl Into<String>,
row_group: RowGroup, row_group: RowGroup,
) { ) {

View File

@ -41,7 +41,7 @@ use parquet_file::{
}; };
use query::{exec::Executor, predicate::Predicate, Database}; use query::{exec::Executor, predicate::Predicate, Database};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{ use std::{
any::Any, any::Any,
@ -527,7 +527,7 @@ impl Db {
let metrics = self let metrics = self
.metrics_registry .metrics_registry
.register_domain_with_labels("read_buffer", self.metric_labels.clone()); .register_domain_with_labels("read_buffer", self.metric_labels.clone());
let mut rb_chunk = ReadBufferChunk::new(ReadBufferChunkMetrics::new( let mut rb_chunk = RBChunk::new(ReadBufferChunkMetrics::new(
&metrics, &metrics,
self.preserved_catalog self.preserved_catalog
.state() .state()

View File

@ -11,7 +11,7 @@ use internal_types::schema::Schema;
use metrics::{Counter, Histogram, KeyValue}; use metrics::{Counter, Histogram, KeyValue};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk}; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, Chunk as MBChunk};
use parquet_file::chunk::Chunk as ParquetChunk; use parquet_file::chunk::Chunk as ParquetChunk;
use read_buffer::Chunk as ReadBufferChunk; use read_buffer::RBChunk;
use tracker::{TaskRegistration, TaskTracker}; use tracker::{TaskRegistration, TaskTracker};
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -121,7 +121,7 @@ pub enum ChunkStageFrozenRepr {
MutableBufferSnapshot(Arc<MBChunkSnapshot>), MutableBufferSnapshot(Arc<MBChunkSnapshot>),
/// Read Buffer that is optimized for in-memory data processing. /// Read Buffer that is optimized for in-memory data processing.
ReadBuffer(Arc<ReadBufferChunk>), ReadBuffer(Arc<RBChunk>),
} }
/// Represents the current lifecycle stage a chunk is in. /// Represents the current lifecycle stage a chunk is in.
@ -187,7 +187,7 @@ pub enum ChunkStage {
parquet: Arc<ParquetChunk>, parquet: Arc<ParquetChunk>,
/// In-memory version of the parquet data. /// In-memory version of the parquet data.
read_buffer: Option<Arc<ReadBufferChunk>>, read_buffer: Option<Arc<RBChunk>>,
}, },
} }
@ -612,7 +612,7 @@ impl Chunk {
/// Set the chunk in the Moved state, setting the underlying /// Set the chunk in the Moved state, setting the underlying
/// storage handle to db, and discarding the underlying mutable buffer /// storage handle to db, and discarding the underlying mutable buffer
/// storage. /// storage.
pub fn set_moved(&mut self, chunk: Arc<ReadBufferChunk>) -> Result<()> { pub fn set_moved(&mut self, chunk: Arc<RBChunk>) -> Result<()> {
match &mut self.stage { match &mut self.stage {
ChunkStage::Frozen { representation, .. } => match &representation { ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
@ -649,7 +649,7 @@ impl Chunk {
pub fn set_writing_to_object_store( pub fn set_writing_to_object_store(
&mut self, &mut self,
registration: &TaskRegistration, registration: &TaskRegistration,
) -> Result<Arc<ReadBufferChunk>> { ) -> Result<Arc<RBChunk>> {
match &self.stage { match &self.stage {
ChunkStage::Frozen { representation, .. } => { ChunkStage::Frozen { representation, .. } => {
match &representation { match &representation {
@ -737,7 +737,7 @@ impl Chunk {
} }
} }
pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<ReadBufferChunk>> { pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<RBChunk>> {
match &mut self.stage { match &mut self.stage {
ChunkStage::Persisted { ChunkStage::Persisted {
parquet, parquet,

View File

@ -21,7 +21,7 @@ use query::{
pruning::Prunable, pruning::Prunable,
PartitionChunk, PartitionChunk,
}; };
use read_buffer::Chunk as ReadBufferChunk; use read_buffer::RBChunk;
use super::{ use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
@ -94,7 +94,7 @@ enum State {
chunk: Arc<ChunkSnapshot>, chunk: Arc<ChunkSnapshot>,
}, },
ReadBuffer { ReadBuffer {
chunk: Arc<ReadBufferChunk>, chunk: Arc<RBChunk>,
partition_key: Arc<str>, partition_key: Arc<str>,
}, },
ParquetFile { ParquetFile {

View File

@ -6,8 +6,6 @@ use observability_deps::tracing::{info, warn};
use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job}; use data_types::{database_rules::LifecycleRules, error::ErrorLogger, job::Job};
use tracker::{RwLock, TaskTracker};
use super::{ use super::{
catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr}, catalog::chunk::{Chunk, ChunkStage, ChunkStageFrozenRepr},
Db, Db,
@ -16,6 +14,7 @@ use data_types::database_rules::SortOrder;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use std::collections::HashSet; use std::collections::HashSet;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracker::{RwLock, TaskTracker};
pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1); pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1);
/// Number of seconds to wait before retying a failed lifecycle action /// Number of seconds to wait before retying a failed lifecycle action
@ -392,6 +391,7 @@ mod tests {
use data_types::partition_metadata::TableSummary; use data_types::partition_metadata::TableSummary;
use entry::test_helpers::lp_to_entry; use entry::test_helpers::lp_to_entry;
use object_store::{memory::InMemory, parsed_path, ObjectStore}; use object_store::{memory::InMemory, parsed_path, ObjectStore};
use read_buffer::RBChunk;
use std::num::{NonZeroU32, NonZeroUsize}; use std::num::{NonZeroU32, NonZeroUsize};
use tracker::{TaskRegistration, TaskRegistry}; use tracker::{TaskRegistration, TaskRegistry};
@ -429,7 +429,7 @@ mod tests {
} }
/// Transitions a new ("open") chunk into the "moved" state. /// Transitions a new ("open") chunk into the "moved" state.
fn transition_to_moved(mut chunk: Chunk, rb: &Arc<read_buffer::Chunk>) -> Chunk { fn transition_to_moved(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
chunk = transition_to_moving(chunk); chunk = transition_to_moving(chunk);
chunk.set_moved(Arc::clone(&rb)).unwrap(); chunk.set_moved(Arc::clone(&rb)).unwrap();
chunk chunk
@ -437,10 +437,7 @@ mod tests {
/// Transitions a new ("open") chunk into the "writing to object store" /// Transitions a new ("open") chunk into the "writing to object store"
/// state. /// state.
fn transition_to_writing_to_object_store( fn transition_to_writing_to_object_store(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
mut chunk: Chunk,
rb: &Arc<read_buffer::Chunk>,
) -> Chunk {
chunk = transition_to_moved(chunk, rb); chunk = transition_to_moved(chunk, rb);
chunk chunk
.set_writing_to_object_store(&Default::default()) .set_writing_to_object_store(&Default::default())
@ -450,10 +447,7 @@ mod tests {
/// Transitions a new ("open") chunk into the "written to object store" /// Transitions a new ("open") chunk into the "written to object store"
/// state. /// state.
fn transition_to_written_to_object_store( fn transition_to_written_to_object_store(mut chunk: Chunk, rb: &Arc<RBChunk>) -> Chunk {
mut chunk: Chunk,
rb: &Arc<read_buffer::Chunk>,
) -> Chunk {
chunk = transition_to_writing_to_object_store(chunk, rb); chunk = transition_to_writing_to_object_store(chunk, rb);
let parquet_chunk = new_parquet_chunk(&chunk); let parquet_chunk = new_parquet_chunk(&chunk);
chunk chunk
@ -783,9 +777,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let rb = Arc::new(read_buffer::Chunk::new( let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
read_buffer::ChunkMetrics::new_unregistered(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))]; let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -826,9 +818,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let rb = Arc::new(read_buffer::Chunk::new( let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
read_buffer::ChunkMetrics::new_unregistered(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))]; let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -879,9 +869,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let rb = Arc::new(read_buffer::Chunk::new( let rb = Arc::new(RBChunk::new(read_buffer::ChunkMetrics::new_unregistered()));
read_buffer::ChunkMetrics::new_unregistered(),
));
let chunks = vec![ let chunks = vec![
// still moving => cannot write // still moving => cannot write