feat: Plan for computing groups (#366)

pull/24376/head
Andrew Lamb 2020-10-19 14:14:43 -04:00 committed by GitHub
parent bfb966b1f1
commit ee344c3d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 362 additions and 34 deletions

View File

@ -1,8 +1,9 @@
use delorean_generated_types::wal as wb;
use delorean_line_parser::ParsedLine;
use delorean_storage::{
exec::GroupedSeriesSetPlans, exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet,
exec::StringSetPlan, Database, Predicate, TimestampRange,
exec::GroupedSeriesSetPlan, exec::GroupedSeriesSetPlans, exec::SeriesSetPlan,
exec::SeriesSetPlans, exec::StringSet, exec::StringSetPlan, Database, Predicate,
TimestampRange,
};
use delorean_wal::WalBuilder;
use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails};
@ -459,11 +460,13 @@ impl Database for Db {
async fn query_groups(
&self,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
_group_columns: Vec<String>,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
group_columns: Vec<String>,
) -> Result<GroupedSeriesSetPlans, Self::Error> {
unimplemented!("query_groups unimplemented as part of write buffer database");
let mut visitor = GroupsVisitor::new(predicate, group_columns);
self.visit_tables(None, range, &mut visitor).await?;
Ok(visitor.plans.into())
}
async fn table_to_arrow(
@ -941,6 +944,45 @@ impl Visitor for SeriesVisitor {
}
}
/// Return DataFusion plans to calculate series that pass the
/// specified predicate, grouped according to grouped_columns
///
/// TODO: Handle _f=<fieldname> and _m=<measurement> predicates
/// specially (by filtering entire tables and selecting fields)
struct GroupsVisitor {
predicate: Option<Predicate>,
group_columns: Vec<String>,
plans: Vec<GroupedSeriesSetPlan>,
}
impl GroupsVisitor {
fn new(predicate: Option<Predicate>, group_columns: Vec<String>) -> Self {
Self {
predicate,
group_columns,
plans: Vec::new(),
}
}
}
impl Visitor for GroupsVisitor {
fn pre_visit_table(
&mut self,
table: &Table,
partition: &Partition,
ts_pred: Option<&TimestampPredicate>,
) -> Result<()> {
self.plans.push(table.grouped_series_set_plan(
self.predicate.as_ref(),
ts_pred,
&self.group_columns,
partition,
)?);
Ok(())
}
}
// partition_key returns the partition key for the given line. The key will be the prefix of a
// partition name (multiple partitions can exist for each key). It uses the user defined
// partitioning rules to construct this key

View File

@ -1,5 +1,8 @@
use delorean_generated_types::wal as wb;
use delorean_storage::{exec::make_schema_pivot, exec::SeriesSetPlan, Predicate, TimestampRange};
use delorean_storage::{
exec::make_schema_pivot, exec::GroupedSeriesSetPlan, exec::SeriesSetPlan, Predicate,
TimestampRange,
};
use std::{collections::HashMap, sync::Arc};
@ -142,6 +145,19 @@ pub enum Error {
#[snafu(display("Row insert to table {} missing column name", table))]
ColumnNameNotInRow { table: u32 },
#[snafu(display(
"Group column '{}' not found in tag columns: {}",
column_name,
all_tag_column_names
))]
GroupColumnNotFound {
column_name: String,
all_tag_column_names: String,
},
#[snafu(display("Duplicate group column '{}'", column_name))]
DuplicateGroupColumn { column_name: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -160,6 +176,8 @@ pub struct Table {
pub columns: Vec<Column>,
}
type ArcStringVec = Vec<Arc<String>>;
impl Table {
pub fn new(id: u32) -> Self {
Self {
@ -427,17 +445,27 @@ impl Table {
/// The data is sorted on tag_col1, tag_col2, ...) so that all
/// rows for a particular series (groups where all tags are the
/// same) occur together in the plan
///
pub fn series_set_plan(
&self,
predicate: Option<&Predicate>,
timestamp_predicate: Option<&TimestampPredicate>,
partition: &Partition,
) -> Result<SeriesSetPlan> {
self.series_set_plan_impl(predicate, timestamp_predicate, None, partition)
}
/// Creates the plans for computing series set, pulling prefix_columns, if any, as a prefix of the ordering
/// The created plan looks like:
///
/// Projection (select the columns columns needed)
/// Order by (tag_columns, timestamp_column)
/// Filter(predicate)
/// InMemoryScan
pub fn series_set_plan(
pub fn series_set_plan_impl(
&self,
predicate: Option<&Predicate>,
timestamp_predicate: Option<&TimestampPredicate>,
prefix_columns: Option<&[String]>,
partition: &Partition,
) -> Result<SeriesSetPlan> {
// Note we also need to add a timestamp predicate to this
@ -461,34 +489,13 @@ impl Table {
.to_string();
let table_name = Arc::new(table_name);
let (mut tag_columns, field_columns) = self.tag_and_field_column_names(partition)?;
let mut field_columns = Vec::with_capacity(self.column_id_to_index.len());
let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len());
for (&column_id, &column_index) in &self.column_id_to_index {
let column_name = partition
.dictionary
.lookup_id(column_id)
.expect("Find column name in dictionary");
if column_name != TIME_COLUMN_NAME {
let column_name = Arc::new(column_name.to_string());
match self.columns[column_index] {
Column::Tag(_) => tag_columns.push(column_name),
_ => field_columns.push(column_name),
}
}
// reorder tag_columns to have the prefix columns, if requested
if let Some(prefix_columns) = prefix_columns {
tag_columns = reorder_prefix(prefix_columns, tag_columns)?;
}
// tag columns are always sorted by name (aka sorted by tag
// key) in the output schema, so ensure the columns are sorted
// (the select exprs)
tag_columns.sort();
// Sort the field columns too so that the output always comes out in a predictable order
field_columns.sort();
// TODO avoid materializing all the columns here (ideally
// DataFusion can prune them out)
let data = self.all_to_arrow(partition)?;
@ -538,6 +545,82 @@ impl Table {
})
}
/// Creates a GroupedSeriesSet plan that produces an output table with rows that match the predicate
///
/// The output looks like:
/// (group_tag_column1, group_tag_column2, ... tag_col1, tag_col2, ... field1, field2, ... timestamp)
///
/// The order of the tag_columns is ordered by name.
///
/// The data is sorted on tag_col1, tag_col2, ...) so that all
/// rows for a particular series (groups where all tags are the
/// same) occur together in the plan
///
/// The created plan looks like:
///
/// Projection (select the columns columns needed)
/// Order by (tag_columns, timestamp_column)
/// Filter(predicate)
/// InMemoryScan
pub fn grouped_series_set_plan(
&self,
predicate: Option<&Predicate>,
timestamp_predicate: Option<&TimestampPredicate>,
group_columns: &[String],
partition: &Partition,
) -> Result<GroupedSeriesSetPlan> {
let series_set_plan = self.series_set_plan_impl(
predicate,
timestamp_predicate,
Some(&group_columns),
partition,
)?;
let num_prefix_tag_group_columns = group_columns.len();
Ok(GroupedSeriesSetPlan {
series_set_plan,
num_prefix_tag_group_columns,
})
}
// Returns (tag_columns, field_columns) vectors with the names of
// all tag and field columns, respectively. The vectors are sorted
// by name.
fn tag_and_field_column_names(
&self,
partition: &Partition,
) -> Result<(ArcStringVec, ArcStringVec)> {
let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len());
let mut field_columns = Vec::with_capacity(self.column_id_to_index.len());
for (&column_id, &column_index) in &self.column_id_to_index {
let column_name = partition
.dictionary
.lookup_id(column_id)
.expect("Find column name in dictionary");
if column_name != TIME_COLUMN_NAME {
let column_name = Arc::new(column_name.to_string());
match self.columns[column_index] {
Column::Tag(_) => tag_columns.push(column_name),
_ => field_columns.push(column_name),
}
}
}
// tag columns are always sorted by name (aka sorted by tag
// key) in the output schema, so ensure the columns are sorted
// (the select exprs)
tag_columns.sort();
// Sort the field columns too so that the output always comes
// out in a predictable order
field_columns.sort();
Ok((tag_columns, field_columns))
}
/// Creates a DataFusion predicate of the form:
///
/// `expr AND (range.start <= time and time < range.end)`
@ -768,6 +851,67 @@ impl Table {
}
}
/// Reorders tag_columns so that its prefix matches exactly
/// prefix_columns. Returns an error if there are duplicates, or other
/// untoward inputs
fn reorder_prefix(
prefix_columns: &[String],
tag_columns: Vec<Arc<String>>,
) -> Result<Vec<Arc<String>>> {
// tag_used_set[i[ is true if we have used the value in tag_columns[i]
let mut tag_used_set = vec![false; tag_columns.len()];
// Note that this is an O(N^2) algorithm. We are assuming the
// number of tag columns is reasonably small
// map from prefix_column[idx] -> index in tag_columns
let prefix_map = prefix_columns
.iter()
.map(|pc| {
let found_location = tag_columns
.iter()
.enumerate()
.find(|(_, c)| pc == c.as_ref());
if let Some((index, _)) = found_location {
if tag_used_set[index] {
DuplicateGroupColumn { column_name: pc }.fail()
} else {
tag_used_set[index] = true;
Ok(index)
}
} else {
GroupColumnNotFound {
column_name: pc,
all_tag_column_names: tag_columns
.iter()
.map(|s| s.as_ref() as &str)
.collect::<Vec<_>>()
.as_slice()
.join(", "),
}
.fail()
}
})
.collect::<Result<Vec<_>>>()?;
let mut new_tag_columns = prefix_map
.iter()
.map(|&i| tag_columns[i].clone())
.collect::<Vec<_>>();
new_tag_columns.extend(tag_columns.into_iter().enumerate().filter_map(|(i, c)| {
// already used in prefix
if tag_used_set[i] {
None
} else {
Some(c)
}
}));
Ok(new_tag_columns)
}
/// Traits to help creating DataFuson expressions from strings
trait IntoExpr {
/// Creates a DataFuson expr
@ -981,6 +1125,148 @@ mod tests {
assert_eq!(expected, results, "expected output");
}
#[tokio::test(threaded_scheduler)]
async fn test_grouped_series_set_plan() {
// test that filters are applied reasonably
// setup a test table
let mut partition = Partition::new("dummy_partition_key");
let dictionary = &mut partition.dictionary;
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
"h2o,state=CA,city=LA temp=90.0 200",
"h2o,state=CA,city=LA temp=90.0 350",
];
write_lines_to_table(&mut table, dictionary, lp_lines);
let expr = Expr::BinaryExpr {
left: Box::new(Expr::Column("city".into())),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("LA".into())))),
};
let predicate = Some(Predicate { expr });
let range = Some(TimestampRange::new(190, 210));
let timestamp_predicate = partition
.make_timestamp_predicate(range)
.expect("Made a timestamp predicate");
let group_columns = vec![String::from("state")];
let grouped_series_set_plan = table
.grouped_series_set_plan(
predicate.as_ref(),
timestamp_predicate.as_ref(),
&group_columns,
&partition,
)
.expect("creating the grouped_series set plan");
assert_eq!(grouped_series_set_plan.num_prefix_tag_group_columns, 1);
// run the created plan, ensuring the output is as expected
let results = run_plan(grouped_series_set_plan.series_set_plan.plan).await;
let expected = vec![
"+-------+------+------+------+",
"| state | city | temp | time |",
"+-------+------+------+------+",
"| CA | LA | 90 | 200 |",
"+-------+------+------+------+",
];
assert_eq!(expected, results, "expected output");
}
#[test]
fn test_reorder_prefix() {
assert_eq!(reorder_prefix_ok(&[], &[]), &[] as &[&str]);
assert_eq!(reorder_prefix_ok(&[], &["one"]), &["one"]);
assert_eq!(reorder_prefix_ok(&["one"], &["one"]), &["one"]);
assert_eq!(reorder_prefix_ok(&[], &["one", "two"]), &["one", "two"]);
assert_eq!(
reorder_prefix_ok(&["one"], &["one", "two"]),
&["one", "two"]
);
assert_eq!(
reorder_prefix_ok(&["two"], &["one", "two"]),
&["two", "one"]
);
assert_eq!(
reorder_prefix_ok(&["two", "one"], &["one", "two"]),
&["two", "one"]
);
assert_eq!(
reorder_prefix_ok(&[], &["one", "two", "three"]),
&["one", "two", "three"]
);
assert_eq!(
reorder_prefix_ok(&["one"], &["one", "two", "three"]),
&["one", "two", "three"]
);
assert_eq!(
reorder_prefix_ok(&["two"], &["one", "two", "three"]),
&["two", "one", "three"]
);
assert_eq!(
reorder_prefix_ok(&["three", "one"], &["one", "two", "three"]),
&["three", "one", "two"]
);
// errors
assert_eq!(
reorder_prefix_err(&["one"], &[]),
"Group column \'one\' not found in tag columns: "
);
assert_eq!(
reorder_prefix_err(&["one"], &["two", "three"]),
"Group column \'one\' not found in tag columns: two, three"
);
assert_eq!(
reorder_prefix_err(&["two", "one", "two"], &["one", "two"]),
"Duplicate group column \'two\'"
);
}
fn reorder_prefix_ok(prefix: &[&str], table_columns: &[&str]) -> Vec<String> {
let prefix = prefix.iter().map(|s| s.to_string()).collect::<Vec<_>>();
let table_columns =
Arc::try_unwrap(str_vec_to_arc_vec(table_columns)).expect("unwrap the arc");
let res = reorder_prefix(&prefix, table_columns);
let message = format!("Expected OK, got {:?}", res);
let res = res.expect(&message);
res.into_iter()
.map(|a| Arc::try_unwrap(a).expect("unwrapping arc"))
.collect()
}
// returns the error string or panics if `reorder_prefix` doesn't return an error
fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String {
let prefix = prefix.iter().map(|s| s.to_string()).collect::<Vec<_>>();
let table_columns =
Arc::try_unwrap(str_vec_to_arc_vec(table_columns)).expect("unwrap the arc");
let res = reorder_prefix(&prefix, table_columns);
match res {
Ok(r) => {
panic!(
"Expected error result from reorder_prefix_err, but was OK: '{:?}'",
r
);
}
Err(e) => format!("{}", e),
}
}
/// Runs `plan` and returns the output as petty-formatted array of strings
async fn run_plan(plan: LogicalPlan) -> Vec<String> {
// run the created plan, ensuring the output is as expected