fix: Remove db crate from query_tests
parent
b4894c2b46
commit
96f0c88b48
|
@ -1,27 +1,22 @@
|
|||
This crate contains "integration" tests for the query
|
||||
engine. Specifically, it runs queries against a fully created `Db`
|
||||
instance, records the output, and compares it to expected output.
|
||||
This crate contains "integration" tests for the query engine. Specifically, it runs queries against
|
||||
a fully created database instance, records the output, and compares it to expected output.
|
||||
|
||||
This crate does not (yet) run queries against a fully functioning
|
||||
server (aka run of `influxdb_iox` binary) but that may be added at
|
||||
some future time.
|
||||
|
||||
Some tests simply have their inputs and outputs hard coded into
|
||||
`#[test]` annotated tests as is rust's norm
|
||||
|
||||
The tests in `src/runner` are driven somewhat more dynamically based on input files
|
||||
Some tests simply have their inputs and outputs hard coded into `#[test]` annotated tests as is
|
||||
Rust's norm.
|
||||
|
||||
The tests in `src/runner` are driven somewhat more dynamically based on input files.
|
||||
|
||||
# Cookbook: Adding a new Test
|
||||
|
||||
How do you make a new test:
|
||||
How to make a new test:
|
||||
|
||||
1. Add a new file .sql to the `cases/in` directory
|
||||
2. Regenerate file: `(cd generate && cargo run)`
|
||||
2. Run the tests `` cargo test -p query_tests`
|
||||
3. You will get a failure message that contains examples of how to update the files
|
||||
|
||||
|
||||
## Example output
|
||||
|
||||
```
|
||||
Possibly helpful commands:
|
||||
# See diff
|
||||
|
@ -32,9 +27,11 @@ Possibly helpful commands:
|
|||
|
||||
# Cookbook: Adding a new test scenario
|
||||
|
||||
Each test can be defined in terms of a "setup" (a set of actions taken to prepare the state of database)
|
||||
Each test can be defined in terms of a "setup" (a set of actions taken to prepare the state of
|
||||
database).
|
||||
|
||||
In the future we envision more fine grained control of these setups (by implementing some of the database commands as IOX_TEST commands) but for now they are hard coded.
|
||||
In the future, we envision more fine grained control of these setups (by implementing some of the
|
||||
database commands as IOX_TEST commands), but for now they are hard coded.
|
||||
|
||||
The SQL files refer to the setups with a specially formatted comment:
|
||||
|
||||
|
@ -42,4 +39,4 @@ The SQL files refer to the setups with a specially formatted comment:
|
|||
-- IOX_SETUP: OneMeasurementFourChunksWithDuplicates
|
||||
```
|
||||
|
||||
To add a new setup, follow the pattern in scenario.rs of `get_all_setups`;
|
||||
To add a new setup, follow the pattern in scenario.rs of `get_all_setups`.
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
-- Test Setup: OneMeasurementAllChunksDropped
|
||||
-- SQL: SELECT * from information_schema.tables where table_schema = 'iox';
|
||||
+---------------+--------------+------------+------------+
|
||||
| table_catalog | table_schema | table_name | table_type |
|
||||
+---------------+--------------+------------+------------+
|
||||
| public | iox | h2o | BASE TABLE |
|
||||
+---------------+--------------+------------+------------+
|
|
@ -1,4 +0,0 @@
|
|||
-- IOX_SETUP: OneMeasurementAllChunksDropped
|
||||
|
||||
-- list information schema (show that all the chunks were dropped)
|
||||
SELECT * from information_schema.tables where table_schema = 'iox';
|
|
@ -58,10 +58,10 @@ fn find_sql_files(root: &Path) -> Vec<PathBuf> {
|
|||
sqls
|
||||
}
|
||||
|
||||
/// writes out what will be the rust test file that lists out .sqls
|
||||
/// Writes out what will be the Rust test file that lists out .sqls
|
||||
fn make_cases_rs(sqls: &[PathBuf]) -> Vec<String> {
|
||||
let mut output_lines: Vec<String> = vec![r#"
|
||||
//! This file is auto generated by build.rs
|
||||
//! This file is auto generated by query_tests/generate.
|
||||
//! Do not edit manually --> will result in sadness
|
||||
use std::path::Path;
|
||||
use crate::runner::Runner;"#
|
||||
|
|
|
@ -1,129 +0,0 @@
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use db::{test_helpers::write_lp, utils::TestDb};
|
||||
use object_store::{DynObjectStore, ObjectStoreImpl, ObjectStoreIntegration};
|
||||
use query::{
|
||||
exec::{ExecutionContextProvider, IOxSessionContext},
|
||||
frontend::sql::SqlQueryPlanner,
|
||||
QueryChunk,
|
||||
};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_cancellation_slow_store() {
|
||||
let object_store = ObjectStoreImpl::new_in_memory_throttled(Default::default());
|
||||
let throttle_config = match &object_store.integration {
|
||||
ObjectStoreIntegration::InMemoryThrottled(t) => Arc::clone(&t.config),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store);
|
||||
|
||||
// create test DB
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(Arc::clone(&object_store))
|
||||
.build()
|
||||
.await;
|
||||
let db = test_db.db;
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "cpu";
|
||||
|
||||
// create persisted chunk
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data);
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
let id = db
|
||||
.persist_partition("cpu", partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
|
||||
// unload read buffer from persisted chunk so that object store access is required
|
||||
db.unload_read_buffer(table_name, partition_key, id)
|
||||
.unwrap();
|
||||
|
||||
// create in-memory chunk
|
||||
let data = "cpu,region=east user=0.1 42";
|
||||
write_lp(&db, data);
|
||||
|
||||
// make store access really slow
|
||||
throttle_config.lock().unwrap().wait_get_per_call = Duration::from_secs(1_000);
|
||||
|
||||
// setup query context
|
||||
let ctx = db.new_query_context(None);
|
||||
wait_for_tasks(&ctx, 0).await;
|
||||
|
||||
// query fast part
|
||||
let expected_fast = vec![
|
||||
"+--------+--------------------------------+------+",
|
||||
"| region | time | user |",
|
||||
"+--------+--------------------------------+------+",
|
||||
"| east | 1970-01-01T00:00:00.000000042Z | 0.1 |",
|
||||
"+--------+--------------------------------+------+",
|
||||
];
|
||||
let query_fast = "select * from cpu where region='east'";
|
||||
let physical_plan = SqlQueryPlanner::default()
|
||||
.query(query_fast, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = ctx.collect(physical_plan).await.unwrap();
|
||||
assert_batches_sorted_eq!(&expected_fast, &batches);
|
||||
wait_for_tasks(&ctx, 0).await;
|
||||
|
||||
// query blocked part
|
||||
let query_slow = "select * from cpu where region='west'";
|
||||
let physical_plan = SqlQueryPlanner::default()
|
||||
.query(query_slow, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let ctx_captured = ctx.child_ctx("slow");
|
||||
let passed = Arc::new(AtomicBool::new(false));
|
||||
let passed_captured = Arc::clone(&passed);
|
||||
let join_handle = tokio::spawn(async move {
|
||||
ctx_captured.collect(physical_plan).await.unwrap();
|
||||
passed_captured.store(true, Ordering::SeqCst);
|
||||
});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
assert!(!passed.load(Ordering::SeqCst));
|
||||
wait_for_tasks(&ctx, 1).await;
|
||||
|
||||
// querying fast part should not be blocked
|
||||
let physical_plan = SqlQueryPlanner::default()
|
||||
.query(query_fast, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = ctx.collect(physical_plan).await.unwrap();
|
||||
assert_batches_sorted_eq!(&expected_fast, &batches);
|
||||
wait_for_tasks(&ctx, 1).await;
|
||||
|
||||
// canceling the blocking query should free resources again
|
||||
// cancelation might take a short while
|
||||
join_handle.abort();
|
||||
wait_for_tasks(&ctx, 0).await;
|
||||
assert!(!passed.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
/// Wait up to 10s for correct task count.
|
||||
async fn wait_for_tasks(ctx: &IOxSessionContext, n: usize) {
|
||||
tokio::time::timeout(Duration::from_secs(10), async {
|
||||
loop {
|
||||
if dbg!(ctx.tasks()) == n {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
|
@ -1,23 +1,9 @@
|
|||
|
||||
//! This file is auto generated by build.rs
|
||||
//! This file is auto generated by query_tests/generate.
|
||||
//! Do not edit manually --> will result in sadness
|
||||
use std::path::Path;
|
||||
use crate::runner::Runner;
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "all_chunks_dropped.sql",
|
||||
async fn test_cases_all_chunks_dropped_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("all_chunks_dropped.sql");
|
||||
let mut runner = Runner::new();
|
||||
runner
|
||||
.run(input_path)
|
||||
.await
|
||||
.expect("test failed");
|
||||
runner
|
||||
.flush()
|
||||
.expect("flush worked");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "basic.sql",
|
||||
async fn test_cases_basic_sql() {
|
||||
|
@ -103,9 +89,9 @@ async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "new_sql_system_tables.sql",
|
||||
async fn test_cases_new_sql_system_tables_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql");
|
||||
// Tests from "duplicates.sql",
|
||||
async fn test_cases_duplicates_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("duplicates.sql");
|
||||
let mut runner = Runner::new();
|
||||
runner
|
||||
.run(input_path)
|
||||
|
@ -117,9 +103,9 @@ async fn test_cases_new_sql_system_tables_sql() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "duplicates.sql",
|
||||
async fn test_cases_duplicates_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("duplicates.sql");
|
||||
// Tests from "new_sql_system_tables.sql",
|
||||
async fn test_cases_new_sql_system_tables_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql");
|
||||
let mut runner = Runner::new();
|
||||
runner
|
||||
.run(input_path)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use datafusion::catalog::catalog::CatalogProvider;
|
||||
use db::Db;
|
||||
use querier::QuerierNamespace;
|
||||
use query::{exec::ExecutionContextProvider, QueryDatabase};
|
||||
|
||||
|
@ -20,20 +19,6 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase
|
|||
fn as_query_database(&self) -> &dyn QueryDatabase;
|
||||
}
|
||||
|
||||
impl AbstractDb for Db {
|
||||
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn as_catalog_provider_arc(self: Arc<Self>) -> Arc<dyn CatalogProvider> {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn as_query_database(&self) -> &dyn QueryDatabase {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AbstractDb for QuerierNamespace {
|
||||
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
|
||||
self as _
|
||||
|
|
|
@ -50,12 +50,6 @@ async fn run_field_columns_test_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_field_columns_empty_database() {
|
||||
let expected_fields = FieldList::default();
|
||||
run_field_columns_test_case(NoData {}, InfluxRpcPredicate::default(), expected_fields).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_field_columns_no_predicate() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
|
|
@ -3,8 +3,8 @@ use std::sync::Arc;
|
|||
|
||||
#[cfg(test)]
|
||||
use crate::scenarios::{
|
||||
DbScenario, DbSetup, NoData, TwoMeasurements, TwoMeasurementsManyFields,
|
||||
TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll,
|
||||
DbScenario, DbSetup, TwoMeasurements, TwoMeasurementsManyFields, TwoMeasurementsWithDelete,
|
||||
TwoMeasurementsWithDeleteAll,
|
||||
};
|
||||
use crate::{
|
||||
db::AbstractDb,
|
||||
|
@ -93,13 +93,6 @@ async fn run_read_filter_error_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_no_data_no_pred() {
|
||||
let expected_results = vec![] as Vec<&str>;
|
||||
|
||||
run_read_filter_test_case(NoData {}, InfluxRpcPredicate::default(), expected_results).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_data_no_pred() {
|
||||
let expected_results = vec![
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
scenarios::{
|
||||
AnotherMeasurementForAggs, DbScenario, DbSetup, MeasurementForDefect2691,
|
||||
MeasurementForGroupByField, MeasurementForGroupKeys, MeasurementForMax, MeasurementForMin,
|
||||
MeasurementForSelectors, NoData, OneMeasurementForAggs, OneMeasurementNoTags2,
|
||||
MeasurementForSelectors, OneMeasurementForAggs, OneMeasurementNoTags2,
|
||||
OneMeasurementNoTagsWithDelete, OneMeasurementNoTagsWithDeleteAllWithAndWithoutChunk,
|
||||
TwoMeasurementForAggs, TwoMeasurementsManyFields, TwoMeasurementsManyFieldsOneChunk,
|
||||
},
|
||||
|
@ -60,22 +60,6 @@ async fn run_read_group_test_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_group_no_data_no_pred() {
|
||||
let agg = Aggregate::Mean;
|
||||
let group_columns = vec![] as Vec<&str>;
|
||||
let expected_results = vec![] as Vec<&str>;
|
||||
|
||||
run_read_group_test_case(
|
||||
NoData {},
|
||||
InfluxRpcPredicate::default(),
|
||||
agg,
|
||||
group_columns,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_group_data_no_tag_columns() {
|
||||
// Count
|
||||
|
|
|
@ -49,24 +49,6 @@ async fn run_read_window_aggregate_test_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_window_aggregate_no_data_no_pred() {
|
||||
let agg = Aggregate::Mean;
|
||||
let every = WindowDuration::from_nanoseconds(200);
|
||||
let offset = WindowDuration::from_nanoseconds(0);
|
||||
let expected_results = vec![] as Vec<&str>;
|
||||
|
||||
run_read_window_aggregate_test_case(
|
||||
NoData {},
|
||||
InfluxRpcPredicate::default(),
|
||||
agg,
|
||||
every,
|
||||
offset,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_window_aggregate_nanoseconds() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
@ -159,28 +141,6 @@ async fn test_read_window_aggregate_nanoseconds_measurement_count() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_window_aggregate_months() {
|
||||
let agg = Aggregate::Mean;
|
||||
let every = WindowDuration::from_months(1, false);
|
||||
let offset = WindowDuration::from_months(0, false);
|
||||
|
||||
// note the name of the field is "temp" even though it is the average
|
||||
let expected_results = vec![
|
||||
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [1585699200000000000, 1588291200000000000], values: [70.5, 72.5]",
|
||||
];
|
||||
|
||||
run_read_window_aggregate_test_case(
|
||||
MeasurementForWindowAggregateMonths {},
|
||||
InfluxRpcPredicate::default(),
|
||||
agg,
|
||||
every,
|
||||
offset,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/2697
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
|
||||
|
|
|
@ -51,11 +51,6 @@ async fn run_table_names_test_case<D>(
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_data_no_pred() {
|
||||
run_table_names_test_case(NoData {}, InfluxRpcPredicate::default(), vec![]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_table_names_no_data_pred() {
|
||||
run_table_names_test_case(
|
||||
|
|
|
@ -1,22 +1,16 @@
|
|||
//! This module contains "end to end" tests for the query layer.
|
||||
//!
|
||||
//! These tests consist of loading the same data in several
|
||||
//! "scenarios" (different distributions across the Mutable Buffer,
|
||||
//! Immutable Buffer, and (eventually) Parquet files, running queries
|
||||
//! against it and verifying the same answer is produced in all scenarios
|
||||
//! These tests consist of loading the same data in several "scenarios", running queries against it
|
||||
//! and verifying the same answer is produced in all scenarios.
|
||||
|
||||
// Actual tests
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod cancellation;
|
||||
#[cfg(test)]
|
||||
#[rustfmt::skip]
|
||||
mod cases;
|
||||
#[cfg(test)]
|
||||
pub mod influxrpc;
|
||||
#[cfg(test)]
|
||||
pub mod pruning;
|
||||
#[cfg(test)]
|
||||
mod runner;
|
||||
#[cfg(test)]
|
||||
pub mod sql;
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use db::{
|
||||
test_helpers::write_lp,
|
||||
utils::{make_db, TestDb},
|
||||
};
|
||||
use metric::{Attributes, Metric, U64Counter};
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::{stringset::StringSet, ExecutionContextProvider},
|
||||
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
|
||||
};
|
||||
|
||||
async fn setup() -> TestDb {
|
||||
// Test that partition pruning is connected up
|
||||
let test_db = make_db().await;
|
||||
let db = &test_db.db;
|
||||
|
||||
// Chunk 0 has bar:[1-2]
|
||||
write_lp(db, "cpu bar=1 10");
|
||||
write_lp(db, "cpu bar=2 20");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.compact_open_chunk("cpu", partition_key).await.unwrap();
|
||||
|
||||
// Chunk 1 has bar:[3-3] (going to get pruned)
|
||||
write_lp(db, "cpu bar=3 10");
|
||||
write_lp(db, "cpu bar=3 100");
|
||||
write_lp(db, "cpu bar=3 1000");
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
db.compact_open_chunk("cpu", partition_key).await.unwrap();
|
||||
|
||||
test_db
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_pruning_sql() {
|
||||
::test_helpers::maybe_start_logging();
|
||||
// Test that partition pruning is connected up
|
||||
let TestDb {
|
||||
db,
|
||||
metric_registry,
|
||||
..
|
||||
} = setup().await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+--------------------------------+",
|
||||
"| bar | time |",
|
||||
"+-----+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 2 | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----+--------------------------------+",
|
||||
];
|
||||
let query = "select * from cpu where bar < 3.0";
|
||||
|
||||
let ctx = db.new_query_context(None);
|
||||
let physical_plan = SqlQueryPlanner::default().query(query, &ctx).await.unwrap();
|
||||
let batches = ctx.collect(physical_plan).await.unwrap();
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
let database_attributes = Attributes::from(&[("db_name", "placeholder")]);
|
||||
// Validate that the chunk was pruned using the metrics
|
||||
let pruned_chunks = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_pruned_chunks")
|
||||
.unwrap()
|
||||
.get_observer(&database_attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(pruned_chunks, 1);
|
||||
|
||||
// Validate that the chunk was pruned using the metrics
|
||||
let pruned_rows = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_pruned_rows")
|
||||
.unwrap()
|
||||
.get_observer(&database_attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(pruned_rows, 3);
|
||||
|
||||
// Validate that it recorded that pruning took place
|
||||
let prune_count = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_prune")
|
||||
.unwrap()
|
||||
.get_observer(&database_attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(prune_count, 2);
|
||||
|
||||
// Validate that it recorded that the catalog was snapshotted
|
||||
let snapshot_count = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_catalog_snapshot")
|
||||
.unwrap()
|
||||
.get_observer(&database_attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(snapshot_count, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_pruning_influxrpc() {
|
||||
::test_helpers::maybe_start_logging();
|
||||
// Test that partition pruning is connected up
|
||||
let TestDb {
|
||||
db,
|
||||
metric_registry,
|
||||
..
|
||||
} = setup().await;
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
// bar < 3.0
|
||||
.add_expr(col("bar").lt(lit(3.0)))
|
||||
.build();
|
||||
let rpc_predicate = InfluxRpcPredicate::new(None, predicate);
|
||||
|
||||
let mut expected = StringSet::new();
|
||||
expected.insert("cpu".into());
|
||||
|
||||
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
|
||||
|
||||
let plan = InfluxRpcPlanner::default()
|
||||
.table_names(db.as_ref(), rpc_predicate)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let result = ctx.to_string_set(plan).await.unwrap();
|
||||
|
||||
assert_eq!(&expected, result.as_ref());
|
||||
|
||||
let attributes = Attributes::from(&[("db_name", "placeholder")]);
|
||||
// Validate that the chunk was pruned using the metrics
|
||||
let pruned_chunks = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_pruned_chunks")
|
||||
.unwrap()
|
||||
.get_observer(&attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(pruned_chunks, 1);
|
||||
|
||||
// Validate that the chunk was pruned using the metrics
|
||||
let pruned_rows = metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("query_access_pruned_rows")
|
||||
.unwrap()
|
||||
.get_observer(&attributes)
|
||||
.unwrap()
|
||||
.fetch();
|
||||
assert_eq!(pruned_rows, 3);
|
||||
}
|
|
@ -53,7 +53,6 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
|
|||
register_setup!(TwoMeasurementsPredicatePushDown),
|
||||
register_setup!(TwoMeasurementsManyFieldsOneChunk),
|
||||
register_setup!(OneMeasurementFourChunksWithDuplicates),
|
||||
register_setup!(OneMeasurementAllChunksDropped),
|
||||
register_setup!(ThreeDeleteThreeChunks),
|
||||
register_setup!(OneDeleteSimpleExprOneChunkDeleteAll),
|
||||
register_setup!(OneDeleteSimpleExprOneChunk),
|
||||
|
|
|
@ -1,23 +1,15 @@
|
|||
//! Library of test scenarios that can be used in query_tests
|
||||
use super::{
|
||||
util::{all_scenarios_for_one_chunk, make_two_chunk_scenarios, ChunkStageNew},
|
||||
DbScenario, DbSetup,
|
||||
};
|
||||
use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
delete_predicate::{DeleteExpr, DeletePredicate},
|
||||
timestamp::TimestampRange,
|
||||
};
|
||||
use db::{
|
||||
test_helpers::write_lp,
|
||||
utils::{
|
||||
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
|
||||
},
|
||||
};
|
||||
use query::{frontend::sql::SqlQueryPlanner, QueryChunk};
|
||||
|
||||
use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew};
|
||||
|
||||
use super::{
|
||||
util::{all_scenarios_for_one_chunk, make_two_chunk_scenarios, ChunkStageNew},
|
||||
DbScenario, DbSetup,
|
||||
};
|
||||
use query::frontend::sql::SqlQueryPlanner;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MeasurementWithMaxTime {}
|
||||
|
@ -37,127 +29,6 @@ impl DbSetup for MeasurementWithMaxTime {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: rewrite this for NG. Need to see whether we will need to test empty DB and if so how
|
||||
// <https://github.com/influxdata/influxdb_iox/issues/4488>
|
||||
/// No data
|
||||
#[derive(Debug)]
|
||||
pub struct NoData {}
|
||||
#[async_trait]
|
||||
impl DbSetup for NoData {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "cpu";
|
||||
|
||||
// Scenario 1: No data in the DB yet
|
||||
//
|
||||
let db = make_db().await.db;
|
||||
let scenario1 = DbScenario {
|
||||
scenario_name: "New, Empty Database".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// Scenario 2: listing partitions (which may create an entry in a map)
|
||||
// in an empty database
|
||||
//
|
||||
let db = make_db().await.db;
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
let scenario2 = DbScenario {
|
||||
scenario_name: "New, Empty Database after partitions are listed".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// Scenario 3: the database has had data loaded into RB and then deleted
|
||||
//
|
||||
let db = make_db().await.db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data);
|
||||
// move data out of open chunk
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1); //
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing yet
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
let chunk_id = db
|
||||
.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // close chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// drop chunk
|
||||
db.drop_chunk(table_name, partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing after dropping chunk 0
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // still nothing
|
||||
|
||||
let scenario3 = DbScenario {
|
||||
scenario_name: "Empty Database after drop chunk that is in read buffer".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// Scenario 4: the database has had data loaded into RB & Object Store and then deleted
|
||||
//
|
||||
let db = make_db().await.db;
|
||||
let data = "cpu,region=west user=23.2 100";
|
||||
write_lp(&db, data);
|
||||
// move data out of open chunk
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 1); // 1 open chunk
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0); // nothing yet
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now load the closed chunk into the RB
|
||||
db.compact_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // close chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
|
||||
|
||||
// Now write the data in RB to object store but keep it in RB
|
||||
let chunk_id = db
|
||||
.persist_partition("cpu", partition_key, true)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
|
||||
assert_eq!(count_read_buffer_chunks(&db), 1); // closed chunk only
|
||||
assert_eq!(count_object_store_chunks(&db), 1); // close chunk only
|
||||
|
||||
// drop chunk
|
||||
db.drop_chunk(table_name, partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(count_mutable_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_read_buffer_chunks(&db), 0);
|
||||
assert_eq!(count_object_store_chunks(&db), 0);
|
||||
|
||||
let scenario4 = DbScenario {
|
||||
scenario_name:
|
||||
"Empty Database after drop chunk that is in both read buffer and object store"
|
||||
.into(),
|
||||
db,
|
||||
};
|
||||
|
||||
vec![scenario1, scenario2, scenario3, scenario4]
|
||||
}
|
||||
}
|
||||
|
||||
/// a measurement with timestamps in 2021
|
||||
#[derive(Debug)]
|
||||
pub struct OneMeasurementRealisticTimes {}
|
||||
|
@ -806,36 +677,6 @@ impl DbSetup for EndToEndTestWithDelete {
|
|||
}
|
||||
}
|
||||
|
||||
/// This creates two chunks but then drops them all. This should keep the tables.
|
||||
#[derive(Debug)]
|
||||
pub struct OneMeasurementAllChunksDropped {}
|
||||
#[async_trait]
|
||||
impl DbSetup for OneMeasurementAllChunksDropped {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let db = make_db().await.db;
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "h2o";
|
||||
|
||||
let lp_lines = vec!["h2o,state=MA temp=70.4 50"];
|
||||
write_lp(&db, &lp_lines.join("\n"));
|
||||
let chunk_id = db
|
||||
.compact_open_chunk(table_name, partition_key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.id();
|
||||
db.drop_chunk(table_name, partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
vec![DbScenario {
|
||||
scenario_name: "one measurement but all chunks are dropped".into(),
|
||||
db,
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TwoMeasurementsMultiSeries {}
|
||||
#[async_trait]
|
||||
|
@ -1309,68 +1150,6 @@ impl DbSetup for MeasurementForWindowAggregate {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct MeasurementForWindowAggregateMonths {}
|
||||
#[async_trait]
|
||||
impl DbSetup for MeasurementForWindowAggregateMonths {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
// Note the lines are written into 4 different partititions (as we are
|
||||
// partitioned by day, effectively)
|
||||
let lp_lines = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.0 1583020800000000000", // 2020-03-01T00:00:00Z
|
||||
"h2o,state=MA,city=Boston temp=71.0 1583107920000000000", // 2020-03-02T00:12:00Z
|
||||
"h2o,state=MA,city=Boston temp=72.0 1585699200000000000", // 2020-04-01T00:00:00Z
|
||||
"h2o,state=MA,city=Boston temp=73.0 1585785600000000000", // 2020-04-02T00:00:00Z
|
||||
];
|
||||
// partition keys are: ["2020-03-02T00", "2020-03-01T00", "2020-04-01T00",
|
||||
// "2020-04-02T00"]
|
||||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data);
|
||||
let scenario1 = DbScenario {
|
||||
scenario_name: "Data in 4 partitions, open chunks of mutable buffer".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data);
|
||||
db.rollover_partition("h2o", "2020-03-01T00").await.unwrap();
|
||||
db.rollover_partition("h2o", "2020-03-02T00").await.unwrap();
|
||||
let scenario2 = DbScenario {
|
||||
scenario_name:
|
||||
"Data in 4 partitions, two open chunk and two closed chunks of mutable buffer"
|
||||
.into(),
|
||||
db,
|
||||
};
|
||||
|
||||
let db = make_db().await.db;
|
||||
let data = lp_lines.join("\n");
|
||||
write_lp(&db, &data);
|
||||
// roll over and load chunks into both RUB and OS
|
||||
db.persist_partition("h2o", "2020-03-01T00", true)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition("h2o", "2020-03-02T00", true)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition("h2o", "2020-04-01T00", true)
|
||||
.await
|
||||
.unwrap();
|
||||
db.persist_partition("h2o", "2020-04-02T00", true)
|
||||
.await
|
||||
.unwrap();
|
||||
let scenario3 = DbScenario {
|
||||
scenario_name: "Data in 4 partitions, 4 closed chunks in mutable buffer".into(),
|
||||
db,
|
||||
};
|
||||
|
||||
// TODO: Add a scenario for OS only in #1342
|
||||
|
||||
vec![scenario1, scenario2, scenario3]
|
||||
}
|
||||
}
|
||||
|
||||
// Test data to validate fix for:
|
||||
// https://github.com/influxdata/influxdb_iox/issues/2697
|
||||
pub struct MeasurementForDefect2697 {}
|
||||
|
|
Loading…
Reference in New Issue