refactor: apply timestamp predicate in visit code (#340)
parent
9a81bf4d72
commit
5400c55b2a
|
@ -360,13 +360,13 @@ impl Database for Db {
|
|||
) -> Result<StringSetPlan, Self::Error> {
|
||||
match predicate {
|
||||
None => {
|
||||
let mut visitor = NameVisitor::new(range);
|
||||
self.visit_tables(table, &mut visitor).await?;
|
||||
let mut visitor = NameVisitor::new();
|
||||
self.visit_tables(table, range, &mut visitor).await?;
|
||||
Ok(visitor.column_names.into())
|
||||
}
|
||||
Some(predicate) => {
|
||||
let mut visitor = NamePredVisitor::new(range, predicate);
|
||||
self.visit_tables(table, &mut visitor).await?;
|
||||
let mut visitor = NamePredVisitor::new(predicate);
|
||||
self.visit_tables(table, range, &mut visitor).await?;
|
||||
Ok(visitor.plans.into())
|
||||
}
|
||||
}
|
||||
|
@ -382,13 +382,13 @@ impl Database for Db {
|
|||
) -> Result<StringSetPlan, Self::Error> {
|
||||
match predicate {
|
||||
None => {
|
||||
let mut visitor = ValueVisitor::new(column_name, range);
|
||||
self.visit_tables(table, &mut visitor).await?;
|
||||
let mut visitor = ValueVisitor::new(column_name);
|
||||
self.visit_tables(table, range, &mut visitor).await?;
|
||||
Ok(visitor.column_values.into())
|
||||
}
|
||||
Some(predicate) => {
|
||||
let mut visitor = ValuePredVisitor::new(column_name, range, predicate);
|
||||
self.visit_tables(table, &mut visitor).await?;
|
||||
let mut visitor = ValuePredVisitor::new(column_name, predicate);
|
||||
self.visit_tables(table, range, &mut visitor).await?;
|
||||
Ok(visitor.plans.into())
|
||||
}
|
||||
}
|
||||
|
@ -497,12 +497,23 @@ trait Visitor {
|
|||
}
|
||||
|
||||
// called once before any column in a Table is visited
|
||||
fn pre_visit_table(&mut self, _table: &Table, _partition: &Partition) -> Result<()> {
|
||||
fn pre_visit_table(
|
||||
&mut self,
|
||||
_table: &Table,
|
||||
_partition: &Partition,
|
||||
_ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// called every time a column is visited
|
||||
fn visit_column(&mut self, _table: &Table, _column_id: u32, _column: &Column) -> Result<()> {
|
||||
fn visit_column(
|
||||
&mut self,
|
||||
_table: &Table,
|
||||
_column_id: u32,
|
||||
_column: &Column,
|
||||
_ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -518,12 +529,31 @@ trait Visitor {
|
|||
}
|
||||
|
||||
impl Db {
|
||||
/// Traverse this database's tables, calling the relevant functions, in
|
||||
/// order, of `visitor`, as described on the Visitor trait
|
||||
async fn visit_tables<V: Visitor>(&self, table: Option<String>, visitor: &mut V) -> Result<()> {
|
||||
/// Traverse this database's tables, calling the relevant
|
||||
/// functions, in order, of `visitor`, as described on the Visitor
|
||||
/// trait.
|
||||
///
|
||||
/// If table is specified, skips tables that do not match the
|
||||
/// name.
|
||||
///
|
||||
/// If range is specified, skips tables which do not have any
|
||||
/// values within the timestamp range.
|
||||
async fn visit_tables<V: Visitor>(
|
||||
&self,
|
||||
table: Option<String>,
|
||||
range: Option<TimestampRange>,
|
||||
visitor: &mut V,
|
||||
) -> Result<()> {
|
||||
let partitions = self.partitions.read().await;
|
||||
|
||||
for partition in partitions.iter() {
|
||||
// The id of the timestamp column is specific to each partition
|
||||
let timestamp_predicate = partition
|
||||
.make_timestamp_predicate(range)
|
||||
.context(PartitionError)?;
|
||||
|
||||
let ts_pred = timestamp_predicate.as_ref();
|
||||
|
||||
// ids are relative to a specific partition
|
||||
let table_symbol = match &table {
|
||||
Some(name) => Some(partition.dictionary.lookup_value(&name).context(
|
||||
|
@ -535,22 +565,27 @@ impl Db {
|
|||
None => None,
|
||||
};
|
||||
|
||||
// The table iterator is the list of tables we want to process
|
||||
let table_iter = partition
|
||||
.tables
|
||||
.values()
|
||||
// filter out any tables that don't pass the predicates
|
||||
.filter(|table| table.matches_id_predicate(&table_symbol));
|
||||
|
||||
visitor.pre_visit_partition(partition)?;
|
||||
for table in table_iter {
|
||||
visitor.pre_visit_table(table, partition)?;
|
||||
for table in partition.tables.values() {
|
||||
// Apply predicates, if any, to skip processing the table entirely
|
||||
if table.matches_id_predicate(&table_symbol)
|
||||
&& table
|
||||
.matches_timestamp_predicate(ts_pred)
|
||||
.context(TableError)?
|
||||
{
|
||||
visitor.pre_visit_table(table, partition, ts_pred)?;
|
||||
|
||||
for (column_id, column_index) in &table.column_id_to_index {
|
||||
visitor.visit_column(table, *column_id, &table.columns[*column_index])?
|
||||
for (column_id, column_index) in &table.column_id_to_index {
|
||||
visitor.visit_column(
|
||||
table,
|
||||
*column_id,
|
||||
&table.columns[*column_index],
|
||||
ts_pred,
|
||||
)?
|
||||
}
|
||||
|
||||
visitor.post_visit_table(table, partition)?;
|
||||
}
|
||||
|
||||
visitor.post_visit_table(table, partition)?;
|
||||
}
|
||||
visitor.post_visit_partition(partition)?;
|
||||
} // next partition
|
||||
|
@ -561,28 +596,30 @@ impl Db {
|
|||
|
||||
/// return all column names in this database, while applying the timestamp range
|
||||
struct NameVisitor {
|
||||
timestamp_predicate: Option<TimestampPredicate>,
|
||||
column_names: StringSet,
|
||||
partition_column_ids: BTreeSet<u32>,
|
||||
range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
impl NameVisitor {
|
||||
fn new(range: Option<TimestampRange>) -> Self {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
timestamp_predicate: None,
|
||||
column_names: StringSet::new(),
|
||||
partition_column_ids: BTreeSet::new(),
|
||||
range,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visitor for NameVisitor {
|
||||
fn visit_column(&mut self, table: &Table, column_id: u32, column: &Column) -> Result<()> {
|
||||
fn visit_column(
|
||||
&mut self,
|
||||
table: &Table,
|
||||
column_id: u32,
|
||||
column: &Column,
|
||||
ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
if let Column::Tag(column) = column {
|
||||
if table
|
||||
.column_matches_timestamp_predicate(column, self.timestamp_predicate.as_ref())
|
||||
.column_matches_timestamp_predicate(column, ts_pred)
|
||||
.context(TableError)?
|
||||
{
|
||||
self.partition_column_ids.insert(column_id);
|
||||
|
@ -591,11 +628,8 @@ impl Visitor for NameVisitor {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> {
|
||||
fn pre_visit_partition(&mut self, _partition: &Partition) -> Result<()> {
|
||||
self.partition_column_ids.clear();
|
||||
self.timestamp_predicate = partition
|
||||
.make_timestamp_predicate(self.range)
|
||||
.context(PartitionError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -631,17 +665,13 @@ impl Visitor for NameVisitor {
|
|||
/// predicate values, e.g., if you have env = "us-west" and a
|
||||
/// table's env column has the range ["eu-south", "us-north"].
|
||||
struct NamePredVisitor {
|
||||
timestamp_predicate: Option<TimestampPredicate>,
|
||||
range: Option<TimestampRange>,
|
||||
predicate: Predicate,
|
||||
plans: Vec<LogicalPlan>,
|
||||
}
|
||||
|
||||
impl NamePredVisitor {
|
||||
fn new(range: Option<TimestampRange>, predicate: Predicate) -> Self {
|
||||
fn new(predicate: Predicate) -> Self {
|
||||
Self {
|
||||
timestamp_predicate: None,
|
||||
range,
|
||||
predicate,
|
||||
plans: Vec::new(),
|
||||
}
|
||||
|
@ -649,29 +679,15 @@ impl NamePredVisitor {
|
|||
}
|
||||
|
||||
impl Visitor for NamePredVisitor {
|
||||
fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> {
|
||||
self.timestamp_predicate = partition
|
||||
.make_timestamp_predicate(self.range)
|
||||
.context(PartitionError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pre_visit_table(&mut self, table: &Table, partition: &Partition) -> Result<()> {
|
||||
// skip table entirely if there are no rows that fall in the timestamp
|
||||
if !table
|
||||
.matches_timestamp_predicate(self.timestamp_predicate.as_ref())
|
||||
.context(TableError)?
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fn pre_visit_table(
|
||||
&mut self,
|
||||
table: &Table,
|
||||
partition: &Partition,
|
||||
ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
self.plans.push(
|
||||
table
|
||||
.tag_column_names_plan(
|
||||
&self.predicate,
|
||||
self.timestamp_predicate.as_ref(),
|
||||
partition,
|
||||
)
|
||||
.tag_column_names_plan(&self.predicate, ts_pred, partition)
|
||||
.context(TableError)?,
|
||||
);
|
||||
Ok(())
|
||||
|
@ -688,21 +704,17 @@ struct ValueVisitor<'a> {
|
|||
column_name: &'a str,
|
||||
// what column id we are looking for
|
||||
column_id: Option<u32>,
|
||||
timestamp_predicate: Option<TimestampPredicate>,
|
||||
partition_value_ids: BTreeSet<u32>,
|
||||
column_values: StringSet,
|
||||
range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
impl<'a> ValueVisitor<'a> {
|
||||
fn new(column_name: &'a str, range: Option<TimestampRange>) -> Self {
|
||||
fn new(column_name: &'a str) -> Self {
|
||||
Self {
|
||||
column_name,
|
||||
column_id: None,
|
||||
timestamp_predicate: None,
|
||||
column_values: StringSet::new(),
|
||||
partition_value_ids: BTreeSet::new(),
|
||||
range,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -721,14 +733,16 @@ impl<'a> Visitor for ValueVisitor<'a> {
|
|||
})?,
|
||||
);
|
||||
|
||||
self.timestamp_predicate = partition
|
||||
.make_timestamp_predicate(self.range)
|
||||
.context(PartitionError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn visit_column(&mut self, table: &Table, column_id: u32, column: &Column) -> Result<()> {
|
||||
fn visit_column(
|
||||
&mut self,
|
||||
table: &Table,
|
||||
column_id: u32,
|
||||
column: &Column,
|
||||
ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
if Some(column_id) != self.column_id {
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -738,7 +752,7 @@ impl<'a> Visitor for ValueVisitor<'a> {
|
|||
// if we have a timestamp prediate, find all values
|
||||
// that the timestamp is within range. Otherwise take
|
||||
// all values.
|
||||
match self.timestamp_predicate {
|
||||
match ts_pred {
|
||||
None => {
|
||||
// take all non-null values
|
||||
column.iter().filter_map(|&s| s).for_each(|value_id| {
|
||||
|
@ -796,19 +810,14 @@ impl<'a> Visitor for ValueVisitor<'a> {
|
|||
/// database, while applying the timestamp range and predicate
|
||||
struct ValuePredVisitor<'a> {
|
||||
column_name: &'a str,
|
||||
// what column id we are looking for
|
||||
timestamp_predicate: Option<TimestampPredicate>,
|
||||
range: Option<TimestampRange>,
|
||||
predicate: Predicate,
|
||||
plans: Vec<LogicalPlan>,
|
||||
}
|
||||
|
||||
impl<'a> ValuePredVisitor<'a> {
|
||||
fn new(column_name: &'a str, range: Option<TimestampRange>, predicate: Predicate) -> Self {
|
||||
fn new(column_name: &'a str, predicate: Predicate) -> Self {
|
||||
Self {
|
||||
column_name,
|
||||
timestamp_predicate: None,
|
||||
range,
|
||||
predicate,
|
||||
plans: Vec::new(),
|
||||
}
|
||||
|
@ -816,19 +825,17 @@ impl<'a> ValuePredVisitor<'a> {
|
|||
}
|
||||
|
||||
impl<'a> Visitor for ValuePredVisitor<'a> {
|
||||
fn pre_visit_partition(&mut self, partition: &Partition) -> Result<()> {
|
||||
self.timestamp_predicate = partition
|
||||
.make_timestamp_predicate(self.range)
|
||||
.context(PartitionError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO try and rule out entire tables based on the same critera
|
||||
// as explained on NamePredVisitor
|
||||
fn pre_visit_table(&mut self, table: &Table, partition: &Partition) -> Result<()> {
|
||||
fn pre_visit_table(
|
||||
&mut self,
|
||||
table: &Table,
|
||||
partition: &Partition,
|
||||
ts_pred: Option<&TimestampPredicate>,
|
||||
) -> Result<()> {
|
||||
// skip table entirely if there are no rows that fall in the timestamp
|
||||
if !table
|
||||
.matches_timestamp_predicate(self.timestamp_predicate.as_ref())
|
||||
.matches_timestamp_predicate(ts_pred)
|
||||
.context(TableError)?
|
||||
{
|
||||
return Ok(());
|
||||
|
@ -836,12 +843,7 @@ impl<'a> Visitor for ValuePredVisitor<'a> {
|
|||
|
||||
self.plans.push(
|
||||
table
|
||||
.tag_values_plan(
|
||||
self.column_name,
|
||||
&self.predicate,
|
||||
self.timestamp_predicate.as_ref(),
|
||||
partition,
|
||||
)
|
||||
.tag_values_plan(self.column_name, &self.predicate, ts_pred, partition)
|
||||
.context(TableError)?,
|
||||
);
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue