From aaeb0d4c843a41aaf2a9377170ac3ea7af03b7fc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 8 Oct 2020 11:21:54 -0400 Subject: [PATCH] refactor: implement automatic error conversion for errors that do not have lots of context (#341) * refactor: implement automatic error conversion for errors that do not have lots of context * fix: implement code review suggestions --- delorean_write_buffer/src/database.rs | 91 +++++++++++++------------- delorean_write_buffer/src/partition.rs | 3 - 2 files changed, 45 insertions(+), 49 deletions(-) diff --git a/delorean_write_buffer/src/database.rs b/delorean_write_buffer/src/database.rs index 68dcdc9438..87e716fd74 100644 --- a/delorean_write_buffer/src/database.rs +++ b/delorean_write_buffer/src/database.rs @@ -86,11 +86,11 @@ pub enum Error { #[snafu(display("Partition {} is full", partition))] PartitionFull { partition: String }, - #[snafu(display("Error in partition: {}", source))] - PartitionError { source: crate::partition::Error }, - - #[snafu(display("Error in table: {}", source))] - TableError { source: crate::table::Error }, + #[snafu(display("Error in {}: {}", source_module, source))] + PassThrough { + source_module: &'static str, + source: Box, + }, #[snafu(display( "Table name {} not found in dictionary of partition {}", @@ -190,6 +190,24 @@ pub enum Error { GenericQueryError { message: String, query: String }, } +impl From for Error { + fn from(e: crate::table::Error) -> Self { + Self::PassThrough { + source_module: "Table", + source: Box::new(e), + } + } +} + +impl From for Error { + fn from(e: crate::partition::Error) -> Self { + Self::PassThrough { + source_module: "Partition", + source: Box::new(e), + } + } +} + pub type Result = std::result::Result; #[derive(Debug)] @@ -300,10 +318,10 @@ impl Database for Db { .expect("partition key should have been inserted"); match partitions.iter_mut().find(|p| p.should_write(key)) { - Some(p) => p.write_entry(&entry).context(PartitionError)?, + Some(p) => p.write_entry(&entry)?, None => { let mut p = Partition::new(key); - p.write_entry(&entry).context(PartitionError)?; + p.write_entry(&entry)?; partitions.push(p) } } @@ -327,14 +345,9 @@ impl Database for Db { let partitions = self.partitions.read().await; let mut table_names: BTreeSet = BTreeSet::new(); for partition in partitions.iter() { - let timestamp_predicate = partition - .make_timestamp_predicate(range) - .context(PartitionError)?; + let timestamp_predicate = partition.make_timestamp_predicate(range)?; for (table_name_symbol, table) in &partition.tables { - if table - .matches_timestamp_predicate(timestamp_predicate.as_ref()) - .context(TableError)? - { + if table.matches_timestamp_predicate(timestamp_predicate.as_ref())? { let table_name = partition.dictionary.lookup_id(*table_name_symbol).context( TableIdNotFoundInDictionary { table: *table_name_symbol, @@ -401,13 +414,12 @@ impl Database for Db { ) -> Result, Self::Error> { let partitions = self.partitions.read().await; - partitions + let batches = partitions .iter() - .map(|p| { - p.table_to_arrow(table_name, columns) - .context(PartitionError) - }) - .collect::>>() + .map(|p| p.table_to_arrow(table_name, columns)) + .collect::, crate::partition::Error>>()?; + + Ok(batches) } async fn query(&self, query: &str) -> Result, Self::Error> { @@ -548,9 +560,7 @@ impl Db { for partition in partitions.iter() { // The id of the timestamp column is specific to each partition - let timestamp_predicate = partition - .make_timestamp_predicate(range) - .context(PartitionError)?; + let timestamp_predicate = partition.make_timestamp_predicate(range)?; let ts_pred = timestamp_predicate.as_ref(); @@ -569,9 +579,7 @@ impl Db { for table in partition.tables.values() { // Apply predicates, if any, to skip processing the table entirely if table.matches_id_predicate(&table_symbol) - && table - .matches_timestamp_predicate(ts_pred) - .context(TableError)? + && table.matches_timestamp_predicate(ts_pred)? { visitor.pre_visit_table(table, partition, ts_pred)?; @@ -618,10 +626,7 @@ impl Visitor for NameVisitor { ts_pred: Option<&TimestampPredicate>, ) -> Result<()> { if let Column::Tag(column) = column { - if table - .column_matches_timestamp_predicate(column, ts_pred) - .context(TableError)? - { + if table.column_matches_timestamp_predicate(column, ts_pred)? { self.partition_column_ids.insert(column_id); } } @@ -685,11 +690,8 @@ impl Visitor for NamePredVisitor { partition: &Partition, ts_pred: Option<&TimestampPredicate>, ) -> Result<()> { - self.plans.push( - table - .tag_column_names_plan(&self.predicate, ts_pred, partition) - .context(TableError)?, - ); + self.plans + .push(table.tag_column_names_plan(&self.predicate, ts_pred, partition)?); Ok(()) } } @@ -761,8 +763,7 @@ impl<'a> Visitor for ValueVisitor<'a> { } Some(pred) => { // filter out all values that don't match the timestmap - let time_column = - table.column_i64(pred.time_column_id).context(TableError)?; + let time_column = table.column_i64(pred.time_column_id)?; column .iter() @@ -834,18 +835,16 @@ impl<'a> Visitor for ValuePredVisitor<'a> { ts_pred: Option<&TimestampPredicate>, ) -> Result<()> { // skip table entirely if there are no rows that fall in the timestamp - if !table - .matches_timestamp_predicate(ts_pred) - .context(TableError)? - { + if !table.matches_timestamp_predicate(ts_pred)? { return Ok(()); } - self.plans.push( - table - .tag_values_plan(self.column_name, &self.predicate, ts_pred, partition) - .context(TableError)?, - ); + self.plans.push(table.tag_values_plan( + self.column_name, + &self.predicate, + ts_pred, + partition, + )?); Ok(()) } } diff --git a/delorean_write_buffer/src/partition.rs b/delorean_write_buffer/src/partition.rs index edf982f5bc..714fde4d05 100644 --- a/delorean_write_buffer/src/partition.rs +++ b/delorean_write_buffer/src/partition.rs @@ -37,9 +37,6 @@ pub enum Error { source: crate::table::Error, }, - #[snafu(display("Table Error:: {}", source))] - TableError { source: crate::table::Error }, - #[snafu(display("Table Error in '{}': {}", table_name, source))] NamedTableError { table_name: String,