From c03b8a396355120e60713d52d2f74b2ea1a88eda Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 26 May 2021 11:37:40 +0100 Subject: [PATCH] refactor: remove tables from ChunkSnapshot (#1295) (#1558) --- mutable_buffer/src/chunk/snapshot.rs | 114 ++++++++++++--------------- server/src/db/chunk.rs | 6 +- 2 files changed, 53 insertions(+), 67 deletions(-) diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 9e3da759ce..b8e44b3873 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -1,11 +1,11 @@ -use std::collections::{BTreeSet, HashMap}; +use std::collections::BTreeSet; use std::sync::Arc; use arrow::record_batch::RecordBatch; use data_types::timestamp::TimestampRange; use internal_types::schema::{Schema, TIME_COLUMN_NAME}; use internal_types::selection::Selection; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; use super::Chunk; use data_types::{error::ErrorLogger, partition_metadata::Statistics}; @@ -26,45 +26,27 @@ pub type Result = std::result::Result; /// A queryable snapshot of a mutable buffer chunk #[derive(Debug)] pub struct ChunkSnapshot { - /// Maps table name to `TableSnapshot` - records: HashMap, - // TODO: Memory tracking -} - -#[derive(Debug)] -struct TableSnapshot { schema: Schema, batch: RecordBatch, + table_name: Arc, timestamp_range: Option, -} - -impl TableSnapshot { - fn matches_predicate(&self, timestamp_range: &Option) -> bool { - match (self.timestamp_range, timestamp_range) { - (Some(a), Some(b)) => !a.disjoint(b), - (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ - // the predicate - (_, None) => true, - } - } + // TODO: Memory tracking } impl ChunkSnapshot { pub fn new(chunk: &Chunk) -> Self { - let mut records: HashMap = Default::default(); let table = &chunk.table; let schema = table .schema(&chunk.dictionary, Selection::All) .log_if_error("ChunkSnapshot getting table schema") .unwrap(); + let batch = table .to_arrow(&chunk.dictionary, Selection::All) .log_if_error("ChunkSnapshot converting table to arrow") .unwrap(); - let name = chunk.table_name.as_ref(); - let timestamp_range = chunk .dictionary @@ -82,73 +64,66 @@ impl ChunkSnapshot { }) }); - records.insert( - name.to_string(), - TableSnapshot { - schema, - batch, - timestamp_range, - }, - ); - - Self { records } + Self { + schema, + batch, + table_name: Arc::clone(&chunk.table_name), + timestamp_range, + } } /// returns true if there is no data in this snapshot pub fn is_empty(&self) -> bool { - self.records.is_empty() + self.batch.num_rows() == 0 } /// Return true if this snapshot has the specified table name pub fn has_table(&self, table_name: &str) -> bool { - self.records.get(table_name).is_some() + self.table_name.as_ref() == table_name } /// Return Schema for the specified table / columns pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { - let table = self - .records - .get(table_name) - .context(TableNotFound { table_name })?; + // Temporary #1295 + ensure!( + self.table_name.as_ref() == table_name, + TableNotFound { table_name } + ); Ok(match selection { - Selection::All => table.schema.clone(), + Selection::All => self.schema.clone(), Selection::Some(columns) => { - let columns = table.schema.select(columns).context(SelectColumns)?; - table.schema.project(&columns) + let columns = self.schema.select(columns).context(SelectColumns)?; + self.schema.project(&columns) } }) } /// Returns a list of tables with writes matching the given timestamp_range - pub fn table_names( - &self, - timestamp_range: Option, - ) -> impl Iterator + '_ { - self.records - .iter() - .flat_map(move |(table_name, table_snapshot)| { - table_snapshot - .matches_predicate(×tamp_range) - .then(|| table_name) - }) + pub fn table_names(&self, timestamp_range: Option) -> BTreeSet { + let mut ret = BTreeSet::new(); + if self.matches_predicate(×tamp_range) { + ret.insert(self.table_name.to_string()); + } + ret } /// Returns a RecordBatch with the given selection pub fn read_filter(&self, table_name: &str, selection: Selection<'_>) -> Result { - let table = self - .records - .get(table_name) - .context(TableNotFound { table_name })?; + // Temporary #1295 + ensure!( + self.table_name.as_ref() == table_name, + TableNotFound { table_name } + ); Ok(match selection { - Selection::All => table.batch.clone(), + Selection::All => self.batch.clone(), Selection::Some(columns) => { - let projection = table.schema.select(columns).context(SelectColumns)?; - let schema = table.schema.project(&projection).into(); + let projection = self.schema.select(columns).context(SelectColumns)?; + let schema = self.schema.project(&projection).into(); let columns = projection .into_iter() - .map(|x| Arc::clone(table.batch.column(x))) + .map(|x| Arc::clone(self.batch.column(x))) .collect(); RecordBatch::try_new(schema, columns).expect("failed to project record batch") @@ -162,8 +137,12 @@ impl ChunkSnapshot { table_name: &str, selection: Selection<'_>, ) -> Option> { - let table = self.records.get(table_name)?; - let fields = table.schema.inner().fields().iter(); + // Temporary #1295 + if self.table_name.as_ref() != table_name { + return None; + } + + let fields = self.schema.inner().fields().iter(); Some(match selection { Selection::Some(cols) => fields @@ -178,4 +157,13 @@ impl ChunkSnapshot { Selection::All => fields.map(|x| x.name().clone()).collect(), }) } + + fn matches_predicate(&self, timestamp_range: &Option) -> bool { + match (self.timestamp_range, timestamp_range) { + (Some(a), Some(b)) => !a.disjoint(b), + (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ + // the predicate + (_, None) => true, + } + } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index bf371abc4f..f6b053de41 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -164,9 +164,7 @@ impl PartitionChunk for DbChunk { fn all_table_names(&self, known_tables: &mut StringSet) { match &self.state { - State::MutableBuffer { chunk, .. } => { - known_tables.extend(chunk.table_names(None).cloned()) - } + State::MutableBuffer { chunk, .. } => known_tables.append(&mut chunk.table_names(None)), State::ReadBuffer { chunk, .. } => { // TODO - align APIs so they behave in the same way... let rb_names = chunk.all_table_names(known_tables); @@ -194,7 +192,7 @@ impl PartitionChunk for DbChunk { // TODO: Support more predicates return Ok(None); } - chunk.table_names(predicate.range).cloned().collect() + chunk.table_names(predicate.range) } State::ReadBuffer { chunk, .. } => { // If not supported, ReadBuffer can't answer with