Merge pull request #604 from influxdata/er/refactor/read-chunk

refactor: rename partition to chunk
pull/24376/head
Edd Robinson 2020-12-28 21:26:56 +00:00 committed by GitHub
commit 02fbce7ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 232 deletions

157
read_buffer/src/chunk.rs Normal file
View File

@ -0,0 +1,157 @@
use std::collections::{BTreeMap, BTreeSet};
use crate::column::AggregateType;
use crate::row_group::{ColumnName, Predicate};
use crate::table::{ReadFilterResults, ReadGroupResults, Table};
type TableName = String;
/// A `Chunk` comprises a collection of `Tables` where every table must have a
/// unique identifier (name).
pub struct Chunk {
// The unique identifier for this chunk.
id: u32,
// Metadata about the tables within this chunk.
meta: MetaData,
// The set of tables within this chunk. Each table is identified by a
// measurement name.
tables: BTreeMap<TableName, Table>,
}
impl Chunk {
pub fn new(id: u32, table: Table) -> Self {
let mut p = Self {
id,
meta: MetaData::new(&table),
tables: BTreeMap::new(),
};
p.tables.insert(table.name().to_owned(), table);
p
}
/// Returns data for the specified column selections on the specified table
/// name.
///
/// Results may be filtered by conjunctive predicates. Time predicates
/// should use as nanoseconds since the epoch.
pub fn select(
&self,
table_name: &str,
predicates: &[Predicate<'_>],
select_columns: &[ColumnName<'_>],
) -> ReadFilterResults<'_, '_> {
// Lookup table by name and dispatch execution.
todo!();
}
/// Returns aggregates segmented by grouping keys for the specified
/// table name.
///
/// The set of data to be aggregated may be filtered by optional conjunctive
/// predicates.
///
/// Group keys are determined according to the provided group column names.
/// Currently only grouping by string (tag key) columns is supported.
///
/// Required aggregates are specified via a tuple comprising a column name
/// and the type of aggregation required. Multiple aggregations can be
/// applied to the same column.
pub fn aggregate(
&self,
table_name: &str,
predicates: &[Predicate<'_>],
group_columns: Vec<ColumnName<'_>>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> ReadGroupResults<'_, '_> {
// Lookup table by name and dispatch execution.
todo!()
}
//
// ---- Schema API queries
//
/// Returns the distinct set of table names that contain data that satisfies
/// the time range and predicates.
pub fn table_names(&self, predicates: &[Predicate<'_>]) -> BTreeSet<String> {
//
// TODO(edd): do we want to add the ability to apply a predicate to the
// table names? For example, a regex where you only want table names
// beginning with /cpu.+/ or something?
todo!()
}
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
&self,
table_name: String,
predicates: &[Predicate<'_>],
found_keys: &BTreeSet<ColumnName<'_>>,
) -> BTreeSet<ColumnName<'_>> {
// Lookup table by name and dispatch execution if the table's time range
// overlaps the requested time range *and* there exists columns in the
// table's schema that are *not* already found.
todo!();
}
/// Returns the distinct set of tag values (column values) for each provided
/// tag key, where each returned value lives in a row matching the provided
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the chunk.
pub fn tag_values(
&self,
table_name: String,
predicates: &[Predicate<'_>],
tag_keys: &[ColumnName<'_>],
found_tag_values: &BTreeMap<ColumnName<'_>, BTreeSet<&String>>,
) -> BTreeMap<ColumnName<'_>, BTreeSet<&String>> {
// Lookup table by name and dispatch execution to the table for that
// if the chunk's time range overlaps the requested time range.
//
// This method also has the ability to short-circuit execution against
// columns if those columns only contain values that have already been
// found.
todo!();
}
}
// `Chunk` metadata that is used to track statistics about the chunk and
// whether it could contain data necessary to execute a query.
struct MetaData {
size: u64, // size in bytes of the chunk
rows: u64, // Total number of rows across all tables
// The total time range of *all* data (across all tables) within this
// chunk.
//
// This would only be None if the chunk contained only tables that had
// no time-stamp column or the values were all NULL.
time_range: Option<(i64, i64)>,
}
impl MetaData {
pub fn new(table: &Table) -> Self {
Self {
size: table.size(),
rows: table.rows(),
time_range: table.time_range(),
}
}
pub fn add_table(&mut self, table: &Table) {
// Update size, rows, time_range
todo!()
}
// invalidate should be called when a table is removed. All meta data must
// be determined by asking each table in the chunk for its meta data.
pub fn invalidate(&mut self) {
// Update size, rows, time_range by linearly scanning all tables.
todo!()
}
}

View File

@ -2,8 +2,8 @@
#![allow(dead_code)]
#![allow(clippy::too_many_arguments)]
#![allow(unused_variables)]
pub(crate) mod chunk;
pub mod column;
pub(crate) mod partition;
pub mod row_group;
pub(crate) mod table;
@ -11,12 +11,12 @@ use std::collections::BTreeMap;
use arrow_deps::arrow::record_batch::RecordBatch;
use chunk::Chunk;
use column::AggregateType;
use partition::Partition;
use row_group::ColumnName;
use row_group::{ColumnName, Predicate};
/// The Segment Store is responsible for providing read access to partition
/// data.
/// The `Store` is responsible for providing an execution engine for reading
/// `Chunk` data.
#[derive(Default)]
pub struct Store {
// A mapping from database name (tenant id, bucket id etc) to a database.
@ -43,17 +43,17 @@ impl Store {
todo!()
}
/// This method adds a partition to the segment store. It is probably what
/// This method adds a `Chunk` to the Read Buffer. It is probably what
/// the `MutableBuffer` will call.
///
/// The partition should comprise a single table (record batch) for each
/// measurement name in the partition.
pub fn add_partition(&mut self, database_id: String, partition: BTreeMap<String, RecordBatch>) {
/// The chunk should comprise a single record batch for each table it
/// contains.
pub fn add_chunk(&mut self, database_id: String, chunk: BTreeMap<String, RecordBatch>) {
todo!()
}
/// Executes selections against matching partitions, returning a single
/// record batch with all partition results appended.
/// Executes selections against matching chunks, returning a single
/// record batch with all chunk results appended.
///
/// Results may be filtered by (currently only) equality predicates, but can
/// be ranged by time, which should be represented as nanoseconds since the
@ -64,7 +64,7 @@ impl Store {
database_name: &str,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
select_columns: Vec<String>,
) -> Option<RecordBatch> {
// Execute against matching database.
@ -79,7 +79,7 @@ impl Store {
/// Returns aggregates segmented by grouping keys for the specified
/// measurement as record batches, with one record batch per matching
/// partition.
/// chunk.
///
/// The set of data to be aggregated may be filtered by (currently only)
/// equality predicates, but can be ranged by time, which should be
@ -97,7 +97,7 @@ impl Store {
database_name: &str,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
group_columns: Vec<String>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> Option<RecordBatch> {
@ -136,7 +136,7 @@ impl Store {
database_name: &str,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
group_columns: Vec<String>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
window: i64,
@ -164,7 +164,7 @@ impl Store {
&self,
database_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
) -> Option<RecordBatch> {
if let Some(db) = self.databases.get(database_name) {
return db.table_names(database_name, time_range, predicates);
@ -179,7 +179,7 @@ impl Store {
database_name: &str,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
) -> Option<RecordBatch> {
if let Some(db) = self.databases.get(database_name) {
return db.tag_keys(table_name, time_range, predicates);
@ -192,13 +192,13 @@ impl Store {
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the partition.
/// all columns (tag keys) are returned for the chunks.
pub fn tag_values(
&self,
database_name: &str,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
tag_keys: &[String],
) -> Option<RecordBatch> {
if let Some(db) = self.databases.get(database_name) {
@ -233,9 +233,9 @@ pub fn time_range_predicate<'a>(from: i64, to: i64) -> Vec<row_group::Predicate<
// measurement name.
#[derive(Default)]
pub struct Database {
// The collection of partitions in the database. Each partition is uniquely
// identified by a partition key.
partitions: BTreeMap<String, Partition>,
// The collection of chunks in the database. Each chunk is uniquely
// identified by a chunk key.
chunks: BTreeMap<String, Chunk>,
// The current total size of the database.
size: u64,
@ -246,11 +246,11 @@ impl Database {
Self::default()
}
pub fn add_partition(&mut self, partition: Partition) {
pub fn add_chunk(&mut self, chunk: Chunk) {
todo!()
}
pub fn remove_partition(&mut self, partition: Partition) {
pub fn remove_chunk(&mut self, chunk: Chunk) {
todo!()
}
@ -258,8 +258,8 @@ impl Database {
self.size
}
/// Executes selections against matching partitions, returning a single
/// record batch with all partition results appended.
/// Executes selections against matching chunks, returning a single
/// record batch with all chunk results appended.
///
/// Results may be filtered by (currently only) equality predicates, but can
/// be ranged by time, which should be represented as nanoseconds since the
@ -269,21 +269,21 @@ impl Database {
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
select_columns: Vec<String>,
) -> Option<RecordBatch> {
// Find all matching partitions using:
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute against each partition and append each result set into a
// Execute against each chunk and append each result set into a
// single record batch.
todo!();
}
/// Returns aggregates segmented by grouping keys for the specified
/// measurement as record batches, with one record batch per matching
/// partition.
/// chunk.
///
/// The set of data to be aggregated may be filtered by (currently only)
/// equality predicates, but can be ranged by time, which should be
@ -300,17 +300,17 @@ impl Database {
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
group_columns: Vec<String>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> Option<RecordBatch> {
// Find all matching partitions using:
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute query against each matching partition and get result set.
// Execute query against each matching chunk and get result set.
// For each result set it may be possible for there to be duplicate
// group keys, e.g., due to back-filling. So partition results may need
// group keys, e.g., due to back-filling. So chunk results may need
// to be merged together with the aggregates from identical group keys
// being resolved.
//
@ -340,18 +340,18 @@ impl Database {
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
group_columns: Vec<String>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
window: i64,
) -> Option<RecordBatch> {
// Find all matching partitions using:
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute query against each matching partition and get result set.
// Execute query against each matching chunk and get result set.
// For each result set it may be possible for there to be duplicate
// group keys, e.g., due to back-filling. So partition results may need
// group keys, e.g., due to back-filling. So chunk results may need
// to be merged together with the aggregates from identical group keys
// being resolved.
//
@ -369,7 +369,7 @@ impl Database {
&self,
database_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
) -> Option<RecordBatch> {
//
// TODO(edd): do we want to add the ability to apply a predicate to the
@ -384,15 +384,15 @@ impl Database {
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
) -> Option<RecordBatch> {
// Find all matching partitions using:
// Find all matching chunks using:
// - time range
// - measurement name.
//
// Execute query against matching partitions. The `tag_keys` method for
// a partition allows the caller to provide already found tag keys
// (column names). This allows the execution to skip entire partitions,
// Execute query against matching chunks. The `tag_keys` method for
// a chunk allows the caller to provide already found tag keys
// (column names). This allows the execution to skip entire chunks,
// tables or segments if there are no new columns to be found there...
todo!();
}
@ -402,16 +402,16 @@ impl Database {
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the partition.
/// all columns (tag keys) are returned for the chunk.
pub fn tag_values(
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
predicates: &[Predicate<'_>],
tag_keys: &[String],
) -> Option<RecordBatch> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// Find the measurement name on the chunk and dispatch query to the
// table for that measurement if the chunk's time range overlaps the
// requested time range.
todo!();
}

View File

@ -1,181 +0,0 @@
use std::collections::{BTreeMap, BTreeSet};
use crate::column::{AggregateResult, AggregateType, Values};
use crate::row_group::{ColumnName, GroupKey};
use crate::table::Table;
// The name of a measurement, i.e., a table name.
type MeasurementName = String;
/// A Partition comprises a collection of tables that have been organised
/// according to a partition rule, e.g., based on time or arrival, some column
/// value etc.
///
/// A partition's key uniquely identifies a property that all tables within the
/// the partition share.
///
/// Each table within a partition can be identified by its measurement name.
pub struct Partition {
// The partition key uniquely identifies this partition.
key: String,
// Metadata about this partition.
meta: MetaData,
// The set of tables within this partition. Each table is identified by
// a measurement name.
tables: BTreeMap<MeasurementName, Table>,
}
impl Partition {
pub fn new(key: String, table: Table) -> Self {
let mut p = Self {
key,
meta: MetaData::new(&table),
tables: BTreeMap::new(),
};
p.tables.insert(table.name().to_owned(), table);
p
}
/// Returns vectors of columnar data for the specified column
/// selections on the specified table name (measurement).
///
/// Results may be filtered by (currently only) equality predicates, but can
/// be ranged by time, which should be represented as nanoseconds since the
/// epoch. Results are included if they satisfy the predicate and fall
/// with the [min, max) time range domain.
pub fn select(
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
select_columns: Vec<ColumnName<'_>>,
) -> BTreeMap<ColumnName<'_>, Values<'_>> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// requested time range.
todo!();
}
/// Returns aggregates segmented by grouping keys for the specified
/// measurement.
///
/// The set of data to be aggregated may be filtered by (currently only)
/// equality predicates, but can be ranged by time, which should be
/// represented as nanoseconds since the epoch. Results are included if they
/// satisfy the predicate and fall with the [min, max) time range domain.
///
/// Group keys are determined according to the provided group column names.
/// Currently only grouping by string (tag key) columns is supported.
///
/// Required aggregates are specified via a tuple comprising a column name
/// and the type of aggregation required. Multiple aggregations can be
/// applied to the same column.
pub fn aggregate(
&self,
table_name: &str,
time_range: (i64, i64),
predicates: &[(&str, &str)],
group_columns: Vec<ColumnName<'_>>,
aggregates: Vec<(ColumnName<'_>, AggregateType)>,
) -> BTreeMap<GroupKey<'_>, Vec<(ColumnName<'_>, AggregateResult<'_>)>> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// requested time range.
todo!()
}
//
// ---- Schema API queries
//
/// Returns the distinct set of table names that contain data that satisfies
/// the time range and predicates.
pub fn table_names(
&self,
time_range: (i64, i64),
predicates: &[(&str, &str)],
) -> BTreeSet<String> {
//
// TODO(edd): do we want to add the ability to apply a predicate to the
// table names? For example, a regex where you only want table names
// beginning with /cpu.+/ or something?
todo!()
}
/// Returns the distinct set of tag keys (column names) matching the
/// provided optional predicates and time range.
pub fn tag_keys(
&self,
table_name: String,
time_range: (i64, i64),
predicates: &[(&str, &str)],
found_keys: &BTreeSet<ColumnName<'_>>,
) -> BTreeSet<ColumnName<'_>> {
// Dispatch query to the table for the provided measurement if the
// partition's time range overlaps the requested time range *and* there
// exists columns in the table's schema that are *not* already found.
todo!();
}
/// Returns the distinct set of tag values (column values) for each provided
/// tag key, where each returned value lives in a row matching the provided
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the partition.
pub fn tag_values(
&self,
table_name: String,
time_range: (i64, i64),
predicates: &[(&str, &str)],
tag_keys: &[ColumnName<'_>],
found_tag_values: &BTreeMap<ColumnName<'_>, BTreeSet<&String>>,
) -> BTreeMap<ColumnName<'_>, BTreeSet<&String>> {
// Find the measurement name on the partition and dispatch query to the
// table for that measurement if the partition's time range overlaps the
// requested time range.
//
// This method also has the ability to short-circuit execution against
// columns if those columns only contain values that have already been
// found.
todo!();
}
}
// Partition metadata that is used to track statistics about the partition and
// whether it may contains data for a query.
struct MetaData {
size: u64, // size in bytes of the partition
rows: u64, // Total number of rows across all tables
// The total time range of *all* data (across all tables) within this
// partition.
//
// This would only be None if the partition contained only tables that had
// no time-stamp column or the values were all NULL.
time_range: Option<(i64, i64)>,
}
impl MetaData {
pub fn new(table: &Table) -> Self {
Self {
size: table.size(),
rows: table.rows(),
time_range: table.time_range(),
}
}
pub fn add_table(&mut self, table: &Table) {
// Update size, rows, time_range
todo!()
}
// invalidate should be called when a table is removed that impacts the
// meta data.
pub fn invalidate(&mut self) {
// Update size, rows, time_range by linearly scanning all tables.
todo!()
}
}

View File

@ -11,7 +11,7 @@ use crate::column::{
/// The name used for a timestamp column.
pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME;
/// A `RowGroup` is an immutable horizontal partition of a single `Table`. By
/// A `RowGroup` is an immutable horizontal chunk of a single `Table`. By
/// definition it has the same schema as all the other read groups in the table.
/// All the columns within the `RowGroup` must have the same number of logical
/// rows.

View File

@ -21,7 +21,7 @@ use crate::{
/// possible that time-ranges (for example) can overlap across segments.
///
/// The current write path ensures that a single table emitted for a
/// measurement within any partition will have the same schema, therefore this
/// measurement within any chunk will have the same schema, therefore this
/// table's schema applies to all of the segments held within it.
///
/// The total size of a table is tracked and can be increased or reduced by
@ -419,7 +419,7 @@ impl Table {
/// optional predicates and time range.
///
/// As a special case, if `tag_keys` is empty then all distinct values for
/// all columns (tag keys) are returned for the partition.
/// all columns (tag keys) are returned for the chunk.
pub fn tag_values<'a>(
&self,
time_range: (i64, i64),