refactor: reduce test duplication in `querier/src/table/mod.rs` (#4698)
* refactor: reduce test duplication in `querier/src/table/mod.rs` * fix: Apply suggestions from code review Co-authored-by: Jake Goulding <jake.goulding@integer32.com> * fix: Update querier/src/table/test_util.rs Co-authored-by: Jake Goulding <jake.goulding@integer32.com> * fix: use now_nanos() * refactor: Add TestQuerierTable * refactor: rename functions for explicitness Co-authored-by: Jake Goulding <jake.goulding@integer32.com>pull/24376/head
parent
6af32b7750
commit
d0903b11bb
|
@ -281,13 +281,12 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::{
|
||||
ingester::{test_util::MockIngesterConnection, IngesterPartition},
|
||||
table::test_util::{lp_to_record_batch, querier_table, IngesterPartitionBuilder},
|
||||
table::test_util::{querier_table, IngesterPartitionBuilder},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parquet_chunks() {
|
||||
maybe_start_logging();
|
||||
let pred = Predicate::default();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
|
@ -314,73 +313,31 @@ mod tests {
|
|||
table1.create_column("foo", ColumnType::I64).await;
|
||||
table2.create_column("foo", ColumnType::I64).await;
|
||||
|
||||
let querier_table = querier_table(&catalog, &table1).await;
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table1).await;
|
||||
|
||||
// no parquet files yet
|
||||
assert!(querier_table.chunks(&pred).await.unwrap().is_empty());
|
||||
assert!(querier_table.chunks().await.unwrap().is_empty());
|
||||
|
||||
let file111 = partition11
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=1 11",
|
||||
1,
|
||||
2,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let file112 = partition11
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=2 22",
|
||||
3,
|
||||
4,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=2 22", 3, 4, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let file113 = partition11
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=3 33",
|
||||
5,
|
||||
6,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=3 33", 5, 6, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let file114 = partition11
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=4 44",
|
||||
7,
|
||||
8,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=4 44", 7, 8, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let file115 = partition11
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=5 55",
|
||||
9,
|
||||
10,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=5 55", 9, 10, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let file121 = partition12
|
||||
.create_parquet_file_with_min_max(
|
||||
"table1 foo=5 55",
|
||||
1,
|
||||
2,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table1 foo=5 55", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
let _file211 = partition21
|
||||
.create_parquet_file_with_min_max(
|
||||
"table2 foo=6 66",
|
||||
1,
|
||||
2,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table2 foo=6 66", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
file111.flag_for_delete().await;
|
||||
|
@ -397,14 +354,14 @@ mod tests {
|
|||
tombstone2.mark_processed(&file112).await;
|
||||
|
||||
// As we have now made new parquet files, force a cache refresh
|
||||
querier_table.clear_parquet_cache();
|
||||
querier_table.clear_tombstone_cache();
|
||||
querier_table.inner().clear_parquet_cache();
|
||||
querier_table.inner().clear_tombstone_cache();
|
||||
|
||||
// now we have some files
|
||||
// this contains all files except for:
|
||||
// - file111: marked for delete
|
||||
// - file221: wrong table
|
||||
let mut chunks = querier_table.chunks(&pred).await.unwrap();
|
||||
let mut chunks = querier_table.chunks().await.unwrap();
|
||||
chunks.sort_by_key(|c| c.id());
|
||||
assert_eq!(chunks.len(), 5);
|
||||
|
||||
|
@ -446,14 +403,13 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_compactor_collision() {
|
||||
maybe_start_logging();
|
||||
let pred = Predicate::default();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let sequencer = ns.create_sequencer(1).await;
|
||||
let partition = table.with_sequencer(&sequencer).create_partition("k").await;
|
||||
table.create_column("foo", ColumnType::I64).await;
|
||||
let schema = make_schema(&table).await;
|
||||
|
||||
// create a parquet file that cannot be processed by the querier:
|
||||
//
|
||||
|
@ -478,45 +434,24 @@ mod tests {
|
|||
// row-level sequence numbers.
|
||||
|
||||
partition
|
||||
.create_parquet_file_with_min_max(
|
||||
"table foo=1 11",
|
||||
1,
|
||||
2,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table foo=1 11", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
let querier_table = querier_table(&catalog, &table).await;
|
||||
let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition);
|
||||
let ingester_partition =
|
||||
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(1)));
|
||||
|
||||
querier_table
|
||||
.ingester_connection
|
||||
.as_any()
|
||||
.downcast_ref::<MockIngesterConnection>()
|
||||
.unwrap()
|
||||
.next_response(Ok(vec![IngesterPartition::try_new(
|
||||
Arc::from("ingester"),
|
||||
ChunkId::new(),
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::new(SchemaBuilder::new().build().unwrap()),
|
||||
Some(SequenceNumber::new(1)),
|
||||
None,
|
||||
Arc::new(None),
|
||||
vec![],
|
||||
)
|
||||
.unwrap()]));
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table)
|
||||
.await
|
||||
.with_ingester_partition(ingester_partition);
|
||||
|
||||
let err = querier_table.chunks(&pred).await.unwrap_err();
|
||||
let err = querier_table.chunks().await.unwrap_err();
|
||||
assert_matches!(err, Error::StateFusion { .. });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_state_reconcile() {
|
||||
maybe_start_logging();
|
||||
let pred = Predicate::default();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
|
@ -533,46 +468,22 @@ mod tests {
|
|||
|
||||
// kept because max sequence number <= 2
|
||||
let file1 = partition1
|
||||
.create_parquet_file_with_min_max(
|
||||
"table foo=1 11",
|
||||
1,
|
||||
2,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table foo=1 11", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// pruned because min sequence number > 2
|
||||
partition1
|
||||
.create_parquet_file_with_min_max(
|
||||
"table foo=2 22",
|
||||
3,
|
||||
3,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table foo=2 22", 3, 3, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// kept because max sequence number <= 3
|
||||
let file2 = partition2
|
||||
.create_parquet_file_with_min_max(
|
||||
"table foo=1 11",
|
||||
1,
|
||||
3,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table foo=1 11", 1, 3, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// pruned because min sequence number > 3
|
||||
partition2
|
||||
.create_parquet_file_with_min_max(
|
||||
"table foo=2 22",
|
||||
4,
|
||||
4,
|
||||
now().timestamp_nanos(),
|
||||
now().timestamp_nanos(),
|
||||
)
|
||||
.create_parquet_file_with_min_max("table foo=2 22", 4, 4, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// partition1: kept because sequence number <= 10
|
||||
|
@ -596,65 +507,45 @@ mod tests {
|
|||
.create_tombstone(12, 1, 100, "foo=3")
|
||||
.await;
|
||||
|
||||
let querier_table = querier_table(&catalog, &table).await;
|
||||
|
||||
let ingester_chunk_id1 = ChunkId::new_test(u128::MAX - 1);
|
||||
let ingester_chunk_id2 = ChunkId::new_test(u128::MAX);
|
||||
querier_table
|
||||
.ingester_connection
|
||||
.as_any()
|
||||
.downcast_ref::<MockIngesterConnection>()
|
||||
.unwrap()
|
||||
.next_response(Ok(vec![
|
||||
// this chunk is kept
|
||||
IngesterPartition::try_new(
|
||||
Arc::from("ingester"),
|
||||
ingester_chunk_id1,
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition1.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::new(
|
||||
SchemaBuilder::new()
|
||||
.influx_field("foo", InfluxFieldType::Integer)
|
||||
.timestamp()
|
||||
.build()
|
||||
.unwrap(),
|
||||
),
|
||||
// parquet max persisted sequence number
|
||||
Some(SequenceNumber::new(2)),
|
||||
// tombstone max persisted sequence number
|
||||
Some(SequenceNumber::new(10)),
|
||||
Arc::new(None),
|
||||
vec![lp_to_record_batch("table foo=3i 33")],
|
||||
)
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::new()
|
||||
.influx_field("foo", InfluxFieldType::Integer)
|
||||
.timestamp()
|
||||
.build()
|
||||
.unwrap(),
|
||||
// this chunk is filtered out because it has no record batches but the reconciling still takes place
|
||||
IngesterPartition::try_new(
|
||||
Arc::from("ingester"),
|
||||
ingester_chunk_id2,
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition2.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::new(
|
||||
SchemaBuilder::new()
|
||||
.influx_field("foo", InfluxFieldType::Integer)
|
||||
.timestamp()
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let ingester_chunk_id1 = u128::MAX - 1;
|
||||
|
||||
let builder1 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition1);
|
||||
let builder2 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition2);
|
||||
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table)
|
||||
.await
|
||||
.with_ingester_partition(
|
||||
// this chunk is kept
|
||||
builder1
|
||||
.with_ingester_chunk_id(ingester_chunk_id1)
|
||||
.with_lp(["table foo=3i 33"])
|
||||
.build(
|
||||
// parquet max persisted sequence number
|
||||
Some(SequenceNumber::new(2)),
|
||||
// tombstone max persisted sequence number
|
||||
Some(SequenceNumber::new(10)),
|
||||
),
|
||||
)
|
||||
.with_ingester_partition(
|
||||
// this chunk is filtered out because it has no record batches but the reconciling still takes place
|
||||
builder2.with_ingester_chunk_id(u128::MAX).build(
|
||||
// parquet max persisted sequence number
|
||||
Some(SequenceNumber::new(3)),
|
||||
// tombstone max persisted sequence number
|
||||
Some(SequenceNumber::new(11)),
|
||||
Arc::new(None),
|
||||
vec![],
|
||||
)
|
||||
.unwrap(),
|
||||
]));
|
||||
),
|
||||
);
|
||||
|
||||
let mut chunks = querier_table.chunks().await.unwrap();
|
||||
|
||||
let mut chunks = querier_table.chunks(&pred).await.unwrap();
|
||||
chunks.sort_by_key(|c| c.id());
|
||||
|
||||
// three chunks (two parquet files and one for the in-mem ingester data)
|
||||
|
@ -669,7 +560,7 @@ mod tests {
|
|||
chunks[1].id(),
|
||||
ChunkId::new_test(file2.parquet_file.id.get() as u128),
|
||||
);
|
||||
assert_eq!(chunks[2].id(), ingester_chunk_id1);
|
||||
assert_eq!(chunks[2].id(), ChunkId::new_test(ingester_chunk_id1));
|
||||
|
||||
// check delete predicates
|
||||
// parquet chunks have predicate attached
|
||||
|
@ -682,7 +573,6 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_ingester_overlap_detection() {
|
||||
maybe_start_logging();
|
||||
let pred = Predicate::default();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
|
@ -697,12 +587,6 @@ mod tests {
|
|||
.create_partition("k2")
|
||||
.await;
|
||||
|
||||
let querier_table = querier_table(&catalog, &table).await;
|
||||
|
||||
let ingester_chunk_id1 = ChunkId::new_test(1);
|
||||
let ingester_chunk_id2 = ChunkId::new_test(2);
|
||||
let ingester_chunk_id3 = ChunkId::new_test(3);
|
||||
|
||||
let schema = Arc::new(
|
||||
SchemaBuilder::new()
|
||||
.influx_field("foo", InfluxFieldType::Integer)
|
||||
|
@ -711,63 +595,45 @@ mod tests {
|
|||
.unwrap(),
|
||||
);
|
||||
|
||||
querier_table
|
||||
.ingester_connection
|
||||
.as_any()
|
||||
.downcast_ref::<MockIngesterConnection>()
|
||||
.unwrap()
|
||||
.next_response(Ok(vec![
|
||||
IngesterPartition::try_new(
|
||||
Arc::from("ingester1"),
|
||||
ingester_chunk_id1,
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition1.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::clone(&schema),
|
||||
// parquet max persisted sequence number
|
||||
None,
|
||||
// tombstone max persisted sequence number
|
||||
None,
|
||||
Arc::new(None),
|
||||
vec![lp_to_record_batch("table foo=1i 1")],
|
||||
)
|
||||
.unwrap(),
|
||||
IngesterPartition::try_new(
|
||||
Arc::from("ingester1"),
|
||||
ingester_chunk_id2,
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition2.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::clone(&schema),
|
||||
// parquet max persisted sequence number
|
||||
None,
|
||||
// tombstone max persisted sequence number
|
||||
None,
|
||||
Arc::new(None),
|
||||
vec![lp_to_record_batch("table foo=2i 2")],
|
||||
)
|
||||
.unwrap(),
|
||||
IngesterPartition::try_new(
|
||||
Arc::from("ingester2"),
|
||||
ingester_chunk_id3,
|
||||
Arc::from(ns.namespace.name.clone()),
|
||||
Arc::from(table.table.name.clone()),
|
||||
partition1.partition.id,
|
||||
sequencer.sequencer.id,
|
||||
Arc::clone(&schema),
|
||||
// parquet max persisted sequence number
|
||||
None,
|
||||
// tombstone max persisted sequence number
|
||||
None,
|
||||
Arc::new(None),
|
||||
vec![lp_to_record_batch("table foo=3i 3")],
|
||||
)
|
||||
.unwrap(),
|
||||
]));
|
||||
let builder1 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition1);
|
||||
let builder2 = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition2);
|
||||
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table)
|
||||
.await
|
||||
.with_ingester_partition(
|
||||
builder1
|
||||
.clone()
|
||||
.with_ingester_chunk_id(1)
|
||||
.with_lp(vec!["table foo=1i 1"])
|
||||
.build(
|
||||
// parquet max persisted sequence number
|
||||
None, // tombstone max persisted sequence number
|
||||
None,
|
||||
),
|
||||
)
|
||||
.with_ingester_partition(
|
||||
builder2
|
||||
.with_ingester_chunk_id(2)
|
||||
.with_lp(vec!["table foo=2i 2"])
|
||||
.build(
|
||||
// parquet max persisted sequence number
|
||||
None, // tombstone max persisted sequence number
|
||||
None,
|
||||
),
|
||||
)
|
||||
.with_ingester_partition(
|
||||
builder1
|
||||
.with_ingester_chunk_id(3)
|
||||
.with_lp(vec!["table foo=3i 3"])
|
||||
.build(
|
||||
// parquet max persisted sequence number
|
||||
None, // tombstone max persisted sequence number
|
||||
None,
|
||||
),
|
||||
);
|
||||
|
||||
let err = querier_table.chunks().await.unwrap_err();
|
||||
|
||||
let err = querier_table.chunks(&pred).await.unwrap_err();
|
||||
assert_matches!(err, Error::IngestersOverlap { .. });
|
||||
}
|
||||
|
||||
|
@ -780,32 +646,34 @@ mod tests {
|
|||
let sequencer = ns.create_sequencer(1).await;
|
||||
let partition = table.with_sequencer(&sequencer).create_partition("k").await;
|
||||
let schema = make_schema(&table).await;
|
||||
let now = now().timestamp_nanos();
|
||||
|
||||
let querier_table = querier_table(&catalog, &table).await;
|
||||
let builder =
|
||||
IngesterPartitionBuilder::new(ns, table, schema, sequencer, Arc::clone(&partition));
|
||||
let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition)
|
||||
.with_lp(["table foo=1i 1"]);
|
||||
|
||||
// Parquet file between with max sequence number 2
|
||||
partition
|
||||
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now, now)
|
||||
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 2, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
let ingester_partition =
|
||||
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2)));
|
||||
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table)
|
||||
.await
|
||||
.with_ingester_partition(ingester_partition);
|
||||
|
||||
// Expect 2 chunks: one for ingester, and one from parquet file
|
||||
let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await;
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 2);
|
||||
|
||||
// Now, make a second chunk with max sequence number 3
|
||||
partition
|
||||
.create_parquet_file_with_min_max("table1 foo=1 22", 2, 3, now, now)
|
||||
.create_parquet_file_with_min_max("table1 foo=1 22", 2, 3, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// With the same ingester response, still expect 2 chunks: one
|
||||
// for ingester, and one from parquet file
|
||||
let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await;
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 2);
|
||||
|
||||
// update the ingester response to return a new max parquet
|
||||
|
@ -813,8 +681,12 @@ mod tests {
|
|||
let ingester_partition =
|
||||
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3)));
|
||||
|
||||
let querier_table = querier_table
|
||||
.clear_ingester_partitions()
|
||||
.with_ingester_partition(ingester_partition);
|
||||
|
||||
// expect the second file is found, resulting in three chunks
|
||||
let chunks = chunks_with_ingester_partition(&querier_table, &ingester_partition).await;
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 3);
|
||||
}
|
||||
|
||||
|
@ -827,20 +699,15 @@ mod tests {
|
|||
let sequencer = ns.create_sequencer(1).await;
|
||||
let partition = table.with_sequencer(&sequencer).create_partition("k").await;
|
||||
let schema = make_schema(&table).await;
|
||||
let now = now().timestamp_nanos();
|
||||
// Expect 1 chunk with with one delete predicate
|
||||
let querier_table = TestQuerierTable::new(&catalog, &table).await;
|
||||
|
||||
let querier_table = querier_table(&catalog, &table).await;
|
||||
let builder = IngesterPartitionBuilder::new(
|
||||
ns,
|
||||
Arc::clone(&table),
|
||||
schema,
|
||||
Arc::clone(&sequencer),
|
||||
Arc::clone(&partition),
|
||||
);
|
||||
let builder = IngesterPartitionBuilder::new(&ns, &table, &schema, &sequencer, &partition)
|
||||
.with_lp(["table foo=1i 1"]);
|
||||
|
||||
// parquet file with max sequence number 1
|
||||
partition
|
||||
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 1, now, now)
|
||||
.create_parquet_file_with_min_max("table1 foo=1 11", 1, 1, now_nanos(), now_nanos())
|
||||
.await;
|
||||
|
||||
// tombstone with max sequence number 2
|
||||
|
@ -854,9 +721,9 @@ mod tests {
|
|||
let ingester_partition =
|
||||
builder.build(max_parquet_sequence_number, max_tombstone_sequence_number);
|
||||
|
||||
// Expect 1 chunk with with one delete predicate
|
||||
let deletes =
|
||||
num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await);
|
||||
let querier_table = querier_table.with_ingester_partition(ingester_partition);
|
||||
|
||||
let deletes = num_deletes(querier_table.chunks().await.unwrap());
|
||||
assert_eq!(&deletes, &[1, 0]);
|
||||
|
||||
// Now, make a second tombstone with max sequence number 3
|
||||
|
@ -867,18 +734,19 @@ mod tests {
|
|||
|
||||
// With the same ingester response, still expect 1 delete
|
||||
// (because cache is not cleared)
|
||||
let deletes =
|
||||
num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await);
|
||||
let deletes = num_deletes(querier_table.chunks().await.unwrap());
|
||||
assert_eq!(&deletes, &[1, 0]);
|
||||
|
||||
// update the ingester response to return a new max delete sequence number
|
||||
let max_tombstone_sequence_number = Some(SequenceNumber::new(3));
|
||||
let ingester_partition =
|
||||
builder.build(max_parquet_sequence_number, max_tombstone_sequence_number);
|
||||
let querier_table = querier_table
|
||||
.clear_ingester_partitions()
|
||||
.with_ingester_partition(ingester_partition);
|
||||
|
||||
// second tombstone should be found
|
||||
let deletes =
|
||||
num_deletes(chunks_with_ingester_partition(&querier_table, &ingester_partition).await);
|
||||
let deletes = num_deletes(querier_table.chunks().await.unwrap());
|
||||
assert_eq!(&deletes, &[2, 0]);
|
||||
}
|
||||
|
||||
|
@ -895,20 +763,55 @@ mod tests {
|
|||
)
|
||||
}
|
||||
|
||||
/// Invokes querier_table.chunks modeling the ingester sending `next_response`
|
||||
async fn chunks_with_ingester_partition(
|
||||
querier_table: &QuerierTable,
|
||||
ingester_partition: &IngesterPartition,
|
||||
) -> Vec<Arc<dyn QueryChunk>> {
|
||||
let pred = Predicate::default();
|
||||
querier_table
|
||||
.ingester_connection
|
||||
.as_any()
|
||||
.downcast_ref::<MockIngesterConnection>()
|
||||
.unwrap()
|
||||
.next_response(Ok(vec![ingester_partition.clone()]));
|
||||
/// A `QuerierTable` and some number of `IngesterPartitions` that
|
||||
/// are fed to the ingester connection on the next call to
|
||||
/// `chunks()`
|
||||
struct TestQuerierTable {
|
||||
// The underling table
|
||||
querier_table: QuerierTable,
|
||||
/// Ingester partitions
|
||||
ingester_partitions: Vec<IngesterPartition>,
|
||||
}
|
||||
|
||||
querier_table.chunks(&pred).await.unwrap()
|
||||
impl TestQuerierTable {
|
||||
/// Create a new wrapped [`QuerierTable`]
|
||||
async fn new(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -> Self {
|
||||
Self {
|
||||
querier_table: querier_table(catalog, table).await,
|
||||
ingester_partitions: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a reference to the inner table
|
||||
fn inner(&self) -> &QuerierTable {
|
||||
&self.querier_table
|
||||
}
|
||||
|
||||
/// add the `ingester_partition` to the ingester response processed by the table
|
||||
fn with_ingester_partition(mut self, ingester_partition: IngesterPartition) -> Self {
|
||||
self.ingester_partitions.push(ingester_partition);
|
||||
self
|
||||
}
|
||||
|
||||
/// Clears ingester partitions for next response from ingester
|
||||
fn clear_ingester_partitions(mut self) -> Self {
|
||||
self.ingester_partitions.clear();
|
||||
self
|
||||
}
|
||||
|
||||
/// Invokes querier_table.chunks modeling the ingester sending the partitions in this table
|
||||
async fn chunks(&self) -> Result<Vec<Arc<dyn QueryChunk>>> {
|
||||
let pred = Predicate::default();
|
||||
|
||||
self.querier_table
|
||||
.ingester_connection
|
||||
.as_any()
|
||||
.downcast_ref::<MockIngesterConnection>()
|
||||
.unwrap()
|
||||
.next_response(Ok(self.ingester_partitions.clone()));
|
||||
|
||||
self.querier_table.chunks(&pred).await
|
||||
}
|
||||
}
|
||||
|
||||
/// returns the number of deletes in each chunk
|
||||
|
@ -919,4 +822,9 @@ mod tests {
|
|||
.map(|chunk| chunk.delete_predicates().len())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// returns the value of now as nanoseconds since the epoch
|
||||
fn now_nanos() -> i64 {
|
||||
now().timestamp_nanos()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,35 +68,48 @@ pub(crate) struct IngesterPartitionBuilder {
|
|||
partition_sort_key: Arc<Option<SortKey>>,
|
||||
|
||||
/// Data returned from the partition, in line protocol format
|
||||
lp: String,
|
||||
lp: Vec<String>,
|
||||
}
|
||||
|
||||
impl IngesterPartitionBuilder {
|
||||
pub(crate) fn new(
|
||||
ns: Arc<TestNamespace>,
|
||||
table: Arc<TestTable>,
|
||||
schema: Arc<Schema>,
|
||||
sequencer: Arc<TestSequencer>,
|
||||
partition: Arc<TestPartition>,
|
||||
ns: &Arc<TestNamespace>,
|
||||
table: &Arc<TestTable>,
|
||||
schema: &Arc<Schema>,
|
||||
sequencer: &Arc<TestSequencer>,
|
||||
partition: &Arc<TestPartition>,
|
||||
) -> Self {
|
||||
Self {
|
||||
ns,
|
||||
table,
|
||||
schema,
|
||||
sequencer,
|
||||
partition,
|
||||
ns: Arc::clone(ns),
|
||||
table: Arc::clone(table),
|
||||
schema: Arc::clone(schema),
|
||||
sequencer: Arc::clone(sequencer),
|
||||
partition: Arc::clone(partition),
|
||||
ingester_name: Arc::from("ingester1"),
|
||||
partition_sort_key: Arc::new(None),
|
||||
ingester_chunk_id: 1,
|
||||
lp: "table foo=1i 1".to_string(),
|
||||
lp: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// set the partition chunk id to use when creating partitons
|
||||
pub(crate) fn with_ingester_chunk_id(mut self, ingester_chunk_id: u128) -> Self {
|
||||
self.ingester_chunk_id = ingester_chunk_id;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the line protocol that will be present in this partition
|
||||
/// with an interator of `AsRef<str>`s
|
||||
pub(crate) fn with_lp(mut self, lp: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
|
||||
self.lp = lp.into_iter().map(|s| s.as_ref().to_string()).collect();
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a ingester partition with the specified max parquet sequence number
|
||||
pub(crate) fn build_with_max_parquet_sequence_number(
|
||||
&self,
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
) -> Arc<IngesterPartition> {
|
||||
) -> IngesterPartition {
|
||||
let tombstone_max_sequence_number = None;
|
||||
|
||||
self.build(parquet_max_sequence_number, tombstone_max_sequence_number)
|
||||
|
@ -107,22 +120,22 @@ impl IngesterPartitionBuilder {
|
|||
&self,
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
) -> Arc<IngesterPartition> {
|
||||
Arc::new(
|
||||
IngesterPartition::try_new(
|
||||
Arc::clone(&self.ingester_name),
|
||||
ChunkId::new_test(self.ingester_chunk_id),
|
||||
Arc::from(self.ns.namespace.name.as_str()),
|
||||
Arc::from(self.table.table.name.as_str()),
|
||||
self.partition.partition.id,
|
||||
self.sequencer.sequencer.id,
|
||||
Arc::clone(&self.schema),
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
Arc::clone(&self.partition_sort_key),
|
||||
vec![lp_to_record_batch(&self.lp)],
|
||||
)
|
||||
.unwrap(),
|
||||
) -> IngesterPartition {
|
||||
let data = self.lp.iter().map(|lp| lp_to_record_batch(lp)).collect();
|
||||
|
||||
IngesterPartition::try_new(
|
||||
Arc::clone(&self.ingester_name),
|
||||
ChunkId::new_test(self.ingester_chunk_id),
|
||||
Arc::from(self.ns.namespace.name.as_str()),
|
||||
Arc::from(self.table.table.name.as_str()),
|
||||
self.partition.partition.id,
|
||||
self.sequencer.sequencer.id,
|
||||
Arc::clone(&self.schema),
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
Arc::clone(&self.partition_sort_key),
|
||||
data,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue