Merge branch 'main' into crepererum/issue2729
commit
f8ec261276
|
@ -1,8 +1,9 @@
|
|||
use std::convert::TryInto;
|
||||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
|
||||
use chrono::DateTime;
|
||||
use data_types::timestamp::TimestampRange;
|
||||
use datafusion::logical_plan::{lit, Column, Expr, Operator};
|
||||
use schema::TIME_COLUMN_NAME;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use sqlparser::{
|
||||
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
|
||||
|
@ -99,6 +100,16 @@ impl DeletePredicate {
|
|||
exprs: delete_exprs,
|
||||
})
|
||||
}
|
||||
|
||||
/// Return all columns participating in the expressions of delete predicate minus the time column if any
|
||||
pub fn all_column_names_but_time<'a>(&'a self, cols: &mut BTreeSet<&'a str>) {
|
||||
for expr in &self.exprs {
|
||||
let col = expr.column();
|
||||
if col != TIME_COLUMN_NAME {
|
||||
cols.insert(col);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeletePredicate> for crate::predicate::Predicate {
|
||||
|
|
|
@ -254,6 +254,7 @@ impl IOxExecutionContext {
|
|||
let ctx = self.child_ctx("prepare_sql");
|
||||
debug!(text=%sql, "planning SQL query");
|
||||
let logical_plan = ctx.inner.create_logical_plan(sql)?;
|
||||
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
|
||||
ctx.prepare_plan(&logical_plan).await
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ use schema::selection::Selection;
|
|||
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
use std::{collections::BTreeSet, fmt::Debug, iter::FromIterator, sync::Arc};
|
||||
|
||||
pub mod exec;
|
||||
pub mod frontend;
|
||||
|
@ -53,6 +53,27 @@ pub trait QueryChunkMeta: Sized {
|
|||
fn has_delete_predicates(&self) -> bool {
|
||||
!self.delete_predicates().is_empty()
|
||||
}
|
||||
|
||||
/// return column names participating in the all delete predicates
|
||||
/// in lexicographical order with one exception that time column is last
|
||||
/// This order is to be consistent with Schema::primary_key
|
||||
fn delete_predicate_columns(&self) -> Vec<&str> {
|
||||
// get all column names but time
|
||||
let mut col_names = BTreeSet::new();
|
||||
for pred in self.delete_predicates() {
|
||||
//let cols = pred.all_column_names_but_time();
|
||||
pred.all_column_names_but_time(&mut col_names);
|
||||
}
|
||||
|
||||
// convert to vector
|
||||
let mut column_names = Vec::from_iter(col_names);
|
||||
|
||||
// Now add time column to the end of the vector
|
||||
// Since time range is a must in the delete predicate, time column must be in this list
|
||||
column_names.push(TIME_COLUMN_NAME);
|
||||
|
||||
column_names
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Database` is the main trait implemented by the IOx subsystems
|
||||
|
|
|
@ -89,6 +89,11 @@ impl From<Error> for DataFusionError {
|
|||
}
|
||||
}
|
||||
|
||||
enum ColumnType {
|
||||
PrimaryKey,
|
||||
DeletePredicate,
|
||||
}
|
||||
|
||||
/// Something that can prune chunks based on their metadata
|
||||
pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
|
||||
/// prune `chunks`, if possible, based on predicate.
|
||||
|
@ -764,10 +769,30 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
predicate: Predicate, // This is the select predicate of the query
|
||||
output_sort_key: &SortKey<'_>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode
|
||||
// This is needed because columns in select query may not include them yet
|
||||
let mut input_schema = Arc::clone(&output_schema);
|
||||
//
|
||||
// Cols of sort keys (which is a part of the primary key)
|
||||
if !output_sort_key.is_empty() {
|
||||
// build schema of PK
|
||||
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]);
|
||||
// Merge it in the output_schema
|
||||
input_schema = Self::compute_input_schema(&input_schema, &pk_schema);
|
||||
}
|
||||
//
|
||||
// Cols of delete predicates
|
||||
if chunk.has_delete_predicates() {
|
||||
// build schema of columns in delete expression
|
||||
let pred_schema = Self::compute_delete_predicate_schema(&[Arc::clone(&chunk)]);
|
||||
// merge that column into the schema
|
||||
input_schema = Self::compute_input_schema(&input_schema, &pred_schema);
|
||||
}
|
||||
|
||||
// Create the bottom node IOxReadFilterNode for this chunk
|
||||
let mut input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
|
||||
Arc::clone(&table_name),
|
||||
output_schema,
|
||||
input_schema,
|
||||
vec![Arc::clone(&chunk)],
|
||||
predicate,
|
||||
));
|
||||
|
@ -794,10 +819,12 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
|
||||
// Add the sort operator, SortExec, if needed
|
||||
if !output_sort_key.is_empty() {
|
||||
Self::build_sort_plan(chunk, input, output_sort_key)
|
||||
} else {
|
||||
Ok(input)
|
||||
input = Self::build_sort_plan(chunk, input, output_sort_key)?
|
||||
}
|
||||
|
||||
// Add a projection operator to return only schema of the operator above this in the plan
|
||||
// This is needed for matching column index of that operator
|
||||
Self::add_projection_node_if_needed(output_schema, input)
|
||||
}
|
||||
|
||||
/// Add SortExec operator on top of the input plan of the given chunk
|
||||
|
@ -961,21 +988,35 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
.all(|chunk| chunk.delete_predicates().is_empty())
|
||||
}
|
||||
|
||||
/// Find the columns needed in the primary key across schemas
|
||||
/// Find the columns needed in a given ColumnType across schemas
|
||||
///
|
||||
/// Note by the time we get down here, we have already checked
|
||||
/// the chunks for compatible schema, so we use unwrap (perhaps
|
||||
/// famous last words, but true at time of writing)
|
||||
fn compute_pk_schema(chunks: &[Arc<C>]) -> Arc<Schema> {
|
||||
let mut pk_schema_merger = SchemaMerger::new();
|
||||
fn compute_schema_for_column_type(chunks: &[Arc<C>], col_type: ColumnType) -> Arc<Schema> {
|
||||
let mut schema_merger = SchemaMerger::new();
|
||||
for chunk in chunks {
|
||||
let chunk_schema = chunk.schema();
|
||||
let chunk_pk = chunk_schema.primary_key();
|
||||
let chunk_pk_schema = chunk_schema.select_by_names(&chunk_pk).unwrap();
|
||||
pk_schema_merger = pk_schema_merger.merge(&chunk_pk_schema).unwrap();
|
||||
let cols = match col_type {
|
||||
ColumnType::PrimaryKey => chunk_schema.primary_key(),
|
||||
ColumnType::DeletePredicate => chunk.delete_predicate_columns(),
|
||||
};
|
||||
let chunk_cols_schema = chunk_schema.select_by_names(&cols).unwrap();
|
||||
schema_merger = schema_merger.merge(&chunk_cols_schema).unwrap();
|
||||
}
|
||||
let pk_schema = pk_schema_merger.build();
|
||||
Arc::new(pk_schema)
|
||||
let cols_schema = schema_merger.build();
|
||||
|
||||
Arc::new(cols_schema)
|
||||
}
|
||||
|
||||
/// Find the columns needed in chunks' delete predicates across schemas
|
||||
fn compute_delete_predicate_schema(chunks: &[Arc<C>]) -> Arc<Schema> {
|
||||
Self::compute_schema_for_column_type(chunks, ColumnType::DeletePredicate)
|
||||
}
|
||||
|
||||
/// Find the columns needed in chunks' primary keys across schemas
|
||||
fn compute_pk_schema(chunks: &[Arc<C>]) -> Arc<Schema> {
|
||||
Self::compute_schema_for_column_type(chunks, ColumnType::PrimaryKey)
|
||||
}
|
||||
|
||||
/// Find columns required to read from each scan: the output columns + the
|
||||
|
|
|
@ -46,6 +46,7 @@ where
|
|||
return summaries;
|
||||
}
|
||||
};
|
||||
trace!(%filter_expr, "Filter_expr of pruning chunks");
|
||||
|
||||
// TODO: performance optimization: batch the chunk pruning by
|
||||
// grouping the chunks with the same types for all columns
|
||||
|
|
|
@ -13,6 +13,7 @@ use datafusion::{
|
|||
ExecutionPlan, PhysicalExpr,
|
||||
},
|
||||
};
|
||||
use observability_deps::tracing::trace;
|
||||
use schema::sort::SortKey;
|
||||
|
||||
/// Create a logical plan that produces the record batch
|
||||
|
@ -76,6 +77,10 @@ pub fn df_physical_expr(
|
|||
let input_physical_schema = input.schema();
|
||||
let input_logical_schema: DFSchema = input_physical_schema.as_ref().clone().try_into()?;
|
||||
|
||||
trace!(%expr, "logical expression");
|
||||
trace!(%input_logical_schema, "input logical schema");
|
||||
trace!(%input_physical_schema, "input physical schema");
|
||||
|
||||
physical_planner.create_physical_expr(
|
||||
&expr,
|
||||
&input_logical_schema,
|
||||
|
|
|
@ -70,6 +70,29 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Setup for many scenario move chunk from from MUB to RUB to OS
|
||||
/// No delete in this case
|
||||
pub struct NoDeleteOneChunk {}
|
||||
#[async_trait]
|
||||
impl DbSetup for NoDeleteOneChunk {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "cpu";
|
||||
// chunk data
|
||||
let lp_lines = vec![
|
||||
"cpu,foo=me bar=1 10",
|
||||
"cpu,foo=you bar=2 20",
|
||||
"cpu,foo=me bar=1 30",
|
||||
"cpu,foo=me bar=1 40",
|
||||
];
|
||||
|
||||
// this returns 15 scenarios
|
||||
all_delete_scenarios_for_one_chunk(vec![], vec![], lp_lines, table_name, partition_key)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
|
||||
pub struct OneDeleteMultiExprsOneChunk {}
|
||||
|
|
|
@ -52,6 +52,20 @@ async fn sql_select_from_cpu() {
|
|||
run_sql_test_case(TwoMeasurements {}, "SELECT * from cpu", &expected).await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_from_cpu_min_utf8() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| MIN(cpu.region |",
|
||||
"+----------------+",
|
||||
"| west |",
|
||||
"+----------------+",
|
||||
];
|
||||
run_sql_test_case(TwoMeasurements {}, "SELECT min(region) from cpu", &expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_from_cpu_2021() {
|
||||
let expected = vec![
|
||||
|
@ -870,16 +884,56 @@ async fn sql_select_all_different_tags_chunks() {
|
|||
// Delete tests
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_delete_all() {
|
||||
// select *
|
||||
let expected = vec!["++", "++"];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {},
|
||||
"SELECT * from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// select a specific column
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {},
|
||||
"SELECT time from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Count
|
||||
let expected = vec![
|
||||
"+-----------------+----------------+-----------------+",
|
||||
"| COUNT(UInt8(1)) | COUNT(cpu.bar) | COUNT(cpu.time) |",
|
||||
"+-----------------+----------------+-----------------+",
|
||||
"| 0 | 0 | 0 |",
|
||||
"+-----------------+----------------+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {},
|
||||
"SELECT count(*), count(bar), count(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Min & Max
|
||||
let expected = vec![
|
||||
"+--------------+--------------+---------------+---------------+",
|
||||
"| MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |",
|
||||
"+--------------+--------------+---------------+---------------+",
|
||||
"| | | | |",
|
||||
"+--------------+--------------+---------------+---------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {},
|
||||
"SELECT min(bar), max(bar), min(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr() {
|
||||
let expected = vec![
|
||||
|
@ -896,8 +950,120 @@ async fn sql_select_with_delete_from_one_expr() {
|
|||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// select all explicit columns
|
||||
let expected = vec![
|
||||
"+--------------------------------+-----+",
|
||||
"| time | bar |",
|
||||
"+--------------------------------+-----+",
|
||||
"| 1970-01-01T00:00:00.000000020Z | 2 |",
|
||||
"+--------------------------------+-----+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT time, bar from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_min_max() {
|
||||
// Min & Max of bar only
|
||||
let expected = vec![
|
||||
"+--------------+--------------+",
|
||||
"| MIN(cpu.bar) | MAX(cpu.bar) |",
|
||||
"+--------------+--------------+",
|
||||
"| 2 | 2 |",
|
||||
"+--------------+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT min(bar), max(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_time() {
|
||||
// one column, time, and no cover all 2 columns, time and bar, of the delete predicate
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| time |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000020Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT time from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2779
|
||||
// inconsistent format returned
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_max_time() {
|
||||
// max on one column and no cover all 2 columns, time and bar, of the delete predicate
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| MAX(cpu.time) |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000020Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_count_max_time() {
|
||||
// Count and max on one column and no cover all 2 columns, time and bar, of the delete predicate
|
||||
let expected = vec![
|
||||
"+-----------------+--------------------------------+",
|
||||
"| COUNT(cpu.time) | MAX(cpu.time) |",
|
||||
"+-----------------+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----------------+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT count(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_count_time() {
|
||||
// Count and max on one column and no cover all 2 columns, time and bar, of the delete predicate
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(cpu.time) |",
|
||||
"+-----------------+",
|
||||
"| 1 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT count(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_one_expr_with_select_predicate() {
|
||||
// Select predicate does not eliminate extra rows
|
||||
|
@ -925,8 +1091,25 @@ async fn sql_select_with_delete_from_one_expr_with_select_predicate() {
|
|||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Count, min and max
|
||||
let expected = vec![
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| 1 | 1 | 1 | 2 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteSimpleExprOneChunk {},
|
||||
"SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs() {
|
||||
let expected = vec![
|
||||
|
@ -944,8 +1127,317 @@ async fn sql_select_with_delete_from_multi_exprs() {
|
|||
&expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// select one column, delete predicates on all 3 columns of the table
|
||||
let expected = vec![
|
||||
"+-----+", "| bar |", "+-----+", "| 1 |", "| 2 |", "+-----+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT bar from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
// tests without delete
|
||||
#[tokio::test]
|
||||
async fn sql_select_without_delete_agg() {
|
||||
// Count, min and max on many columns but not `foo` that is included in delete predicate
|
||||
let expected = vec![
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| 4 | 4 | 4 | 1 | 2 | 1970-01-01T00:00:00.000000010Z | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::NoDeleteOneChunk {},
|
||||
"SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_without_delete_max_foo() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MAX(cpu.foo) |",
|
||||
"+--------------+",
|
||||
"| you |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::NoDeleteOneChunk {},
|
||||
"SELECT max(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: as as above
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_without_delete_min_foo() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MIN(cpu.foo) |",
|
||||
"+--------------+",
|
||||
"| me |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::NoDeleteOneChunk {},
|
||||
"SELECT min(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_agg() {
|
||||
// Count, min and max on many columns but not `foo` that is included in delete predicate
|
||||
let expected = vec![
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
"| 2 | 2 | 2 | 1 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000040Z |",
|
||||
"+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_count_time() {
|
||||
// Count, min and max on many columns but not `foo` that is included in delete predicate
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(cpu.time) |",
|
||||
"+-----------------+",
|
||||
"| 2 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT count(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_count_foo() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.foo) |",
|
||||
"+----------------+",
|
||||
"| 2 |",
|
||||
"+----------------+",
|
||||
];
|
||||
|
||||
// OneDeleteMultiExprsOneChunk's delete predicates include columns foo, bar and time
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT count(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_count_bar() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.bar) |",
|
||||
"+----------------+",
|
||||
"| 2 |",
|
||||
"+----------------+",
|
||||
];
|
||||
|
||||
// OneDeleteMultiExprsOneChunk's delete predicates include columns foo, bar and time
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT count(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_count_star() {
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(UInt8(1)) |",
|
||||
"+-----------------+",
|
||||
"| 2 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT count(*) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_min_bar() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MIN(cpu.bar) |",
|
||||
"+--------------+",
|
||||
"| 1 |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT min(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_min_foo() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MIN(cpu.foo) |",
|
||||
"+--------------+",
|
||||
"| me |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT min(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_foo() {
|
||||
let expected = vec![
|
||||
"+-----+", "| foo |", "+-----+", "| me |", "| you |", "+-----+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT foo from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2779
|
||||
// inconsistent format returned
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_min_time() {
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| MIN(cpu.time) |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000020Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT min(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_time() {
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| time |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000020Z |",
|
||||
"| 1970-01-01T00:00:00.000000040Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT time from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_max_bar() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MAX(cpu.bar) |",
|
||||
"+--------------+",
|
||||
"| 2 |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT max(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_max_foo() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.foo) |",
|
||||
"+----------------+",
|
||||
"| 2 |",
|
||||
"+----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT max(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2779
|
||||
// inconsistent format returned
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_max_time() {
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| MAX(cpu.time) |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000040Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::OneDeleteMultiExprsOneChunk {},
|
||||
"SELECT max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_delete_from_multi_exprs_with_select_predicate() {
|
||||
// not eliminate extra row
|
||||
|
@ -1038,6 +1530,8 @@ async fn sql_select_with_two_deletes_from_multi_exprs_with_select_predicate() {
|
|||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks() {
|
||||
let expected = vec![
|
||||
|
@ -1062,10 +1556,102 @@ async fn sql_select_with_three_deletes_from_three_chunks() {
|
|||
.await;
|
||||
}
|
||||
|
||||
// Bug: https://github.com/influxdata/influxdb_iox/issues/2745
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_count_star() {
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(UInt8(1)) |",
|
||||
"+-----------------+",
|
||||
"| 7 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(*) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_count_time() {
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(cpu.time) |",
|
||||
"+-----------------+",
|
||||
"| 7 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_count_foo() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.foo) |",
|
||||
"+----------------+",
|
||||
"| 7 |",
|
||||
"+----------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_count_bar() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.bar) |",
|
||||
"+----------------+",
|
||||
"| 7 |",
|
||||
"+----------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_min_bar() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MIN(cpu.bar) |",
|
||||
"+--------------+",
|
||||
"| 1 |",
|
||||
"+--------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT min(bar) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_count_with_three_deletes_from_three_chunks() {
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_max_foo() {
|
||||
let expected = vec![
|
||||
"+-----+-----+--------------------------------+",
|
||||
"| bar | foo | time |",
|
||||
|
@ -1076,12 +1662,32 @@ async fn sql_select_count_with_three_deletes_from_three_chunks() {
|
|||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(*) from cpu",
|
||||
"SELECT max(foo) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_min_max_time() {
|
||||
let expected = vec![
|
||||
"+--------------------------------+--------------------------------+",
|
||||
"| MIN(cpu.time) | MAX(cpu.time) |",
|
||||
"+--------------------------------+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000040Z | 1970-01-01T00:00:00.000000080Z |",
|
||||
"+--------------------------------+--------------------------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT min(time), max(time) from cpu",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
@ -1175,3 +1781,110 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate()
|
|||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// --------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_bar() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MIN(cpu.bar) |",
|
||||
"+--------------+",
|
||||
"| 1 |",
|
||||
"+--------------+",
|
||||
];
|
||||
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT min(bar) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// BUG: https://github.com/influxdata/influxdb_iox/issues/2776
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_max_foo() {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| MAX(cpu.foo) |",
|
||||
"+--------------+",
|
||||
"| me |",
|
||||
"+--------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT max(foo) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_time() {
|
||||
let expected = vec![
|
||||
"+--------------------------------+",
|
||||
"| MIN(cpu.time) |",
|
||||
"+--------------------------------+",
|
||||
"| 1970-01-01T00:00:00.000000040Z |",
|
||||
"+--------------------------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT min(time) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_bar() {
|
||||
let expected = vec![
|
||||
"+----------------+",
|
||||
"| COUNT(cpu.bar) |",
|
||||
"+----------------+",
|
||||
"| 6 |",
|
||||
"+----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(bar) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_time() {
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(cpu.time) |",
|
||||
"+-----------------+",
|
||||
"| 6 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(time) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_star() {
|
||||
let expected = vec![
|
||||
"+-----------------+",
|
||||
"| COUNT(UInt8(1)) |",
|
||||
"+-----------------+",
|
||||
"| 6 |",
|
||||
"+-----------------+",
|
||||
];
|
||||
run_sql_test_case(
|
||||
scenarios::delete::ThreeDeleteThreeChunks {},
|
||||
"SELECT count(*) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);",
|
||||
&expected,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue