feat: implement column_values for write buffer database (#339)

pull/24376/head
Andrew Lamb 2020-10-07 10:12:28 -04:00 committed by GitHub
parent 3ba1a95795
commit 9a81bf4d72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 536 additions and 11 deletions

View File

@ -31,6 +31,18 @@ impl TimestampRange {
pub fn new(start: i64, end: i64) -> Self {
Self { start, end }
}
#[inline]
/// Returns true if this range contains the value v
pub fn contains(&self, v: i64) -> bool {
self.start <= v && v < self.end
}
#[inline]
/// Returns true if this range contains the value v
pub fn contains_opt(&self, v: Option<i64>) -> bool {
Some(true) == v.map(|ts| self.contains(ts))
}
}
/// Represents a general purpose predicate for evaluation
@ -151,3 +163,32 @@ pub fn org_and_bucket_to_database(org: impl Into<String>, bucket: &str) -> Strin
//
//#[cfg(test)]
pub mod test;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timestamp_range_contains() {
let range = TimestampRange::new(100, 200);
assert!(!range.contains(99));
assert!(range.contains(100));
assert!(range.contains(101));
assert!(range.contains(199));
assert!(!range.contains(200));
assert!(!range.contains(201));
}
#[test]
fn test_timestamp_range_contains_opt() {
let range = TimestampRange::new(100, 200);
assert!(!range.contains_opt(Some(99)));
assert!(range.contains_opt(Some(100)));
assert!(range.contains_opt(Some(101)));
assert!(range.contains_opt(Some(199)));
assert!(!range.contains_opt(Some(200)));
assert!(!range.contains_opt(Some(201)));
assert!(!range.contains_opt(None));
}
}

View File

@ -115,16 +115,44 @@ pub enum Error {
},
#[snafu(display(
"Column ID {} not found in dictionary of partition {}",
column,
"Column name {} not found in dictionary of partition {}",
column_name,
partition
))]
ColumnIdNotFoundInDictionary {
column: u32,
ColumnNameNotFoundInDictionary {
column_name: String,
partition: String,
source: DictionaryError,
},
#[snafu(display(
"Column ID {} not found in dictionary of partition {}",
column_id,
partition
))]
ColumnIdNotFoundInDictionary {
column_id: u32,
partition: String,
source: DictionaryError,
},
#[snafu(display(
"Value ID {} not found in dictionary of partition {}",
value_id,
partition
))]
ColumnValueIdNotFoundInDictionary {
value_id: u32,
partition: String,
source: DictionaryError,
},
#[snafu(display(
"Column '{}' is not a tag column and thus can not list values",
column_name
))]
UnsupportedColumnTypeForListingValues { column_name: String },
#[snafu(display("Table {} not found in partition {}", table, partition))]
TableNotFoundInPartition { table: u32, partition: String },
@ -344,14 +372,26 @@ impl Database for Db {
}
}
/// return all column values in this database, while applying optional predicates
async fn column_values(
&self,
_column_name: &str,
_table: Option<String>,
_range: Option<TimestampRange>,
_predicate: Option<Predicate>,
column_name: &str,
table: Option<String>,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
) -> Result<StringSetPlan, Self::Error> {
unimplemented!("Write buffer database does not yet implement column_values");
match predicate {
None => {
let mut visitor = ValueVisitor::new(column_name, range);
self.visit_tables(table, &mut visitor).await?;
Ok(visitor.column_values.into())
}
Some(predicate) => {
let mut visitor = ValuePredVisitor::new(column_name, range, predicate);
self.visit_tables(table, &mut visitor).await?;
Ok(visitor.plans.into())
}
}
}
async fn table_to_arrow(
@ -564,7 +604,7 @@ impl Visitor for NameVisitor {
for &column_id in &self.partition_column_ids {
let column_name = partition.dictionary.lookup_id(column_id).context(
ColumnIdNotFoundInDictionary {
column: column_id,
column_id,
partition: &partition.key,
},
)?;
@ -636,7 +676,177 @@ impl Visitor for NamePredVisitor {
);
Ok(())
}
} // next partition
}
/// return all values in the `column_name` column
/// in this database, while applying the timestamp range
///
/// Potential optimizations: Run this in parallel (in different
/// futures) for each partition / table, rather than a single one
/// -- but that will require building up parallel hash tables.
struct ValueVisitor<'a> {
column_name: &'a str,
// what column id we are looking for
column_id: Option<u32>,
timestamp_predicate: Option<TimestampPredicate>,
partition_value_ids: BTreeSet<u32>,
column_values: StringSet,
range: Option<TimestampRange>,
}
impl<'a> ValueVisitor<'a> {
fn new(column_name: &'a str, range: Option<TimestampRange>) -> Self {
Self {
column_name,
column_id: None,
timestamp_predicate: None,
column_values: StringSet::new(),
partition_value_ids: BTreeSet::new(),
range,
}
}
}
impl<'a> Visitor for ValueVisitor<'a> {
fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> {
self.partition_value_ids.clear();
self.column_id = Some(
partition
.dictionary
.lookup_value(self.column_name)
.context(ColumnNameNotFoundInDictionary {
column_name: self.column_name,
partition: &partition.key,
})?,
);
self.timestamp_predicate = partition
.make_timestamp_predicate(self.range)
.context(PartitionError)?;
Ok(())
}
fn visit_column(&mut self, table: &Table, column_id: u32, column: &Column) -> Result<()> {
if Some(column_id) != self.column_id {
return Ok(());
}
match column {
Column::Tag(column) => {
// if we have a timestamp prediate, find all values
// that the timestamp is within range. Otherwise take
// all values.
match self.timestamp_predicate {
None => {
// take all non-null values
column.iter().filter_map(|&s| s).for_each(|value_id| {
self.partition_value_ids.insert(value_id);
});
}
Some(pred) => {
// filter out all values that don't match the timestmap
let time_column =
table.column_i64(pred.time_column_id).context(TableError)?;
column
.iter()
.zip(time_column.iter())
.filter_map(|(&column_value_id, &timestamp_value)| {
if pred.range.contains_opt(timestamp_value) {
column_value_id
} else {
None
}
})
.for_each(|value_id| {
self.partition_value_ids.insert(value_id);
});
}
}
Ok(())
}
_ => UnsupportedColumnTypeForListingValues {
column_name: self.column_name,
}
.fail(),
}
}
fn post_visit_partition(&mut self, partition: &Partition) -> Result<()> {
// convert all the partition's column_ids to Strings
for &value_id in &self.partition_value_ids {
let value = partition.dictionary.lookup_id(value_id).context(
ColumnValueIdNotFoundInDictionary {
value_id,
partition: &partition.key,
},
)?;
if !self.column_values.contains(value) {
self.column_values.insert(value.to_string());
}
}
Ok(())
}
}
/// return all column values for the specified column in this
/// database, while applying the timestamp range and predicate
struct ValuePredVisitor<'a> {
column_name: &'a str,
// what column id we are looking for
timestamp_predicate: Option<TimestampPredicate>,
range: Option<TimestampRange>,
predicate: Predicate,
plans: Vec<LogicalPlan>,
}
impl<'a> ValuePredVisitor<'a> {
fn new(column_name: &'a str, range: Option<TimestampRange>, predicate: Predicate) -> Self {
Self {
column_name,
timestamp_predicate: None,
range,
predicate,
plans: Vec::new(),
}
}
}
impl<'a> Visitor for ValuePredVisitor<'a> {
fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> {
self.timestamp_predicate = partition
.make_timestamp_predicate(self.range)
.context(PartitionError)?;
Ok(())
}
// TODO try and rule out entire tables based on the same critera
// as explained on NamePredVisitor
fn pre_visit_table(&mut self, table: &Table, partition: &Partition) -> Result<()> {
// skip table entirely if there are no rows that fall in the timestamp
if !table
.matches_timestamp_predicate(self.timestamp_predicate.as_ref())
.context(TableError)?
{
return Ok(());
}
self.plans.push(
table
.tag_values_plan(
self.column_name,
&self.predicate,
self.timestamp_predicate.as_ref(),
partition,
)
.context(TableError)?,
);
Ok(())
}
}
// partition_key returns the partition key for the given line. The key will be the prefix of a
// partition name (multiple partitions can exist for each key). It uses the user defined
@ -1203,4 +1413,167 @@ disk bytes=23432323i 1600136510000000000",
assert_eq!(to_set(&["state", "city", "county"]), *actual_tag_keys);
Ok(())
}
#[tokio::test(threaded_scheduler)]
async fn list_column_values() -> Result {
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
let db = Db::try_with_wal("column_namedb", &mut dir).await?;
let lp_data = "h2o,state=CA,city=LA temp=70.4 100\n\
h2o,state=MA,city=Boston temp=72.4 250\n\
o2,state=MA,city=Boston temp=50.4 200\n\
o2,state=CA temp=79.0 300\n\
o2,state=NY temp=60.8 400\n";
let lines: Vec<_> = parse_lines(lp_data).map(|l| l.unwrap()).collect();
db.write_lines(&lines).await?;
#[derive(Debug)]
struct TestCase<'a> {
description: &'a str,
column_name: &'a str,
measurement: Option<String>,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
expected_column_values: Result<Vec<&'a str>>,
};
let test_cases = vec![
TestCase {
description: "No predicates, 'state' col",
column_name: "state",
measurement: None,
range: None,
predicate: None,
expected_column_values: Ok(vec!["CA", "MA", "NY"]),
},
TestCase {
description: "No predicates, 'city' col",
column_name: "city",
measurement: None,
range: None,
predicate: None,
expected_column_values: Ok(vec!["Boston", "LA"]),
},
TestCase {
description: "Restrictions: timestamp",
column_name: "state",
measurement: None,
range: Some(TimestampRange::new(50, 201)),
predicate: None,
expected_column_values: Ok(vec!["CA", "MA"]),
},
TestCase {
description: "Restrictions: predicate",
column_name: "city",
measurement: None,
range: None,
predicate: make_column_eq_predicate("state", "MA"), // state=MA
expected_column_values: Ok(vec!["Boston"]),
},
TestCase {
description: "Restrictions: timestamp and predicate",
column_name: "state",
measurement: None,
range: Some(TimestampRange::new(150, 301)),
predicate: make_column_eq_predicate("state", "MA"), // state=MA
expected_column_values: Ok(vec!["MA"]),
},
TestCase {
description: "Restrictions: measurement name",
column_name: "state",
measurement: Some("h2o".to_string()),
range: None,
predicate: None,
expected_column_values: Ok(vec!["CA", "MA"]),
},
TestCase {
description: "Restrictions: measurement name, with nulls",
column_name: "city",
measurement: Some("o2".to_string()),
range: None,
predicate: None,
expected_column_values: Ok(vec!["Boston"]),
},
TestCase {
description: "Restrictions: measurement name and timestamp",
column_name: "state",
measurement: Some("o2".to_string()),
range: Some(TimestampRange::new(50, 201)),
predicate: None,
expected_column_values: Ok(vec!["MA"]),
},
TestCase {
description: "Restrictions: measurement name and predicate",
column_name: "state",
measurement: Some("o2".to_string()),
range: None,
predicate: make_column_eq_predicate("state", "NY"), // state=NY
expected_column_values: Ok(vec!["NY"]),
},
TestCase {
description: "Restrictions: measurement name, timestamp and predicate",
column_name: "state",
measurement: Some("o2".to_string()),
range: Some(TimestampRange::new(1, 550)),
predicate: make_column_eq_predicate("state", "NY"), // state=NY
expected_column_values: Ok(vec!["NY"]),
},
TestCase {
description: "Restrictions: measurement name, timestamp and predicate: no match",
column_name: "state",
measurement: Some("o2".to_string()),
range: Some(TimestampRange::new(1, 300)), // filters out the NY row
predicate: make_column_eq_predicate("state", "NY"), // state=NY
expected_column_values: Ok(vec![]),
},
];
for test_case in test_cases.into_iter() {
let test_case_str = format!("{:#?}", test_case);
println!("Running test case: {:?}", test_case);
let column_values_plan = db
.column_values(
test_case.column_name,
test_case.measurement,
test_case.range,
test_case.predicate,
)
.await
.expect("Created tag_values plan successfully");
// run the execution plan (
let executor = Executor::default();
let actual_column_values = executor.to_string_set(column_values_plan).await;
let is_match = if let Ok(expected_column_values) = &test_case.expected_column_values {
let expected_column_values = to_set(expected_column_values);
if let Ok(actual_column_values) = &actual_column_values {
**actual_column_values == expected_column_values
} else {
false
}
} else if let Err(e) = &actual_column_values {
// use string compare to compare errors to avoid having to build exact errors
format!("{:?}", e) == format!("{:?}", test_case.expected_column_values)
} else {
false
};
assert!(
is_match,
"Mismatch\n\
actual_column_values: \n\
{:?}\n\
expected_column_values: \n\
{:?}\n\
Test_case: \n\
{}",
actual_column_values, test_case.expected_column_values, test_case_str
);
}
Ok(())
}
}

View File

@ -64,6 +64,18 @@ pub enum Error {
inserted_value_type: String,
},
#[snafu(display(
"Internal error: Expected column {} to be type {} but was {}",
column_id,
expected_column_type,
actual_column_type
))]
InternalColumnTypeMismatch {
column_id: u32,
expected_column_type: String,
actual_column_type: String,
},
#[snafu(display(
"Column name '{}' not found in dictionary of partition {}",
column_name,
@ -75,6 +87,17 @@ pub enum Error {
source: DictionaryError,
},
#[snafu(display(
"Internal: Column id '{}' not found in dictionary of partition {}",
column_id,
partition
))]
ColumnIdNotFoundInDictionary {
column_id: u32,
partition: String,
source: DictionaryError,
},
#[snafu(display(
"Schema mismatch: for column {}: can't insert {} into column with type {}",
column,
@ -222,6 +245,21 @@ impl Table {
.expect("invalid column id"))
}
/// Returns a reference to the specified column as a slice of
/// i64s. Errors if the type is not i64
pub fn column_i64(&self, column_id: u32) -> Result<&[Option<i64>]> {
let column = self.column(column_id)?;
match column {
Column::I64(vals) => Ok(vals),
_ => InternalColumnTypeMismatch {
column_id,
expected_column_type: "i64",
actual_column_type: column.type_description(),
}
.fail(),
}
}
pub fn append_rows(
&mut self,
dictionary: &mut Dictionary,
@ -285,6 +323,7 @@ impl Table {
})
.collect::<Vec<_>>();
// TODO avoid materializing here
let data = self.to_arrow_impl(partition, &requested_columns_with_index)?;
let schema = data.schema();
@ -327,6 +366,59 @@ impl Table {
Ok(plan)
}
/// Creates a DataFusion LogicalPlan that returns column values as a
/// single column of Strings
///
/// The created plan looks like:
///
/// Projection
/// Filter(predicate)
/// InMemoryScan
pub fn tag_values_plan(
&self,
column_name: &str,
predicate: &Predicate,
timestamp_predicate: Option<&TimestampPredicate>,
partition: &Partition,
) -> Result<LogicalPlan> {
// Note we also need to add a timestamp predicate to this
// expression as some additional rows may be filtered out (and
// we couldn't prune out the entire Table based on range)
let df_predicate = predicate.expr.clone();
let df_predicate = match timestamp_predicate {
None => df_predicate,
Some(timestamp_predicate) => {
Self::add_timestamp_predicate_expr(df_predicate, timestamp_predicate)
}
};
// TODO avoid materializing all the columns here (ideally
// DataFusion can prune them out)
let data = self.all_to_arrow(partition)?;
let schema = data.schema();
let projection = None;
let projected_schema = schema.clone();
let select_exprs = vec![Expr::Column(column_name.into())];
// And build the plan!
let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan {
data: vec![vec![data]],
schema,
projection,
projected_schema,
});
plan_builder
.filter(df_predicate)
.context(BuildingPlan)?
.project(select_exprs)
.context(BuildingPlan)?
.build()
.context(BuildingPlan)
}
/// Creates a DataFusion predicate of the form:
///
/// `expr AND (range.start <= time and time < range.end)`
@ -387,6 +479,25 @@ impl Table {
self.to_arrow_impl(partition, &requested_columns_with_index)
}
/// Convert all columns to an arrow record batch
pub fn all_to_arrow(&self, partition: &Partition) -> Result<RecordBatch> {
let requested_columns_with_index = self
.column_id_to_index
.iter()
.map(|(&column_id, &column_index)| {
let column_name = partition.dictionary.lookup_id(column_id).context(
ColumnIdNotFoundInDictionary {
column_id,
partition: &partition.key,
},
)?;
Ok((column_name, column_index))
})
.collect::<Result<Vec<_>>>()?;
self.to_arrow_impl(partition, &requested_columns_with_index)
}
/// Converts this table to an arrow record batch,
///
/// requested columns with index are tuples of column_name, column_index