Merge pull request #673 from influxdata/er/refactor/read_buffer/read_filter_schema

refactor: use schema/metadata type in `read_filter`
pull/24376/head
Edd Robinson 2021-01-19 15:51:53 +00:00 committed by GitHub
commit 5f30d4b2fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 576 additions and 301 deletions

View File

@ -1,10 +1,11 @@
use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
use crate::row_group::RowGroup;
use crate::row_group::{ColumnName, Predicate};
use crate::schema::AggregateType;
use crate::table;
use crate::table::{ColumnSelection, Table};
use crate::Error;
use crate::{column::AggregateType, row_group::RowGroup};
type TableName = String;
@ -58,6 +59,11 @@ impl Chunk {
self.tables.len()
}
/// Returns true if the chunk contains data for this table.
pub fn has_table(&self, table_name: &str) -> bool {
self.tables.contains_key(table_name)
}
/// Returns true if there are no tables under this chunk.
pub fn is_empty(&self) -> bool {
self.tables() == 0
@ -96,10 +102,14 @@ impl Chunk {
table_name: &str,
predicate: &Predicate,
select_columns: &ColumnSelection<'_>,
) -> Option<table::ReadFilterResults<'_>> {
) -> Result<table::ReadFilterResults<'_>, Error> {
// Lookup table by name and dispatch execution.
match self.tables.get(table_name) {
Some(table) => Some(table.read_filter(select_columns, predicate)),
None => None,
Some(table) => Ok(table.read_filter(select_columns, predicate)),
None => crate::TableNotFound {
table_name: table_name.to_owned(),
}
.fail(),
}
}

View File

@ -9,9 +9,11 @@ use std::sync::Arc;
use arrow::array;
use croaring::Bitmap;
use either::Either;
use arrow_deps::{arrow, arrow::array::Array};
use either::Either;
use crate::schema::{AggregateType, LogicalDataType};
// Edd's totally made up magic constant. This determines whether we would use
// a run-length encoded dictionary encoding or just a plain dictionary encoding.
@ -609,30 +611,6 @@ pub struct ColumnProperties {
pub has_pre_computed_row_ids: bool,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
/// The logical data-type for a column.
pub enum LogicalDataType {
Integer, // Signed integer
Unsigned, // Unsigned integer
Float, //
String, // UTF-8 valid string
Binary, // Arbitrary collection of bytes
Boolean, //
}
impl LogicalDataType {
pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType {
match &self {
LogicalDataType::Integer => arrow::datatypes::DataType::Int64,
LogicalDataType::Unsigned => arrow::datatypes::DataType::UInt64,
LogicalDataType::Float => arrow::datatypes::DataType::Float64,
LogicalDataType::String => arrow::datatypes::DataType::Utf8,
LogicalDataType::Binary => arrow::datatypes::DataType::Binary,
LogicalDataType::Boolean => arrow::datatypes::DataType::Boolean,
}
}
}
#[derive(Default, Debug, PartialEq)]
// The meta-data for a column
pub struct MetaData<T>
@ -2202,39 +2180,6 @@ impl From<&arrow::array::Float64Array> for Column {
}
}
/// These variants describe supported aggregates that can applied to columnar
/// data.
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum AggregateType {
Count,
First,
Last,
Min,
Max,
Sum,
/* TODO - support:
* Distinct - (edd): not sure this counts as an aggregations. Seems more like a special
* filter. CountDistinct
* Percentile */
}
impl std::fmt::Display for AggregateType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
AggregateType::Count => "count",
AggregateType::First => "first",
AggregateType::Last => "last",
AggregateType::Min => "min",
AggregateType::Max => "max",
AggregateType::Sum => "sum",
}
)
}
}
/// These variants hold aggregates, which are the results of applying aggregates
/// to column data.
#[derive(Debug, Copy, Clone)]

View File

@ -10,6 +10,7 @@ pub(crate) mod table;
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet},
convert::TryInto,
fmt,
sync::Arc,
};
@ -19,11 +20,11 @@ use arrow_deps::arrow::{
datatypes::{DataType::Utf8, Field, Schema},
record_batch::RecordBatch,
};
use snafu::{ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
// Identifiers that are exported as part of the public API.
pub use column::AggregateType;
pub use row_group::{BinaryExpr, Predicate};
pub use schema::*;
pub use table::ColumnSelection;
use chunk::Chunk;
@ -199,6 +200,13 @@ impl Database {
Some(partition) => {
let mut chunks = vec![];
for chunk_id in chunk_ids {
let chunk = partition
.chunks
.get(chunk_id)
.context(ChunkNotFound { id: *chunk_id })?;
ensure!(chunk.has_table(table_name), TableNotFound { table_name });
chunks.push(
partition
.chunks
@ -512,10 +520,10 @@ impl<'input, 'chunk> Iterator for ReadFilterResults<'input, 'chunk> {
// Try next chunk's table.
if self.curr_table_results.is_none() {
self.curr_table_results = self.chunks[self.next_i].read_filter(
self.table_name,
&self.predicate,
&self.select_columns,
self.curr_table_results = Some(
self.chunks[self.next_i]
.read_filter(self.table_name, &self.predicate, &self.select_columns)
.unwrap(),
);
}
@ -524,7 +532,10 @@ impl<'input, 'chunk> Iterator for ReadFilterResults<'input, 'chunk> {
Some(table_results) => {
// Table has found results in a row group.
if let Some(row_group_result) = table_results.next() {
return Some(row_group_result.record_batch());
// it should not be possible for the conversion to record
// batch to fail here
let rb = row_group_result.try_into();
return Some(rb.unwrap());
}
// no more results for row groups in the table. Try next chunk.

View File

@ -2,17 +2,18 @@ use std::{
borrow::Cow,
collections::BTreeMap,
convert::{TryFrom, TryInto},
sync::Arc,
};
use hashbrown::{hash_map, HashMap};
use itertools::Itertools;
use snafu::Snafu;
use crate::column::{
self, cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, LogicalDataType,
OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator,
cmp::Operator, AggregateResult, Column, EncodedValues, OwnedValue, RowIDs, RowIDsOption,
Scalar, Value, Values, ValuesIterator,
};
use crate::schema::ResultSchema;
use crate::schema;
use crate::schema::{AggregateType, LogicalDataType, ResultSchema};
use arrow_deps::arrow::record_batch::RecordBatch;
use arrow_deps::{
arrow, datafusion::logical_plan::Expr as DfExpr,
@ -23,6 +24,21 @@ use data_types::schema::{InfluxColumnType, Schema};
/// The name used for a timestamp column.
pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("arrow conversion error: {}", source))]
ArrowError {
source: arrow_deps::arrow::error::ArrowError,
},
#[snafu(display("schema conversion error: {}", source))]
SchemaError {
source: data_types::schema::builder::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A `RowGroup` is an immutable horizontal chunk of a single `Table`. By
/// definition it has the same schema as all the other read groups in the table.
/// All the columns within the `RowGroup` must have the same number of logical
@ -32,8 +48,6 @@ pub struct RowGroup {
columns: Vec<Column>,
all_columns_by_name: BTreeMap<String, usize>,
tag_columns_by_name: BTreeMap<String, usize>,
field_columns_by_name: BTreeMap<String, usize>,
time_column: usize,
}
@ -46,8 +60,6 @@ impl RowGroup {
let mut all_columns = vec![];
let mut all_columns_by_name = BTreeMap::new();
let mut tag_columns_by_name = BTreeMap::new();
let mut field_columns_by_name = BTreeMap::new();
let mut time_column = None;
for (name, ct) in columns {
@ -56,21 +68,26 @@ impl RowGroup {
ColumnType::Tag(c) => {
assert_eq!(c.num_rows(), rows);
meta.column_types.insert(name.clone(), c.logical_datatype());
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.add_column(
&name,
schema::ColumnType::Tag(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
tag_columns_by_name.insert(name, all_columns.len());
all_columns.push(c);
}
ColumnType::Field(c) => {
assert_eq!(c.num_rows(), rows);
meta.column_types.insert(name.clone(), c.logical_datatype());
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.add_column(
&name,
schema::ColumnType::Field(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
field_columns_by_name.insert(name, all_columns.len());
all_columns.push(c);
}
ColumnType::Time(c) => {
@ -85,9 +102,12 @@ impl RowGroup {
Some((_, _)) => unreachable!("unexpected types for time range"),
};
meta.column_types.insert(name.clone(), c.logical_datatype());
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.add_column(
&name,
schema::ColumnType::Timestamp(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
time_column = Some(all_columns.len());
@ -97,17 +117,12 @@ impl RowGroup {
}
// Meta data should have same columns for types and ranges.
assert_eq!(
meta.column_types.keys().collect::<Vec<_>>(),
meta.column_ranges.keys().collect::<Vec<_>>(),
);
assert_eq!(meta.columns.keys().len(), all_columns.len());
Self {
meta,
columns: all_columns,
all_columns_by_name,
tag_columns_by_name,
field_columns_by_name,
time_column: time_column.unwrap(),
}
}
@ -123,14 +138,9 @@ impl RowGroup {
self.meta.rows
}
/// The ranges on each column in the `RowGroup`.
pub fn column_ranges(&self) -> &BTreeMap<String, (OwnedValue, OwnedValue)> {
&self.meta.column_ranges
}
/// The logical data-type of each column.
pub fn column_logical_types(&self) -> &BTreeMap<String, column::LogicalDataType> {
&self.meta.column_types
// The row group's meta data.
pub fn metadata(&self) -> &MetaData {
&self.meta
}
// Returns a reference to a column from the column name.
@ -186,37 +196,37 @@ impl RowGroup {
columns: &[ColumnName<'_>],
predicates: &Predicate,
) -> ReadFilterResult<'_> {
let row_ids = self.row_ids_from_predicates(predicates);
let select_columns = self.meta.schema_for_column_names(&columns);
assert_eq!(select_columns.len(), columns.len());
// ensure meta/data have same lifetime by using column names from row
// group rather than from input.
let (col_names, col_data) = self.materialise_rows(columns, row_ids);
let schema = self.meta.schema_for_column_names(&col_names);
let schema = ResultSchema {
select_columns,
group_columns: vec![],
aggregate_columns: vec![],
};
// apply predicates to determine candidate rows.
let row_ids = self.row_ids_from_predicates(predicates);
let col_data = self.materialise_rows(columns, row_ids);
ReadFilterResult {
schema,
data: col_data,
}
}
fn materialise_rows(
&self,
names: &[ColumnName<'_>],
row_ids: RowIDsOption,
) -> (Vec<&str>, Vec<Values<'_>>) {
let mut col_names = Vec::with_capacity(names.len());
fn materialise_rows(&self, names: &[ColumnName<'_>], row_ids: RowIDsOption) -> Vec<Values<'_>> {
let mut col_data = Vec::with_capacity(names.len());
match row_ids {
RowIDsOption::None(_) => (col_names, col_data), // nothing to materialise
RowIDsOption::None(_) => col_data, // nothing to materialise
RowIDsOption::Some(row_ids) => {
// TODO(edd): causes an allocation. Implement a way to pass a
// pooled buffer to the croaring Bitmap API.
let row_ids = row_ids.to_vec();
for &name in names {
let (col_name, col) = self.column_name_and_column(name);
col_names.push(col_name);
let (_, col) = self.column_name_and_column(name);
col_data.push(col.values(row_ids.as_slice()));
}
(col_names, col_data)
col_data
}
RowIDsOption::All(_) => {
@ -226,11 +236,10 @@ impl RowGroup {
let row_ids = (0..self.rows()).collect::<Vec<_>>();
for &name in names {
let (col_name, col) = self.column_name_and_column(name);
col_names.push(col_name);
let (_, col) = self.column_name_and_column(name);
col_data.push(col.values(row_ids.as_slice()));
}
(col_names, col_data)
col_data
}
}
}
@ -415,8 +424,8 @@ impl RowGroup {
// Materialise values in aggregate columns.
let mut aggregate_columns_data = Vec::with_capacity(agg_cols_num);
for (name, agg_type, _) in &result.schema.aggregate_columns {
let col = self.column_by_name(name);
for (col_type, agg_type, _) in &result.schema.aggregate_columns {
let col = self.column_by_name(col_type.as_str());
// TODO(edd): this materialises a column per aggregate. If there are
// multiple aggregates for the same column then this will
@ -644,13 +653,17 @@ impl RowGroup {
.schema
.aggregate_columns
.iter()
.map(|(name, typ, _)| (self.column_by_name(name), *typ))
.map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type))
.collect::<Vec<_>>();
let encoded_groups = dst
.schema
.group_column_names_iter()
.map(|name| self.column_by_name(name).grouped_row_ids().unwrap_left())
.map(|col_type| {
self.column_by_name(col_type.as_str())
.grouped_row_ids()
.unwrap_left()
})
.collect::<Vec<_>>();
// multi_cartesian_product will create the cartesian product of all
@ -1194,13 +1207,28 @@ impl ColumnType {
}
}
#[derive(Debug, Clone)]
pub struct ColumnMeta {
pub typ: crate::schema::ColumnType,
pub logical_data_type: LogicalDataType,
pub range: (OwnedValue, OwnedValue),
}
// column metadata is equivalent for two columns if their logical type and
// semantic type are equivalent.
impl PartialEq for ColumnMeta {
fn eq(&self, other: &Self) -> bool {
self.typ == other.typ && self.logical_data_type == other.logical_data_type
}
}
#[derive(Default, Debug)]
struct MetaData {
pub struct MetaData {
// The total size of the table in bytes.
size: u64,
pub size: u64,
// The total number of rows in the table.
rows: u32,
pub rows: u32,
// The distinct set of columns for this `RowGroup` (all of these columns
// will appear in all of the `Table`'s `RowGroup`s) and the range of values
@ -1208,17 +1236,14 @@ struct MetaData {
//
// This can be used to skip the table entirely if a logical predicate can't
// possibly match based on the range of values a column has.
column_ranges: BTreeMap<String, (OwnedValue, OwnedValue)>,
// The logical column types for each column in the `RowGroup`.
column_types: BTreeMap<String, LogicalDataType>,
pub columns: BTreeMap<String, ColumnMeta>,
// The total time range of this table spanning all of the `RowGroup`s within
// the table.
//
// This can be used to skip the table entirely if the time range for a query
// falls outside of this range.
time_range: (i64, i64),
pub time_range: (i64, i64),
}
impl MetaData {
@ -1227,8 +1252,8 @@ impl MetaData {
// no rows in the `RowGroup` would ever match the expression.
//
pub fn column_could_satisfy_binary_expr(&self, expr: &BinaryExpr) -> bool {
let (column_min, column_max) = match self.column_ranges.get(expr.column()) {
Some(range) => range,
let (column_min, column_max) = match self.columns.get(expr.column()) {
Some(schema) => &schema.range,
None => return false, // column doesn't exist.
};
@ -1260,11 +1285,34 @@ impl MetaData {
}
}
pub fn add_column(
&mut self,
name: &str,
col_type: schema::ColumnType,
logical_data_type: LogicalDataType,
range: (OwnedValue, OwnedValue),
) {
self.columns.insert(
name.to_owned(),
ColumnMeta {
typ: col_type,
logical_data_type,
range,
},
);
}
// Extract schema information for a set of columns.
fn schema_for_column_names(&self, names: &[ColumnName<'_>]) -> Vec<(String, LogicalDataType)> {
fn schema_for_column_names(
&self,
names: &[ColumnName<'_>],
) -> Vec<(crate::schema::ColumnType, LogicalDataType)> {
names
.iter()
.map(|&name| (name.to_owned(), *self.column_types.get(name).unwrap()))
.map(|&name| {
let schema = self.columns.get(name).unwrap();
(schema.typ.clone(), schema.logical_data_type)
})
.collect::<Vec<_>>()
}
@ -1272,15 +1320,12 @@ impl MetaData {
fn schema_for_aggregate_column_names(
&self,
columns: &[(ColumnName<'_>, AggregateType)],
) -> Vec<(String, AggregateType, LogicalDataType)> {
) -> Vec<(crate::schema::ColumnType, AggregateType, LogicalDataType)> {
columns
.iter()
.map(|(name, agg_type)| {
(
name.to_string(),
*agg_type,
*self.column_types.get(*name).unwrap(),
)
let schema = self.columns.get(*name).unwrap();
(schema.typ.clone(), *agg_type, schema.logical_data_type)
})
.collect::<Vec<_>>()
}
@ -1289,8 +1334,7 @@ impl MetaData {
/// Encapsulates results from `RowGroup`s with a structure that makes them
/// easier to work with and display.
pub struct ReadFilterResult<'row_group> {
/// tuples of the form (column_name, data_type)
schema: Vec<(String, LogicalDataType)>,
schema: ResultSchema,
data: Vec<Values<'row_group>>,
}
@ -1299,23 +1343,20 @@ impl ReadFilterResult<'_> {
self.data.is_empty()
}
pub fn schema(&self) -> &Vec<(String, LogicalDataType)> {
pub fn schema(&self) -> &ResultSchema {
&self.schema
}
}
/// Produces a `RecordBatch` from the results, giving up ownership to the
/// returned record batch.
pub fn record_batch(self) -> arrow::record_batch::RecordBatch {
let schema = arrow::datatypes::Schema::new(
self.schema()
.iter()
.map(|(col_name, col_typ)| {
arrow::datatypes::Field::new(col_name, col_typ.to_arrow_datatype(), true)
})
.collect::<Vec<_>>(),
);
impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
type Error = Error;
let columns = self
fn try_from(result: ReadFilterResult<'_>) -> Result<Self, Self::Error> {
let schema = data_types::schema::Schema::try_from(result.schema())
.map_err(|source| Error::SchemaError { source })?;
let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into();
let columns = result
.data
.into_iter()
.map(arrow::array::ArrayRef::from)
@ -1324,20 +1365,15 @@ impl ReadFilterResult<'_> {
// try_new only returns an error if the schema is invalid or the number
// of rows on columns differ. We have full control over both so there
// should never be an error to return...
arrow::record_batch::RecordBatch::try_new(Arc::new(schema), columns).unwrap()
arrow::record_batch::RecordBatch::try_new(arrow_schema, columns)
.map_err(|source| Error::ArrowError { source })
}
}
impl std::fmt::Debug for &ReadFilterResult<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// header line.
for (i, (k, _)) in self.schema.iter().enumerate() {
write!(f, "{}", k)?;
if i < self.schema.len() - 1 {
write!(f, ",")?;
}
}
// Display the header
std::fmt::Display::fmt(self.schema(), f)?;
writeln!(f)?;
// Display the rest of the values.
@ -1360,14 +1396,15 @@ impl std::fmt::Display for &ReadFilterResult<'_> {
.map(|v| ValuesIterator::new(v))
.collect::<Vec<_>>();
let columns = iter_map.len();
while rows < expected_rows {
if rows > 0 {
writeln!(f)?;
}
for (i, (k, _)) in self.schema.iter().enumerate() {
write!(f, "{}", iter_map[i].next().unwrap())?;
if i < self.schema.len() - 1 {
for (i, data) in iter_map.iter_mut().enumerate() {
write!(f, "{}", data.next().unwrap())?;
if i < columns - 1 {
write!(f, ",")?;
}
}
@ -1466,6 +1503,7 @@ impl std::fmt::Display for &ReadAggregateResult<'_> {
#[cfg(test)]
mod test {
use super::*;
use crate::schema;
// Helper function that creates a predicate from a single expression
fn col_pred(expr: BinaryExpr) -> Predicate {
@ -1951,17 +1989,23 @@ west,POST,304,101,203
schema: ResultSchema {
select_columns: vec![],
group_columns: vec![
("region".to_owned(), LogicalDataType::String),
("host".to_owned(), LogicalDataType::String),
(
schema::ColumnType::Tag("region".to_owned()),
LogicalDataType::String,
),
(
schema::ColumnType::Tag("host".to_owned()),
LogicalDataType::String,
),
],
aggregate_columns: vec![
(
"temp".to_owned(),
schema::ColumnType::Field("temp".to_owned()),
AggregateType::Sum,
LogicalDataType::Integer,
),
(
"voltage".to_owned(),
schema::ColumnType::Field("voltage".to_owned()),
AggregateType::Count,
LogicalDataType::Unsigned,
),
@ -2021,4 +2065,38 @@ west,host-d,11,9
"
);
}
#[test]
fn column_meta_equal() {
let col1 = ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("east".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
let col2 = ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("north".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
let col3 = ColumnMeta {
typ: schema::ColumnType::Tag("host".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("east".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
assert_eq!(col1, col2);
assert_ne!(col1, col3);
assert_ne!(col2, col3);
}
}

View File

@ -1,6 +1,7 @@
use std::fmt::Display;
use std::{convert::TryFrom, fmt::Display};
use crate::{column::LogicalDataType, AggregateType};
use arrow_deps::arrow;
use data_types::schema::InfluxFieldType;
/// A schema that is used to track the names and semantics of columns returned
/// in results out of various operations on a row group.
@ -10,24 +11,39 @@ use crate::{column::LogicalDataType, AggregateType};
/// the read buffer.
#[derive(Default, PartialEq, Debug)]
pub struct ResultSchema {
pub select_columns: Vec<(String, LogicalDataType)>,
pub group_columns: Vec<(String, LogicalDataType)>,
pub aggregate_columns: Vec<(String, AggregateType, LogicalDataType)>,
pub select_columns: Vec<(ColumnType, LogicalDataType)>,
pub group_columns: Vec<(ColumnType, LogicalDataType)>,
pub aggregate_columns: Vec<(ColumnType, AggregateType, LogicalDataType)>,
}
impl ResultSchema {
pub fn select_column_names_iter(&self) -> impl Iterator<Item = &str> {
self.select_columns.iter().map(|(name, _)| name.as_str())
pub fn select_column_names_iter(&self) -> impl Iterator<Item = &String> {
self.select_columns.iter().map(|(name, _)| match name {
ColumnType::Tag(name) => name,
ColumnType::Field(name) => name,
ColumnType::Timestamp(name) => name,
ColumnType::Other(name) => name,
})
}
pub fn group_column_names_iter(&self) -> impl Iterator<Item = &str> {
self.group_columns.iter().map(|(name, _)| name.as_str())
pub fn group_column_names_iter(&self) -> impl Iterator<Item = &String> {
self.group_columns.iter().map(|(name, _)| match name {
ColumnType::Tag(name) => name,
ColumnType::Field(name) => name,
ColumnType::Timestamp(name) => name,
ColumnType::Other(name) => name,
})
}
pub fn aggregate_column_names_iter(&self) -> impl Iterator<Item = &str> {
pub fn aggregate_column_names_iter(&self) -> impl Iterator<Item = &String> {
self.aggregate_columns
.iter()
.map(|(name, _, _)| name.as_str())
.map(|(name, _, _)| match name {
ColumnType::Tag(name) => name,
ColumnType::Field(name) => name,
ColumnType::Timestamp(name) => name,
ColumnType::Other(name) => name,
})
}
}
@ -61,3 +77,131 @@ impl Display for ResultSchema {
writeln!(f)
}
}
impl TryFrom<&ResultSchema> for data_types::schema::Schema {
type Error = data_types::schema::builder::Error;
fn try_from(rs: &ResultSchema) -> Result<Self, Self::Error> {
let mut builder = data_types::schema::builder::SchemaBuilder::new();
for (col_type, data_type) in &rs.select_columns {
match col_type {
ColumnType::Tag(name) => builder = builder.tag(name.as_str()),
ColumnType::Field(name) => {
builder = builder.influx_field(name.as_str(), data_type.into())
}
ColumnType::Timestamp(_) => builder = builder.timestamp(),
ColumnType::Other(name) => builder = builder.field(name.as_str(), data_type.into()),
}
}
builder.build()
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
/// The logical data-type for a column.
pub enum LogicalDataType {
Integer, // Signed integer
Unsigned, // Unsigned integer
Float, //
String, // UTF-8 valid string
Binary, // Arbitrary collection of bytes
Boolean, //
}
impl From<&LogicalDataType> for arrow::datatypes::DataType {
fn from(logical_type: &LogicalDataType) -> Self {
match logical_type {
LogicalDataType::Integer => arrow::datatypes::DataType::Int64,
LogicalDataType::Unsigned => arrow::datatypes::DataType::UInt64,
LogicalDataType::Float => arrow::datatypes::DataType::Float64,
LogicalDataType::String => arrow::datatypes::DataType::Utf8,
LogicalDataType::Binary => arrow::datatypes::DataType::Binary,
LogicalDataType::Boolean => arrow::datatypes::DataType::Boolean,
}
}
}
impl From<&LogicalDataType> for InfluxFieldType {
fn from(logical_type: &LogicalDataType) -> Self {
match logical_type {
LogicalDataType::Integer => InfluxFieldType::Integer,
LogicalDataType::Unsigned => InfluxFieldType::UInteger,
LogicalDataType::Float => InfluxFieldType::Float,
LogicalDataType::String => InfluxFieldType::String,
LogicalDataType::Binary => {
unimplemented!("binary data type cannot be represented as InfluxFieldType")
}
LogicalDataType::Boolean => InfluxFieldType::Boolean,
}
}
}
/// These variants describe supported aggregates that can applied to columnar
/// data in the Read Buffer.
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum AggregateType {
Count,
First,
Last,
Min,
Max,
Sum,
/* TODO - support:
* Distinct - (edd): not sure this counts as an aggregations. Seems more like a special
* filter. CountDistinct
* Percentile */
}
impl std::fmt::Display for AggregateType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
AggregateType::Count => "count",
AggregateType::First => "first",
AggregateType::Last => "last",
AggregateType::Min => "min",
AggregateType::Max => "max",
AggregateType::Sum => "sum",
}
)
}
}
/// Describes the semantic meaning of the column in a set of results. That is,
/// whether the column is a "tag", "field", "timestamp", or "other".
#[derive(PartialEq, Debug, PartialOrd, Clone)]
pub enum ColumnType {
Tag(String),
Field(String),
Timestamp(String),
Other(String),
}
impl ColumnType {
pub fn as_str(&self) -> &str {
match self {
ColumnType::Tag(name) => name.as_str(),
ColumnType::Field(name) => name.as_str(),
ColumnType::Timestamp(name) => name.as_str(),
ColumnType::Other(name) => name.as_str(),
}
}
}
impl Display for ColumnType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ColumnType::Tag(name) => name,
ColumnType::Field(name) => name,
ColumnType::Timestamp(name) => name,
ColumnType::Other(name) => name,
}
)
}
}

View File

@ -2,13 +2,9 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;
use std::slice::Iter;
use crate::column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value};
use crate::schema::ResultSchema;
use crate::{
column,
column::LogicalDataType,
row_group::{self, ColumnName, GroupKey, Predicate, RowGroup},
};
use crate::column::{AggregateResult, Scalar, Value};
use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup};
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
/// A Table represents data for a single measurement.
///
@ -40,14 +36,14 @@ impl Table {
pub fn new(name: String, rg: RowGroup) -> Self {
Self {
name,
meta: MetaData::new(&rg),
meta: MetaData::new(rg.metadata()),
row_groups: vec![rg],
}
}
/// Add a new row group to this table.
pub fn add_row_group(&mut self, rg: RowGroup) {
self.meta.update(&rg);
self.meta.update(rg.metadata());
self.row_groups.push(rg);
}
@ -91,16 +87,6 @@ impl Table {
self.meta.time_range
}
/// The ranges on each column in the table (across all row groups).
pub fn column_ranges(&self) -> BTreeMap<String, (OwnedValue, OwnedValue)> {
todo!()
}
/// The logical data-type of each column in the `Table`'s schema.
pub fn column_logical_types(&self) -> &BTreeMap<String, column::LogicalDataType> {
&self.meta.column_types
}
// Identify set of row groups that might satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> Vec<&RowGroup> {
let mut rgs = Vec::with_capacity(self.row_groups.len());
@ -137,18 +123,18 @@ impl Table {
// and merge results.
let rgs = self.filter_row_groups(predicate);
let schema = match columns {
ColumnSelection::All => self.meta.schema(),
ColumnSelection::Some(column_names) => self.meta.schema_for_column_names(column_names),
let schema = ResultSchema {
select_columns: match columns {
ColumnSelection::All => self.meta.schema_for_all_columns(),
ColumnSelection::Some(column_names) => {
self.meta.schema_for_column_names(column_names)
}
},
..ResultSchema::default()
};
// temp I think I can remove `columns` and `predicates` from this..
let columns = schema
.iter()
.map(|(name, _)| name.to_string())
.collect::<Vec<_>>();
// temp I think I can remove `predicates` from the results
ReadFilterResults {
columns,
predicate: predicate.clone(),
schema,
row_groups: rgs,
@ -419,15 +405,9 @@ struct MetaData {
rows: u64,
// The distinct set of columns for this table (all of these columns will
// appear in all of the table's row groups) and the range of values for
// each of those columns.
//
// This can be used to skip the table entirely if a logical predicate can't
// possibly match based on the range of values a column has.
column_ranges: BTreeMap<String, (OwnedValue, OwnedValue)>,
// The `ReadBuffer` logical types associated with the columns in the table
column_types: BTreeMap<String, column::LogicalDataType>,
// appear in all of the table's row groups) and meta data about those
// columns including their schema and range.
columns: BTreeMap<String, row_group::ColumnMeta>,
column_names: Vec<String>,
@ -440,59 +420,52 @@ struct MetaData {
}
impl MetaData {
pub fn new(rg: &RowGroup) -> Self {
pub fn new(meta: &row_group::MetaData) -> Self {
Self {
size: rg.size(),
rows: u64::from(rg.rows()),
column_ranges: rg
.column_ranges()
.iter()
.map(|(k, v)| (k.to_string(), (v.0.clone(), v.1.clone())))
.collect(),
column_types: rg
.column_logical_types()
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect(),
column_names: rg.column_ranges().keys().cloned().collect(),
time_range: Some(rg.time_range()),
size: meta.size,
rows: meta.rows as u64,
columns: meta.columns.clone(),
column_names: meta.columns.keys().cloned().collect(),
time_range: Some(meta.time_range),
}
}
// Extract schema information for a set of columns. If a column name does
// not exist within the `Table` schema it is ignored and not present within
// the resulting schema information.
fn schema_for_column_names(&self, names: &[ColumnName<'_>]) -> Vec<(String, LogicalDataType)> {
fn schema_for_column_names(
&self,
names: &[ColumnName<'_>],
) -> Vec<(ColumnType, LogicalDataType)> {
names
.iter()
.filter_map(|&name| match self.column_types.get_key_value(name) {
Some((name, data_type)) => Some((name.clone(), *data_type)),
.filter_map(|&name| match self.columns.get(name) {
Some(schema) => Some((schema.typ.clone(), schema.logical_data_type)),
None => None,
})
.collect::<Vec<_>>()
}
// As `schema_for_column_names` but for all columns in the table.
fn schema_for_all_columns(&self) -> Vec<(ColumnType, LogicalDataType)> {
self.columns
.iter()
.map(|(name, schema)| (schema.typ.clone(), schema.logical_data_type))
.collect::<Vec<_>>()
}
// As `schema_for_column_names` but also embeds the provided aggregate type.
fn schema_for_aggregate_column_names(
&self,
names: &[(ColumnName<'_>, AggregateType)],
) -> Vec<(String, AggregateType, LogicalDataType)> {
) -> Vec<(ColumnType, AggregateType, LogicalDataType)> {
names
.iter()
.filter_map(
|(name, agg_type)| match self.column_types.get_key_value(*name) {
Some((name, data_type)) => Some((name.clone(), *agg_type, *data_type)),
None => None,
},
)
.collect::<Vec<_>>()
}
// Extract all schema information for the `Table`.
fn schema(&self) -> Vec<(String, LogicalDataType)> {
self.column_types
.iter()
.map(|(k, v)| (k.clone(), *v))
.filter_map(|(name, agg_type)| {
self.columns
.get(*name)
.map(|schema| (schema.typ.clone(), *agg_type, schema.logical_data_type))
})
.collect::<Vec<_>>()
}
@ -500,21 +473,23 @@ impl MetaData {
self.column_names.iter().map(|name| name.as_str()).collect()
}
pub fn update(&mut self, rg: &RowGroup) {
pub fn update(&mut self, meta: &row_group::MetaData) {
// update size, rows, column ranges, time range
self.size += rg.size();
self.rows += u64::from(rg.rows());
self.size += meta.size;
self.rows += meta.rows as u64;
// The incoming row group must have the same schema as the existing row
// groups in the table.
assert_eq!(&self.column_types, rg.column_logical_types());
// The incoming row group must have exactly the same schema as the
// existing row groups in the table.
assert_eq!(&self.columns, &meta.columns);
assert_eq!(self.column_ranges.len(), rg.column_ranges().len());
for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() {
let mut curr_range = self
.column_ranges
// Update the table schema using the incoming row group schema
for (column_name, column_meta) in &meta.columns {
let (column_range_min, column_range_max) = &column_meta.range;
let mut curr_range = &mut self
.columns
.get_mut(&column_name.to_string())
.unwrap();
.unwrap()
.range;
if column_range_min < &curr_range.0 {
curr_range.0 = column_range_min.clone();
}
@ -522,6 +497,16 @@ impl MetaData {
if column_range_max > &curr_range.1 {
curr_range.1 = column_range_max.clone();
}
match self.time_range {
Some(time_range) => {
self.time_range = Some((
time_range.0.min(meta.time_range.0),
time_range.1.max(meta.time_range.1),
));
}
None => panic!("cannot call `update` on empty Metadata"),
}
}
}
@ -546,14 +531,13 @@ pub enum ColumnSelection<'a> {
/// row groups are only queried when `ReadFilterResults` is iterated.
pub struct ReadFilterResults<'table> {
// schema of all columns in the query results
schema: Vec<(String, LogicalDataType)>,
schema: ResultSchema,
// These row groups passed the predicates and need to be queried.
row_groups: Vec<&'table RowGroup>,
// TODO(edd): encapsulate these into a single executor function that just
// TODO(edd): encapsulate this into a single executor function that just
// executes on the next row group.
columns: Vec<String>,
predicate: Predicate,
}
@ -564,7 +548,7 @@ impl<'table> ReadFilterResults<'table> {
/// Returns the schema associated with table result and therefore all of the
/// results for all of row groups in the table results.
pub fn schema(&self) -> &Vec<(String, LogicalDataType)> {
pub fn schema(&self) -> &ResultSchema {
&self.schema
}
}
@ -580,8 +564,8 @@ impl<'a> Iterator for ReadFilterResults<'a> {
let row_group = self.row_groups.remove(0);
let result = row_group.read_filter(
&self
.columns
.iter()
.schema()
.select_column_names_iter()
.map(|name| name.as_str())
.collect::<Vec<_>>(),
&self.predicate,
@ -604,21 +588,15 @@ impl<'a> Display for DisplayReadFilterResults<'a> {
return Ok(());
}
let schema = self.0[0].schema();
// header line.
for (i, (k, _)) in schema.iter().enumerate() {
write!(f, "{}", k)?;
if i < schema.len() - 1 {
write!(f, ",")?;
}
}
// write out the schema of the first result as the table header
std::fmt::Display::fmt(&self.0[0].schema(), f)?;
writeln!(f)?;
// Display all the results of each row group
for row_group in &self.0 {
row_group.fmt(f)?;
// write out each row group result
for row_group in self.0.iter() {
std::fmt::Display::fmt(&row_group, f)?;
}
Ok(())
}
}
@ -663,7 +641,11 @@ impl<'a> Iterator for ReadAggregateResults<'a> {
let merged_results = self.row_groups.remove(0).read_aggregate(
&self.predicate,
&self.schema.group_column_names_iter().collect::<Vec<_>>(),
&self
.schema
.group_column_names_iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
&self
.schema
.aggregate_columns
@ -678,7 +660,11 @@ impl<'a> Iterator for ReadAggregateResults<'a> {
for row_group in &self.row_groups {
let result = row_group.read_aggregate(
&self.predicate,
&self.schema.group_column_names_iter().collect::<Vec<_>>(),
&self
.schema
.group_column_names_iter()
.map(|s| s.as_str())
.collect::<Vec<_>>(),
&self
.schema
.aggregate_columns
@ -723,9 +709,77 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)]
mod test {
use row_group::ColumnMeta;
use super::*;
use crate::column::{Column, LogicalDataType};
use crate::column::{self, Column};
use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult};
use crate::schema;
use crate::schema::LogicalDataType;
#[test]
fn meta_data_update() {
let rg_meta = row_group::MetaData {
size: 100,
rows: 2000,
columns: vec![(
"region".to_owned(),
ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
column::OwnedValue::String("north".to_owned()),
column::OwnedValue::String("south".to_owned()),
),
},
)]
.into_iter()
.collect::<BTreeMap<_, _>>(),
time_range: (10, 3000),
};
let mut meta = MetaData::new(&rg_meta);
assert_eq!(meta.rows, 2000);
assert_eq!(meta.size, 100);
assert_eq!(meta.time_range, Some((10, 3000)));
assert_eq!(
meta.columns.get("region").unwrap().range,
(
column::OwnedValue::String("north".to_owned()),
column::OwnedValue::String("south".to_owned())
)
);
meta.update(&row_group::MetaData {
size: 300,
rows: 1500,
columns: vec![(
"region".to_owned(),
ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
column::OwnedValue::String("east".to_owned()),
column::OwnedValue::String("north".to_owned()),
),
},
)]
.into_iter()
.collect::<BTreeMap<_, _>>(),
time_range: (10, 3500),
});
assert_eq!(meta.rows, 3500);
assert_eq!(meta.size, 400);
assert_eq!(meta.time_range, Some((10, 3500)));
assert_eq!(
meta.columns.get("region").unwrap().range,
(
column::OwnedValue::String("east".to_owned()),
column::OwnedValue::String("south".to_owned())
)
);
}
#[test]
fn select() {
@ -746,13 +800,21 @@ mod test {
let mut table = Table::new("cpu".to_owned(), rg);
let exp_col_types = vec![
("region".to_owned(), LogicalDataType::String),
("count".to_owned(), LogicalDataType::Unsigned),
("time".to_owned(), LogicalDataType::Integer),
("region", LogicalDataType::String),
("count", LogicalDataType::Unsigned),
("time", LogicalDataType::Integer),
]
.into_iter()
.collect::<BTreeMap<_, _>>();
assert_eq!(table.column_logical_types(), &exp_col_types);
assert_eq!(
table
.meta
.columns
.iter()
.map(|(k, v)| (k.as_str(), v.logical_data_type))
.collect::<BTreeMap<_, _>>(),
exp_col_types
);
// Build another segment.
let mut columns = BTreeMap::new();
@ -773,11 +835,24 @@ mod test {
);
// check the column types
let exp_schema = vec![
("time".to_owned(), LogicalDataType::Integer),
("count".to_owned(), LogicalDataType::Unsigned),
("region".to_owned(), LogicalDataType::String),
];
let exp_schema = ResultSchema {
select_columns: vec![
(
schema::ColumnType::Timestamp("time".to_owned()),
LogicalDataType::Integer,
),
(
schema::ColumnType::Field("count".to_owned()),
LogicalDataType::Unsigned,
),
(
schema::ColumnType::Tag("region".to_owned()),
LogicalDataType::String,
),
],
..ResultSchema::default()
};
assert_eq!(results.schema(), &exp_schema);
let mut all = vec![];
@ -832,11 +907,17 @@ mod test {
schema: ResultSchema {
select_columns: vec![],
group_columns: vec![
("region".to_owned(), LogicalDataType::String),
("host".to_owned(), LogicalDataType::String),
(
schema::ColumnType::Tag("region".to_owned()),
LogicalDataType::String,
),
(
schema::ColumnType::Tag("host".to_owned()),
LogicalDataType::String,
),
],
aggregate_columns: vec![(
"temp".to_owned(),
schema::ColumnType::Tag("temp".to_owned()),
AggregateType::Sum,
LogicalDataType::Integer,
)],
@ -848,11 +929,17 @@ mod test {
schema: ResultSchema {
select_columns: vec![],
group_columns: vec![
("region".to_owned(), LogicalDataType::String),
("host".to_owned(), LogicalDataType::String),
(
schema::ColumnType::Tag("region".to_owned()),
LogicalDataType::String,
),
(
schema::ColumnType::Tag("host".to_owned()),
LogicalDataType::String,
),
],
aggregate_columns: vec![(
"temp".to_owned(),
schema::ColumnType::Tag("temp".to_owned()),
AggregateType::Sum,
LogicalDataType::Integer,
)],