test: retention test for querier inthe query_tests (#6220)

pull/24376/head
Nga Tran 2022-11-23 12:04:14 -05:00 committed by GitHub
parent 71b29ae442
commit 52d70b060a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 177 additions and 4 deletions

1
Cargo.lock generated
View File

@ -4143,6 +4143,7 @@ dependencies = [
"iox_catalog", "iox_catalog",
"iox_query", "iox_query",
"iox_tests", "iox_tests",
"iox_time",
"itertools", "itertools",
"mutable_batch", "mutable_batch",
"mutable_batch_lp", "mutable_batch_lp",

View File

@ -23,6 +23,7 @@ ingester = { path = "../ingester" }
iox_catalog = { path = "../iox_catalog" } iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" } iox_query = { path = "../iox_query" }
iox_tests = { path = "../iox_tests" } iox_tests = { path = "../iox_tests" }
iox_time = { path = "../iox_time" }
itertools = "0.10" itertools = "0.10"
mutable_batch = { path = "../mutable_batch" } mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -0,0 +1,71 @@
-- Test Setup: ThreeChunksWithRetention
-- SQL: SELECT * FROM cpu order by host, load, time;
+------+------+----------------------+
| host | load | time |
+------+------+----------------------+
| a | 1 | 1970-01-01T00:00:00Z |
| b | 2 | 1970-01-01T00:00:00Z |
| bb | 21 | 1970-01-01T00:00:00Z |
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time;
-- Results After Normalizing UUIDs
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | TableScan: cpu projection=[host, load, time] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | SortExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet], output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
+------+------+----------------------+
| host | load | time |
+------+------+----------------------+
| a | 1 | 1970-01-01T00:00:00Z |
| bb | 21 | 1970-01-01T00:00:00Z |
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
-- Results After Normalizing UUIDs
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) |
| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: host@0 != b |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | SortExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet], predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -0,0 +1,16 @@
-- Test for retention policy
-- IOX_SETUP: ThreeChunksWithRetention
-- should return 3 rows
SELECT * FROM cpu order by host, load, time;
-- should see only 2 chunks with predicate pushed down to ParquetExec
-- IOX_COMPARE: uuid
EXPLAIN SELECT * FROM cpu order by host, load, time;
-- should return 2 rows
SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
-- should see only 2 chunks with predicate pushed down to ParquetExec
-- IOX_COMPARE: uuid
EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;

View File

@ -132,6 +132,22 @@ async fn test_cases_pushdown_sql() {
.expect("flush worked"); .expect("flush worked");
} }
#[tokio::test]
// Tests from "retention.sql",
async fn test_cases_retention_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("retention.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test] #[tokio::test]
// Tests from "selectors.sql", // Tests from "selectors.sql",
async fn test_cases_selectors_sql() { async fn test_cases_selectors_sql() {

View File

@ -68,6 +68,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(AllTypes), register_setup!(AllTypes),
register_setup!(TwoChunksDedupWeirdnessParquet), register_setup!(TwoChunksDedupWeirdnessParquet),
register_setup!(TwoChunksDedupWeirdnessParquetIngester), register_setup!(TwoChunksDedupWeirdnessParquetIngester),
register_setup!(ThreeChunksWithRetention),
] ]
.into_iter() .into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>)) .map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))

View File

@ -1,12 +1,16 @@
//! Library of test scenarios that can be used in query_tests //! Library of test scenarios that can be used in query_tests
use super::{ use super::{
util::{all_scenarios_for_one_chunk, make_two_chunk_scenarios, ChunkStage}, util::{
all_scenarios_for_one_chunk, make_n_chunks_scenario_with_retention,
make_two_chunk_scenarios, ChunkStage,
},
DbScenario, DbSetup, DbScenario, DbSetup,
}; };
use crate::scenarios::util::{make_n_chunks_scenario, ChunkData}; use crate::scenarios::util::{make_n_chunks_scenario, ChunkData};
use async_trait::async_trait; use async_trait::async_trait;
use iox_query::frontend::sql::SqlQueryPlanner; use iox_query::frontend::sql::SqlQueryPlanner;
use iox_time::{MockProvider, Time, TimeProvider};
#[derive(Debug)] #[derive(Debug)]
pub struct MeasurementWithMaxTime {} pub struct MeasurementWithMaxTime {}
@ -961,6 +965,59 @@ impl DbSetup for MeasurementForDefect2890 {
} }
} }
// Test data for retention policy
#[derive(Debug)]
pub struct ThreeChunksWithRetention {}
#[async_trait]
impl DbSetup for ThreeChunksWithRetention {
async fn make(&self) -> Vec<DbScenario> {
// Same time provider as the one used in n_chunks scenarios
let time_provider = MockProvider::new(Time::from_timestamp(0, 0).unwrap());
let retention_period_1_hour_ns = 3600 * 1_000_000_000;
let inside_retention = time_provider.now().timestamp_nanos(); // now
let outside_retention = inside_retention - retention_period_1_hour_ns - 10; // over one hour ago
let partition_key = "1970-01-01T00"; //"test_partition";
let l1 = format!("cpu,host=a load=1 {}", inside_retention);
let l2 = format!("cpu,host=aa load=11 {}", outside_retention);
let lp_partially_inside = vec![l1.as_str(), l2.as_str()];
let l3 = format!("cpu,host=b load=2 {}", inside_retention);
let l4 = format!("cpu,host=bb load=21 {}", inside_retention);
let lp_fully_inside = vec![l3.as_str(), l4.as_str()];
let l5 = format!("cpu,host=z load=3 {}", outside_retention);
let l6 = format!("cpu,host=zz load=31 {}", outside_retention);
let lp_fully_outside = vec![l5.as_str(), l6.as_str()];
let c_partially_inside = ChunkData {
lp_lines: lp_partially_inside,
partition_key,
chunk_stage: Some(ChunkStage::Parquet),
..Default::default()
};
let c_fully_inside = ChunkData {
lp_lines: lp_fully_inside,
partition_key,
chunk_stage: Some(ChunkStage::Parquet),
..Default::default()
};
let c_fully_outside = ChunkData {
lp_lines: lp_fully_outside,
partition_key,
chunk_stage: Some(ChunkStage::Parquet),
..Default::default()
};
make_n_chunks_scenario_with_retention(
&[c_partially_inside, c_fully_inside, c_fully_outside],
Some(retention_period_1_hour_ns),
)
.await
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TwoChunksMissingColumns {} pub struct TwoChunksMissingColumns {}
#[async_trait] #[async_trait]

View File

@ -381,8 +381,14 @@ pub async fn make_two_chunk_scenarios(
]) ])
.await .await
} }
pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScenario> { pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScenario> {
make_n_chunks_scenario_with_retention(chunks, None).await
}
pub async fn make_n_chunks_scenario_with_retention(
chunks: &[ChunkData<'_, '_>],
retention_period_ns: Option<i64>,
) -> Vec<DbScenario> {
let n_stages_unset = chunks let n_stages_unset = chunks
.iter() .iter()
.filter(|chunk| chunk.chunk_stage.is_none()) .filter(|chunk| chunk.chunk_stage.is_none())
@ -439,7 +445,7 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScena
// build scenario // build scenario
let mut scenario_name = format!("{} chunks:", chunks.len()); let mut scenario_name = format!("{} chunks:", chunks.len());
let mut mock_ingester = MockIngester::new().await; let mut mock_ingester = MockIngester::new_with_retention(retention_period_ns).await;
for chunk_data in chunks { for chunk_data in chunks {
let name = make_chunk(&mut mock_ingester, chunk_data).await; let name = make_chunk(&mut mock_ingester, chunk_data).await;
@ -651,10 +657,14 @@ static GLOBAL_EXEC: Lazy<Arc<DedicatedExecutors>> =
impl MockIngester { impl MockIngester {
/// Create new empty ingester. /// Create new empty ingester.
async fn new() -> Self { async fn new() -> Self {
Self::new_with_retention(None).await
}
async fn new_with_retention(retention_period_ns: Option<i64>) -> Self {
let exec = Arc::clone(&GLOBAL_EXEC); let exec = Arc::clone(&GLOBAL_EXEC);
let catalog = TestCatalog::with_execs(exec, 4); let catalog = TestCatalog::with_execs(exec, 4);
let ns = catalog let ns = catalog
.create_namespace_with_retention("test_db", None) .create_namespace_with_retention("test_db", retention_period_ns)
.await; .await;
let shard = ns.create_shard(1).await; let shard = ns.create_shard(1).await;