test: run SOME query tests for querier (#4098)
This includes some type changes to dispatch between OG and NG and allows some tests to be run against the NG querier. This only contains parquet files though, so it's somewhat a limited scope. For #3934.pull/24376/head
parent
c3ef56588f
commit
89206e013c
|
@ -4237,10 +4237,13 @@ dependencies = [
|
|||
"data_types",
|
||||
"datafusion 0.1.0",
|
||||
"db",
|
||||
"iox_tests",
|
||||
"metric",
|
||||
"mutable_batch_lp",
|
||||
"object_store",
|
||||
"once_cell",
|
||||
"predicate",
|
||||
"querier",
|
||||
"query",
|
||||
"schema",
|
||||
"snafu",
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Caches used by the querier.
|
||||
use backoff::BackoffConfig;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use std::sync::Arc;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Namespace cache.
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Partition cache.
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Processed tombstone cache.
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//! Table cache.
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
pub use client_util::connection;
|
||||
|
||||
mod cache;
|
||||
pub mod cache;
|
||||
mod cache_system;
|
||||
mod chunk;
|
||||
pub mod database;
|
||||
|
|
|
@ -12,10 +12,13 @@ async-trait = "0.1"
|
|||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
db = { path = "../db" }
|
||||
iox_tests = { path = "../iox_tests" }
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
once_cell = { version = "1.10.0", features = ["parking_lot"] }
|
||||
predicate = { path = "../predicate" }
|
||||
schema = { path = "../schema" }
|
||||
trace = { path = "../trace" }
|
||||
querier = { path = "../querier" }
|
||||
query = { path = "../query" }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@
|
|||
| 1 |
|
||||
+--------------+
|
||||
-- SQL: SELECT foo from cpu;
|
||||
-- Results After Sorting
|
||||
+-----+
|
||||
| foo |
|
||||
+-----+
|
||||
|
@ -158,11 +159,12 @@
|
|||
| you | 1970-01-01T00:00:00.000000020Z |
|
||||
+-----+--------------------------------+
|
||||
-- SQL: SELECT time from cpu;
|
||||
-- Results After Sorting
|
||||
+--------------------------------+
|
||||
| time |
|
||||
+--------------------------------+
|
||||
| 1970-01-01T00:00:00.000000040Z |
|
||||
| 1970-01-01T00:00:00.000000020Z |
|
||||
| 1970-01-01T00:00:00.000000040Z |
|
||||
+--------------------------------+
|
||||
-- SQL: SELECT max(bar) from cpu order by 1;
|
||||
+--------------+
|
||||
|
|
|
@ -20,6 +20,7 @@ SELECT count(*) from cpu;
|
|||
|
||||
SELECT min(bar) from cpu;
|
||||
|
||||
-- IOX_COMPARE: sorted
|
||||
SELECT foo from cpu;
|
||||
|
||||
SELECT min(foo) as min_foo from cpu order by min_foo;
|
||||
|
@ -41,7 +42,7 @@ SELECT bar, min(time) as min_time from cpu group by bar order by bar, min_time;
|
|||
SELECT max(time) as max_time from cpu group by foo order by max_time;
|
||||
SELECT foo, max(time) as max_time from cpu group by foo order by foo, max_time;
|
||||
|
||||
|
||||
-- IOX_COMPARE: sorted
|
||||
SELECT time from cpu;
|
||||
|
||||
SELECT max(bar) from cpu order by 1;
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::{any::Any, sync::Arc};
|
|||
|
||||
use datafusion::catalog::catalog::CatalogProvider;
|
||||
use db::Db;
|
||||
use querier::namespace::QuerierNamespace;
|
||||
use query::{exec::ExecutionContextProvider, QueryDatabase};
|
||||
|
||||
/// Abstract database used during testing.
|
||||
|
@ -32,3 +33,17 @@ impl AbstractDb for Db {
|
|||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AbstractDb for QuerierNamespace {
|
||||
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn as_catalog_provider_arc(self: Arc<Self>) -> Arc<dyn CatalogProvider> {
|
||||
self as _
|
||||
}
|
||||
|
||||
fn as_query_database(&self) -> &dyn QueryDatabase {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,13 +5,12 @@ use data_types::timestamp::TimestampRange;
|
|||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::scenarios::util::{
|
||||
all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario, ChunkData,
|
||||
ChunkStage,
|
||||
};
|
||||
|
||||
use super::util::make_os_chunks_and_then_compact_with_different_scenarios_with_delete;
|
||||
use super::{DbScenario, DbSetup};
|
||||
use crate::scenarios::util::{
|
||||
all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old,
|
||||
make_os_chunks_and_then_compact_with_different_scenarios_with_delete, ChunkDataOld,
|
||||
ChunkStageOld,
|
||||
};
|
||||
|
||||
// =========================================================================================================================
|
||||
// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and when they happen
|
||||
|
@ -258,21 +257,21 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: MUB, RUB, OS
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Rub,
|
||||
chunk_stage: ChunkStageOld::Rub,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Mubo,
|
||||
chunk_stage: ChunkStageOld::Mubo,
|
||||
},
|
||||
];
|
||||
let preds = vec![&pred1, &pred2, &pred3];
|
||||
let scenario_mub_rub_os = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_mub_rub_os = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
@ -283,20 +282,20 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: 1 MUB open, 1 MUB frozen, 1 RUB
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Rub,
|
||||
chunk_stage: ChunkStageOld::Rub,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Mubf,
|
||||
chunk_stage: ChunkStageOld::Mubf,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Mubo,
|
||||
chunk_stage: ChunkStageOld::Mubo,
|
||||
},
|
||||
];
|
||||
let scenario_2mub_rub = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_2mub_rub = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
@ -307,20 +306,20 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: 2 MUB, 1 OS
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Mubf,
|
||||
chunk_stage: ChunkStageOld::Mubf,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Mubo,
|
||||
chunk_stage: ChunkStageOld::Mubo,
|
||||
},
|
||||
];
|
||||
let scenario_2mub_os = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_2mub_os = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
@ -331,20 +330,20 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: 2 RUB, 1 OS
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Rub,
|
||||
chunk_stage: ChunkStageOld::Rub,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Rub,
|
||||
chunk_stage: ChunkStageOld::Rub,
|
||||
},
|
||||
];
|
||||
let scenario_2rub_os = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_2rub_os = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
@ -355,20 +354,20 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: RUB, 2 OS
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Rub,
|
||||
chunk_stage: ChunkStageOld::Rub,
|
||||
},
|
||||
];
|
||||
let scenario_rub_2os = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_rub_2os = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
@ -379,20 +378,20 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
// ----------------------
|
||||
// 3 chunks: 3 OS
|
||||
let lp = vec![
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_1.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_2.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
ChunkData {
|
||||
ChunkDataOld {
|
||||
lp_lines: lp_lines_3.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
},
|
||||
];
|
||||
let scenario_3os = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario_3os = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
lp,
|
||||
preds.clone(),
|
||||
table_name,
|
||||
|
|
|
@ -8,6 +8,9 @@ use db::{
|
|||
Db,
|
||||
};
|
||||
use query::QueryChunk;
|
||||
use schema::merge::SchemaMerger;
|
||||
use schema::selection::Selection;
|
||||
use std::collections::BTreeMap;
|
||||
use std::{fmt::Display, sync::Arc};
|
||||
|
||||
// Structs, enums, and functions used to exhaust all test scenarios of chunk life cycle
|
||||
|
@ -15,15 +18,29 @@ use std::{fmt::Display, sync::Arc};
|
|||
|
||||
// STRUCTs & ENUMs
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChunkData<'a> {
|
||||
pub struct ChunkDataOld<'a> {
|
||||
/// Line protocol data of this chunk
|
||||
pub lp_lines: Vec<&'a str>,
|
||||
/// which stage this chunk will be created
|
||||
pub chunk_stage: ChunkStage,
|
||||
pub chunk_stage: ChunkStageOld,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChunkStage {
|
||||
pub struct ChunkDataNew<'a> {
|
||||
/// Line protocol data of this chunk
|
||||
pub lp_lines: Vec<&'a str>,
|
||||
/// which stage this chunk will be created
|
||||
pub chunk_stage: ChunkStageNew,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChunkData<'a> {
|
||||
Old(ChunkDataOld<'a>),
|
||||
New(ChunkDataNew<'a>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ChunkStageOld {
|
||||
/// Open MUB
|
||||
Mubo,
|
||||
/// Frozen MUB
|
||||
|
@ -36,14 +53,57 @@ pub enum ChunkStage {
|
|||
Os,
|
||||
}
|
||||
|
||||
impl Display for ChunkStageOld {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Mubo => write!(f, "Open MUB"),
|
||||
Self::Mubf => write!(f, "Frozen MUB"),
|
||||
Self::Rub => write!(f, "RUB"),
|
||||
Self::RubOs => write!(f, "RUB & OS"),
|
||||
Self::Os => write!(f, "OS"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkStageOld {
|
||||
/// return the list of all chunk types
|
||||
pub fn all() -> Vec<Self> {
|
||||
vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ChunkStageNew {
|
||||
/// In parquet file.
|
||||
Parquet,
|
||||
}
|
||||
|
||||
impl Display for ChunkStageNew {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Parquet => write!(f, "Parquet"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkStageNew {
|
||||
/// return the list of all chunk types
|
||||
pub fn all() -> Vec<Self> {
|
||||
vec![Self::Parquet]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ChunkStage {
|
||||
Old(ChunkStageOld),
|
||||
New(ChunkStageNew),
|
||||
}
|
||||
|
||||
impl Display for ChunkStage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ChunkStage::Mubo => write!(f, "Open MUB"),
|
||||
ChunkStage::Mubf => write!(f, "Frozen MUB"),
|
||||
ChunkStage::Rub => write!(f, "RUB"),
|
||||
ChunkStage::RubOs => write!(f, "RUB & OS"),
|
||||
ChunkStage::Os => write!(f, "OS"),
|
||||
Self::Old(stage) => write!(f, "Old: {}", stage),
|
||||
Self::New(stage) => write!(f, "New: {}", stage),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -51,20 +111,53 @@ impl Display for ChunkStage {
|
|||
impl ChunkStage {
|
||||
/// return the list of all chunk types
|
||||
pub fn all() -> Vec<Self> {
|
||||
vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os]
|
||||
ChunkStageOld::all()
|
||||
.into_iter()
|
||||
.map(ChunkStage::Old)
|
||||
.chain(ChunkStageNew::all().into_iter().map(ChunkStage::New))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Pred<'a> {
|
||||
pub struct PredOld<'a> {
|
||||
/// Delete predicate
|
||||
predicate: &'a DeletePredicate,
|
||||
/// At which chunk stage this predicate is applied
|
||||
delete_time: DeleteTime,
|
||||
delete_time: DeleteTimeOld,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum DeleteTime {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PredNew<'a> {
|
||||
/// Delete predicate
|
||||
predicate: &'a DeletePredicate,
|
||||
/// At which chunk stage this predicate is applied
|
||||
delete_time: DeleteTimeNew,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Pred<'a> {
|
||||
Old(PredOld<'a>),
|
||||
New(PredNew<'a>),
|
||||
}
|
||||
|
||||
impl<'a> Pred<'a> {
|
||||
fn new(predicate: &'a DeletePredicate, delete_time: DeleteTime) -> Self {
|
||||
match delete_time {
|
||||
DeleteTime::Old(delete_time) => Self::Old(PredOld {
|
||||
predicate,
|
||||
delete_time,
|
||||
}),
|
||||
DeleteTime::New(delete_time) => Self::New(PredNew {
|
||||
predicate,
|
||||
delete_time,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DeleteTimeOld {
|
||||
/// Delete predicate happens after all chunks created
|
||||
/// and moved to their corresponding stages
|
||||
End,
|
||||
|
@ -80,26 +173,84 @@ pub enum DeleteTime {
|
|||
Os,
|
||||
}
|
||||
|
||||
impl DeleteTimeOld {
|
||||
/// Return all DeleteTime at and after the given chunk stage
|
||||
pub fn all_from_and_before(chunk_stage: ChunkStageOld) -> Vec<Self> {
|
||||
match chunk_stage {
|
||||
ChunkStageOld::Mubo => vec![Self::Mubo],
|
||||
ChunkStageOld::Mubf => vec![Self::Mubo, Self::Mubf],
|
||||
ChunkStageOld::Rub => {
|
||||
vec![Self::Mubo, Self::Mubf, Self::Rub]
|
||||
}
|
||||
ChunkStageOld::RubOs => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs],
|
||||
ChunkStageOld::Os => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin() -> Self {
|
||||
Self::Mubo
|
||||
}
|
||||
|
||||
pub fn end() -> Self {
|
||||
Self::End
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DeleteTimeNew {
|
||||
/// Delete predicate is added to chunks at their parquet stage
|
||||
Parquet,
|
||||
}
|
||||
|
||||
impl DeleteTimeNew {
|
||||
/// Return all DeleteTime at and after the given chunk stage
|
||||
pub fn all_from_and_before(chunk_stage: ChunkStageNew) -> Vec<DeleteTimeNew> {
|
||||
match chunk_stage {
|
||||
ChunkStageNew::Parquet => vec![Self::Parquet],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin() -> Self {
|
||||
Self::Parquet
|
||||
}
|
||||
|
||||
pub fn end() -> Self {
|
||||
Self::Parquet
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DeleteTime {
|
||||
Old(DeleteTimeOld),
|
||||
New(DeleteTimeNew),
|
||||
}
|
||||
|
||||
impl DeleteTime {
|
||||
/// Return all DeleteTime at and after the given chunk stage
|
||||
pub fn all_from_and_before(chunk_stage: ChunkStage) -> Vec<DeleteTime> {
|
||||
pub fn all_from_and_before(chunk_stage: ChunkStage) -> Vec<Self> {
|
||||
match chunk_stage {
|
||||
ChunkStage::Mubo => vec![DeleteTime::Mubo],
|
||||
ChunkStage::Mubf => vec![DeleteTime::Mubo, DeleteTime::Mubf],
|
||||
ChunkStage::Rub => vec![DeleteTime::Mubo, DeleteTime::Mubf, DeleteTime::Rub],
|
||||
ChunkStage::RubOs => vec![
|
||||
DeleteTime::Mubo,
|
||||
DeleteTime::Mubf,
|
||||
DeleteTime::Rub,
|
||||
DeleteTime::RubOs,
|
||||
],
|
||||
ChunkStage::Os => vec![
|
||||
DeleteTime::Mubo,
|
||||
DeleteTime::Mubf,
|
||||
DeleteTime::Rub,
|
||||
DeleteTime::RubOs,
|
||||
DeleteTime::Os,
|
||||
],
|
||||
ChunkStage::Old(chunk_stage) => DeleteTimeOld::all_from_and_before(chunk_stage)
|
||||
.into_iter()
|
||||
.map(Self::Old)
|
||||
.collect(),
|
||||
ChunkStage::New(chunk_stage) => DeleteTimeNew::all_from_and_before(chunk_stage)
|
||||
.into_iter()
|
||||
.map(Self::New)
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin_for(chunk_stage: ChunkStage) -> Self {
|
||||
match chunk_stage {
|
||||
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::begin()),
|
||||
ChunkStage::New(_) => Self::New(DeleteTimeNew::begin()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn end_for(chunk_stage: ChunkStage) -> Self {
|
||||
match chunk_stage {
|
||||
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::end()),
|
||||
ChunkStage::New(_) => Self::New(DeleteTimeNew::end()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,15 +272,6 @@ pub async fn all_scenarios_for_one_chunk(
|
|||
// Partition of the chunk
|
||||
partition_key: &str,
|
||||
) -> Vec<DbScenario> {
|
||||
// Make delete predicates that happen when all chunks in their final stages
|
||||
let end_preds: Vec<Pred> = at_end_preds
|
||||
.iter()
|
||||
.map(|p| Pred {
|
||||
predicate: *p,
|
||||
delete_time: DeleteTime::End,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut scenarios = vec![];
|
||||
// Go over chunk stages
|
||||
for chunk_stage in ChunkStage::all() {
|
||||
|
@ -137,19 +279,22 @@ pub async fn all_scenarios_for_one_chunk(
|
|||
// all stages at and before that in the life cycle to the chunk
|
||||
// But only need to get all delete times if chunk_stage_preds is not empty,
|
||||
// otherwise, produce only one scenario of each chunk stage
|
||||
let mut delete_times = vec![DeleteTime::Mubo];
|
||||
let mut delete_times = vec![DeleteTime::begin_for(chunk_stage)];
|
||||
if !chunk_stage_preds.is_empty() {
|
||||
delete_times = DeleteTime::all_from_and_before(chunk_stage.clone())
|
||||
delete_times = DeleteTime::all_from_and_before(chunk_stage)
|
||||
};
|
||||
|
||||
// Make delete predicates that happen when all chunks in their final stages
|
||||
let end_preds: Vec<Pred> = at_end_preds
|
||||
.iter()
|
||||
.map(|p| Pred::new(*p, DeleteTime::end_for(chunk_stage)))
|
||||
.collect();
|
||||
|
||||
for delete_time in delete_times {
|
||||
// make delete predicate with time it happens
|
||||
let mut preds: Vec<Pred> = chunk_stage_preds
|
||||
.iter()
|
||||
.map(|p| Pred {
|
||||
predicate: *p,
|
||||
delete_time: delete_time.clone(),
|
||||
})
|
||||
.map(|p| Pred::new(*p, delete_time))
|
||||
.collect();
|
||||
// extend at-end predicates
|
||||
preds.extend(end_preds.clone());
|
||||
|
@ -158,7 +303,7 @@ pub async fn all_scenarios_for_one_chunk(
|
|||
scenarios.push(
|
||||
make_chunk_with_deletes_at_different_stages(
|
||||
lp_lines.clone(),
|
||||
chunk_stage.clone(),
|
||||
chunk_stage,
|
||||
preds,
|
||||
delete_table_name,
|
||||
partition_key,
|
||||
|
@ -174,12 +319,59 @@ pub async fn all_scenarios_for_one_chunk(
|
|||
/// Build a chunk that may move with life cycle before/after deletes
|
||||
/// Note that the only chunk in this function can be moved to different stages and delete predicates
|
||||
/// can be applied at different stages when the chunk is moved.
|
||||
pub async fn make_chunk_with_deletes_at_different_stages(
|
||||
async fn make_chunk_with_deletes_at_different_stages(
|
||||
lp_lines: Vec<&str>,
|
||||
chunk_stage: ChunkStage,
|
||||
preds: Vec<Pred<'_>>,
|
||||
delete_table_name: &str,
|
||||
partition_key: &str,
|
||||
) -> DbScenario {
|
||||
match chunk_stage {
|
||||
ChunkStage::Old(chunk_stage) => {
|
||||
let preds: Vec<_> = preds
|
||||
.into_iter()
|
||||
.map(|p| match p {
|
||||
Pred::Old(pred) => pred,
|
||||
Pred::New(_) => panic!("mixed new and old"),
|
||||
})
|
||||
.collect();
|
||||
|
||||
make_chunk_with_deletes_at_different_stages_old(
|
||||
lp_lines,
|
||||
chunk_stage,
|
||||
preds,
|
||||
delete_table_name,
|
||||
partition_key,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ChunkStage::New(chunk_stage) => {
|
||||
let preds: Vec<_> = preds
|
||||
.into_iter()
|
||||
.map(|p| match p {
|
||||
Pred::Old(_) => panic!("mixed new and old"),
|
||||
Pred::New(pred) => pred,
|
||||
})
|
||||
.collect();
|
||||
|
||||
make_chunk_with_deletes_at_different_stages_new(
|
||||
lp_lines,
|
||||
chunk_stage,
|
||||
preds,
|
||||
delete_table_name,
|
||||
partition_key,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_chunk_with_deletes_at_different_stages_old(
|
||||
lp_lines: Vec<&str>,
|
||||
chunk_stage: ChunkStageOld,
|
||||
preds: Vec<PredOld<'_>>,
|
||||
delete_table_name: &str,
|
||||
partition_key: &str,
|
||||
) -> DbScenario {
|
||||
let db = make_db().await.db;
|
||||
|
||||
|
@ -198,7 +390,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
let mut display = "".to_string();
|
||||
let mut count = 0;
|
||||
for pred in &preds {
|
||||
if pred.delete_time == DeleteTime::Mubo {
|
||||
if pred.delete_time == DeleteTimeOld::Mubo {
|
||||
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
|
||||
.unwrap();
|
||||
deleted = true;
|
||||
|
@ -212,7 +404,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// ----------------------
|
||||
// Freeze MUB if requested
|
||||
match chunk_stage {
|
||||
ChunkStage::Mubf | ChunkStage::Rub | ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
// Since mub are frozen at delete, no need to do it in that case for table of deleted data
|
||||
if !deleted {
|
||||
db.rollover_partition(delete_table_name, partition_key)
|
||||
|
@ -244,7 +436,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Apply delete predicate
|
||||
count = 0;
|
||||
for pred in &preds {
|
||||
if pred.delete_time == DeleteTime::Mubf {
|
||||
if pred.delete_time == DeleteTimeOld::Mubf {
|
||||
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
|
||||
.unwrap();
|
||||
count += 1;
|
||||
|
@ -257,7 +449,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// ----------------------
|
||||
// Move MUB to RUB if requested
|
||||
match chunk_stage {
|
||||
ChunkStage::Rub | ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
let mut no_more_data = false;
|
||||
for table in &tables {
|
||||
// Compact this MUB of this table
|
||||
|
@ -297,7 +489,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Apply delete predicate
|
||||
count = 0;
|
||||
for pred in &preds {
|
||||
if pred.delete_time == DeleteTime::Rub {
|
||||
if pred.delete_time == DeleteTimeOld::Rub {
|
||||
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
|
||||
.unwrap();
|
||||
count += 1;
|
||||
|
@ -310,7 +502,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// ----------------------
|
||||
// Persist RUB to OS if requested
|
||||
match chunk_stage {
|
||||
ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
let mut no_more_data = false;
|
||||
for table in &tables {
|
||||
// Persist RUB of this table
|
||||
|
@ -358,7 +550,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Apply delete predicate
|
||||
count = 0;
|
||||
for pred in &preds {
|
||||
if pred.delete_time == DeleteTime::RubOs {
|
||||
if pred.delete_time == DeleteTimeOld::RubOs {
|
||||
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
|
||||
.unwrap();
|
||||
count = 1;
|
||||
|
@ -370,7 +562,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
|
||||
// ----------------------
|
||||
// Unload RUB
|
||||
if let ChunkStage::Os = chunk_stage {
|
||||
if let ChunkStageOld::Os = chunk_stage {
|
||||
for table in &tables {
|
||||
// retrieve its chunk_id first
|
||||
let rub_chunk_ids = chunk_ids_rub(&db, Some(table.as_str()), Some(partition_key));
|
||||
|
@ -390,7 +582,7 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Apply delete predicate
|
||||
count = 0;
|
||||
for pred in &preds {
|
||||
if pred.delete_time == DeleteTime::Os || pred.delete_time == DeleteTime::End {
|
||||
if pred.delete_time == DeleteTimeOld::Os || pred.delete_time == DeleteTimeOld::End {
|
||||
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
|
||||
.unwrap();
|
||||
count += 1;
|
||||
|
@ -410,6 +602,146 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
DbScenario { scenario_name, db }
|
||||
}
|
||||
|
||||
async fn make_chunk_with_deletes_at_different_stages_new(
|
||||
lp_lines: Vec<&str>,
|
||||
chunk_stage: ChunkStageNew,
|
||||
preds: Vec<PredNew<'_>>,
|
||||
delete_table_name: &str,
|
||||
partition_key: &str,
|
||||
) -> DbScenario {
|
||||
use iox_tests::util::TestCatalog;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use querier::{cache::CatalogCache, namespace::QuerierNamespace};
|
||||
|
||||
// detect table names and schemas from LP lines
|
||||
let (lp_lines_grouped, schemas) = {
|
||||
let mut lp_lines_grouped: BTreeMap<_, Vec<_>> = BTreeMap::new();
|
||||
let mut schemas: BTreeMap<_, SchemaMerger> = BTreeMap::new();
|
||||
|
||||
for lp in lp_lines {
|
||||
let (table_name, batch) = lp_to_mutable_batch(lp);
|
||||
|
||||
lp_lines_grouped
|
||||
.entry(table_name.clone())
|
||||
.or_default()
|
||||
.push(lp);
|
||||
|
||||
let schema = batch.schema(Selection::All).unwrap();
|
||||
let merger = schemas.entry(table_name).or_default();
|
||||
*merger = merger.clone().merge(&schema).unwrap();
|
||||
}
|
||||
|
||||
let schemas: BTreeMap<_, _> = schemas
|
||||
.into_iter()
|
||||
.map(|(table_name, merger)| (table_name, merger.build()))
|
||||
.collect();
|
||||
|
||||
(lp_lines_grouped, schemas)
|
||||
};
|
||||
|
||||
// set up catalog
|
||||
let catalog = TestCatalog::new();
|
||||
let ns = catalog.create_namespace("test_db").await;
|
||||
let sequencer = ns.create_sequencer(1).await;
|
||||
let tables = {
|
||||
// need to use a temporary vector because BTree iterators ain't `Send`
|
||||
let table_names: Vec<_> = lp_lines_grouped.keys().cloned().collect();
|
||||
|
||||
let mut tables = BTreeMap::new();
|
||||
for table_name in table_names {
|
||||
let table = ns.create_table(&table_name).await;
|
||||
tables.insert(table_name, table);
|
||||
}
|
||||
tables
|
||||
};
|
||||
let partitions = {
|
||||
// need to use a temporary vector because BTree iterators ain't `Send`
|
||||
let tables: Vec<_> = tables.values().cloned().collect();
|
||||
|
||||
let mut partitions = BTreeMap::new();
|
||||
for table in tables {
|
||||
let partition = table
|
||||
.with_sequencer(&sequencer)
|
||||
.create_partition(partition_key)
|
||||
.await;
|
||||
partitions.insert(table.table.name.clone(), partition);
|
||||
}
|
||||
partitions
|
||||
};
|
||||
for (table_name, schema) in schemas {
|
||||
let table = tables.get(&table_name).unwrap();
|
||||
|
||||
for (t, field) in schema.iter() {
|
||||
let t = t.unwrap();
|
||||
table.create_column(field.name(), t.into()).await;
|
||||
}
|
||||
}
|
||||
|
||||
// create chunks
|
||||
match chunk_stage {
|
||||
ChunkStageNew::Parquet => {
|
||||
// need to use a temporary vector because BTree iterators ain't `Send`
|
||||
let lp_lines_grouped: Vec<_> = lp_lines_grouped.into_iter().collect();
|
||||
|
||||
for (table_name, lp_lines) in lp_lines_grouped {
|
||||
let partition = partitions.get(&table_name).unwrap();
|
||||
let min_seq = 1;
|
||||
let max_seq = 1;
|
||||
let min_time = 0;
|
||||
let max_time = 0;
|
||||
partition
|
||||
.create_parquet_file_with_min_max(
|
||||
&lp_lines.join("\n"),
|
||||
min_seq,
|
||||
max_seq,
|
||||
min_time,
|
||||
max_time,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// attach delete predicates
|
||||
let n_preds = preds.len();
|
||||
if let Some(table) = tables.get(delete_table_name) {
|
||||
for (i, pred) in preds.into_iter().enumerate() {
|
||||
match pred.delete_time {
|
||||
DeleteTimeNew::Parquet => {
|
||||
// parquet files got created w/ sequence number = 1
|
||||
let sequence_number = 2 + i;
|
||||
|
||||
let min_time = pred.predicate.range.start();
|
||||
let max_time = pred.predicate.range.end();
|
||||
let predicate = pred.predicate.expr_sql_string();
|
||||
table
|
||||
.with_sequencer(&sequencer)
|
||||
.create_tombstone(sequence_number as i64, min_time, max_time, &predicate)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let catalog_cache = Arc::new(CatalogCache::new(
|
||||
catalog.catalog(),
|
||||
catalog.time_provider(),
|
||||
));
|
||||
let db = Arc::new(QuerierNamespace::new(
|
||||
catalog_cache,
|
||||
ns.namespace.name.clone().into(),
|
||||
ns.namespace.id,
|
||||
catalog.metric_registry(),
|
||||
catalog.object_store(),
|
||||
catalog.time_provider(),
|
||||
catalog.exec(),
|
||||
));
|
||||
db.sync().await;
|
||||
|
||||
let scenario_name = format!("NG Chunk {} with {} deletes", chunk_stage, n_preds);
|
||||
DbScenario { scenario_name, db }
|
||||
}
|
||||
|
||||
/// Build many chunks which are in different stages
|
||||
// Note that, after a lot of thoughts, I decided to have 2 separated functions, this one and the one above.
|
||||
// The above tests delete predicates before and/or after a chunk is moved to different stages, while
|
||||
|
@ -417,7 +749,17 @@ pub async fn make_chunk_with_deletes_at_different_stages(
|
|||
// Even though these 2 functions have some overlapped code, merging them in one
|
||||
// function will created a much more complicated cases to handle
|
||||
pub async fn make_different_stage_chunks_with_deletes_scenario(
|
||||
data: Vec<ChunkData<'_>>,
|
||||
_data: Vec<ChunkData<'_>>,
|
||||
_preds: Vec<&DeletePredicate>,
|
||||
_table_name: &str,
|
||||
_partition_key: &str,
|
||||
) -> DbScenario {
|
||||
// this is used by `delete.rs` but currently that only generates OG data
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
pub async fn make_different_stage_chunks_with_deletes_scenario_old(
|
||||
data: Vec<ChunkDataOld<'_>>,
|
||||
preds: Vec<&DeletePredicate>,
|
||||
table_name: &str,
|
||||
partition_key: &str,
|
||||
|
@ -439,7 +781,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
// ----------
|
||||
// freeze MUB
|
||||
match chunk_data.chunk_stage {
|
||||
ChunkStage::Mubf | ChunkStage::Rub | ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
let chunk = db
|
||||
.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
|
@ -453,7 +795,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
// ----------
|
||||
// Move MUB to RUB
|
||||
match chunk_data.chunk_stage {
|
||||
ChunkStage::Rub | ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
let chunk = db
|
||||
.compact_chunks(table_name, partition_key, |chunk| chunk.id() == chunk_id)
|
||||
.await
|
||||
|
@ -467,7 +809,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
// ----------
|
||||
// Move RUB to OS
|
||||
match chunk_data.chunk_stage {
|
||||
ChunkStage::RubOs | ChunkStage::Os => {
|
||||
ChunkStageOld::RubOs | ChunkStageOld::Os => {
|
||||
let chunk = db
|
||||
.persist_partition(table_name, partition_key, true)
|
||||
.await
|
||||
|
@ -480,7 +822,7 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
|
|||
|
||||
// ----------
|
||||
// Unload RUB
|
||||
if let ChunkStage::Os = chunk_data.chunk_stage {
|
||||
if let ChunkStageOld::Os = chunk_data.chunk_stage {
|
||||
db.unload_read_buffer(table_name, partition_key, chunk_id)
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -578,14 +920,14 @@ pub async fn make_contiguous_os_chunks(
|
|||
// Define they are OS
|
||||
let mut chunk_data_vec = vec![];
|
||||
for lp_lines in lp_lines_vec {
|
||||
let chunk_data = ChunkData {
|
||||
let chunk_data = ChunkDataOld {
|
||||
lp_lines: lp_lines.clone(),
|
||||
chunk_stage: ChunkStage::Os,
|
||||
chunk_stage: ChunkStageOld::Os,
|
||||
};
|
||||
chunk_data_vec.push(chunk_data);
|
||||
}
|
||||
// Make db with those OS chunks
|
||||
let scenario = make_different_stage_chunks_with_deletes_scenario(
|
||||
let scenario = make_different_stage_chunks_with_deletes_scenario_old(
|
||||
chunk_data_vec,
|
||||
vec![], // not delete anything yet
|
||||
table_name,
|
||||
|
|
Loading…
Reference in New Issue