Merge pull request #5780 from influxdata/dom/test-cleanup

test: refactor ingester helpers
pull/24376/head
kodiakhq[bot] 2022-09-30 16:43:18 +00:00 committed by GitHub
commit fc0634792b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 83 deletions

View File

@ -208,7 +208,7 @@ mod tests {
TABLE_NAME.into(),
None,
);
let inner = MockPartitionProvider::default().with_partition(PARTITION_KEY.into(), data);
let inner = MockPartitionProvider::default().with_partition(data);
let cache = PartitionCache::new(inner, []);
let got = cache
@ -277,18 +277,15 @@ mod tests {
async fn test_miss_partition_jey() {
let other_key = PartitionKey::from("test");
let other_key_id = PartitionId::new(99);
let inner = MockPartitionProvider::default().with_partition(
let inner = MockPartitionProvider::default().with_partition(PartitionData::new(
other_key_id,
other_key.clone(),
PartitionData::new(
other_key_id,
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
),
);
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
));
let partition = Partition {
id: PARTITION_ID,
@ -319,18 +316,15 @@ mod tests {
#[tokio::test]
async fn test_miss_table_id() {
let other_table = TableId::new(1234);
let inner = MockPartitionProvider::default().with_partition(
let inner = MockPartitionProvider::default().with_partition(PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
None,
),
);
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
None,
));
let partition = Partition {
id: PARTITION_ID,
@ -361,18 +355,15 @@ mod tests {
#[tokio::test]
async fn test_miss_shard_id() {
let other_shard = ShardId::new(1234);
let inner = MockPartitionProvider::default().with_partition(
let inner = MockPartitionProvider::default().with_partition(PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
PartitionData::new(
PARTITION_ID,
PARTITION_KEY.into(),
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
),
);
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
));
let partition = Partition {
id: PARTITION_ID,

View File

@ -20,22 +20,25 @@ pub(crate) struct MockPartitionProvider {
impl MockPartitionProvider {
/// A builder helper for [`Self::insert()`].
#[must_use]
pub(crate) fn with_partition(
mut self,
partition_key: PartitionKey,
data: PartitionData,
) -> Self {
self.insert(partition_key, data);
pub(crate) fn with_partition(mut self, data: PartitionData) -> Self {
self.insert(data);
self
}
/// Add `data` to the mock state, returning it when asked for the specified
/// `(key, shard, table)` triplet.
pub(crate) fn insert(&mut self, partition_key: PartitionKey, data: PartitionData) {
pub(crate) fn insert(&mut self, data: PartitionData) {
assert!(
self.partitions
.lock()
.insert((partition_key, data.shard_id(), data.table_id()), data)
.insert(
(
data.partition_key().clone(),
data.shard_id(),
data.table_id()
),
data
)
.is_none(),
"overwriting an existing mock PartitionData"
);

View File

@ -71,7 +71,7 @@ mod tests {
None,
);
let mock = Arc::new(MockPartitionProvider::default().with_partition(key.clone(), data));
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
let got = mock
.get_partition(

View File

@ -5,6 +5,8 @@
//! some absolute number and individual Parquet files that get persisted below some number. It
//! is expected that they may be above or below the absolute thresholds.
pub mod mock_handle;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};

View File

@ -0,0 +1,31 @@
//! A mock [`LifecycleHandle`] impl for testing.
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
use super::LifecycleHandle;
/// Special [`LifecycleHandle`] that never persists and always accepts more data.
///
/// This is useful to control persists manually.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopLifecycleHandle;
impl LifecycleHandle for NoopLifecycleHandle {
fn log_write(
&self,
_partition_id: PartitionId,
_shard_id: ShardId,
_namespace_id: NamespaceId,
_table_id: TableId,
_sequence_number: SequenceNumber,
_bytes_written: usize,
_rows_written: usize,
) -> bool {
// do NOT pause ingest
false
}
fn can_resume_ingest(&self) -> bool {
true
}
}

View File

@ -626,7 +626,8 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
// Make data for one shard and two tables
let shard_index = ShardIndex::new(1);
let shard_id = populate_catalog(&*catalog).await;
let (shard_id, _, _) =
populate_catalog(&*catalog, shard_index, TEST_NAMESPACE, TEST_TABLE).await;
let ingester = IngesterData::new(
object_store,
@ -693,7 +694,8 @@ pub(crate) async fn make_ingester_data_with_tombstones(loc: DataLocation) -> Ing
// Make data for one shard and two tables
let shard_index = ShardIndex::new(0);
let shard_id = populate_catalog(&*catalog).await;
let (shard_id, _, _) =
populate_catalog(&*catalog, shard_index, TEST_NAMESPACE, TEST_TABLE).await;
let ingester = IngesterData::new(
object_store,
@ -745,6 +747,7 @@ pub(crate) fn make_partitions(two_partitions: bool, shard_index: ShardIndex) ->
ops.push(DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_2),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Medford day="sun",temp=55 22"#,
)));
@ -753,6 +756,7 @@ pub(crate) fn make_partitions(two_partitions: bool, shard_index: ShardIndex) ->
ops.push(DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_2),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Reading day="mon",temp=58 40"#,
)));
@ -761,6 +765,7 @@ pub(crate) fn make_partitions(two_partitions: bool, shard_index: ShardIndex) ->
ops.push(DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_1),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Medford day="sun",temp=55 22"#,
)));
@ -769,6 +774,7 @@ pub(crate) fn make_partitions(two_partitions: bool, shard_index: ShardIndex) ->
ops.push(DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_1),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Reading day="mon",temp=58 40"#,
)));
@ -878,6 +884,7 @@ async fn make_one_partition_with_tombstones(
DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_1),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Medford day="sun",temp=55 22"#,
)),
@ -893,6 +900,7 @@ async fn make_one_partition_with_tombstones(
DmlOperation::Write(make_write_op(
&PartitionKey::from(TEST_PARTITION_1),
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Reading day="mon",temp=58 40"#,
)),
@ -902,14 +910,15 @@ async fn make_one_partition_with_tombstones(
.unwrap();
}
fn make_write_op(
pub(crate) fn make_write_op(
partition_key: &PartitionKey,
shard_index: ShardIndex,
namespace: &str,
sequence_number: i64,
lines: &str,
) -> DmlWrite {
DmlWrite::new(
TEST_NAMESPACE.to_string(),
namespace.to_string(),
lines_to_batches(lines, 0).unwrap(),
Some(partition_key.clone()),
DmlMeta::sequenced(
@ -954,6 +963,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Boston day="sun",temp=60 36"#,
)));
@ -962,6 +972,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Andover day="tue",temp=56 30"#,
)));
@ -972,6 +983,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Andover day="mon" 46"#,
)));
@ -980,6 +992,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Medford day="wed" 26"#,
)));
@ -991,6 +1004,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Boston day="mon" 38"#,
)));
@ -999,6 +1013,7 @@ fn make_first_partition_data(
out.push(DmlOperation::Write(make_write_op(
partition_key,
shard_index,
TEST_NAMESPACE,
seq_num,
r#"test_table,city=Wilmington day="mon" 35"#,
)));
@ -1007,15 +1022,19 @@ fn make_first_partition_data(
(out, SequenceNumber::new(seq_num))
}
async fn populate_catalog(catalog: &dyn Catalog) -> ShardId {
pub(crate) async fn populate_catalog(
catalog: &dyn Catalog,
shard_index: ShardIndex,
namespace: &str,
table: &str,
) -> (ShardId, NamespaceId, TableId) {
let mut c = catalog.repositories().await;
let topic = c.topics().create_or_get("whatevs").await.unwrap();
let query_pool = c.query_pools().create_or_get("whatevs").await.unwrap();
let shard_index = ShardIndex::new(0);
let topic = c.topics().create_or_get("kafka-topic").await.unwrap();
let query_pool = c.query_pools().create_or_get("query-pool").await.unwrap();
let ns_id = c
.namespaces()
.create(
TEST_NAMESPACE,
namespace,
iox_catalog::INFINITE_RETENTION_POLICY,
topic.id,
query_pool.id,
@ -1023,10 +1042,12 @@ async fn populate_catalog(catalog: &dyn Catalog) -> ShardId {
.await
.unwrap()
.id;
c.tables().create_or_get(TEST_TABLE, ns_id).await.unwrap();
c.shards()
let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id;
let shard_id = c
.shards()
.create_or_get(&topic, shard_index)
.await
.unwrap()
.id
.id;
(shard_id, ns_id, table_id)
}

View File

@ -3,8 +3,8 @@ use super::DbScenario;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{
DeletePredicate, IngesterMapping, NamespaceId, NonEmptyString, ParquetFileId, PartitionId,
PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId, TombstoneId,
DeletePredicate, IngesterMapping, NonEmptyString, ParquetFileId, PartitionId, PartitionKey,
Sequence, SequenceNumber, ShardIndex, TombstoneId,
};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::StreamExt;
@ -18,7 +18,7 @@ use ingester::{
partition::resolver::CatalogPartitionResolver, FlatIngesterQueryResponse, IngesterData,
IngesterQueryResponse, Persister,
},
lifecycle::LifecycleHandle,
lifecycle::mock_handle::NoopLifecycleHandle,
querier_handler::prepare_data_to_querier,
};
use iox_catalog::interface::get_schema_by_name;
@ -964,31 +964,6 @@ impl MockIngester {
}
}
/// Special [`LifecycleHandle`] that never persists and always accepts more data.
///
/// This is useful to control persists manually.
struct NoopLifecycleHandle {}
impl LifecycleHandle for NoopLifecycleHandle {
fn log_write(
&self,
_partition_id: PartitionId,
_shard_id: ShardId,
_namespace_id: NamespaceId,
_table_id: TableId,
_sequence_number: SequenceNumber,
_bytes_written: usize,
_rows_written: usize,
) -> bool {
// do NOT pause ingest
false
}
fn can_resume_ingest(&self) -> bool {
true
}
}
#[async_trait]
impl IngesterFlightClient for MockIngester {
async fn query(