Merge branch 'main' into crepererum/issue1513-b
commit
0e09b20ca8
|
@ -902,7 +902,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "4.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=c794f2df539a10524566cb02b6158ee46cb1459a#c794f2df539a10524566cb02b6158ee46cb1459a"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=16011120a1b73798049c5be49f9548b00f8a0a00#16011120a1b73798049c5be49f9548b00f8a0a00"
|
||||
dependencies = [
|
||||
"ahash 0.7.4",
|
||||
"arrow",
|
||||
|
@ -3771,9 +3771,9 @@ checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2"
|
|||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.3.0"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
|
||||
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
|
|
@ -9,4 +9,4 @@ description = "Re-exports datafusion at a specific version"
|
|||
|
||||
# Rename to workaround doctest bug
|
||||
# Turn off optional datafusion features (function packages)
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "c794f2df539a10524566cb02b6158ee46cb1459a", default-features = false, package = "datafusion" }
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev = "16011120a1b73798049c5be49f9548b00f8a0a00", default-features = false, package = "datafusion" }
|
||||
|
|
|
@ -120,13 +120,6 @@ impl Chunk {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// Add the table name in this chunk to `names` if it is not already present
|
||||
pub fn all_table_names(&self, names: &mut BTreeSet<String>) {
|
||||
if !names.contains(self.table_name.as_ref()) {
|
||||
names.insert(self.table_name.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a queryable snapshot of this chunk
|
||||
#[cfg(not(feature = "nocache"))]
|
||||
pub fn snapshot(&self) -> Arc<ChunkSnapshot> {
|
||||
|
|
|
@ -72,11 +72,6 @@ impl ChunkSnapshot {
|
|||
self.batch.num_rows() == 0
|
||||
}
|
||||
|
||||
/// Return true if this snapshot has the specified table name
|
||||
pub fn has_table(&self, table_name: &str) -> bool {
|
||||
self.table_name.as_ref() == table_name
|
||||
}
|
||||
|
||||
/// Return Schema for the specified table / columns
|
||||
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> {
|
||||
// Temporary #1295
|
||||
|
@ -102,7 +97,7 @@ impl ChunkSnapshot {
|
|||
/// Returns a list of tables with writes matching the given timestamp_range
|
||||
pub fn table_names(&self, timestamp_range: Option<TimestampRange>) -> BTreeSet<String> {
|
||||
let mut ret = BTreeSet::new();
|
||||
if self.matches_predicate(×tamp_range) {
|
||||
if self.has_timerange(×tamp_range) {
|
||||
ret.insert(self.table_name.to_string());
|
||||
}
|
||||
ret
|
||||
|
@ -222,7 +217,7 @@ impl ChunkSnapshot {
|
|||
self.batch.num_rows()
|
||||
}
|
||||
|
||||
fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
|
||||
pub fn has_timerange(&self, timestamp_range: &Option<TimestampRange>) -> bool {
|
||||
let timestamp_range = match timestamp_range {
|
||||
Some(t) => t,
|
||||
None => return true,
|
||||
|
|
|
@ -134,14 +134,9 @@ impl Chunk {
|
|||
self.table.full_schema()
|
||||
}
|
||||
|
||||
// Return all tables of this chunk whose timestamp overlaps with the give one
|
||||
pub fn table_names(
|
||||
&self,
|
||||
timestamp_range: Option<TimestampRange>,
|
||||
) -> impl Iterator<Item = String> + '_ {
|
||||
std::iter::once(&self.table)
|
||||
.filter(move |table| table.matches_predicate(×tamp_range))
|
||||
.map(|table| table.name().to_string())
|
||||
// Return true if the table in this chunk contains values within the time range
|
||||
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
|
||||
self.table.matches_predicate(timestamp_range)
|
||||
}
|
||||
|
||||
// Return the columns names that belong to the given column
|
||||
|
|
|
@ -110,7 +110,7 @@ impl Table {
|
|||
}
|
||||
|
||||
// Check if 2 time ranges overlap
|
||||
pub fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
|
||||
pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool {
|
||||
match (self.timestamp_range, timestamp_range) {
|
||||
(Some(a), Some(b)) => !a.disjoint(b),
|
||||
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
|
||||
|
|
|
@ -145,6 +145,7 @@ impl IOxExecutionContext {
|
|||
/// Prepare a SQL statement for execution. This assumes that any
|
||||
/// tables referenced in the SQL have been registered with this context
|
||||
pub fn prepare_sql(&mut self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
debug!(text=%sql, "SQL");
|
||||
let logical_plan = self.inner.sql(sql)?.to_logical_plan();
|
||||
self.prepare_plan(&logical_plan)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ use observability_deps::tracing::debug;
|
|||
use snafu::{ensure, ResultExt, Snafu};
|
||||
|
||||
use crate::{
|
||||
exec::{field::FieldColumns, make_schema_pivot, stringset::StringSet},
|
||||
exec::{field::FieldColumns, make_schema_pivot},
|
||||
func::{
|
||||
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput},
|
||||
window::make_window_bound_expr,
|
||||
|
@ -30,7 +30,7 @@ use crate::{
|
|||
seriesset::{SeriesSetPlan, SeriesSetPlans},
|
||||
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
|
||||
},
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
predicate::{Predicate, PredicateMatch},
|
||||
provider::ProviderBuilder,
|
||||
util::schema_has_all_expr_columns,
|
||||
Database, PartitionChunk,
|
||||
|
@ -196,21 +196,31 @@ impl InfluxRpcPlanner {
|
|||
let mut builder = StringSetPlanBuilder::new();
|
||||
|
||||
for chunk in database.chunks(&predicate) {
|
||||
let new_table_names = chunk
|
||||
.table_names(&predicate, builder.known_strings())
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(TableNamePlan)?;
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
builder = match new_table_names {
|
||||
Some(new_table_names) => builder.append(new_table_names.into()),
|
||||
None => {
|
||||
builder = match pred_result {
|
||||
PredicateMatch::AtLeastOne => builder.append_table(chunk.table_name()),
|
||||
// no match, ignore table
|
||||
PredicateMatch::Zero => builder,
|
||||
// can't evaluate predicate, need a new plan
|
||||
PredicateMatch::Unknown => {
|
||||
// TODO: General purpose plans for
|
||||
// table_names. For now, if we couldn't figure out
|
||||
// the table names from only metadata, generate an
|
||||
// error
|
||||
// table_names. For now, return an error
|
||||
debug!(
|
||||
chunk = chunk.id(),
|
||||
?predicate,
|
||||
table_name = chunk.table_name(),
|
||||
"can not evaluate predicate"
|
||||
);
|
||||
return UnsupportedPredicateForTableNames { predicate }.fail();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let plan = builder.build().context(CreatingStringSet)?;
|
||||
|
@ -241,51 +251,60 @@ impl InfluxRpcPlanner {
|
|||
|
||||
let mut known_columns = BTreeSet::new();
|
||||
for chunk in database.chunks(&predicate) {
|
||||
// try and get the table names that have rows that match the predicate
|
||||
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
for table_name in table_names {
|
||||
debug!(
|
||||
table_name = table_name.as_str(),
|
||||
chunk_id = chunk.id(),
|
||||
"finding columns in table"
|
||||
);
|
||||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
let chunk_id = chunk.id();
|
||||
debug!(table_name, chunk_id, "finding columns in table");
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk
|
||||
.table_schema(Selection::All)
|
||||
.expect("to be able to get table schema");
|
||||
let column_names: Vec<&str> = schema
|
||||
.tags_iter()
|
||||
.map(|f| f.name().as_str())
|
||||
.collect::<Vec<&str>>();
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk
|
||||
.table_schema(Selection::All)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(GettingTableSchema {
|
||||
table_name,
|
||||
chunk_id,
|
||||
})?;
|
||||
|
||||
let selection = Selection::Some(&column_names);
|
||||
let column_names: Vec<&str> = schema
|
||||
.tags_iter()
|
||||
.map(|f| f.name().as_str())
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
// filter the columns further from the predicate
|
||||
let maybe_names = chunk
|
||||
.column_names(&predicate, selection)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnNames)?;
|
||||
let selection = Selection::Some(&column_names);
|
||||
|
||||
match maybe_names {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
known_columns.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
table_name = table_name.as_str(),
|
||||
chunk_id = chunk.id(),
|
||||
"column names need full plan"
|
||||
);
|
||||
// can't get columns only from metadata, need
|
||||
// a general purpose plan
|
||||
need_full_plans
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
// filter the columns further from the predicate
|
||||
let maybe_names = chunk
|
||||
.column_names(&predicate, selection)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnNames)?;
|
||||
|
||||
match maybe_names {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
known_columns.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
table_name,
|
||||
chunk_id = chunk.id(),
|
||||
"column names need full plan"
|
||||
);
|
||||
// can't get columns only from metadata, need
|
||||
// a general purpose plan
|
||||
need_full_plans
|
||||
.entry(table_name.to_string())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -346,70 +365,79 @@ impl InfluxRpcPlanner {
|
|||
|
||||
let mut known_values = BTreeSet::new();
|
||||
for chunk in database.chunks(&predicate) {
|
||||
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
for table_name in table_names {
|
||||
debug!(
|
||||
table_name = table_name.as_str(),
|
||||
chunk_id = chunk.id(),
|
||||
"finding columns in table"
|
||||
);
|
||||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
let chunk_id = chunk.id();
|
||||
debug!(table_name, chunk_id, "finding columns in table");
|
||||
|
||||
// use schema to validate column type
|
||||
let schema = chunk
|
||||
.table_schema(Selection::All)
|
||||
.expect("to be able to get table schema");
|
||||
// use schema to validate column type
|
||||
let schema = chunk
|
||||
.table_schema(Selection::All)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(GettingTableSchema {
|
||||
table_name,
|
||||
chunk_id,
|
||||
})?;
|
||||
|
||||
// Skip this table if the tag_name is not a column in this table
|
||||
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
|
||||
idx
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
// Skip this table if the tag_name is not a column in this table
|
||||
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
|
||||
idx
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Validate that this really is a Tag column
|
||||
let (influx_column_type, field) = schema.field(idx);
|
||||
ensure!(
|
||||
matches!(influx_column_type, Some(InfluxColumnType::Tag)),
|
||||
InvalidTagColumn {
|
||||
tag_name,
|
||||
influx_column_type,
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
influx_column_type
|
||||
.unwrap()
|
||||
.valid_arrow_type(field.data_type()),
|
||||
InternalInvalidTagType {
|
||||
tag_name,
|
||||
data_type: field.data_type().clone(),
|
||||
}
|
||||
);
|
||||
// Validate that this really is a Tag column
|
||||
let (influx_column_type, field) = schema.field(idx);
|
||||
ensure!(
|
||||
matches!(influx_column_type, Some(InfluxColumnType::Tag)),
|
||||
InvalidTagColumn {
|
||||
tag_name,
|
||||
influx_column_type,
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
influx_column_type
|
||||
.unwrap()
|
||||
.valid_arrow_type(field.data_type()),
|
||||
InternalInvalidTagType {
|
||||
tag_name,
|
||||
data_type: field.data_type().clone(),
|
||||
}
|
||||
);
|
||||
|
||||
// try and get the list of values directly from metadata
|
||||
let maybe_values = chunk
|
||||
.column_values(tag_name, &predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnValues)?;
|
||||
// try and get the list of values directly from metadata
|
||||
let maybe_values = chunk
|
||||
.column_values(tag_name, &predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnValues)?;
|
||||
|
||||
match maybe_values {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata");
|
||||
known_values.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
table_name = table_name.as_str(),
|
||||
chunk_id = chunk.id(),
|
||||
"need full plan to find column values"
|
||||
);
|
||||
// can't get columns only from metadata, need
|
||||
// a general purpose plan
|
||||
need_full_plans
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
match maybe_values {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata");
|
||||
known_values.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
table_name,
|
||||
chunk_id = chunk.id(),
|
||||
"need full plan to find column values"
|
||||
);
|
||||
// can't get columns only from metadata, need
|
||||
// a general purpose plan
|
||||
need_full_plans
|
||||
.entry(table_name.to_string())
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -609,7 +637,7 @@ impl InfluxRpcPlanner {
|
|||
Ok(ss_plans.into())
|
||||
}
|
||||
|
||||
/// Creates a map of table_name --> Chunks that have that table
|
||||
/// Creates a map of table_name --> Chunks that have that table that *may* pass the predicate
|
||||
fn group_chunks_by_table<C>(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
|
@ -620,56 +648,32 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
let mut table_chunks = BTreeMap::new();
|
||||
for chunk in chunks {
|
||||
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate)?;
|
||||
for table_name in table_names {
|
||||
table_chunks
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
match pred_result {
|
||||
PredicateMatch::AtLeastOne |
|
||||
// have to include chunk as we can't rule it out
|
||||
PredicateMatch::Unknown => {
|
||||
let table_name = chunk.table_name().to_string();
|
||||
table_chunks
|
||||
.entry(table_name)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(Arc::clone(&chunk));
|
||||
}
|
||||
// Skip chunk here based on metadata
|
||||
PredicateMatch::Zero => {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(table_chunks)
|
||||
}
|
||||
|
||||
/// Find all the table names in the specified chunk that pass the predicate
|
||||
fn chunk_table_names<C>(&self, chunk: &C, predicate: &Predicate) -> Result<BTreeSet<String>>
|
||||
where
|
||||
C: PartitionChunk + 'static,
|
||||
{
|
||||
let no_tables = StringSet::new();
|
||||
|
||||
// try and get the table names that have rows that match the predicate
|
||||
let table_names = chunk
|
||||
.table_names(&predicate, &no_tables)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(TableNamePlan)?;
|
||||
|
||||
debug!(table_names=?table_names, chunk_id = chunk.id(), "chunk tables");
|
||||
|
||||
let table_names = match table_names {
|
||||
Some(table_names) => {
|
||||
debug!("found table names with original predicate");
|
||||
table_names
|
||||
}
|
||||
None => {
|
||||
// couldn't find table names with predicate, get all chunk tables,
|
||||
// fall back to filtering ourself
|
||||
let table_name_predicate = if let Some(table_names) = &predicate.table_names {
|
||||
PredicateBuilder::new().tables(table_names).build()
|
||||
} else {
|
||||
Predicate::default()
|
||||
};
|
||||
chunk
|
||||
.table_names(&table_name_predicate, &no_tables)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(InternalTableNamePlanForDefault)?
|
||||
// unwrap the Option (table didn't match)
|
||||
.unwrap_or(no_tables)
|
||||
}
|
||||
};
|
||||
Ok(table_names)
|
||||
}
|
||||
|
||||
/// Creates a DataFusion LogicalPlan that returns column *names* as a
|
||||
/// single column of Strings for a specific table
|
||||
///
|
||||
|
@ -1109,11 +1113,11 @@ impl InfluxRpcPlanner {
|
|||
let chunk_id = chunk.id();
|
||||
|
||||
// check that it is consistent with this table_name
|
||||
assert!(
|
||||
chunk.has_table(table_name),
|
||||
"Chunk {} did not have table {}, while trying to make a plan for it",
|
||||
assert_eq!(
|
||||
chunk.table_name(),
|
||||
table_name,
|
||||
"Chunk {} expected table mismatch",
|
||||
chunk.id(),
|
||||
table_name
|
||||
);
|
||||
|
||||
let chunk_table_schema = chunk
|
||||
|
|
|
@ -11,6 +11,7 @@ use data_types::chunk_metadata::ChunkSummary;
|
|||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use exec::{stringset::StringSet, Executor};
|
||||
use internal_types::{schema::Schema, selection::Selection};
|
||||
use predicate::PredicateMatch;
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
|
@ -59,32 +60,16 @@ pub trait PartitionChunk: Debug + Send + Sync {
|
|||
/// particular partition.
|
||||
fn id(&self) -> u32;
|
||||
|
||||
/// Returns true if this chunk contains data for the specified table
|
||||
fn has_table(&self, table_name: &str) -> bool;
|
||||
/// Returns the name of the table stored in this chunk
|
||||
fn table_name(&self) -> &str;
|
||||
|
||||
/// Returns all table names from this chunk that have at least one
|
||||
/// row that matches the `predicate` and are not already in `known_tables`.
|
||||
/// Returns the result of applying the `predicate` to the chunk
|
||||
/// using an efficient, but inexact method, based on metadata.
|
||||
///
|
||||
/// If the predicate cannot be evaluated (e.g it has predicates
|
||||
/// that cannot be directly evaluated in the chunk), `None` is
|
||||
/// returned.
|
||||
///
|
||||
/// `known_tables` is a list of table names already known to be in
|
||||
/// other chunks from the same partition. It may be empty or
|
||||
/// contain `table_names` not in this chunk.
|
||||
fn table_names(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
known_tables: &StringSet,
|
||||
) -> Result<Option<StringSet>, Self::Error>;
|
||||
|
||||
/// Adds all table names from this chunk without any predicate to
|
||||
/// `known_tables)
|
||||
///
|
||||
/// `known_tables` is a list of table names already known to be in
|
||||
/// other chunks from the same partition. It may be empty or
|
||||
/// contain `table_names` not in this chunk.
|
||||
fn all_table_names(&self, known_tables: &mut StringSet);
|
||||
/// NOTE: This method is suitable for calling during planning, and
|
||||
/// may return PredicateMatch::Unknown for certain types of
|
||||
/// predicates.
|
||||
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch, Self::Error>;
|
||||
|
||||
/// Returns a set of Strings with column names from the specified
|
||||
/// table that have at least one row that matches `predicate`, if
|
||||
|
|
|
@ -101,6 +101,14 @@ impl StringSetPlanBuilder {
|
|||
&self.strings
|
||||
}
|
||||
|
||||
/// Append the name of a table to the known strings
|
||||
pub fn append_table(mut self, table_name: &str) -> Self {
|
||||
if !self.strings.contains(table_name) {
|
||||
self.strings.insert(table_name.to_string());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Append the strings from the passed plan into ourselves if possible, or
|
||||
/// passes on the plan
|
||||
pub fn append(mut self, other: StringSetPlan) -> Self {
|
||||
|
|
|
@ -28,6 +28,20 @@ pub const EMPTY_PREDICATE: Predicate = Predicate {
|
|||
partition_key: None,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
/// The result of evaluating a predicate on a set of rows
|
||||
pub enum PredicateMatch {
|
||||
/// There is at least one row that matches the predicate
|
||||
AtLeastOne,
|
||||
|
||||
/// There are exactly zero rows that match the predicate
|
||||
Zero,
|
||||
|
||||
/// There *may* be rows that match, OR there *may* be no rows that
|
||||
/// match
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Represents a parsed predicate for evaluation by the
|
||||
/// TSDatabase InfluxDB IOx query engine.
|
||||
///
|
||||
|
|
|
@ -13,6 +13,7 @@ use datafusion::{
|
|||
physical_plan::ExecutionPlan,
|
||||
};
|
||||
use internal_types::schema::{builder::SchemaMerger, Schema};
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::{predicate::PredicateBuilder, util::project_schema, PartitionChunk};
|
||||
|
||||
|
@ -198,6 +199,8 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
debug!("Input Filters to Scan: {:#?}", filters);
|
||||
|
||||
// Note that `filters` don't actually need to be evaluated in
|
||||
// the scan for the plans to be correct, they are an extra
|
||||
// optimization for providers which can offer them
|
||||
|
|
|
@ -14,7 +14,7 @@ use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBa
|
|||
use crate::exec::Executor;
|
||||
use crate::{
|
||||
exec::stringset::{StringSet, StringSetRef},
|
||||
Database, DatabaseStore, PartitionChunk, Predicate,
|
||||
Database, DatabaseStore, PartitionChunk, Predicate, PredicateMatch,
|
||||
};
|
||||
|
||||
use internal_types::{
|
||||
|
@ -133,6 +133,9 @@ pub struct TestChunk {
|
|||
|
||||
/// A saved error that is returned instead of actual results
|
||||
saved_error: Option<String>,
|
||||
|
||||
/// Return value for apply_predicate, if desired
|
||||
predicate_match: Option<PredicateMatch>,
|
||||
}
|
||||
|
||||
impl TestChunk {
|
||||
|
@ -150,6 +153,12 @@ impl TestChunk {
|
|||
self
|
||||
}
|
||||
|
||||
/// specify that any call to apply_predicate should return this value
|
||||
pub fn with_predicate_match(mut self, predicate_match: PredicateMatch) -> Self {
|
||||
self.predicate_match = Some(predicate_match);
|
||||
self
|
||||
}
|
||||
|
||||
/// Checks the saved error, and returns it if any, otherwise returns OK
|
||||
fn check_error(&self) -> Result<()> {
|
||||
if let Some(message) = self.saved_error.as_ref() {
|
||||
|
@ -307,6 +316,10 @@ impl PartitionChunk for TestChunk {
|
|||
self.id
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &str {
|
||||
self.table_name.as_deref().unwrap()
|
||||
}
|
||||
|
||||
fn read_filter(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
|
@ -322,29 +335,31 @@ impl PartitionChunk for TestChunk {
|
|||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn table_names(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
_known_tables: &StringSet,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
|
||||
self.check_error()?;
|
||||
|
||||
// save the predicate
|
||||
self.predicates.lock().push(predicate.clone());
|
||||
|
||||
// do basic filtering based on table name predicate.
|
||||
// check if there is a saved result to return
|
||||
if let Some(&predicate_match) = self.predicate_match.as_ref() {
|
||||
return Ok(predicate_match);
|
||||
}
|
||||
|
||||
Ok(self
|
||||
// otherwise fall back to basic filtering based on table name predicate.
|
||||
let predicate_match = self
|
||||
.table_name
|
||||
.as_ref()
|
||||
.filter(|table_name| predicate.should_include_table(&table_name))
|
||||
.map(|table_name| std::iter::once(table_name.to_string()).collect::<StringSet>()))
|
||||
}
|
||||
.map(|table_name| {
|
||||
if !predicate.should_include_table(&table_name) {
|
||||
PredicateMatch::Zero
|
||||
} else {
|
||||
PredicateMatch::Unknown
|
||||
}
|
||||
})
|
||||
.unwrap_or(PredicateMatch::Unknown);
|
||||
|
||||
fn all_table_names(&self, known_tables: &mut StringSet) {
|
||||
if let Some(table_name) = self.table_name.as_ref() {
|
||||
known_tables.insert(table_name.to_string());
|
||||
}
|
||||
Ok(predicate_match)
|
||||
}
|
||||
|
||||
fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> {
|
||||
|
@ -366,13 +381,6 @@ impl PartitionChunk for TestChunk {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
fn has_table(&self, table_name: &str) -> bool {
|
||||
self.table_name
|
||||
.as_ref()
|
||||
.map(|n| n == table_name)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn column_names(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
|
|
|
@ -1228,6 +1228,7 @@ impl CatalogState for Catalog {
|
|||
.set_written_to_object_store(parquet_chunk)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogStateFailure { path: info.path })?;
|
||||
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITTEN. Persisting to object store complete");
|
||||
}
|
||||
Err(catalog::Error::UnknownTable { .. }) | Err(catalog::Error::UnknownChunk { .. }) => {
|
||||
// table unknown => that's ok, create chunk in "object store only" stage which will also create the table
|
||||
|
@ -1238,6 +1239,7 @@ impl CatalogState for Catalog {
|
|||
.create_object_store_only_chunk(chunk_id, parquet_chunk)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogStateFailure { path: info.path })?;
|
||||
debug!(%partition_key, %table_name, %chunk_id, "recovered chunk from persisted catalog");
|
||||
}
|
||||
Err(e) => {
|
||||
// Other unknown error => bail
|
||||
|
@ -1248,7 +1250,6 @@ impl CatalogState for Catalog {
|
|||
}
|
||||
}
|
||||
|
||||
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITTEN. Persisting to object store complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -271,14 +271,7 @@ impl Chunk {
|
|||
chunk: Arc<parquet_file::chunk::Chunk>,
|
||||
metrics: ChunkMetrics,
|
||||
) -> Self {
|
||||
// workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed
|
||||
let table_name = Arc::from(
|
||||
chunk
|
||||
.table_names(None)
|
||||
.next()
|
||||
.expect("chunk must have exactly 1 table")
|
||||
.as_ref(),
|
||||
);
|
||||
let table_name = Arc::from(chunk.table_name());
|
||||
|
||||
// Cache table summary + schema
|
||||
let meta = Arc::new(ChunkMetadata {
|
||||
|
|
|
@ -133,11 +133,7 @@ impl Partition {
|
|||
chunk_id: u32,
|
||||
chunk: Arc<parquet_file::chunk::Chunk>,
|
||||
) -> Result<Arc<RwLock<Chunk>>> {
|
||||
// workaround until https://github.com/influxdata/influxdb_iox/issues/1295 is fixed
|
||||
let table_name = chunk
|
||||
.table_names(None)
|
||||
.next()
|
||||
.expect("chunk must have exactly 1 table");
|
||||
let table_name = chunk.table_name().to_string();
|
||||
|
||||
let chunk = Arc::new(self.metrics.new_lock(Chunk::new_object_store_only(
|
||||
chunk_id,
|
||||
|
|
|
@ -12,7 +12,11 @@ use mutable_buffer::chunk::snapshot::ChunkSnapshot;
|
|||
use object_store::path::Path;
|
||||
use observability_deps::tracing::debug;
|
||||
use parquet_file::chunk::Chunk as ParquetChunk;
|
||||
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
|
||||
use query::{
|
||||
exec::stringset::StringSet,
|
||||
predicate::{Predicate, PredicateMatch},
|
||||
PartitionChunk,
|
||||
};
|
||||
use read_buffer::Chunk as ReadBufferChunk;
|
||||
|
||||
use super::{
|
||||
|
@ -205,26 +209,35 @@ impl PartitionChunk for DbChunk {
|
|||
self.id
|
||||
}
|
||||
|
||||
fn all_table_names(&self, known_tables: &mut StringSet) {
|
||||
// TODO remove this function (use name from TableSummary directly!)
|
||||
let table_name = &self.meta.table_summary.name;
|
||||
if !known_tables.contains(table_name) {
|
||||
known_tables.insert(table_name.to_string());
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
self.table_name.as_ref()
|
||||
}
|
||||
|
||||
fn table_names(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
_known_tables: &StringSet, // TODO: Should this be being used?
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
let names = match &self.state {
|
||||
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
|
||||
if !predicate.should_include_table(self.table_name().as_ref()) {
|
||||
return Ok(PredicateMatch::Zero);
|
||||
}
|
||||
|
||||
// TODO apply predicate pruning here...
|
||||
|
||||
let pred_result = match &self.state {
|
||||
State::MutableBuffer { chunk, .. } => {
|
||||
if predicate.has_exprs() {
|
||||
// TODO: Support more predicates
|
||||
return Ok(None);
|
||||
PredicateMatch::Unknown
|
||||
} else if chunk.has_timerange(&predicate.range) {
|
||||
// Note: this isn't precise / correct: if the
|
||||
// chunk has the timerange, some other part of the
|
||||
// predicate may rule out the rows, and thus
|
||||
// without further work this clause should return
|
||||
// "Unknown" rather than falsely claiming that
|
||||
// there is at least one row:
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/1590
|
||||
PredicateMatch::AtLeastOne
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
chunk.table_names(predicate.range)
|
||||
}
|
||||
State::ReadBuffer { chunk, .. } => {
|
||||
// If not supported, ReadBuffer can't answer with
|
||||
|
@ -233,23 +246,35 @@ impl PartitionChunk for DbChunk {
|
|||
Ok(rb_predicate) => rb_predicate,
|
||||
Err(e) => {
|
||||
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
|
||||
return Ok(None);
|
||||
return Ok(PredicateMatch::Unknown);
|
||||
}
|
||||
};
|
||||
|
||||
chunk.table_names(&rb_predicate, &BTreeSet::new())
|
||||
// TODO align API in read_buffer
|
||||
let table_names = chunk.table_names(&rb_predicate, &BTreeSet::new());
|
||||
if !table_names.is_empty() {
|
||||
// As above, this should really be "Unknown" rather than AtLeastOne
|
||||
// for precision / correctness.
|
||||
PredicateMatch::AtLeastOne
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
}
|
||||
State::ParquetFile { chunk, .. } => {
|
||||
if predicate.has_exprs() {
|
||||
// TODO: Support more predicates
|
||||
PredicateMatch::Unknown
|
||||
} else if chunk.has_timerange(predicate.range.as_ref()) {
|
||||
// As above, this should really be "Unknown" rather than AtLeastOne
|
||||
// for precision / correctness.
|
||||
PredicateMatch::AtLeastOne
|
||||
} else {
|
||||
PredicateMatch::Zero
|
||||
}
|
||||
}
|
||||
State::ParquetFile { chunk, .. } => chunk.table_names(predicate.range).collect(),
|
||||
};
|
||||
|
||||
// Prune out tables that should not be
|
||||
// present (based on additional table restrictions of the Predicate)
|
||||
Ok(Some(
|
||||
names
|
||||
.into_iter()
|
||||
.filter(|table_name| predicate.should_include_table(table_name))
|
||||
.collect(),
|
||||
))
|
||||
Ok(pred_result)
|
||||
}
|
||||
|
||||
fn table_schema(&self, selection: Selection<'_>) -> Result<Schema, Self::Error> {
|
||||
|
@ -262,10 +287,6 @@ impl PartitionChunk for DbChunk {
|
|||
})
|
||||
}
|
||||
|
||||
fn has_table(&self, table_name: &str) -> bool {
|
||||
table_name == self.meta.table_summary.name
|
||||
}
|
||||
|
||||
fn read_filter(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
|
@ -275,6 +296,8 @@ impl PartitionChunk for DbChunk {
|
|||
// Predicate is not required to be applied for correctness. We only pushed it down
|
||||
// when possible for performance gain
|
||||
|
||||
debug!("Input Predicate to read_filter: {:#?}", predicate);
|
||||
|
||||
match &self.state {
|
||||
State::MutableBuffer { chunk, .. } => {
|
||||
let batch = chunk
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
|
||||
use super::scenarios::*;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use query::frontend::sql::SqlQueryPlanner;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -40,6 +40,35 @@ macro_rules! run_sql_test_case {
|
|||
};
|
||||
}
|
||||
|
||||
/// runs table_names(predicate) and compares it to the expected
|
||||
/// output
|
||||
macro_rules! run_sql_explain_test_case {
|
||||
($DB_SETUP:expr, $SQL:expr, $EXPECTED_LINES:expr) => {
|
||||
test_helpers::maybe_start_logging();
|
||||
let sql = $SQL.to_string();
|
||||
for scenario in $DB_SETUP.make().await {
|
||||
let DbScenario {
|
||||
scenario_name, db, ..
|
||||
} = scenario;
|
||||
let db = Arc::new(db);
|
||||
|
||||
println!("Running scenario '{}'", scenario_name);
|
||||
println!("SQL: '{:#?}'", sql);
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = db.executor();
|
||||
|
||||
let physical_plan = planner
|
||||
.query(db, &sql, executor.as_ref())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let results: Vec<RecordBatch> =
|
||||
executor.collect(physical_plan).await.expect("Running plan");
|
||||
|
||||
assert_batches_eq!($EXPECTED_LINES, &results);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_from_cpu() {
|
||||
let expected = vec![
|
||||
|
@ -409,10 +438,8 @@ async fn sql_select_with_schema_merge_subset() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_predicate_pushdown() {
|
||||
async fn sql_predicate_pushdown_correctness() {
|
||||
// Test 1: Select everything
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -432,17 +459,7 @@ async fn sql_predicate_pushdown() {
|
|||
&expected
|
||||
);
|
||||
|
||||
// TODO: Make push-down predicates shown in explain verbose. Ticket #1538 - actively working on this
|
||||
// Check the plan
|
||||
// run_sql_test_case!(
|
||||
// TwoMeasurementsPredicatePushDown {},
|
||||
// "EXPLAIN VERBOSE SELECT * from restaurant",
|
||||
// &expected
|
||||
// );
|
||||
|
||||
// Test 2: One push-down expression: count > 200
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -461,16 +478,7 @@ async fn sql_predicate_pushdown() {
|
|||
&expected
|
||||
);
|
||||
|
||||
// Check the plan
|
||||
// run_sql_test_case!(
|
||||
// TwoMeasurementsPredicatePushDown {},
|
||||
// "EXPLAIN VERBOSE SELECT * from restaurant where count > 200",
|
||||
// &expected
|
||||
// );
|
||||
|
||||
// Test 3: Two push-down expression: count > 200 and town != 'tewsbury'
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -488,17 +496,8 @@ async fn sql_predicate_pushdown() {
|
|||
&expected
|
||||
);
|
||||
|
||||
// Check the plan
|
||||
// run_sql_test_case!(
|
||||
// TwoMeasurementsPredicatePushDown {},
|
||||
// "EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury'",
|
||||
// &expected
|
||||
// );
|
||||
|
||||
// Test 4: Still two push-down expression: count > 200 and town != 'tewsbury'
|
||||
// even though the results are different
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -516,8 +515,6 @@ async fn sql_predicate_pushdown() {
|
|||
);
|
||||
|
||||
// Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -534,8 +531,6 @@ async fn sql_predicate_pushdown() {
|
|||
);
|
||||
|
||||
// Test 6: two push-down expression: count > 200 and count < 40000
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -554,8 +549,6 @@ async fn sql_predicate_pushdown() {
|
|||
);
|
||||
|
||||
// Test 7: two push-down expression on float: system > 4.0 and system < 7.0
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+-----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -575,8 +568,6 @@ async fn sql_predicate_pushdown() {
|
|||
);
|
||||
|
||||
// Test 8: two push-down expression on float: system > 5.0 and system < 7.0
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -593,8 +584,6 @@ async fn sql_predicate_pushdown() {
|
|||
);
|
||||
|
||||
// Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+----------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -611,8 +600,6 @@ async fn sql_predicate_pushdown() {
|
|||
|
||||
// Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
|
||||
// even though there are more expressions,(count = 632 or town = 'reading'), in the filter
|
||||
//
|
||||
// Check correctness
|
||||
let expected = vec![
|
||||
"+-------+--------+-------------------------------+---------+",
|
||||
"| count | system | time | town |",
|
||||
|
@ -626,11 +613,8 @@ async fn sql_predicate_pushdown() {
|
|||
&expected
|
||||
);
|
||||
|
||||
// Test 11: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
|
||||
// After DF ticket, https://github.com/apache/arrow-datafusion/issues/383 is done,
|
||||
// there will be more pushed-down predicate time > to_timestamp('1970-01-01T00:00:00.000000120+00:00')
|
||||
//
|
||||
// Check correctness
|
||||
// Test 11: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
|
||||
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130)
|
||||
let expected = vec!["++", "++"];
|
||||
run_sql_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
|
@ -671,3 +655,306 @@ async fn sql_predicate_pushdown() {
|
|||
&expected
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_predicate_pushdown_explain() {
|
||||
// Test 1: Select everything
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 2: One push-down expression: count > 200
|
||||
// TODO: Make push-down predicates shown in explain verbose. Ticket #1538
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Check the plan
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 3: Two push-down expression: count > 200 and town != 'tewsbury'
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury'",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 4: Still two push-down expression: count > 200 and town != 'tewsbury'
|
||||
// even though the results are different
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence')",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 5: three push-down expression: count > 200 and town != 'tewsbury' and count < 40000
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #town NotEq Utf8(\"tewsbury\") And #system Eq Int64(5) Or #town Eq Utf8(\"lawrence\") And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(town AS Utf8) != tewsbury AND system = CAST(5 AS Float64) OR CAST(town AS Utf8) = lawrence AND CAST(count AS Int64) < 40000 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 6: two push-down expression: count > 200 and count < 40000
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #count Gt Int64(200) And #count Lt Int64(40000) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: CAST(count AS Int64) > 200 AND CAST(count AS Int64) < 40000 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200 and count < 40000",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 7: two push-down expression on float: system > 4.0 and system < 7.0
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(4) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: system > 4 AND system < 7 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0 and system < 7.0",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 8: two push-down expression on float: system > 5.0 and system < 7.0
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #system Lt Float64(7) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: system > 5 AND system < 7 |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+----------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and system < 7.0",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 9: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And #town NotEq Utf8(\"tewsbury\") And Float64(7) Gt #system |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: system > 5 AND CAST(town AS Utf8) != tewsbury AND 7 > system |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+--------------------------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 10: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0
|
||||
// even though there are more expressions,(count = 632 or town = 'reading'), in the filter
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: #system Gt Float64(5) And Utf8(\"tewsbury\") NotEq #town And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: system > 5 AND tewsbury != CAST(town AS Utf8) AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading')",
|
||||
&expected
|
||||
);
|
||||
|
||||
// Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
|
||||
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130)
|
||||
let expected = vec![
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| plan_type | plan |",
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
"| logical_plan | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
|
||||
"| | TableScan: restaurant projection=None |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
|
||||
"| | Filter: Float64(5) Lt #system And #town NotEq Utf8(\"tewsbury\") And #system Lt Float64(7) And #count Eq Int64(632) Or #town Eq Utf8(\"reading\") And #time Gt totimestamp(Utf8(\"1970-01-01T00:00:00.000000130+00:00\")) |",
|
||||
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
|
||||
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
|
||||
"| | FilterExec: 5 < system AND CAST(town AS Utf8) != tewsbury AND system < 7 AND CAST(count AS Int64) = 632 OR CAST(town AS Utf8) = reading AND time > totimestamp(1970-01-01T00:00:00.000000130+00:00) |",
|
||||
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
|
||||
"+-----------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
|
||||
];
|
||||
run_sql_explain_test_case!(
|
||||
TwoMeasurementsPredicatePushDown {},
|
||||
"EXPLAIN VERBOSE SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00')",
|
||||
&expected
|
||||
);
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ macro_rules! run_table_schema_test_case {
|
|||
let predicate = PredicateBuilder::new().table(table_name).build();
|
||||
|
||||
for chunk in db.chunks(&predicate) {
|
||||
if chunk.has_table(table_name) {
|
||||
if chunk.table_name().as_ref() == table_name {
|
||||
chunks_with_table += 1;
|
||||
let actual_schema = chunk.table_schema(selection.clone()).unwrap();
|
||||
|
||||
|
|
|
@ -98,6 +98,9 @@ pub enum ApplicationError {
|
|||
WritingPoints {
|
||||
org: String,
|
||||
bucket_name: String,
|
||||
tables: Vec<String>,
|
||||
num_lines: usize,
|
||||
body_size: usize,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
|
@ -373,19 +376,18 @@ where
|
|||
// The API-global error handler, handles ApplicationErrors originating from
|
||||
// individual routes and middlewares, along with errors from the router itself
|
||||
async fn error_handler(err: RouterError<ApplicationError>, req: RequestInfo) -> Response<Body> {
|
||||
let method = req.method().clone();
|
||||
let uri = req.uri().clone();
|
||||
let span_id = req.headers().get("x-b3-spanid");
|
||||
let content_length = req.headers().get("content-length");
|
||||
error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, ?span_id, ?content_length, "Error while handling request");
|
||||
|
||||
match err {
|
||||
RouterError::HandleRequest(e, _)
|
||||
| RouterError::HandlePreMiddlewareRequest(e)
|
||||
| RouterError::HandlePostMiddlewareWithInfoRequest(e)
|
||||
| RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => {
|
||||
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
|
||||
e.response()
|
||||
}
|
||||
| RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => e.response(),
|
||||
_ => {
|
||||
let method = req.method().clone();
|
||||
let uri = req.uri().clone();
|
||||
error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, "Error while handling request");
|
||||
|
||||
let json = serde_json::json!({"error": err.to_string()}).to_string();
|
||||
Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
|
@ -539,11 +541,20 @@ where
|
|||
server::Error::DatabaseNotFound { .. } => ApplicationError::DatabaseNotFound {
|
||||
name: db_name.to_string(),
|
||||
},
|
||||
_ => ApplicationError::WritingPoints {
|
||||
org: write_info.org.clone(),
|
||||
bucket_name: write_info.bucket.clone(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
_ => {
|
||||
let tables = lines
|
||||
.iter()
|
||||
.map(|i| i.series.measurement.to_string())
|
||||
.collect();
|
||||
ApplicationError::WritingPoints {
|
||||
org: write_info.org.clone(),
|
||||
bucket_name: write_info.bucket.clone(),
|
||||
tables,
|
||||
num_lines: lines.len(),
|
||||
body_size: body.len(),
|
||||
source: Box::new(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
|
|
|
@ -1171,7 +1171,7 @@ mod tests {
|
|||
use super::*;
|
||||
use datafusion::logical_plan::{col, lit, Expr};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use query::{test::TestChunk, test::TestDatabaseStore};
|
||||
use query::{predicate::PredicateMatch, test::TestChunk, test::TestDatabaseStore};
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
|
@ -1252,8 +1252,13 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk0 = TestChunk::new(0).with_table("h2o");
|
||||
let chunk1 = TestChunk::new(1).with_table("o2");
|
||||
let chunk0 = TestChunk::new(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("h2o");
|
||||
|
||||
let chunk1 = TestChunk::new(1)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("o2");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1409,7 +1414,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1535,6 +1542,7 @@ mod tests {
|
|||
|
||||
let chunk = TestChunk::new(0)
|
||||
// predicate specifies m4, so this is filtered out
|
||||
.with_table("my_table")
|
||||
.with_error("This is an error");
|
||||
|
||||
fixture
|
||||
|
@ -1640,7 +1648,9 @@ mod tests {
|
|||
tag_key: [0].into(),
|
||||
};
|
||||
|
||||
let chunk = TestChunk::new(0).with_table("h2o");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("h2o");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1717,7 +1727,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1835,7 +1847,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1985,7 +1999,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2077,7 +2093,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2287,7 +2305,9 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
|
Loading…
Reference in New Issue