test: chunks in ingester stage (#4415)

* refactor: document and improve `MockIngesterConnection`

* refactor: split `OldOneMeasurementFourChunksWithDuplicates` for `EXPLAIN` queries

* fix: mark "IngsterPartition" chunks as unsorted

* fix: "group by" queries may require sorted comparison

* refactor: re-export a few more types from querier

* fix: ensure that test parquet files are de-duped

* test: chunks in ingester stage

* docs: explain test code
pull/24376/head
Marco Neumann 2022-04-26 09:55:19 +02:00 committed by GitHub
parent 4b47d723b1
commit 2337935660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 370 additions and 82 deletions

2
Cargo.lock generated
View File

@ -2647,6 +2647,7 @@ dependencies = [
"arrow",
"bytes",
"data_types2",
"datafusion 0.1.0",
"iox_catalog",
"iox_object_store",
"iox_time",
@ -4623,6 +4624,7 @@ dependencies = [
"arrow_util",
"async-trait",
"data_types",
"data_types2",
"datafusion 0.1.0",
"datafusion_util",
"db",

View File

@ -9,6 +9,7 @@ description = "IOx NG test utils and tests"
arrow = "12"
bytes = "1.0"
data_types2 = { path = "../data_types2" }
datafusion = { path = "../datafusion" }
iox_catalog = { path = "../iox_catalog" }
iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" }

View File

@ -10,6 +10,7 @@ use data_types2::{
ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, QueryPool, SequenceNumber,
Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId,
};
use datafusion::physical_plan::metrics::Count;
use iox_catalog::{
interface::{Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL},
mem::MemCatalog,
@ -19,7 +20,7 @@ use iox_time::{MockProvider, Time, TimeProvider};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{DynObjectStore, ObjectStoreImpl};
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::exec::Executor;
use query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_sort_key_exprs};
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
@ -472,6 +473,7 @@ impl TestPartition {
let row_count = record_batch.num_rows();
assert!(row_count > 0, "Parquet file must have at least 1 row");
let (record_batch, sort_key) = sort_batch(record_batch, schema);
let record_batch = dedup_batch(record_batch, &sort_key);
let object_store_id = Uuid::new_v4();
let min_sequence_number = SequenceNumber::new(min_seq);
@ -755,3 +757,16 @@ fn sort_batch(record_batch: RecordBatch, schema: Schema) -> (RecordBatch, SortKe
(record_batch, sort_key)
}
fn dedup_batch(record_batch: RecordBatch, sort_key: &SortKey) -> RecordBatch {
let schema = record_batch.schema();
let sort_keys = arrow_sort_key_exprs(sort_key, &schema);
let mut deduplicator = RecordBatchDeduplicator::new(sort_keys, Count::default(), None);
let mut batches = vec![deduplicator.push(record_batch).unwrap()];
if let Some(batch) = deduplicator.finish().unwrap() {
batches.push(batch);
}
RecordBatch::concat(&schema, &batches).unwrap()
}

View File

@ -6,18 +6,18 @@ use parking_lot::Mutex;
use super::IngesterConnection;
/// IngesterConnection for testing
#[derive(Debug)]
pub(crate) struct MockIngesterConnection {
#[derive(Debug, Default)]
pub struct MockIngesterConnection {
next_response: Mutex<Option<super::Result<Vec<Arc<super::IngesterPartition>>>>>,
}
impl MockIngesterConnection {
/// Create connection w/ an empty response.
pub fn new() -> Self {
Self {
next_response: Mutex::new(None),
}
Self::default()
}
/// Set next response for this connection.
pub fn next_response(&self, response: super::Result<Vec<Arc<super::IngesterPartition>>>) {
*self.next_response.lock() = Some(response);
}

View File

@ -28,7 +28,8 @@ pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::QuerierDatabase;
pub use handler::{QuerierHandler, QuerierHandlerImpl};
pub use ingester::{
create_ingester_connection, create_ingester_connection_for_testing, IngesterConnection,
create_ingester_connection, create_ingester_connection_for_testing, Error as IngesterError,
IngesterConnection, IngesterPartition,
};
pub use namespace::QuerierNamespace;
pub use server::QuerierServer;

View File

@ -38,6 +38,7 @@ pub mod overlap;
mod physical;
use self::overlap::{group_potential_duplicates, group_potential_duplicates_og};
pub(crate) use deduplicate::DeduplicateExec;
pub use deduplicate::RecordBatchDeduplicator;
pub(crate) use physical::IOxReadFilterNode;
#[derive(Debug, Snafu)]

View File

@ -11,7 +11,7 @@ use arrow::{
use async_trait::async_trait;
use datafusion_util::AdapterStream;
use self::algo::RecordBatchDeduplicator;
pub use self::algo::RecordBatchDeduplicator;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,

View File

@ -22,7 +22,7 @@ use crate::provider::deduplicate::key_ranges::key_ranges;
// [`RecordBatch`]es which are already sorted on a primary key,
// including primary keys which straddle RecordBatch boundaries
#[derive(Debug)]
pub(crate) struct RecordBatchDeduplicator {
pub struct RecordBatchDeduplicator {
sort_keys: Vec<PhysicalSortExpr>,
last_batch: Option<RecordBatch>,
num_dupes: metrics::Count,

View File

@ -8,8 +8,10 @@ description = "Tests of the query engine against different database configuratio
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { version = "12", features = ["prettyprint"] }
async-trait = "0.1"
data_types = { path = "../data_types" }
data_types2 = { path = "../data_types2" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
db = { path = "../db" }

View File

@ -108,6 +108,7 @@
| 1970-01-01T00:00:00.000000080Z |
+--------------------------------+
-- SQL: SELECT foo, min(time) from cpu group by foo;
-- Results After Sorting
+-----+--------------------------------+
| foo | MIN(cpu.time) |
+-----+--------------------------------+

View File

@ -28,6 +28,7 @@ SELECT max(foo) from cpu;
SELECT min(time) from cpu;
SELECT max(time) from cpu;
-- IOX_COMPARE: sorted
SELECT foo, min(time) from cpu group by foo;
SELECT bar, max(time) as max_time from cpu group by bar order by bar, max_time;
SELECT max(time) as max_time from cpu group by bar order by max_time;
@ -72,4 +73,4 @@ SELECT time, bar from cpu where bar >= 1.0 order by bar, time;
SELECT * from cpu where foo = 'you' order by bar, foo, time;
SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma;
SELECT min(bar) as mi, max(time) as ma from cpu where foo = 'you' order by mi, ma;

View File

@ -1,4 +1,4 @@
-- Test Setup: OneMeasurementFourChunksWithDuplicates
-- Test Setup: OldOneMeasurementFourChunksWithDuplicates
-- SQL: explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |

View File

@ -1,5 +1,5 @@
-- Test for predicate push down explains
-- IOX_SETUP: OneMeasurementFourChunksWithDuplicates
-- IOX_SETUP: OldOneMeasurementFourChunksWithDuplicates
-- Plan with order by
explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;

View File

@ -116,20 +116,6 @@ async fn test_cases_delete_two_del_multi_expr_one_chunk_sql() {
.expect("flush worked");
}
#[tokio::test]
// Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() {
let input_path = Path::new("cases").join("in").join("duplicates.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "new_sql_system_tables.sql",
async fn test_cases_new_sql_system_tables_sql() {
@ -158,6 +144,20 @@ async fn test_cases_no_stats_plans_sql() {
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_duplicates.sql",
async fn test_cases_old_duplicates_sql() {
let input_path = Path::new("cases").join("in").join("old_duplicates.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_sql_system_tables.sql",
async fn test_cases_old_sql_system_tables_sql() {

View File

@ -55,6 +55,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(OldTwoMeasurementsManyFieldsTwoChunks),
register_setup!(OldTwoMeasurementsManyFieldsOneRubChunk),
register_setup!(OneMeasurementFourChunksWithDuplicates),
register_setup!(OldOneMeasurementFourChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
register_setup!(ChunkOrder),
register_setup!(ThreeDeleteThreeChunks),

View File

@ -5,7 +5,7 @@ use data_types::timestamp::TimestampRange;
use async_trait::async_trait;
use super::util::{make_n_chunks_scenario_new, ChunkDataNew, PredNew};
use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew};
use super::{DbScenario, DbSetup};
use crate::scenarios::util::{
all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old,
@ -427,21 +427,46 @@ impl DbSetup for ThreeDeleteThreeChunks {
&mut make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines_1,
preds: vec![PredNew::end(&pred1)],
preds: vec![
PredNew {
predicate: &pred1,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred2,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
},
],
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_2,
preds: vec![PredNew::end(&pred2)],
preds: vec![
PredNew {
predicate: &pred2,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
},
],
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_3,
preds: vec![PredNew::end(&pred3)],
preds: vec![PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
}],
delete_table_name: Some(table_name),
partition_key,
..Default::default()

View File

@ -850,6 +850,25 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
}
}
#[derive(Debug)]
/// Setup for four chunks with duplicates for deduplicate tests
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldOneMeasurementFourChunksWithDuplicates {}
#[async_trait]
impl DbSetup for OldOneMeasurementFourChunksWithDuplicates {
async fn make(&self) -> Vec<DbScenario> {
let scenarios: Vec<_> = OneMeasurementFourChunksWithDuplicates {}
.make()
.await
.into_iter()
.filter(|s| s.scenario_name == "Data in four chunks with duplicates")
.collect();
assert_eq!(scenarios.len(), 1);
scenarios
}
}
#[derive(Debug)]
/// This has a single scenario with all the life cycle operations to
/// test queries that depend on that

View File

@ -1,6 +1,9 @@
//! This module contains util functions for testing scenarios
use super::DbScenario;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{chunk_metadata::ChunkId, delete_predicate::DeletePredicate};
use data_types2::SequenceNumber;
use datafusion_util::batch_filter;
use db::test_helpers::chunk_ids_rub;
use db::{
@ -12,11 +15,15 @@ use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::{TestCatalog, TestNamespace};
use itertools::Itertools;
use predicate::PredicateBuilder;
use querier::{create_ingester_connection_for_testing, QuerierNamespace};
use query::util::df_physical_expr_from_schema_and_expr;
use querier::{IngesterConnection, IngesterError, IngesterPartition, QuerierNamespace};
use query::util::{
compute_timenanosecond_min_max_for_one_record_batch, df_physical_expr_from_schema_and_expr,
};
use query::QueryChunk;
use schema::merge::SchemaMerger;
use schema::selection::Selection;
use std::any::Any;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Write;
use std::{fmt::Display, sync::Arc};
@ -64,6 +71,23 @@ impl<'a, 'b> ChunkDataNew<'a, 'b> {
..self
}
}
/// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`].
fn replace_begin_and_end_delete_times(self) -> Self {
Self {
preds: self
.preds
.into_iter()
.map(|pred| {
pred.replace_begin_and_end_delete_times(
self.chunk_stage
.expect("chunk stage must be set at this point"),
)
})
.collect(),
..self
}
}
}
#[derive(Debug, Clone, Copy)]
@ -99,16 +123,36 @@ impl ChunkStageOld {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkStageNew {
/// In parquet file.
Parquet,
/// In ingester.
Ingester,
}
impl Display for ChunkStageNew {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Parquet => write!(f, "Parquet"),
Self::Ingester => write!(f, "Ingester"),
}
}
}
impl PartialOrd for ChunkStageNew {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
// allow multiple parquet chunks (for the same partition). sequence numbers will be used for ordering.
(Self::Parquet, Self::Parquet) => Some(Ordering::Equal),
// "parquet" chunks are older (i.e. come earlier) than chunks that still life in the ingester
(Self::Parquet, Self::Ingester) => Some(Ordering::Less),
(Self::Ingester, Self::Parquet) => Some(Ordering::Greater),
// it's impossible for two chunks (for the same partition) to be in the ingester stage
(Self::Ingester, Self::Ingester) => None,
}
}
}
@ -116,7 +160,7 @@ impl Display for ChunkStageNew {
impl ChunkStageNew {
/// return the list of all chunk types
pub fn all() -> Vec<Self> {
vec![Self::Parquet]
vec![Self::Parquet, Self::Ingester]
}
}
@ -157,23 +201,18 @@ pub struct PredOld<'a> {
#[derive(Debug, Clone)]
pub struct PredNew<'a> {
/// Delete predicate
predicate: &'a DeletePredicate,
pub predicate: &'a DeletePredicate,
/// At which chunk stage this predicate is applied
delete_time: DeleteTimeNew,
pub delete_time: DeleteTimeNew,
}
impl<'a> PredNew<'a> {
pub fn begin(predicate: &'a DeletePredicate) -> Self {
/// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStageNew) -> Self {
Self {
predicate,
delete_time: DeleteTimeNew::begin(),
}
}
pub fn end(predicate: &'a DeletePredicate) -> Self {
Self {
predicate,
delete_time: DeleteTimeNew::end(),
delete_time: self.delete_time.replace_begin_and_end_delete_times(stage),
..self
}
}
}
@ -239,8 +278,19 @@ impl DeleteTimeOld {
}
}
/// Describes when a delete predicate was applied.
///
/// # Ordering
/// Compared to [`ChunkStageNew`], the ordering here may seem a bit confusing. While the latest payload / LP data
/// resists in the ingester and is not yet available as a parquet file, the latest tombstones apply to parquet files and
/// were (past tense!) NOT applied while the LP data was in the ingester.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTimeNew {
/// Special delete time which marks the first time that could be used from deletion.
///
/// May depend on [`ChunkStageNew`].
Begin,
/// Delete predicate is added while chunk is was still in ingester memory.
Ingester {
/// Flag if the tombstone also exists in the catalog.
@ -255,6 +305,11 @@ pub enum DeleteTimeNew {
/// Delete predicate is added to chunks at their parquet stage
Parquet,
/// Special delete time which marks the last time that could be used from deletion.
///
/// May depend on [`ChunkStageNew`].
End,
}
impl DeleteTimeNew {
@ -270,23 +325,46 @@ impl DeleteTimeNew {
},
Self::Parquet,
],
ChunkStageNew::Ingester => vec![
Self::Ingester {
also_in_catalog: true,
},
Self::Ingester {
also_in_catalog: false,
},
],
}
}
pub fn begin() -> Self {
/// Replace [`DeleteTimeNew::Begin`] and [`DeleteTimeNew::End`] with values that correspond to the linked [`ChunkStageNew`].
fn replace_begin_and_end_delete_times(self, stage: ChunkStageNew) -> Self {
match self {
Self::Begin => Self::begin_for(stage),
Self::End => Self::end_for(stage),
other @ (Self::Ingester { .. } | Self::Parquet) => other,
}
}
fn begin_for(_stage: ChunkStageNew) -> Self {
Self::Ingester {
also_in_catalog: true,
}
}
pub fn end() -> Self {
Self::Parquet
fn end_for(stage: ChunkStageNew) -> Self {
match stage {
ChunkStageNew::Ingester => Self::Ingester {
also_in_catalog: true,
},
ChunkStageNew::Parquet => Self::Parquet,
}
}
}
impl Display for DeleteTimeNew {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Begin => write!(f, "Begin"),
Self::Ingester {
also_in_catalog: false,
} => write!(f, "Ingester w/o catalog entry"),
@ -294,6 +372,7 @@ impl Display for DeleteTimeNew {
also_in_catalog: true,
} => write!(f, "Ingester w/ catalog entry"),
Self::Parquet => write!(f, "Parquet"),
Self::End => write!(f, "End"),
}
}
}
@ -306,7 +385,7 @@ pub enum DeleteTime {
impl DeleteTime {
/// Return all DeleteTime at and after the given chunk stage
pub fn all_from_and_before(chunk_stage: ChunkStage) -> Vec<Self> {
fn all_from_and_before(chunk_stage: ChunkStage) -> Vec<Self> {
match chunk_stage {
ChunkStage::Old(chunk_stage) => DeleteTimeOld::all_from_and_before(chunk_stage)
.into_iter()
@ -319,17 +398,17 @@ impl DeleteTime {
}
}
pub fn begin_for(chunk_stage: ChunkStage) -> Self {
fn begin_for(chunk_stage: ChunkStage) -> Self {
match chunk_stage {
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::begin()),
ChunkStage::New(_) => Self::New(DeleteTimeNew::begin()),
ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::begin_for(chunk_stage)),
}
}
pub fn end_for(chunk_stage: ChunkStage) -> Self {
fn end_for(chunk_stage: ChunkStage) -> Self {
match chunk_stage {
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::end()),
ChunkStage::New(_) => Self::New(DeleteTimeNew::end()),
ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::end_for(chunk_stage)),
}
}
}
@ -698,9 +777,11 @@ async fn make_chunk_with_deletes_at_different_stages_new(
delete_table_name,
partition_key,
};
let scenario_name = make_ng_chunk(Arc::clone(&ns), chunk_data).await;
let mut ingester_connection = MockIngesterConnection::default();
let scenario_name =
make_ng_chunk(Arc::clone(&ns), chunk_data, &mut ingester_connection, 0).await;
let db = make_querier_namespace(ns).await;
let db = make_querier_namespace(ns, ingester_connection).await;
DbScenario { scenario_name, db }
}
@ -1091,14 +1172,25 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec<
for stages in ChunkStageNew::all()
.into_iter()
.permutations(n_stages_unset)
.combinations_with_replacement(n_stages_unset)
{
// filter out unordered stages
if !stages.windows(2).all(|stages| {
stages[0]
.partial_cmp(&stages[1])
.map(|o| o.is_le())
.unwrap_or_default()
}) {
continue;
}
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("test_db").await;
let mut scenario_name = format!("{} chunks:", chunks.len());
let mut stages_it = stages.iter();
let mut ingester_connection = MockIngesterConnection::default();
for chunk_data in chunks {
for (i, chunk_data) in chunks.iter().enumerate() {
let mut chunk_data = chunk_data.clone();
if chunk_data.chunk_stage.is_none() {
@ -1106,14 +1198,23 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec<
chunk_data = chunk_data.with_chunk_stage(*chunk_stage);
}
let name = make_ng_chunk(Arc::clone(&ns), chunk_data).await;
let chunk_data = chunk_data.replace_begin_and_end_delete_times();
write!(&mut scenario_name, "{}", name).unwrap();
let sequence_number_offset = (i as i64) * 1000;
let name = make_ng_chunk(
Arc::clone(&ns),
chunk_data,
&mut ingester_connection,
sequence_number_offset,
)
.await;
write!(&mut scenario_name, ", {}", name).unwrap();
}
assert!(stages_it.next().is_none(), "generated too many stages");
let db = make_querier_namespace(ns).await;
let db = make_querier_namespace(ns, ingester_connection).await;
scenarios.push(DbScenario { scenario_name, db });
}
@ -1159,7 +1260,12 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
vec![scenario1, scenario2]
}
async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> String {
async fn make_ng_chunk(
ns: Arc<TestNamespace>,
chunk: ChunkDataNew<'_, '_>,
ingester_connection: &mut MockIngesterConnection,
sequence_number_offset: i64,
) -> String {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
let chunk_stage = chunk.chunk_stage.expect("chunk stage should be set");
@ -1245,6 +1351,59 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
let mut batch = batch.to_arrow(Selection::All).unwrap();
match chunk_stage {
ChunkStageNew::Ingester => {
// process delete predicates
if let Some(delete_table_name) = chunk.delete_table_name {
if delete_table_name == table_name {
for pred in &chunk.preds {
match pred.delete_time {
DeleteTimeNew::Ingester { .. } => {
batch = materialize_delete_predicate(batch, pred.predicate);
}
other @ DeleteTimeNew::Parquet => {
panic!("Cannot have delete time '{other}' for ingester chunk")
}
DeleteTimeNew::Begin | DeleteTimeNew::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
}
}
}
}
}
// create ingester partition
if batch.num_rows() > 0 {
let partition = partitions.get(&table_name).unwrap();
let chunk_id = ChunkId::new();
let namespace_name = Arc::from(ns.namespace.name.clone());
let table_name = Arc::from(table_name);
let partition_id = partition.partition.id;
let sequencer_id = partition.sequencer.sequencer.id;
let old_gen_partition_key = Arc::from(format!(
"{}-{}",
sequencer_id, partition.partition.partition_key
));
let expected_schema = Arc::new(schema);
let parquet_max_sequence_number = Some(SequenceNumber::new(i64::MAX)); // do NOT exclude any parquet files in other chunks
let tombstone_max_sequence_number = Some(SequenceNumber::new(i64::MAX)); // do NOT exclude any delete predicates in other chunks
let batches = vec![batch];
ingester_connection.push(Arc::new(
IngesterPartition::try_new(
chunk_id,
namespace_name,
table_name,
partition_id,
sequencer_id,
old_gen_partition_key,
expected_schema,
parquet_max_sequence_number,
tombstone_max_sequence_number,
batches,
)
.unwrap(),
));
}
}
ChunkStageNew::Parquet => {
// model delete predicates that are materialized (applied) by the ingester,
// during parquet file creation
@ -1253,35 +1412,27 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
for pred in &chunk.preds {
match pred.delete_time {
DeleteTimeNew::Ingester { .. } => {
let mut predicate = PredicateBuilder::new().build();
predicate.merge_delete_predicates(&[Arc::new(
pred.predicate.clone().into(),
)]);
if let Some(expr) = predicate.filter_expr() {
let df_phy_expr = df_physical_expr_from_schema_and_expr(
schema.as_arrow(),
expr,
)
.unwrap();
batch = batch_filter(&batch, &df_phy_expr).unwrap();
}
batch = materialize_delete_predicate(batch, pred.predicate);
}
DeleteTimeNew::Parquet => {
// will be attached AFTER the chunk was created
}
DeleteTimeNew::Begin | DeleteTimeNew::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
}
}
}
}
}
// create parquet file
let parquet_file_seq_number = 1000;
let parquet_file_seq_number = sequence_number_offset + (chunk.preds.len() as i64);
if batch.num_rows() > 0 {
let partition = partitions.get(&table_name).unwrap();
let min_seq = parquet_file_seq_number;
let max_seq = parquet_file_seq_number;
let min_time = 0;
let max_time = 0;
let (min_time, max_time) =
compute_timenanosecond_min_max_for_one_record_batch(&batch).unwrap();
let file_size_bytes = None; // don't mock/override
let creation_time = 1;
partition
@ -1315,7 +1466,9 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
DeleteTimeNew::Ingester {
also_in_catalog: true,
} => {
let sequence_number = i as i64;
let sequence_number = parquet_file_seq_number
- (chunk.preds.len() as i64)
+ (i as i64);
assert!(sequence_number < parquet_file_seq_number);
let min_time = pred.predicate.range.start();
@ -1348,6 +1501,9 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
)
.await;
}
DeleteTimeNew::Begin | DeleteTimeNew::End => {
unreachable!("Begin/end cases should have been replaced with concrete instances at this point")
}
}
}
}
@ -1359,12 +1515,39 @@ async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> S
let mut name = format!("NG Chunk {}", chunk_stage);
let n_preds = chunk.preds.len();
if n_preds > 0 {
write!(name, " with {} deletes", n_preds).unwrap();
let delete_names: Vec<_> = chunk
.preds
.iter()
.map(|p| p.delete_time.to_string())
.collect();
write!(
name,
" with {} delete(s) ({})",
n_preds,
delete_names.join(", ")
)
.unwrap();
}
name
}
async fn make_querier_namespace(ns: Arc<TestNamespace>) -> Arc<QuerierNamespace> {
fn materialize_delete_predicate(record_batch: RecordBatch, pred: &DeletePredicate) -> RecordBatch {
let mut predicate = PredicateBuilder::new().build();
predicate.merge_delete_predicates(&[Arc::new(pred.clone().into())]);
if let Some(expr) = predicate.filter_expr() {
let df_phy_expr =
df_physical_expr_from_schema_and_expr(record_batch.schema(), expr).unwrap();
batch_filter(&record_batch, &df_phy_expr).unwrap()
} else {
record_batch
}
}
async fn make_querier_namespace(
ns: Arc<TestNamespace>,
ingester_connection: MockIngesterConnection,
) -> Arc<QuerierNamespace> {
let mut repos = ns.catalog.catalog.repositories().await;
let schema = Arc::new(
get_schema_by_name(&ns.namespace.name, repos.as_mut())
@ -1372,6 +1555,8 @@ async fn make_querier_namespace(ns: Arc<TestNamespace>) -> Arc<QuerierNamespace>
.unwrap(),
);
let ingester_connection = Arc::new(ingester_connection);
Arc::new(QuerierNamespace::new_testing(
ns.catalog.catalog(),
ns.catalog.object_store(),
@ -1380,6 +1565,40 @@ async fn make_querier_namespace(ns: Arc<TestNamespace>) -> Arc<QuerierNamespace>
ns.namespace.name.clone().into(),
schema,
ns.catalog.exec(),
create_ingester_connection_for_testing(),
ingester_connection,
))
}
#[derive(Debug, Default)]
struct MockIngesterConnection {
responses: Vec<Arc<IngesterPartition>>,
}
impl MockIngesterConnection {
fn push(&mut self, p: Arc<IngesterPartition>) {
self.responses.push(p);
}
}
#[async_trait]
impl IngesterConnection for MockIngesterConnection {
async fn partitions(
&self,
_namespace_name: Arc<str>,
table_name: Arc<str>,
_columns: Vec<String>,
_predicate: &predicate::Predicate,
_expected_schema: Arc<schema::Schema>,
) -> Result<Vec<Arc<IngesterPartition>>, IngesterError> {
Ok(self
.responses
.iter()
.filter(|p| p.table_name() == table_name.as_ref())
.cloned()
.collect())
}
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}

View File

@ -71,7 +71,7 @@ async fn run_table_schema_test_case<D>(
}
fn is_unsorted_chunk_type(chunk: &dyn QueryChunk) -> bool {
chunk.chunk_type() == "MUB"
(chunk.chunk_type() == "MUB") || (chunk.chunk_type() == "IngesterPartition")
}
#[tokio::test]