refactor: make `QueryChunk` object-safe

This makes it way easier to dyn-type database implementations. The only
real change is that we make `QueryChunk::Error` opaque. Nobody is going
to inspect that anyways, it's just printed to the user.

This is a follow-up of #4053.

Ref #3934.
pull/24376/head
Marco Neumann 2022-03-18 11:40:31 +01:00
parent a122b1e2ca
commit 169fa2fb2f
8 changed files with 71 additions and 284 deletions

View File

@ -12,7 +12,7 @@ use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata};
use predicate::{Predicate, PredicateMatch}; use predicate::{Predicate, PredicateMatch};
use query::{ use query::{
exec::{stringset::StringSet, IOxExecutionContext}, exec::{stringset::StringSet, IOxExecutionContext},
QueryChunk, QueryChunkMeta, QueryChunk, QueryChunkError, QueryChunkMeta,
}; };
use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -102,8 +102,6 @@ impl QueryChunkMeta for QueryableParquetChunk {
} }
impl QueryChunk for QueryableParquetChunk { impl QueryChunk for QueryableParquetChunk {
type Error = Error;
// Todo: This function should not be used in this NG chunk context // Todo: This function should not be used in this NG chunk context
// For now, since we also use scan for both OG and NG, the chunk id // For now, since we also use scan for both OG and NG, the chunk id
// is used as second key in build_deduplicate_plan_for_overlapped_chunks // is used as second key in build_deduplicate_plan_for_overlapped_chunks
@ -141,7 +139,7 @@ impl QueryChunk for QueryableParquetChunk {
fn apply_predicate_to_metadata( fn apply_predicate_to_metadata(
&self, &self,
_predicate: &Predicate, _predicate: &Predicate,
) -> Result<PredicateMatch, Self::Error> { ) -> Result<PredicateMatch, QueryChunkError> {
Ok(PredicateMatch::Unknown) Ok(PredicateMatch::Unknown)
} }
@ -154,7 +152,7 @@ impl QueryChunk for QueryableParquetChunk {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
_predicate: &Predicate, _predicate: &Predicate,
_columns: Selection<'_>, _columns: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
Ok(None) Ok(None)
} }
@ -168,7 +166,7 @@ impl QueryChunk for QueryableParquetChunk {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
_column_name: &str, _column_name: &str,
_predicate: &Predicate, _predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
Ok(None) Ok(None)
} }
@ -190,7 +188,7 @@ impl QueryChunk for QueryableParquetChunk {
mut ctx: IOxExecutionContext, mut ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, QueryChunkError> {
ctx.set_metadata("storage", "compactor"); ctx.set_metadata("storage", "compactor");
ctx.set_metadata("projection", format!("{}", selection)); ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection"); trace!(?selection, "selection");
@ -198,6 +196,7 @@ impl QueryChunk for QueryableParquetChunk {
self.data self.data
.read_filter(predicate, selection) .read_filter(predicate, selection)
.context(ReadParquetSnafu) .context(ReadParquetSnafu)
.map_err(|e| Box::new(e) as _)
} }
/// Returns chunk type /// Returns chunk type

View File

@ -17,6 +17,7 @@ use parquet_file::chunk::ParquetChunk;
use partition_metadata::TableSummary; use partition_metadata::TableSummary;
use predicate::{Predicate, PredicateMatch}; use predicate::{Predicate, PredicateMatch};
use query::exec::IOxExecutionContext; use query::exec::IOxExecutionContext;
use query::QueryChunkError;
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk; use read_buffer::RBChunk;
use schema::InfluxColumnType; use schema::InfluxColumnType;
@ -290,8 +291,6 @@ impl DbChunk {
} }
impl QueryChunk for DbChunk { impl QueryChunk for DbChunk {
type Error = Error;
fn id(&self) -> ChunkId { fn id(&self) -> ChunkId {
self.addr.chunk_id self.addr.chunk_id
} }
@ -313,7 +312,10 @@ impl QueryChunk for DbChunk {
matches!(self.state, State::MutableBuffer { .. }) matches!(self.state, State::MutableBuffer { .. })
} }
fn apply_predicate_to_metadata(&self, predicate: &Predicate) -> Result<PredicateMatch> { fn apply_predicate_to_metadata(
&self,
predicate: &Predicate,
) -> Result<PredicateMatch, QueryChunkError> {
let pred_result = match &self.state { let pred_result = match &self.state {
State::MutableBuffer { chunk, .. } => { State::MutableBuffer { chunk, .. } => {
if predicate.has_exprs() || chunk.has_timerange(&predicate.range) { if predicate.has_exprs() || chunk.has_timerange(&predicate.range) {
@ -373,7 +375,7 @@ impl QueryChunk for DbChunk {
mut ctx: IOxExecutionContext, mut ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, QueryChunkError> {
// Predicate is not required to be applied for correctness. We only pushed it down // Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain // when possible for performance gain
@ -451,6 +453,7 @@ impl QueryChunk for DbChunk {
.context(ParquetFileChunkSnafu { .context(ParquetFileChunkSnafu {
chunk_id: self.id(), chunk_id: self.id(),
}) })
.map_err(|e| Box::new(e) as _)
} }
} }
} }
@ -460,7 +463,7 @@ impl QueryChunk for DbChunk {
mut ctx: IOxExecutionContext, mut ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
columns: Selection<'_>, columns: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
ctx.set_metadata("storage", self.state.state_name()); ctx.set_metadata("storage", self.state.state_name());
ctx.set_metadata("projection", format!("{}", columns)); ctx.set_metadata("projection", format!("{}", columns));
ctx.set_metadata("predicate", format!("{}", &predicate)); ctx.set_metadata("predicate", format!("{}", &predicate));
@ -513,7 +516,7 @@ impl QueryChunk for DbChunk {
mut ctx: IOxExecutionContext, mut ctx: IOxExecutionContext,
column_name: &str, column_name: &str,
predicate: &Predicate, predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
ctx.set_metadata("storage", self.state.state_name()); ctx.set_metadata("storage", self.state.state_name());
ctx.set_metadata("column_name", column_name.to_string()); ctx.set_metadata("column_name", column_name.to_string());
ctx.set_metadata("predicate", format!("{}", &predicate)); ctx.set_metadata("predicate", format!("{}", &predicate));

View File

@ -1381,7 +1381,7 @@ mod tests {
use predicate::{PredicateBuilder, PredicateMatch}; use predicate::{PredicateBuilder, PredicateMatch};
use query::{ use query::{
exec::Executor, exec::Executor,
test::{TestChunk, TestDatabase, TestError}, test::{TestChunk, TestDatabase},
}; };
use test_helpers::{assert_contains, tracing::TracingCapture}; use test_helpers::{assert_contains, tracing::TracingCapture};
@ -1465,7 +1465,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk0)) .add_chunk("my_partition_key", Arc::new(chunk0))
.add_chunk("my_partition_key", Arc::new(chunk1)); .add_chunk("my_partition_key", Arc::new(chunk1));
@ -1567,7 +1566,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk0)) .add_chunk("my_partition_key", Arc::new(chunk0))
.add_chunk("my_partition_key", Arc::new(chunk1)); .add_chunk("my_partition_key", Arc::new(chunk1));
@ -1617,7 +1615,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -1662,7 +1659,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk0)) .add_chunk("my_partition_key", Arc::new(chunk0))
.add_chunk("my_partition_key", Arc::new(chunk1)); .add_chunk("my_partition_key", Arc::new(chunk1));
@ -1724,7 +1720,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -1766,7 +1761,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -1816,7 +1810,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let tag_values = vec!["h2o"]; let tag_values = vec!["h2o"];
@ -1848,7 +1841,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -1887,7 +1879,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -1956,7 +1947,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk1)) .add_chunk("my_partition_key", Arc::new(chunk1))
.add_chunk("my_partition_key", Arc::new(chunk2)); .add_chunk("my_partition_key", Arc::new(chunk2));
@ -2173,7 +2163,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2216,7 +2205,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2257,7 +2245,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2359,7 +2346,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2424,7 +2410,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2476,7 +2461,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2516,7 +2500,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2559,7 +2542,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2610,7 +2592,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2664,7 +2645,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2724,7 +2704,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2778,7 +2757,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2819,7 +2797,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_info.db_name()) .db_or_create(db_info.db_name())
.await .await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk)); .add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClient::read_source(&db_info, 1)); let source = Some(StorageClient::read_source(&db_info, 1));
@ -2999,7 +2976,6 @@ mod tests {
.test_storage .test_storage
.db_or_create(db_name) .db_or_create(db_name)
.await .await
.expect("getting db")
.get_chunk(partition_key, ChunkId::new_test(chunk_id)) .get_chunk(partition_key, ChunkId::new_test(chunk_id))
.unwrap() .unwrap()
.predicates(); .predicates();
@ -3025,15 +3001,15 @@ mod tests {
Self::default() Self::default()
} }
async fn db_or_create(&self, name: &str) -> Result<Arc<TestDatabase>, TestError> { async fn db_or_create(&self, name: &str) -> Arc<TestDatabase> {
let mut databases = self.databases.lock(); let mut databases = self.databases.lock();
if let Some(db) = databases.get(name) { if let Some(db) = databases.get(name) {
Ok(Arc::clone(db)) Arc::clone(db)
} else { } else {
let new_db = Arc::new(TestDatabase::new(Arc::clone(&self.executor))); let new_db = Arc::new(TestDatabase::new(Arc::clone(&self.executor)));
databases.insert(name.to_string(), Arc::clone(&new_db)); databases.insert(name.to_string(), Arc::clone(&new_db));
Ok(new_db) new_db
} }
} }
} }

View File

@ -24,7 +24,7 @@ use predicate::{Predicate, PredicateMatch};
use query::{ use query::{
exec::{stringset::StringSet, IOxExecutionContext}, exec::{stringset::StringSet, IOxExecutionContext},
util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull}, util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull},
QueryChunk, QueryChunkMeta, QueryChunk, QueryChunkError, QueryChunkMeta,
}; };
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
@ -127,8 +127,6 @@ impl QueryChunkMeta for QueryableBatch {
} }
impl QueryChunk for QueryableBatch { impl QueryChunk for QueryableBatch {
type Error = Error;
// This function should not be used in QueryBatch context // This function should not be used in QueryBatch context
fn id(&self) -> ChunkId { fn id(&self) -> ChunkId {
// always return id 0 for debugging mode // always return id 0 for debugging mode
@ -162,7 +160,7 @@ impl QueryChunk for QueryableBatch {
fn apply_predicate_to_metadata( fn apply_predicate_to_metadata(
&self, &self,
_predicate: &Predicate, _predicate: &Predicate,
) -> Result<PredicateMatch, Self::Error> { ) -> Result<PredicateMatch, QueryChunkError> {
Ok(PredicateMatch::Unknown) Ok(PredicateMatch::Unknown)
} }
@ -175,7 +173,7 @@ impl QueryChunk for QueryableBatch {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
_predicate: &Predicate, _predicate: &Predicate,
_columns: Selection<'_>, _columns: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
Ok(None) Ok(None)
} }
@ -189,7 +187,7 @@ impl QueryChunk for QueryableBatch {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
_column_name: &str, _column_name: &str,
_predicate: &Predicate, _predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
Ok(None) Ok(None)
} }
@ -211,7 +209,7 @@ impl QueryChunk for QueryableBatch {
mut ctx: IOxExecutionContext, mut ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, QueryChunkError> {
ctx.set_metadata("storage", "ingester"); ctx.set_metadata("storage", "ingester");
ctx.set_metadata("projection", format!("{}", selection)); ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection"); trace!(?selection, "selection");

View File

@ -266,12 +266,11 @@ impl InfluxRpcPlanner {
.push(Arc::clone(&chunk)); .push(Arc::clone(&chunk));
} else { } else {
// Try and apply the predicate using only metadata // Try and apply the predicate using only metadata
let pred_result = chunk let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
.apply_predicate_to_metadata(predicate) CheckingChunkPredicateSnafu {
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(), chunk_id: chunk.id(),
})?; },
)?;
match pred_result { match pred_result {
PredicateMatch::AtLeastOneNonNullField => { PredicateMatch::AtLeastOneNonNullField => {
@ -362,12 +361,11 @@ impl InfluxRpcPlanner {
let mut do_full_plan = chunk.has_delete_predicates(); let mut do_full_plan = chunk.has_delete_predicates();
// Try and apply the predicate using only metadata // Try and apply the predicate using only metadata
let pred_result = chunk let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
.apply_predicate_to_metadata(predicate) CheckingChunkPredicateSnafu {
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(), chunk_id: chunk.id(),
})?; },
)?;
if matches!(pred_result, PredicateMatch::Zero) { if matches!(pred_result, PredicateMatch::Zero) {
continue; continue;
@ -391,7 +389,6 @@ impl InfluxRpcPlanner {
predicate, predicate,
selection, selection,
) )
.map_err(|e| Box::new(e) as _)
.context(FindingColumnNamesSnafu)?; .context(FindingColumnNamesSnafu)?;
match maybe_names { match maybe_names {
@ -505,12 +502,11 @@ impl InfluxRpcPlanner {
let mut do_full_plan = chunk.has_delete_predicates(); let mut do_full_plan = chunk.has_delete_predicates();
// Try and apply the predicate using only metadata // Try and apply the predicate using only metadata
let pred_result = chunk let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
.apply_predicate_to_metadata(predicate) CheckingChunkPredicateSnafu {
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(), chunk_id: chunk.id(),
})?; },
)?;
if matches!(pred_result, PredicateMatch::Zero) { if matches!(pred_result, PredicateMatch::Zero) {
continue; continue;
@ -553,7 +549,6 @@ impl InfluxRpcPlanner {
tag_name, tag_name,
predicate, predicate,
) )
.map_err(|e| Box::new(e) as _)
.context(FindingColumnValuesSnafu)?; .context(FindingColumnValuesSnafu)?;
match maybe_values { match maybe_values {
@ -1517,9 +1512,9 @@ where
let mut filtered = Vec::with_capacity(chunks.len()); let mut filtered = Vec::with_capacity(chunks.len());
for chunk in chunks { for chunk in chunks {
// Try and apply the predicate using only metadata // Try and apply the predicate using only metadata
let pred_result = chunk let pred_result =
chunk
.apply_predicate_to_metadata(predicate) .apply_predicate_to_metadata(predicate)
.map_err(|e| Box::new(e) as _)
.context(CheckingChunkPredicateSnafu { .context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(), chunk_id: chunk.id(),
})?; })?;

View File

@ -39,7 +39,7 @@ use schema::sort::SortKeyBuilder;
/// Trait for an object (designed to be a Chunk) which can provide /// Trait for an object (designed to be a Chunk) which can provide
/// metadata /// metadata
pub trait QueryChunkMeta: Sized { pub trait QueryChunkMeta {
/// Return a reference to the summary of the data /// Return a reference to the summary of the data
fn summary(&self) -> Option<&TableSummary>; fn summary(&self) -> Option<&TableSummary>;
@ -153,10 +153,11 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
) -> QueryCompletedToken; ) -> QueryCompletedToken;
} }
/// Collection of data that shares the same partition key /// Error type for [`QueryChunk`] operations.
pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { pub type QueryChunkError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Error: std::error::Error + Send + Sync + 'static;
/// Collection of data that shares the same partition key
pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// returns the Id of this chunk. Ids are unique within a /// returns the Id of this chunk. Ids are unique within a
/// particular partition. /// particular partition.
fn id(&self) -> ChunkId; fn id(&self) -> ChunkId;
@ -180,7 +181,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
fn apply_predicate_to_metadata( fn apply_predicate_to_metadata(
&self, &self,
predicate: &Predicate, predicate: &Predicate,
) -> Result<PredicateMatch, Self::Error>; ) -> Result<PredicateMatch, QueryChunkError>;
/// Returns a set of Strings with column names from the specified /// Returns a set of Strings with column names from the specified
/// table that have at least one row that matches `predicate`, if /// table that have at least one row that matches `predicate`, if
@ -191,7 +192,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
columns: Selection<'_>, columns: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error>; ) -> Result<Option<StringSet>, QueryChunkError>;
/// Return a set of Strings containing the distinct values in the /// Return a set of Strings containing the distinct values in the
/// specified columns. If the predicate can be evaluated entirely /// specified columns. If the predicate can be evaluated entirely
@ -203,7 +204,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
column_name: &str, column_name: &str,
predicate: &Predicate, predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error>; ) -> Result<Option<StringSet>, QueryChunkError>;
/// Provides access to raw `QueryChunk` data as an /// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es filtered by a *required* /// asynchronous stream of `RecordBatch`es filtered by a *required*
@ -223,7 +224,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error>; ) -> Result<SendableRecordBatchStream, QueryChunkError>;
/// Returns chunk type which is either MUB, RUB, OS /// Returns chunk type which is either MUB, RUB, OS
fn chunk_type(&self) -> &str; fn chunk_type(&self) -> &str;

View File

@ -8,7 +8,7 @@ use crate::{
exec::stringset::{StringSet, StringSetRef}, exec::stringset::{StringSet, StringSetRef},
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
}; };
use crate::{QueryCompletedToken, QueryText}; use crate::{QueryChunkError, QueryCompletedToken, QueryText};
use arrow::array::UInt64Array; use arrow::array::UInt64Array;
use arrow::{ use arrow::{
array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray}, array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray},
@ -32,7 +32,6 @@ use schema::selection::Selection;
use schema::{ use schema::{
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema, builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema,
}; };
use snafu::Snafu;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::{collections::BTreeMap, fmt, sync::Arc}; use std::{collections::BTreeMap, fmt, sync::Arc};
use trace::ctx::SpanContext; use trace::ctx::SpanContext;
@ -52,19 +51,6 @@ pub struct TestDatabase {
chunks_predicate: Mutex<Predicate>, chunks_predicate: Mutex<Predicate>,
} }
#[derive(Snafu, Debug)]
pub enum TestError {
#[snafu(display("Test database error: {}", message))]
General { message: String },
#[snafu(display("Test error writing to database: {}", source))]
DatabaseWrite {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
pub type Result<T, E = TestError> = std::result::Result<T, E>;
impl TestDatabase { impl TestDatabase {
pub fn new(executor: Arc<Executor>) -> Self { pub fn new(executor: Arc<Executor>) -> Self {
Self { Self {
@ -316,9 +302,9 @@ impl TestChunk {
} }
/// Checks the saved error, and returns it if any, otherwise returns OK /// Checks the saved error, and returns it if any, otherwise returns OK
fn check_error(&self) -> Result<()> { fn check_error(&self) -> Result<(), QueryChunkError> {
if let Some(message) = self.saved_error.as_ref() { if let Some(message) = self.saved_error.as_ref() {
GeneralSnafu { message }.fail() Err(message.clone().into())
} else { } else {
Ok(()) Ok(())
} }
@ -874,8 +860,6 @@ impl fmt::Display for TestChunk {
} }
impl QueryChunk for TestChunk { impl QueryChunk for TestChunk {
type Error = TestError;
fn id(&self) -> ChunkId { fn id(&self) -> ChunkId {
self.id self.id
} }
@ -902,7 +886,7 @@ impl QueryChunk for TestChunk {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
_selection: Selection<'_>, _selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error> { ) -> Result<SendableRecordBatchStream, QueryChunkError> {
self.check_error()?; self.check_error()?;
// save the predicate // save the predicate
@ -916,7 +900,10 @@ impl QueryChunk for TestChunk {
"Test Chunk" "Test Chunk"
} }
fn apply_predicate_to_metadata(&self, predicate: &Predicate) -> Result<PredicateMatch> { fn apply_predicate_to_metadata(
&self,
predicate: &Predicate,
) -> Result<PredicateMatch, QueryChunkError> {
self.check_error()?; self.check_error()?;
// save the predicate // save the predicate
@ -935,7 +922,7 @@ impl QueryChunk for TestChunk {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
_column_name: &str, _column_name: &str,
_predicate: &Predicate, _predicate: &Predicate,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
// Model not being able to get column values from metadata // Model not being able to get column values from metadata
Ok(None) Ok(None)
} }
@ -945,7 +932,7 @@ impl QueryChunk for TestChunk {
_ctx: IOxExecutionContext, _ctx: IOxExecutionContext,
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<Option<StringSet>, Self::Error> { ) -> Result<Option<StringSet>, QueryChunkError> {
self.check_error()?; self.check_error()?;
// save the predicate // save the predicate

View File

@ -1,45 +1,16 @@
use std::{ use std::{any::Any, fmt::Debug, sync::Arc};
any::Any,
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait; use async_trait::async_trait;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::catalog::catalog::CatalogProvider; use datafusion::catalog::catalog::CatalogProvider;
use db::{chunk::DbChunk, Db}; use db::Db;
use predicate::rpc_predicate::QueryDatabaseMeta; use predicate::rpc_predicate::QueryDatabaseMeta;
use query::{ use query::{
exec::{ExecutionContextProvider, IOxExecutionContext}, exec::{ExecutionContextProvider, IOxExecutionContext},
QueryChunk, QueryChunkMeta, QueryDatabase, QueryChunk, QueryChunkError, QueryChunkMeta, QueryDatabase,
}; };
use self::sealed::{AbstractChunkInterface, AbstractDbInterface}; use self::sealed::AbstractDbInterface;
pub struct Error(String);
impl Error {
fn new<E>(e: E) -> Self
where
E: std::error::Error,
{
Self(e.to_string())
}
}
impl Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Error").field(&self.0).finish()
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl std::error::Error for Error {}
/// Abstract database used during testing. /// Abstract database used during testing.
/// ///
@ -116,17 +87,9 @@ impl QueryDatabaseMeta for AbstractDb {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct AbstractChunk(Box<dyn AbstractChunkInterface>); pub struct AbstractChunk(Arc<dyn QueryChunk>);
impl AbstractChunk {
fn create_old(chunk: Arc<DbChunk>) -> Self {
Self(Box::new(OldChunk(chunk)))
}
}
impl QueryChunk for AbstractChunk { impl QueryChunk for AbstractChunk {
type Error = Error;
fn id(&self) -> ChunkId { fn id(&self) -> ChunkId {
self.0.id() self.0.id()
} }
@ -146,7 +109,7 @@ impl QueryChunk for AbstractChunk {
fn apply_predicate_to_metadata( fn apply_predicate_to_metadata(
&self, &self,
predicate: &predicate::Predicate, predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Self::Error> { ) -> Result<predicate::PredicateMatch, QueryChunkError> {
self.0.apply_predicate_to_metadata(predicate) self.0.apply_predicate_to_metadata(predicate)
} }
@ -155,7 +118,7 @@ impl QueryChunk for AbstractChunk {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
predicate: &predicate::Predicate, predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>, columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Self::Error> { ) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
self.0.column_names(ctx, predicate, columns) self.0.column_names(ctx, predicate, columns)
} }
@ -164,7 +127,7 @@ impl QueryChunk for AbstractChunk {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
column_name: &str, column_name: &str,
predicate: &predicate::Predicate, predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Self::Error> { ) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
self.0.column_values(ctx, column_name, predicate) self.0.column_values(ctx, column_name, predicate)
} }
@ -173,7 +136,7 @@ impl QueryChunk for AbstractChunk {
ctx: IOxExecutionContext, ctx: IOxExecutionContext,
predicate: &predicate::Predicate, predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>, selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Self::Error> { ) -> Result<datafusion::physical_plan::SendableRecordBatchStream, QueryChunkError> {
self.0.read_filter(ctx, predicate, selection) self.0.read_filter(ctx, predicate, selection)
} }
@ -235,54 +198,6 @@ mod sealed {
fn table_schema(&self, table_name: &str) -> Option<Arc<schema::Schema>>; fn table_schema(&self, table_name: &str) -> Option<Arc<schema::Schema>>;
} }
pub trait AbstractChunkInterface: Debug + Send + Sync + 'static {
fn id(&self) -> ChunkId;
fn addr(&self) -> ChunkAddr;
fn table_name(&self) -> &str;
fn may_contain_pk_duplicates(&self) -> bool;
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Error>;
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Error>;
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Error>;
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Error>;
fn chunk_type(&self) -> &str;
fn order(&self) -> ChunkOrder;
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary>;
fn schema(&self) -> Arc<schema::Schema>;
fn sort_key(&self) -> Option<&schema::sort::SortKey>;
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>];
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -311,7 +226,7 @@ impl AbstractDbInterface for OldDb {
.chunks(table_name, predicate) .chunks(table_name, predicate)
.await .await
.into_iter() .into_iter()
.map(|c| Arc::new(AbstractChunk::create_old(c))) .map(|c| Arc::new(AbstractChunk(c as _)))
.collect() .collect()
} }
@ -332,90 +247,3 @@ impl AbstractDbInterface for OldDb {
self.0.table_schema(table_name) self.0.table_schema(table_name)
} }
} }
#[derive(Debug)]
struct OldChunk(Arc<DbChunk>);
impl AbstractChunkInterface for OldChunk {
fn id(&self) -> ChunkId {
self.0.id()
}
fn addr(&self) -> ChunkAddr {
self.0.addr().clone()
}
fn table_name(&self) -> &str {
self.0.table_name()
}
fn may_contain_pk_duplicates(&self) -> bool {
self.0.may_contain_pk_duplicates()
}
fn apply_predicate_to_metadata(
&self,
predicate: &predicate::Predicate,
) -> Result<predicate::PredicateMatch, Error> {
self.0
.apply_predicate_to_metadata(predicate)
.map_err(Error::new)
}
fn column_names(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, Error> {
self.0
.column_names(ctx, predicate, columns)
.map_err(Error::new)
}
fn column_values(
&self,
ctx: IOxExecutionContext,
column_name: &str,
predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, Error> {
self.0
.column_values(ctx, column_name, predicate)
.map_err(Error::new)
}
fn read_filter(
&self,
ctx: IOxExecutionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, Error> {
self.0
.read_filter(ctx, predicate, selection)
.map_err(Error::new)
}
fn chunk_type(&self) -> &str {
self.0.chunk_type()
}
fn order(&self) -> ChunkOrder {
self.0.order()
}
fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary> {
self.0.summary()
}
fn schema(&self) -> Arc<schema::Schema> {
self.0.schema()
}
fn sort_key(&self) -> Option<&schema::sort::SortKey> {
self.0.sort_key()
}
fn delete_predicates(&self) -> &[Arc<data_types::delete_predicate::DeletePredicate>] {
self.0.delete_predicates()
}
}