Merge branch 'main' into crepererum/issue1313

pull/24376/head
kodiakhq[bot] 2021-05-26 14:46:18 +00:00 committed by GitHub
commit efe077da8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 67 deletions

View File

@ -1,11 +1,11 @@
use std::collections::{BTreeSet, HashMap};
use std::collections::BTreeSet;
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::timestamp::TimestampRange;
use internal_types::schema::{Schema, TIME_COLUMN_NAME};
use internal_types::selection::Selection;
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
use super::Chunk;
use data_types::{error::ErrorLogger, partition_metadata::Statistics};
@ -26,45 +26,27 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A queryable snapshot of a mutable buffer chunk
#[derive(Debug)]
pub struct ChunkSnapshot {
/// Maps table name to `TableSnapshot`
records: HashMap<String, TableSnapshot>,
// TODO: Memory tracking
}
#[derive(Debug)]
struct TableSnapshot {
schema: Schema,
batch: RecordBatch,
table_name: Arc<str>,
timestamp_range: Option<TimestampRange>,
}
impl TableSnapshot {
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
}
}
// TODO: Memory tracking
}
impl ChunkSnapshot {
pub fn new(chunk: &Chunk) -> Self {
let mut records: HashMap<String, TableSnapshot> = Default::default();
let table = &chunk.table;
let schema = table
.schema(&chunk.dictionary, Selection::All)
.log_if_error("ChunkSnapshot getting table schema")
.unwrap();
let batch = table
.to_arrow(&chunk.dictionary, Selection::All)
.log_if_error("ChunkSnapshot converting table to arrow")
.unwrap();
let name = chunk.table_name.as_ref();
let timestamp_range =
chunk
.dictionary
@ -82,73 +64,66 @@ impl ChunkSnapshot {
})
});
records.insert(
name.to_string(),
TableSnapshot {
schema,
batch,
timestamp_range,
},
);
Self { records }
Self {
schema,
batch,
table_name: Arc::clone(&chunk.table_name),
timestamp_range,
}
}
/// returns true if there is no data in this snapshot
pub fn is_empty(&self) -> bool {
self.records.is_empty()
self.batch.num_rows() == 0
}
/// Return true if this snapshot has the specified table name
pub fn has_table(&self, table_name: &str) -> bool {
self.records.get(table_name).is_some()
self.table_name.as_ref() == table_name
}
/// Return Schema for the specified table / columns
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
let table = self
.records
.get(table_name)
.context(TableNotFound { table_name })?;
// Temporary #1295
ensure!(
self.table_name.as_ref() == table_name,
TableNotFound { table_name }
);
Ok(match selection {
Selection::All => table.schema.clone(),
Selection::All => self.schema.clone(),
Selection::Some(columns) => {
let columns = table.schema.select(columns).context(SelectColumns)?;
table.schema.project(&columns)
let columns = self.schema.select(columns).context(SelectColumns)?;
self.schema.project(&columns)
}
})
}
/// Returns a list of tables with writes matching the given timestamp_range
pub fn table_names(
&self,
timestamp_range: Option<TimestampRange>,
) -> impl Iterator<Item = &String> + '_ {
self.records
.iter()
.flat_map(move |(table_name, table_snapshot)| {
table_snapshot
.matches_predicate(&timestamp_range)
.then(|| table_name)
})
pub fn table_names(&self, timestamp_range: Option<TimestampRange>) -> BTreeSet<String> {
let mut ret = BTreeSet::new();
if self.matches_predicate(&timestamp_range) {
ret.insert(self.table_name.to_string());
}
ret
}
/// Returns a RecordBatch with the given selection
pub fn read_filter(&self, table_name: &str, selection: Selection<'_>) -> Result<RecordBatch> {
let table = self
.records
.get(table_name)
.context(TableNotFound { table_name })?;
// Temporary #1295
ensure!(
self.table_name.as_ref() == table_name,
TableNotFound { table_name }
);
Ok(match selection {
Selection::All => table.batch.clone(),
Selection::All => self.batch.clone(),
Selection::Some(columns) => {
let projection = table.schema.select(columns).context(SelectColumns)?;
let schema = table.schema.project(&projection).into();
let projection = self.schema.select(columns).context(SelectColumns)?;
let schema = self.schema.project(&projection).into();
let columns = projection
.into_iter()
.map(|x| Arc::clone(table.batch.column(x)))
.map(|x| Arc::clone(self.batch.column(x)))
.collect();
RecordBatch::try_new(schema, columns).expect("failed to project record batch")
@ -162,8 +137,12 @@ impl ChunkSnapshot {
table_name: &str,
selection: Selection<'_>,
) -> Option<BTreeSet<String>> {
let table = self.records.get(table_name)?;
let fields = table.schema.inner().fields().iter();
// Temporary #1295
if self.table_name.as_ref() != table_name {
return None;
}
let fields = self.schema.inner().fields().iter();
Some(match selection {
Selection::Some(cols) => fields
@ -178,4 +157,13 @@ impl ChunkSnapshot {
Selection::All => fields.map(|x| x.name().clone()).collect(),
})
}
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
}
}
}

View File

@ -164,9 +164,7 @@ impl PartitionChunk for DbChunk {
fn all_table_names(&self, known_tables: &mut StringSet) {
match &self.state {
State::MutableBuffer { chunk, .. } => {
known_tables.extend(chunk.table_names(None).cloned())
}
State::MutableBuffer { chunk, .. } => known_tables.append(&mut chunk.table_names(None)),
State::ReadBuffer { chunk, .. } => {
// TODO - align APIs so they behave in the same way...
let rb_names = chunk.all_table_names(known_tables);
@ -194,7 +192,7 @@ impl PartitionChunk for DbChunk {
// TODO: Support more predicates
return Ok(None);
}
chunk.table_names(predicate.range).cloned().collect()
chunk.table_names(predicate.range)
}
State::ReadBuffer { chunk, .. } => {
// If not supported, ReadBuffer can't answer with