refactor: remove table name from read buffer (#4910)

The low-level chunk storage shouldn't care about the table name (this is
also true for parquet chunks btw). In fact, the table name is already
only a partial information since it misses the namespace.

If we need a table name, then the high-level chunk/data management is
responsible for that.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-06-21 13:57:28 +02:00 committed by GitHub
parent 0f63be26c3
commit db24838221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 137 deletions

View File

@ -25,7 +25,6 @@ const CACHE_ID: &str = "read_buffer";
#[derive(Debug)]
struct ExtraFetchInfo {
decoded_parquet_file: Arc<DecodedParquetFile>,
table_name: Arc<str>,
store: ParquetStorage,
}
@ -59,13 +58,11 @@ impl ReadBufferCache {
.retry_all_errors("get read buffer chunk from parquet file", || {
let decoded_parquet_file_for_load =
Arc::clone(&extra_fetch_info.decoded_parquet_file);
let table_name_for_load = Arc::clone(&extra_fetch_info.table_name);
let store_for_load = extra_fetch_info.store.clone();
async {
load_from_parquet_file(
decoded_parquet_file_for_load,
table_name_for_load,
store_for_load,
&metric_registry,
)
@ -117,7 +114,6 @@ impl ReadBufferCache {
pub async fn get(
&self,
decoded_parquet_file: Arc<DecodedParquetFile>,
table_name: Arc<str>,
store: ParquetStorage,
) -> Arc<RBChunk> {
self.cache
@ -125,7 +121,6 @@ impl ReadBufferCache {
decoded_parquet_file.parquet_file_id(),
ExtraFetchInfo {
decoded_parquet_file,
table_name,
store,
},
)
@ -146,13 +141,12 @@ enum LoadError {
async fn load_from_parquet_file(
decoded_parquet_file: Arc<DecodedParquetFile>,
table_name: Arc<str>,
store: ParquetStorage,
metric_registry: &metric::Registry,
) -> Result<RBChunk, LoadError> {
let record_batch_stream =
record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?;
read_buffer_chunk_from_stream(table_name, record_batch_stream, metric_registry)
read_buffer_chunk_from_stream(record_batch_stream, metric_registry)
.await
.context(BuildingChunkSnafu)
}
@ -188,7 +182,6 @@ enum RBChunkError {
}
async fn read_buffer_chunk_from_stream(
table_name: Arc<str>,
mut stream: SendableRecordBatchStream,
metric_registry: &metric::Registry,
) -> Result<RBChunk, RBChunkError> {
@ -197,8 +190,7 @@ async fn read_buffer_chunk_from_stream(
// create "global" metric object, so that we don't blow up prometheus w/ too many metrics
let metrics = ChunkMetrics::new(metric_registry, "iox_shared");
let mut builder =
read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema).with_metrics(metrics);
let mut builder = read_buffer::RBChunkBuilder::new(schema).with_metrics(metrics);
while let Some(record_batch) = stream.next().await {
builder
@ -258,9 +250,7 @@ mod tests {
let cache = make_cache(&catalog);
let rb = cache
.get(Arc::clone(&decoded), "table1".into(), storage.clone())
.await;
let rb = cache.get(Arc::clone(&decoded), storage.clone()).await;
let rb_batches: Vec<RecordBatch> = rb
.read_filter(Predicate::default(), Selection::All, vec![])
@ -278,7 +268,7 @@ mod tests {
assert_batches_eq!(expected, &rb_batches);
// This should fetch from the cache
let _rb_again = cache.get(decoded, "table1".into(), storage).await;
let _rb_again = cache.get(decoded, storage).await;
let m: Metric<U64Counter> = catalog
.metric_registry
@ -338,71 +328,43 @@ mod tests {
// load 1: Fetch table1 from storage
cache
.get(
Arc::clone(&decoded_parquet_files[0]),
"cached_table1".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[0]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// load 2: Fetch table2 from storage
cache
.get(
Arc::clone(&decoded_parquet_files[1]),
"cached_table2".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[1]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// Fetch table1 from cache, which should update its last used
cache
.get(
Arc::clone(&decoded_parquet_files[0]),
"cached_table1".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[0]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// load 3: Fetch table3 from storage, which should evict table2
cache
.get(
Arc::clone(&decoded_parquet_files[2]),
"cached_table3".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[2]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// load 4: Fetch table2, which will be from storage again, and will evict table1
cache
.get(
Arc::clone(&decoded_parquet_files[1]),
"cached_table2".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[1]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// Fetch table2 from cache
cache
.get(
Arc::clone(&decoded_parquet_files[1]),
"cached_table2".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[1]), storage.clone())
.await;
catalog.mock_time_provider().inc(Duration::from_millis(1));
// Fetch table3 from cache
cache
.get(
Arc::clone(&decoded_parquet_files[2]),
"cached_table3".into(),
storage.clone(),
)
.get(Arc::clone(&decoded_parquet_files[2]), storage.clone())
.await;
let m: Metric<U64Counter> = catalog
@ -448,7 +410,7 @@ mod tests {
let metric_registry = metric::Registry::new();
let rb = read_buffer_chunk_from_stream("cpu".into(), stream, &metric_registry)
let rb = read_buffer_chunk_from_stream(stream, &metric_registry)
.await
.unwrap();
@ -480,9 +442,7 @@ mod tests {
let cache = make_cache(&catalog);
let _rb = cache
.get(Arc::clone(&decoded), "table1".into(), storage.clone())
.await;
let _rb = cache.get(Arc::clone(&decoded), storage.clone()).await;
let g: Metric<CumulativeGauge> = catalog
.metric_registry

View File

@ -399,11 +399,7 @@ impl ChunkAdapter {
let rb_chunk = self
.catalog_cache()
.read_buffer()
.get(
Arc::clone(&decoded_parquet_file),
Arc::clone(&table_name),
self.store.clone(),
)
.get(Arc::clone(&decoded_parquet_file), self.store.clone())
.await;
let order = ChunkOrder::new(decoded_parquet_file.min_sequence_number().get())

View File

@ -12,7 +12,7 @@ const ONE_MS: i64 = 1_000_000;
fn satisfies_predicate(c: &mut Criterion) {
let rb = generate_row_group(500_000);
let chunk = RBChunk::new("table_a", rb, ChunkMetrics::new_unregistered());
let chunk = RBChunk::new(rb, ChunkMetrics::new_unregistered());
// no predicate
benchmark_satisfies_predicate(

View File

@ -18,7 +18,7 @@ pub fn read_filter(c: &mut Criterion) {
let mut rng = rand::thread_rng();
let row_group = generate_row_group(200_000, &mut rng);
let chunk = read_buffer::benchmarks::new_from_row_group("table", row_group);
let chunk = read_buffer::benchmarks::new_from_row_group(row_group);
read_filter_no_pred_vary_proj(c, &chunk);
read_filter_with_pred_vary_proj(c, &chunk);

View File

@ -29,23 +29,14 @@ pub enum Error {
#[snafu(display("error generating schema for table: {}", source))]
TableSchemaError { source: SchemaError },
#[snafu(display("table '{}' does not exist", table_name))]
TableNotFound { table_name: String },
#[snafu(display("no data to build chunk"))]
ChunkBuilderNoInput,
#[snafu(display("no data to build chunk for table '{}'", table_name))]
ChunkBuilderNoInput { table_name: String },
#[snafu(display("error building chunk: {}", source))]
ChunkBuilderError { source: arrow::error::ArrowError },
#[snafu(display("error building chunk for table '{}': {}", table_name, source))]
ChunkBuilderError {
table_name: String,
source: arrow::error::ArrowError,
},
#[snafu(display("column '{}' does not exist in table '{}'", column_name, table_name))]
ColumnDoesNotExist {
column_name: String,
table_name: String,
},
#[snafu(display("column '{}' does not exist", column_name))]
ColumnDoesNotExist { column_name: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -61,25 +52,20 @@ pub struct Chunk {
impl Chunk {
/// Start a new Chunk from the given record batch.
pub fn new(table_name: impl Into<String>, table_data: RecordBatch, metrics: Metrics) -> Self {
let table_name = table_name.into();
let row_group = record_batch_to_row_group(&table_name, table_data);
pub fn new(table_data: RecordBatch, metrics: Metrics) -> Self {
let row_group = record_batch_to_row_group(table_data);
Self::new_from_row_group(table_name, row_group, metrics)
Self::new_from_row_group(row_group, metrics)
}
// Only used in tests and benchmarks
pub(crate) fn new_from_row_group(
table_name: impl Into<String>,
row_group: RowGroup,
mut metrics: Metrics,
) -> Self {
pub(crate) fn new_from_row_group(row_group: RowGroup, mut metrics: Metrics) -> Self {
let storage_statistics = row_group.column_storage_statistics();
metrics.update_column_storage_statistics(&storage_statistics);
Self {
metrics,
table: Table::with_row_group(table_name, row_group),
table: Table::with_row_group(row_group),
}
}
@ -135,9 +121,7 @@ impl Chunk {
/// The data is converted to a `RowGroup` outside of any locking so the
/// caller does not need to be concerned about the size of the update.
pub fn upsert_table(&mut self, table_data: RecordBatch) {
let table_name = self.table.name();
let row_group = record_batch_to_row_group(table_name, table_data);
let row_group = record_batch_to_row_group(table_data);
self.upsert_table_with_row_group(row_group)
}
@ -263,7 +247,6 @@ impl Chunk {
if !table_meta.has_column(column_name) {
return ColumnDoesNotExistSnafu {
column_name: column_name.to_string(),
table_name: self.table.name().to_owned(),
}
.fail();
}
@ -343,11 +326,11 @@ impl Chunk {
}
}
fn record_batch_to_row_group(table_name: &str, rb: RecordBatch) -> RowGroup {
fn record_batch_to_row_group(rb: RecordBatch) -> RowGroup {
let now = std::time::Instant::now();
let row_group = RowGroup::from(rb);
debug!(rows=row_group.rows(), columns=row_group.columns(), size_bytes=row_group.size(),
raw_size_null=row_group.size_raw(true), raw_size_no_null=row_group.size_raw(true), table_name=?table_name, compressing_took=?now.elapsed(), "row group added");
raw_size_null=row_group.size_raw(true), raw_size_no_null=row_group.size_raw(true), compressing_took=?now.elapsed(), "row group added");
row_group
}
@ -362,7 +345,6 @@ impl std::fmt::Debug for Chunk {
/// count is met, reducing the overall memory footprint during the building
/// phase.
pub struct ChunkBuilder {
table_name: String,
schema: arrow::datatypes::SchemaRef,
current_rows: usize, // current total rows of below vec
@ -374,9 +356,8 @@ pub struct ChunkBuilder {
}
impl ChunkBuilder {
pub fn new(table_name: impl Into<String>, schema: arrow::datatypes::SchemaRef) -> Self {
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
Self {
table_name: table_name.into(),
schema,
current_rows: 0,
record_batches: vec![],
@ -430,7 +411,7 @@ impl ChunkBuilder {
arrow::record_batch::RecordBatch::concat(&self.schema, &self.record_batches)?;
self.row_groups
.push(record_batch_to_row_group(&self.table_name, concat_batch));
.push(record_batch_to_row_group(concat_batch));
// clear pending batches
self.record_batches.clear();
@ -451,21 +432,15 @@ impl ChunkBuilder {
// No batches or row groups is an error because we have nothing to build
// chunk with.
if self.is_empty() {
return ChunkBuilderNoInputSnafu {
table_name: self.table_name.clone(),
}
.fail();
return ChunkBuilderNoInputSnafu.fail();
// Snapshot remaining batches to a row group (accepting that it may
// be smaller than desired)
} else if !self.record_batches.is_empty() {
self.snapshot_rowgroup().context(ChunkBuilderSnafu {
table_name: self.table_name.clone(),
})?;
self.snapshot_rowgroup().context(ChunkBuilderSnafu)?;
}
// Create new chunk
let mut chunk = Chunk::new_from_row_group(
self.table_name.clone(),
self.row_groups.remove(0),
match self.chunk_metrics.take() {
// avoid partial move of self
@ -706,7 +681,7 @@ mod test {
let registry = metric::Registry::new();
let rb = gen_recordbatch();
let mut chunk = ChunkBuilder::new("mydb", rb.schema())
let mut chunk = ChunkBuilder::new(rb.schema())
.with_metrics(Metrics::new(&registry, "mydb"))
.with_record_batch(rb)
.must_build();
@ -844,7 +819,7 @@ mod test {
#[test]
fn read_filter_table_schema() {
let rb = gen_recordbatch();
let chunk = ChunkBuilder::new("mydb", rb.schema())
let chunk = ChunkBuilder::new(rb.schema())
.with_record_batch(rb)
.must_build();
@ -939,7 +914,7 @@ mod test {
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
let mut chunk_builder = ChunkBuilder::new("a_table", rb.schema());
let mut chunk_builder = ChunkBuilder::new(rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
@ -1052,7 +1027,7 @@ mod test {
.build()
.unwrap();
let mut chunk_builder = ChunkBuilder::new("Coolverine", schema.clone().into());
let mut chunk_builder = ChunkBuilder::new(schema.clone().into());
// Add a bunch of row groups to a single table in a single chunk
for &i in &[100, 200, 300] {
@ -1242,7 +1217,7 @@ mod test {
#[test]
fn could_pass_predicate() {
let rb = gen_recordbatch();
let mut chunk_builder = ChunkBuilder::new("table", rb.schema());
let mut chunk_builder = ChunkBuilder::new(rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
@ -1267,7 +1242,7 @@ mod test {
];
let rg = RowGroup::new(6, columns);
let chunk = Chunk::new_from_row_group("table_1", rg, Metrics::new_unregistered());
let chunk = Chunk::new_from_row_group(rg, Metrics::new_unregistered());
// No predicate so at least one row matches
assert!(chunk.satisfies_predicate(&Predicate::default()));
@ -1325,7 +1300,7 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema());
let mut chunk_builder = ChunkBuilder::new(rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
@ -1396,7 +1371,7 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema());
let mut chunk_builder = ChunkBuilder::new(rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
@ -1483,7 +1458,7 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let mut chunk_builder = ChunkBuilder::new("my_table", rb.schema());
let mut chunk_builder = ChunkBuilder::new(rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
@ -1544,7 +1519,7 @@ mod test {
fn chunk_builder() {
// test row group configuration
let rb = gen_recordbatch();
let chunk = ChunkBuilder::new("table_a", rb.schema())
let chunk = ChunkBuilder::new(rb.schema())
.with_record_batch(rb)
.must_build();
@ -1552,24 +1527,23 @@ mod test {
assert_eq!(chunk.rows(), 3);
let rb = gen_recordbatch();
let mut builder = ChunkBuilder::new("table_a", rb.schema()).set_row_group_min_size(3);
let mut builder = ChunkBuilder::new(rb.schema()).set_row_group_min_size(3);
builder.must_push_record_batch(rb);
builder.must_push_record_batch(gen_recordbatch());
builder.must_push_record_batch(gen_recordbatch());
let chunk = builder.must_build();
assert_eq!(chunk.table.name(), "table_a");
assert_eq!(chunk.row_groups(), 3);
assert_eq!(chunk.rows(), 9);
// when the chunk is empty an error is returned on build
let rb = gen_recordbatch();
let builder = ChunkBuilder::new("table_a", rb.schema());
let builder = ChunkBuilder::new(rb.schema());
assert!(builder.build().is_err());
// empty record batches are not stored for snapshotting
let rb = gen_recordbatch();
let mut builder = ChunkBuilder::new("table_a", rb.schema());
let mut builder = ChunkBuilder::new(rb.schema());
assert!(builder
.push_record_batch(RecordBatch::new_empty(rb.schema()))
.is_ok());

View File

@ -39,7 +39,7 @@ pub mod benchmarks {
use crate::{ChunkMetrics, RBChunk};
// Allow external benchmarks to use this crate-only test method
pub fn new_from_row_group(table_name: impl Into<String>, row_group: RowGroup) -> RBChunk {
RBChunk::new_from_row_group(table_name, row_group, ChunkMetrics::new_unregistered())
pub fn new_from_row_group(row_group: RowGroup) -> RBChunk {
RBChunk::new_from_row_group(row_group, ChunkMetrics::new_unregistered())
}
}

View File

@ -54,8 +54,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// Tables must contain at least one row group with at least one row.
pub struct Table {
name: String,
// A table's data is held in a collection of immutable row groups and
// mutable meta data (`RowGroupData`).
//
@ -84,9 +82,8 @@ struct RowGroupData {
impl Table {
/// Create a new table with the provided row_group. Creating an empty table is not possible.
pub fn with_row_group(name: impl Into<String>, rg: RowGroup) -> Self {
pub fn with_row_group(rg: RowGroup) -> Self {
Self {
name: name.into(),
table_data: RwLock::new(RowGroupData {
meta: Arc::new(MetaData::new(&rg)),
data: vec![Arc::new(rg)],
@ -127,11 +124,6 @@ impl Table {
Ok(())
}
/// The name of the table (equivalent to measurement or table name).
pub fn name(&self) -> &str {
&self.name
}
/// Determines if this table contains no row groups.
pub fn is_empty(&self) -> bool {
self.table_data.read().data.is_empty()
@ -144,7 +136,7 @@ impl Table {
/// An estimation of the total size of the table in bytes in memory
pub fn size(&self) -> usize {
let base_size = std::mem::size_of::<Self>() + self.name.len();
let base_size = std::mem::size_of::<Self>();
// meta.size accounts for all the row group data.
base_size + self.table_data.read().meta.size()
}
@ -1179,7 +1171,7 @@ mod test {
];
let row_group = RowGroup::new(1, columns);
let table = Table::with_row_group("cpu", row_group);
let table = Table::with_row_group(row_group);
let predicate = Predicate::default();
assert!(table.meta().validate_exprs(predicate).is_ok());
@ -1266,7 +1258,7 @@ mod test {
let columns = vec![("time".to_owned(), tc)];
let rg = RowGroup::new(3, columns);
let mut table = Table::with_row_group("cpu", rg);
let mut table = Table::with_row_group(rg);
assert_eq!(table.rows(), 3);
@ -1308,7 +1300,7 @@ mod test {
let fc = ColumnType::Field(Column::from(&[1000_u64, 1002, 1200][..]));
let columns = vec![("time".to_owned(), tc), ("count".to_owned(), fc)];
let row_group = RowGroup::new(3, columns);
let mut table = Table::with_row_group("cpu", row_group);
let mut table = Table::with_row_group(row_group);
// add another row group
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
@ -1342,7 +1334,7 @@ mod test {
("count".to_owned(), fc),
];
let row_group = RowGroup::new(3, columns);
let mut table = Table::with_row_group("cpu", row_group);
let mut table = Table::with_row_group(row_group);
// add another row group
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
@ -1425,7 +1417,7 @@ mod test {
];
let rg = RowGroup::new(6, columns);
let mut table = Table::with_row_group("cpu", rg);
let mut table = Table::with_row_group(rg);
let exp_col_types = vec![
("region", LogicalDataType::String),
@ -1557,7 +1549,7 @@ mod test {
),
];
let rg = RowGroup::new(3, columns);
let mut table = Table::with_row_group("cpu", rg);
let mut table = Table::with_row_group(rg);
// Build another row group.
let columns = vec![
@ -1699,7 +1691,7 @@ west,host-b,100
let columns = vec![("time".to_owned(), tc), ("region".to_owned(), rc)];
let rg = RowGroup::new(3, columns);
let mut table = Table::with_row_group("cpu", rg);
let mut table = Table::with_row_group(rg);
// add another row group
let tc = ColumnType::Time(Column::from(&[200_i64, 300, 400][..]));
@ -1820,7 +1812,7 @@ west,host-b,100
];
let rg = RowGroup::new(4, columns);
let table = Table::with_row_group("cpu", rg);
let table = Table::with_row_group(rg);
assert_eq!(table.time_range().unwrap(), (-100, 3));
}