diff --git a/delorean_write_buffer/src/database.rs b/delorean_write_buffer/src/database.rs index 95c5fe99e7..7e51579304 100644 --- a/delorean_write_buffer/src/database.rs +++ b/delorean_write_buffer/src/database.rs @@ -1,8 +1,9 @@ use delorean_generated_types::wal as wb; use delorean_line_parser::ParsedLine; use delorean_storage::{ - exec::GroupedSeriesSetPlans, exec::SeriesSetPlan, exec::SeriesSetPlans, exec::StringSet, - exec::StringSetPlan, Database, Predicate, TimestampRange, + exec::GroupedSeriesSetPlan, exec::GroupedSeriesSetPlans, exec::SeriesSetPlan, + exec::SeriesSetPlans, exec::StringSet, exec::StringSetPlan, Database, Predicate, + TimestampRange, }; use delorean_wal::WalBuilder; use delorean_wal_writer::{start_wal_sync_task, Error as WalWriterError, WalDetails}; @@ -459,11 +460,13 @@ impl Database for Db { async fn query_groups( &self, - _range: Option, - _predicate: Option, - _group_columns: Vec, + range: Option, + predicate: Option, + group_columns: Vec, ) -> Result { - unimplemented!("query_groups unimplemented as part of write buffer database"); + let mut visitor = GroupsVisitor::new(predicate, group_columns); + self.visit_tables(None, range, &mut visitor).await?; + Ok(visitor.plans.into()) } async fn table_to_arrow( @@ -941,6 +944,45 @@ impl Visitor for SeriesVisitor { } } +/// Return DataFusion plans to calculate series that pass the +/// specified predicate, grouped according to grouped_columns +/// +/// TODO: Handle _f= and _m= predicates +/// specially (by filtering entire tables and selecting fields) +struct GroupsVisitor { + predicate: Option, + group_columns: Vec, + plans: Vec, +} + +impl GroupsVisitor { + fn new(predicate: Option, group_columns: Vec) -> Self { + Self { + predicate, + group_columns, + plans: Vec::new(), + } + } +} + +impl Visitor for GroupsVisitor { + fn pre_visit_table( + &mut self, + table: &Table, + partition: &Partition, + ts_pred: Option<&TimestampPredicate>, + ) -> Result<()> { + self.plans.push(table.grouped_series_set_plan( + self.predicate.as_ref(), + ts_pred, + &self.group_columns, + partition, + )?); + + Ok(()) + } +} + // partition_key returns the partition key for the given line. The key will be the prefix of a // partition name (multiple partitions can exist for each key). It uses the user defined // partitioning rules to construct this key diff --git a/delorean_write_buffer/src/table.rs b/delorean_write_buffer/src/table.rs index b67ccc2193..ac57c55bc4 100644 --- a/delorean_write_buffer/src/table.rs +++ b/delorean_write_buffer/src/table.rs @@ -1,5 +1,8 @@ use delorean_generated_types::wal as wb; -use delorean_storage::{exec::make_schema_pivot, exec::SeriesSetPlan, Predicate, TimestampRange}; +use delorean_storage::{ + exec::make_schema_pivot, exec::GroupedSeriesSetPlan, exec::SeriesSetPlan, Predicate, + TimestampRange, +}; use std::{collections::HashMap, sync::Arc}; @@ -142,6 +145,19 @@ pub enum Error { #[snafu(display("Row insert to table {} missing column name", table))] ColumnNameNotInRow { table: u32 }, + + #[snafu(display( + "Group column '{}' not found in tag columns: {}", + column_name, + all_tag_column_names + ))] + GroupColumnNotFound { + column_name: String, + all_tag_column_names: String, + }, + + #[snafu(display("Duplicate group column '{}'", column_name))] + DuplicateGroupColumn { column_name: String }, } pub type Result = std::result::Result; @@ -160,6 +176,8 @@ pub struct Table { pub columns: Vec, } +type ArcStringVec = Vec>; + impl Table { pub fn new(id: u32) -> Self { Self { @@ -427,17 +445,27 @@ impl Table { /// The data is sorted on tag_col1, tag_col2, ...) so that all /// rows for a particular series (groups where all tags are the /// same) occur together in the plan - /// + pub fn series_set_plan( + &self, + predicate: Option<&Predicate>, + timestamp_predicate: Option<&TimestampPredicate>, + partition: &Partition, + ) -> Result { + self.series_set_plan_impl(predicate, timestamp_predicate, None, partition) + } + + /// Creates the plans for computing series set, pulling prefix_columns, if any, as a prefix of the ordering /// The created plan looks like: /// /// Projection (select the columns columns needed) /// Order by (tag_columns, timestamp_column) /// Filter(predicate) /// InMemoryScan - pub fn series_set_plan( + pub fn series_set_plan_impl( &self, predicate: Option<&Predicate>, timestamp_predicate: Option<&TimestampPredicate>, + prefix_columns: Option<&[String]>, partition: &Partition, ) -> Result { // Note we also need to add a timestamp predicate to this @@ -461,34 +489,13 @@ impl Table { .to_string(); let table_name = Arc::new(table_name); + let (mut tag_columns, field_columns) = self.tag_and_field_column_names(partition)?; - let mut field_columns = Vec::with_capacity(self.column_id_to_index.len()); - let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len()); - - for (&column_id, &column_index) in &self.column_id_to_index { - let column_name = partition - .dictionary - .lookup_id(column_id) - .expect("Find column name in dictionary"); - - if column_name != TIME_COLUMN_NAME { - let column_name = Arc::new(column_name.to_string()); - - match self.columns[column_index] { - Column::Tag(_) => tag_columns.push(column_name), - _ => field_columns.push(column_name), - } - } + // reorder tag_columns to have the prefix columns, if requested + if let Some(prefix_columns) = prefix_columns { + tag_columns = reorder_prefix(prefix_columns, tag_columns)?; } - // tag columns are always sorted by name (aka sorted by tag - // key) in the output schema, so ensure the columns are sorted - // (the select exprs) - tag_columns.sort(); - - // Sort the field columns too so that the output always comes out in a predictable order - field_columns.sort(); - // TODO avoid materializing all the columns here (ideally // DataFusion can prune them out) let data = self.all_to_arrow(partition)?; @@ -538,6 +545,82 @@ impl Table { }) } + /// Creates a GroupedSeriesSet plan that produces an output table with rows that match the predicate + /// + /// The output looks like: + /// (group_tag_column1, group_tag_column2, ... tag_col1, tag_col2, ... field1, field2, ... timestamp) + /// + /// The order of the tag_columns is ordered by name. + /// + /// The data is sorted on tag_col1, tag_col2, ...) so that all + /// rows for a particular series (groups where all tags are the + /// same) occur together in the plan + /// + /// The created plan looks like: + /// + /// Projection (select the columns columns needed) + /// Order by (tag_columns, timestamp_column) + /// Filter(predicate) + /// InMemoryScan + pub fn grouped_series_set_plan( + &self, + predicate: Option<&Predicate>, + timestamp_predicate: Option<&TimestampPredicate>, + group_columns: &[String], + partition: &Partition, + ) -> Result { + let series_set_plan = self.series_set_plan_impl( + predicate, + timestamp_predicate, + Some(&group_columns), + partition, + )?; + let num_prefix_tag_group_columns = group_columns.len(); + + Ok(GroupedSeriesSetPlan { + series_set_plan, + num_prefix_tag_group_columns, + }) + } + + // Returns (tag_columns, field_columns) vectors with the names of + // all tag and field columns, respectively. The vectors are sorted + // by name. + fn tag_and_field_column_names( + &self, + partition: &Partition, + ) -> Result<(ArcStringVec, ArcStringVec)> { + let mut tag_columns = Vec::with_capacity(self.column_id_to_index.len()); + let mut field_columns = Vec::with_capacity(self.column_id_to_index.len()); + + for (&column_id, &column_index) in &self.column_id_to_index { + let column_name = partition + .dictionary + .lookup_id(column_id) + .expect("Find column name in dictionary"); + + if column_name != TIME_COLUMN_NAME { + let column_name = Arc::new(column_name.to_string()); + + match self.columns[column_index] { + Column::Tag(_) => tag_columns.push(column_name), + _ => field_columns.push(column_name), + } + } + } + + // tag columns are always sorted by name (aka sorted by tag + // key) in the output schema, so ensure the columns are sorted + // (the select exprs) + tag_columns.sort(); + + // Sort the field columns too so that the output always comes + // out in a predictable order + field_columns.sort(); + + Ok((tag_columns, field_columns)) + } + /// Creates a DataFusion predicate of the form: /// /// `expr AND (range.start <= time and time < range.end)` @@ -768,6 +851,67 @@ impl Table { } } +/// Reorders tag_columns so that its prefix matches exactly +/// prefix_columns. Returns an error if there are duplicates, or other +/// untoward inputs +fn reorder_prefix( + prefix_columns: &[String], + tag_columns: Vec>, +) -> Result>> { + // tag_used_set[i[ is true if we have used the value in tag_columns[i] + let mut tag_used_set = vec![false; tag_columns.len()]; + + // Note that this is an O(N^2) algorithm. We are assuming the + // number of tag columns is reasonably small + + // map from prefix_column[idx] -> index in tag_columns + let prefix_map = prefix_columns + .iter() + .map(|pc| { + let found_location = tag_columns + .iter() + .enumerate() + .find(|(_, c)| pc == c.as_ref()); + + if let Some((index, _)) = found_location { + if tag_used_set[index] { + DuplicateGroupColumn { column_name: pc }.fail() + } else { + tag_used_set[index] = true; + Ok(index) + } + } else { + GroupColumnNotFound { + column_name: pc, + all_tag_column_names: tag_columns + .iter() + .map(|s| s.as_ref() as &str) + .collect::>() + .as_slice() + .join(", "), + } + .fail() + } + }) + .collect::>>()?; + + let mut new_tag_columns = prefix_map + .iter() + .map(|&i| tag_columns[i].clone()) + .collect::>(); + + new_tag_columns.extend(tag_columns.into_iter().enumerate().filter_map(|(i, c)| { + // already used in prefix + if tag_used_set[i] { + None + } else { + Some(c) + } + })); + + Ok(new_tag_columns) +} + /// Traits to help creating DataFuson expressions from strings trait IntoExpr { /// Creates a DataFuson expr @@ -981,6 +1125,148 @@ mod tests { assert_eq!(expected, results, "expected output"); } + #[tokio::test(threaded_scheduler)] + async fn test_grouped_series_set_plan() { + // test that filters are applied reasonably + + // setup a test table + let mut partition = Partition::new("dummy_partition_key"); + let dictionary = &mut partition.dictionary; + let mut table = Table::new(dictionary.lookup_value_or_insert("table_name")); + + let lp_lines = vec![ + "h2o,state=MA,city=Boston temp=70.4 100", + "h2o,state=MA,city=Boston temp=72.4 250", + "h2o,state=CA,city=LA temp=90.0 200", + "h2o,state=CA,city=LA temp=90.0 350", + ]; + + write_lines_to_table(&mut table, dictionary, lp_lines); + + let expr = Expr::BinaryExpr { + left: Box::new(Expr::Column("city".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("LA".into())))), + }; + let predicate = Some(Predicate { expr }); + + let range = Some(TimestampRange::new(190, 210)); + let timestamp_predicate = partition + .make_timestamp_predicate(range) + .expect("Made a timestamp predicate"); + + let group_columns = vec![String::from("state")]; + let grouped_series_set_plan = table + .grouped_series_set_plan( + predicate.as_ref(), + timestamp_predicate.as_ref(), + &group_columns, + &partition, + ) + .expect("creating the grouped_series set plan"); + + assert_eq!(grouped_series_set_plan.num_prefix_tag_group_columns, 1); + + // run the created plan, ensuring the output is as expected + let results = run_plan(grouped_series_set_plan.series_set_plan.plan).await; + + let expected = vec![ + "+-------+------+------+------+", + "| state | city | temp | time |", + "+-------+------+------+------+", + "| CA | LA | 90 | 200 |", + "+-------+------+------+------+", + ]; + + assert_eq!(expected, results, "expected output"); + } + + #[test] + fn test_reorder_prefix() { + assert_eq!(reorder_prefix_ok(&[], &[]), &[] as &[&str]); + + assert_eq!(reorder_prefix_ok(&[], &["one"]), &["one"]); + assert_eq!(reorder_prefix_ok(&["one"], &["one"]), &["one"]); + + assert_eq!(reorder_prefix_ok(&[], &["one", "two"]), &["one", "two"]); + assert_eq!( + reorder_prefix_ok(&["one"], &["one", "two"]), + &["one", "two"] + ); + assert_eq!( + reorder_prefix_ok(&["two"], &["one", "two"]), + &["two", "one"] + ); + assert_eq!( + reorder_prefix_ok(&["two", "one"], &["one", "two"]), + &["two", "one"] + ); + + assert_eq!( + reorder_prefix_ok(&[], &["one", "two", "three"]), + &["one", "two", "three"] + ); + assert_eq!( + reorder_prefix_ok(&["one"], &["one", "two", "three"]), + &["one", "two", "three"] + ); + assert_eq!( + reorder_prefix_ok(&["two"], &["one", "two", "three"]), + &["two", "one", "three"] + ); + assert_eq!( + reorder_prefix_ok(&["three", "one"], &["one", "two", "three"]), + &["three", "one", "two"] + ); + + // errors + assert_eq!( + reorder_prefix_err(&["one"], &[]), + "Group column \'one\' not found in tag columns: " + ); + assert_eq!( + reorder_prefix_err(&["one"], &["two", "three"]), + "Group column \'one\' not found in tag columns: two, three" + ); + assert_eq!( + reorder_prefix_err(&["two", "one", "two"], &["one", "two"]), + "Duplicate group column \'two\'" + ); + } + + fn reorder_prefix_ok(prefix: &[&str], table_columns: &[&str]) -> Vec { + let prefix = prefix.iter().map(|s| s.to_string()).collect::>(); + let table_columns = + Arc::try_unwrap(str_vec_to_arc_vec(table_columns)).expect("unwrap the arc"); + + let res = reorder_prefix(&prefix, table_columns); + let message = format!("Expected OK, got {:?}", res); + let res = res.expect(&message); + + res.into_iter() + .map(|a| Arc::try_unwrap(a).expect("unwrapping arc")) + .collect() + } + + // returns the error string or panics if `reorder_prefix` doesn't return an error + fn reorder_prefix_err(prefix: &[&str], table_columns: &[&str]) -> String { + let prefix = prefix.iter().map(|s| s.to_string()).collect::>(); + let table_columns = + Arc::try_unwrap(str_vec_to_arc_vec(table_columns)).expect("unwrap the arc"); + + let res = reorder_prefix(&prefix, table_columns); + + match res { + Ok(r) => { + panic!( + "Expected error result from reorder_prefix_err, but was OK: '{:?}'", + r + ); + } + Err(e) => format!("{}", e), + } + } + /// Runs `plan` and returns the output as petty-formatted array of strings async fn run_plan(plan: LogicalPlan) -> Vec { // run the created plan, ensuring the output is as expected