Merge pull request #662 from influxdata/er/refactor/read_buffer/api

refactor: clean up public Read Buffer API
pull/24376/head
Edd Robinson 2021-01-15 13:13:59 +00:00 committed by GitHub
commit 0fb271015d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 174 additions and 128 deletions

View File

@ -3,7 +3,7 @@ use rand::distributions::Alphanumeric;
use rand::prelude::*;
use rand::Rng;
use read_buffer::{column::cmp::Operator, column::dictionary, column::RowIDs};
use read_buffer::benchmarks::{dictionary, Operator, RowIDs};
const ROWS: [usize; 3] = [100_000, 1_000_000, 10_000_000];
const LOCATIONS: [Location; 3] = [Location::Start, Location::Middle, Location::End];

View File

@ -4,8 +4,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through
use rand::prelude::*;
use arrow_deps::arrow::datatypes::*;
use read_buffer::column::fixed::Fixed;
use read_buffer::column::fixed_null::FixedNull;
use read_buffer::benchmarks::Fixed;
use read_buffer::benchmarks::FixedNull;
const ROWS: [usize; 5] = [10, 100, 1_000, 10_000, 60_000];
const CHUNKS: [Chunks; 4] = [

View File

@ -4,8 +4,8 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through
use rand::prelude::*;
use arrow_deps::arrow::datatypes::*;
use read_buffer::column::fixed::Fixed;
use read_buffer::column::fixed_null::FixedNull;
use read_buffer::benchmarks::Fixed;
use read_buffer::benchmarks::FixedNull;
const ROWS: [usize; 5] = [10, 100, 1_000, 10_000, 60_000];
const CHUNKS: [Chunks; 4] = [

View File

@ -7,9 +7,8 @@ use rand::Rng;
use rand_distr::{Distribution, Normal};
use packers::{sorter, Packers};
use read_buffer::column::{AggregateType, Column};
use read_buffer::row_group::{ColumnType, Predicate, RowGroup};
use read_buffer::benchmarks::{Column, ColumnType, RowGroup};
use read_buffer::{AggregateType, Predicate};
const ONE_MS: i64 = 1_000_000;

View File

@ -131,10 +131,6 @@ impl Chunk {
/// Returns the distinct set of table names that contain data that satisfies
/// the time range and predicates.
pub fn table_names(&self, predicate: &Predicate) -> BTreeSet<&String> {
if !predicate.is_empty() {
unimplemented!("Predicate support on `table_names` is not yet implemented");
}
self.tables.keys().collect::<BTreeSet<&String>>()
}

View File

@ -3,8 +3,8 @@
#![allow(clippy::too_many_arguments)]
#![allow(unused_variables)]
pub(crate) mod chunk;
pub mod column;
pub mod row_group;
pub(crate) mod column;
pub(crate) mod row_group;
pub(crate) mod table;
use std::{
@ -20,13 +20,19 @@ use arrow_deps::arrow::{
};
use snafu::{ResultExt, Snafu};
// Identifiers that are exported as part of the public API.
pub use column::{AggregateType, FIELD_COLUMN_TYPE, TAG_COLUMN_TYPE, TIME_COLUMN_TYPE};
pub use row_group::{BinaryExpr, Predicate};
pub use table::ColumnSelection;
use chunk::Chunk;
use column::AggregateType;
pub use column::{FIELD_COLUMN_TYPE, TAG_COLUMN_TYPE, TIME_COLUMN_TYPE};
pub use row_group::Predicate;
use row_group::{ColumnName, RowGroup};
use table::Table;
/// The name of the column containing table names returned by a call to
/// `table_names`.
pub const TABLE_NAMES_COLUMN_NAME: &str = "table";
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("arrow conversion error: {}", source))]
@ -39,6 +45,9 @@ pub enum Error {
#[snafu(display("chunk id does not exist: {}", id))]
ChunkNotFound { id: u32 },
#[snafu(display("unsupported operation: {}", msg))]
UnsupportedOperation { msg: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -189,7 +198,7 @@ impl Database {
table_name: &'a str,
chunk_ids: &[u32],
predicate: Predicate,
select_columns: table::ColumnSelection<'a>,
select_columns: ColumnSelection<'a>,
) -> Result<ReadFilterResults<'a, '_>> {
match self.partitions.get(partition_key) {
Some(partition) => {
@ -234,89 +243,80 @@ impl Database {
/// and the type of aggregation required. Multiple aggregations can be
/// applied to the same column.
///
/// Because identical groups may exist across chunks `read_aggregate` will
/// first merge across the provided chunks before returning an iterator of
/// record batches, which will be lazily constructed.
/// This method might be deprecated in the future, replaced by a call to
/// `read_aggregate_window` with a `window` of `0`.
pub fn read_aggregate(
&self,
partition_key: &str,
table_name: &str,
chunk_ids: &[u32],
predicate: Predicate,
group_columns: Vec<String>,
group_columns: ColumnSelection<'_>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> Result<()> {
// TODO - return type for iterator
match self.partitions.get(partition_key) {
Some(partition) => {
let mut chunks = vec![];
for chunk_id in chunk_ids {
chunks.push(
partition
.chunks
.get(chunk_id)
.ok_or_else(|| Error::ChunkNotFound { id: *chunk_id })?,
);
}
// Execute query against each chunk and get results.
// For each result set it may be possible for there to be
// duplicate group keys, e.g., due to
// back-filling. So chunk results may need to be
// merged together with the aggregates from identical group keys
// being resolved.
// Merge these results and then return an iterator that lets the
// caller stream through record batches. The number of record
// batches is an implementation detail of the `ReadBuffer`.
Ok(())
}
None => Err(Error::PartitionNotFound {
key: partition_key.to_owned(),
}),
}
) -> Result<ReadAggregateResults> {
Err(Error::UnsupportedOperation {
msg: "`read_aggregate` not yet implemented".to_owned(),
})
}
/// Returns aggregates segmented by grouping keys and windowed by time.
/// Returns windowed aggregates for each group specified by the values of
/// the grouping keys and window, limited to the specified partition key
/// table name and chunk ids.
///
/// The set of data to be aggregated may be filtered by (currently only)
/// equality predicates, but can be ranged by time, which should be
/// represented as nanoseconds since the epoch. Results are included if they
/// satisfy the predicate and fall with the [min, max) time range domain.
///
/// Group keys are determined according to the provided group column names
/// (`group_columns`). Currently only grouping by string (tag key) columns
/// is supported.
/// Results may be filtered by conjunctive predicates.
/// Whilst the `ReadBuffer` will carry out the most optimal execution
/// possible by pruning columns, row groups and tables, it is assumed
/// that the caller has already provided an appropriately pruned
/// collection of chunks.
///
/// Currently, only grouping by string (tag key) columns is supported.
/// Required aggregates are specified via a tuple comprising a column name
/// and the type of aggregation required. Multiple aggregations can be
/// applied to the same column.
///
/// Results are grouped and windowed according to the `window` parameter,
/// which represents an interval in nanoseconds. For example, to window
/// results by one minute, window should be set to 600_000_000_000.
pub fn aggregate_window(
/// `window` should be a positive value indicating a duration in
/// nanoseconds.
pub fn read_window_aggregate(
&self,
partition_key: &str,
table_name: &str,
time_range: (i64, i64),
chunk_ids: &[u32],
predicate: Predicate,
group_columns: Vec<String>,
group_columns: ColumnSelection<'_>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
window: i64,
) -> Option<RecordBatch> {
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute query against each matching chunk and get result set.
// For each result set it may be possible for there to be duplicate
// group keys, e.g., due to back-filling. So chunk results may need
// to be merged together with the aggregates from identical group keys
// being resolved.
//
// Finally a record batch is returned.
todo!()
window: u64,
) -> Result<ReadWindowAggregateResults> {
Err(Error::UnsupportedOperation {
msg: "`read_aggregate_window` not yet implemented".to_owned(),
})
}
///
/// NOTE: this is going to contain a specialised execution path for
/// essentially doing:
///
/// SELECT DISTINCT(column_name) WHERE XYZ
///
/// In the future the `ReadBuffer` should just probably just add this
/// special execution to read_filter queries with `DISTINCT` expressions
/// on the selector columns.
///
/// Returns the distinct set of tag values (column values) for each provided
/// column, which *must* be considered a tag key.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the provided chunks.
pub fn tag_values(
&self,
partition_key: &str,
table_name: &str,
chunk_ids: &[u32],
predicate: Predicate,
select_columns: ColumnSelection<'_>,
) -> Result<TagValuesResults> {
Err(Error::UnsupportedOperation {
msg: "`tag_values` call not yet hooked up".to_owned(),
})
}
//
@ -324,7 +324,7 @@ impl Database {
//
/// Returns the distinct set of table names that contain data that satisfies
/// the time range and predicates.
/// the provided predicate.
///
/// TODO(edd): Implement predicate support.
pub fn table_names(
@ -332,7 +332,13 @@ impl Database {
partition_key: &str,
chunk_ids: &[u32],
predicate: Predicate,
) -> Result<Option<RecordBatch>> {
) -> Result<RecordBatch> {
if !predicate.is_empty() {
return Err(Error::UnsupportedOperation {
msg: "predicate support on `table_names` not implemented".to_owned(),
});
}
let partition = self
.partitions
.get(partition_key)
@ -340,18 +346,14 @@ impl Database {
key: partition_key.to_owned(),
})?;
let chunks = partition.chunks_by_ids(chunk_ids)?;
let mut intersection = BTreeSet::new();
let chunk_table_names = partition
.chunks
.values()
let chunk_table_names = chunks
.iter()
.map(|chunk| chunk.table_names(&predicate))
.for_each(|mut names| intersection.append(&mut names));
if intersection.is_empty() {
return Ok(None);
}
let schema = Schema::new(vec![Field::new("table", Utf8, false)]);
let schema = Schema::new(vec![Field::new(TABLE_NAMES_COLUMN_NAME, Utf8, false)]);
let columns: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
intersection
.iter()
@ -360,19 +362,19 @@ impl Database {
))];
match RecordBatch::try_new(Arc::new(schema), columns).context(ArrowError {}) {
Ok(rb) => Ok(Some(rb)),
Ok(rb) => Ok(rb),
Err(e) => Err(e),
}
}
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
/// Returns the distinct set of column names (tag keys) that satisfy the
/// provided predicate.
pub fn column_names(
&self,
table_name: &str,
time_range: (i64, i64),
partition_key: &str,
chunk_ids: &[u32],
predicate: Predicate,
) -> Option<RecordBatch> {
) -> Result<RecordBatch> {
// Find all matching chunks using:
// - time range
// - measurement name.
@ -381,26 +383,9 @@ impl Database {
// a chunk allows the caller to provide already found tag keys
// (column names). This allows the execution to skip entire chunks,
// tables or segments if there are no new columns to be found there...
todo!();
}
/// Returns the distinct set of tag values (column values) for each provided
/// tag key, where each returned value lives in a row matching the provided
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the chunk.
pub fn tag_values(
&self,
table_name: &str,
time_range: (i64, i64),
predicate: Predicate,
tag_keys: &[String],
) -> Option<RecordBatch> {
// Find the measurement name on the chunk and dispatch query to the
// table for that measurement if the chunk's time range overlaps the
// requested time range.
todo!();
Err(Error::UnsupportedOperation {
msg: "`column_names` call not yet hooked up".to_owned(),
})
}
}
@ -464,6 +449,18 @@ impl Partition {
};
}
fn chunks_by_ids(&self, ids: &[u32]) -> Result<Vec<&Chunk>> {
let mut chunks = vec![];
for chunk_id in ids {
chunks.push(
self.chunks
.get(chunk_id)
.ok_or_else(|| Error::ChunkNotFound { id: *chunk_id })?,
);
}
Ok(chunks)
}
/// Determines the total number of tables under all chunks within the
/// partition.
pub fn tables(&self) -> usize {
@ -553,6 +550,48 @@ impl<'input, 'chunk> Iterator for ReadFilterResults<'input, 'chunk> {
}
}
/// An iterable set of results for calls to `read_aggregate`.
///
/// There may be some internal buffering and merging of results before a record
/// batch can be emitted from the iterator.
pub struct ReadAggregateResults {}
impl Iterator for ReadAggregateResults {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// An iterable set of results for calls to `read_window_aggregate`.
///
/// There may be some internal buffering and merging of results before a record
/// batch is emitted from the iterator.
pub struct ReadWindowAggregateResults {}
impl Iterator for ReadWindowAggregateResults {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// An iterable set of results for calls to `tag_values`.
///
/// There may be some internal buffering and merging of results before a record
/// batch is emitted from the iterator.
pub struct TagValuesResults {}
impl Iterator for TagValuesResults {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
@ -778,28 +817,24 @@ mod test {
db.upsert_partition("hour_1", 22, "Coolverine", gen_recordbatch());
let data = db
.table_names("hour_1", &[22], Predicate::default())
.unwrap()
.unwrap();
assert_rb_column_equals(&data, "table", &Values::String(vec![Some("Coolverine")]));
db.upsert_partition("hour_1", 22, "Coolverine", gen_recordbatch());
let data = db
.table_names("hour_1", &[22], Predicate::default())
.unwrap()
.unwrap();
assert_rb_column_equals(&data, "table", &Values::String(vec![Some("Coolverine")]));
db.upsert_partition("hour_1", 2, "Coolverine", gen_recordbatch());
let data = db
.table_names("hour_1", &[22], Predicate::default())
.unwrap()
.unwrap();
assert_rb_column_equals(&data, "table", &Values::String(vec![Some("Coolverine")]));
db.upsert_partition("hour_1", 2, "20 Size", gen_recordbatch());
let data = db
.table_names("hour_1", &[22], Predicate::default())
.unwrap()
.table_names("hour_1", &[2, 22], Predicate::default())
.unwrap();
assert_rb_column_equals(
&data,
@ -982,3 +1017,17 @@ mod test {
assert!(itr.next().is_none());
}
}
/// THIS MODULE SHOULD ONLY BE IMPORTED FOR BENCHMARKS.
///
/// This module lets us expose internal parts of the crate so that we can use
/// libraries like criterion for benchmarking.
///
/// It should not be imported into any non-testing or benchmarking crates.
pub mod benchmarks {
pub use crate::column::{
cmp::Operator, dictionary, fixed::Fixed, fixed_null::FixedNull, Column, RowIDs,
};
pub use crate::row_group::{ColumnType, RowGroup};
}

View File

@ -552,8 +552,10 @@ impl MetaData {
}
}
/// A collection of columns, with a variant that implies all columns for the
/// table should be included.
/// A collection of columns to include in query results.
///
/// The `All` variant denotes that the caller wishes to include all table
/// columns in the results.
pub enum ColumnSelection<'a> {
All,
Some(&'a [&'a str]),

View File

@ -279,7 +279,7 @@ pub fn to_read_buffer_predicate(predicate: &Predicate) -> Result<read_buffer::Pr
match predicate
.exprs
.iter()
.map(read_buffer::row_group::BinaryExpr::try_from)
.map(read_buffer::BinaryExpr::try_from)
.collect::<Result<Vec<_>, _>>()
{
Ok(exprs) => {
@ -306,7 +306,7 @@ pub mod test {
use arrow_deps::datafusion::scalar::ScalarValue;
use query::predicate::PredicateBuilder;
use read_buffer::row_group::BinaryExpr as RBBinaryExpr;
use read_buffer::BinaryExpr as RBBinaryExpr;
use read_buffer::Predicate as RBPredicate;
#[test]