refactor: Use test constants in more places

So that when I change the type of PartitionIds to TransitionPartitionId,
I don't have to update all these places that just need an arbitrary
partition ID or related values.

These test constants probably didn't exist when these tests
were created.
pull/24376/head
Carol (Nichols || Goulding) 2023-06-26 13:12:15 -04:00
parent 7e30c91ceb
commit afcd2d859d
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 169 additions and 215 deletions

View File

@ -271,7 +271,6 @@ mod tests {
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::PartitionId;
use futures::Future; use futures::Future;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;

View File

@ -70,41 +70,16 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
use lazy_static::lazy_static;
use metric::Attributes;
use super::*; use super::*;
use crate::{ use crate::{
buffer_tree::{namespace::NamespaceName, table::TableName},
deferred_load::DeferredLoad,
dml_sink::{mock_sink::MockDmlSink, DmlError}, dml_sink::{mock_sink::MockDmlSink, DmlError},
test_util::make_write_op, test_util::{
make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
},
}; };
use assert_matches::assert_matches;
const PARTITION_ID: PartitionId = PartitionId::new(42); use metric::Attributes;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(24);
const TABLE_ID: TableId = TableId::new(2442);
const TABLE_NAME: &str = "banana-report";
const NAMESPACE_NAME: &str = "platanos";
lazy_static! {
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas");
static ref NAMESPACE_NAME_LOADER: Arc<DeferredLoad<NamespaceName>> =
Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { NamespaceName::from(NAMESPACE_NAME) },
&metric::Registry::default(),
));
static ref TABLE_NAME_LOADER: Arc<DeferredLoad<TableName>> = Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { TableName::from(TABLE_NAME) },
&metric::Registry::default(),
));
}
const LAYER_NAME: &str = "test-bananas"; const LAYER_NAME: &str = "test-bananas";
@ -123,12 +98,12 @@ mod tests {
let decorator = DmlSinkInstrumentation::new(LAYER_NAME, mock, &metrics); let decorator = DmlSinkInstrumentation::new(LAYER_NAME, mock, &metrics);
let op = IngestOp::Write(make_write_op( let op = IngestOp::Write(make_write_op(
&PARTITION_KEY, &ARBITRARY_PARTITION_KEY,
NAMESPACE_ID, ARBITRARY_NAMESPACE_ID,
TABLE_NAME, &ARBITRARY_TABLE_NAME,
TABLE_ID, ARBITRARY_TABLE_ID,
42, 42,
"banana-report,tag=1 v=2 42424242", &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
None, None,
)); ));
@ -141,7 +116,9 @@ mod tests {
// Validate the histogram with the specified attributes saw // Validate the histogram with the specified attributes saw
// an observation // an observation
let histogram = metrics let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("ingester_dml_sink_apply_duration") .get_instrument::<Metric<DurationHistogram>>(
"ingester_dml_sink_apply_duration"
)
.expect("failed to find metric") .expect("failed to find metric")
.get_observer(&Attributes::from(&$want_metric_attr)) .get_observer(&Attributes::from(&$want_metric_attr))
.expect("failed to find attributes") .expect("failed to find attributes")

View File

@ -55,42 +55,17 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
use lazy_static::lazy_static;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use crate::{
buffer_tree::{namespace::NamespaceName, table::TableName},
deferred_load::DeferredLoad,
dml_sink::{mock_sink::MockDmlSink, DmlError},
test_util::make_write_op,
};
use super::*; use super::*;
use crate::{
const PARTITION_ID: PartitionId = PartitionId::new(42); dml_sink::{mock_sink::MockDmlSink, DmlError},
const NAMESPACE_ID: NamespaceId = NamespaceId::new(24); test_util::{
const TABLE_ID: TableId = TableId::new(2442); make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
const TABLE_NAME: &str = "banana-report"; ARBITRARY_TABLE_NAME,
const NAMESPACE_NAME: &str = "platanos"; },
};
lazy_static! { use assert_matches::assert_matches;
static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas"); use std::sync::Arc;
static ref NAMESPACE_NAME_LOADER: Arc<DeferredLoad<NamespaceName>> = use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { NamespaceName::from(NAMESPACE_NAME) },
&metric::Registry::default(),
));
static ref TABLE_NAME_LOADER: Arc<DeferredLoad<TableName>> = Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { TableName::from(TABLE_NAME) },
&metric::Registry::default(),
));
}
#[track_caller] #[track_caller]
fn assert_trace(name: impl Into<String>, status: SpanStatus, traces: &dyn TraceCollector) { fn assert_trace(name: impl Into<String>, status: SpanStatus, traces: &dyn TraceCollector) {
@ -120,12 +95,12 @@ mod tests {
let span = SpanContext::new(Arc::clone(&traces)); let span = SpanContext::new(Arc::clone(&traces));
let op = IngestOp::Write(make_write_op( let op = IngestOp::Write(make_write_op(
&PARTITION_KEY, &ARBITRARY_PARTITION_KEY,
NAMESPACE_ID, ARBITRARY_NAMESPACE_ID,
TABLE_NAME, &ARBITRARY_TABLE_NAME,
TABLE_ID, ARBITRARY_TABLE_ID,
42, 42,
"banana-report,tag=1 v=2 42424242", &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
Some(span), Some(span),
)); ));
@ -148,12 +123,12 @@ mod tests {
let span = SpanContext::new(Arc::clone(&traces)); let span = SpanContext::new(Arc::clone(&traces));
let op = IngestOp::Write(make_write_op( let op = IngestOp::Write(make_write_op(
&PARTITION_KEY, &ARBITRARY_PARTITION_KEY,
NAMESPACE_ID, ARBITRARY_NAMESPACE_ID,
TABLE_NAME, &ARBITRARY_TABLE_NAME,
TABLE_ID, ARBITRARY_TABLE_ID,
42, 42,
"banana-report,tag=1 v=2 42424242", &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME),
Some(span), Some(span),
)); ));

View File

@ -107,12 +107,12 @@ pub(super) async fn compact_persisting_batch(
mod tests { mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::PartitionId;
use iox_query::test::{raw_data, TestChunk}; use iox_query::test::{raw_data, TestChunk};
use mutable_batch_lp::lines_to_batches; use mutable_batch_lp::lines_to_batches;
use schema::Projection; use schema::Projection;
use super::*; use super::*;
use crate::test_util::ARBITRARY_PARTITION_ID;
// this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782
// where if sending in a single row it would compact into an output of two batches, one of // where if sending in a single row it would compact into an output of two batches, one of
@ -127,7 +127,7 @@ mod tests {
.to_arrow(Projection::All) .to_arrow(Projection::All)
.unwrap(); .unwrap();
let batch = QueryAdaptor::new(PartitionId::new(1), vec![Arc::new(batch)]); let batch = QueryAdaptor::new(ARBITRARY_PARTITION_ID, vec![Arc::new(batch)]);
// verify PK // verify PK
let schema = batch.schema(); let schema = batch.schema();
@ -162,7 +162,7 @@ mod tests {
async fn test_compact_batch_on_one_record_batch_no_dupilcates() { async fn test_compact_batch_on_one_record_batch_no_dupilcates() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_one_record_batch_with_influxtype_no_duplicates().await, create_one_record_batch_with_influxtype_no_duplicates().await,
); );
@ -211,7 +211,7 @@ mod tests {
async fn test_compact_batch_no_sort_key() { async fn test_compact_batch_no_sort_key() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_cardinality().await, create_batches_with_influxtype_different_cardinality().await,
); );
@ -265,7 +265,7 @@ mod tests {
async fn test_compact_batch_with_specified_sort_key() { async fn test_compact_batch_with_specified_sort_key() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_cardinality().await, create_batches_with_influxtype_different_cardinality().await,
); );
@ -324,7 +324,7 @@ mod tests {
async fn test_compact_batch_new_column_for_sort_key() { async fn test_compact_batch_new_column_for_sort_key() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_cardinality().await, create_batches_with_influxtype_different_cardinality().await,
); );
@ -387,7 +387,7 @@ mod tests {
async fn test_compact_batch_missing_column_for_sort_key() { async fn test_compact_batch_missing_column_for_sort_key() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_cardinality().await, create_batches_with_influxtype_different_cardinality().await,
); );
@ -449,7 +449,7 @@ mod tests {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_one_row_record_batch_with_influxtype().await, create_one_row_record_batch_with_influxtype().await,
); );
@ -490,7 +490,7 @@ mod tests {
async fn test_compact_one_batch_with_duplicates() { async fn test_compact_one_batch_with_duplicates() {
// create input data // create input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_one_record_batch_with_influxtype_duplicates().await, create_one_record_batch_with_influxtype_duplicates().await,
); );
@ -538,7 +538,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_compact_many_batches_same_columns_with_duplicates() { async fn test_compact_many_batches_same_columns_with_duplicates() {
// create many-batches input data // create many-batches input data
let batch = QueryAdaptor::new(PartitionId::new(1), create_batches_with_influxtype().await); let batch = QueryAdaptor::new(
ARBITRARY_PARTITION_ID,
create_batches_with_influxtype().await,
);
// verify PK // verify PK
let schema = batch.schema(); let schema = batch.schema();
@ -583,7 +586,7 @@ mod tests {
async fn test_compact_many_batches_different_columns_with_duplicates() { async fn test_compact_many_batches_different_columns_with_duplicates() {
// create many-batches input data // create many-batches input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_columns().await, create_batches_with_influxtype_different_columns().await,
); );
@ -634,7 +637,7 @@ mod tests {
async fn test_compact_many_batches_different_columns_different_order_with_duplicates() { async fn test_compact_many_batches_different_columns_different_order_with_duplicates() {
// create many-batches input data // create many-batches input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_different_columns_different_order().await, create_batches_with_influxtype_different_columns_different_order().await,
); );
@ -688,7 +691,7 @@ mod tests {
async fn test_compact_many_batches_same_columns_different_types() { async fn test_compact_many_batches_same_columns_different_types() {
// create many-batches input data // create many-batches input data
let batch = QueryAdaptor::new( let batch = QueryAdaptor::new(
PartitionId::new(1), ARBITRARY_PARTITION_ID,
create_batches_with_influxtype_same_columns_different_type().await, create_batches_with_influxtype_same_columns_different_type().await,
); );

View File

@ -156,19 +156,15 @@ pub(crate) mod mock {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
use super::*; use super::*;
use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1); use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp};
const TABLE_ID: TableId = TableId::new(1);
const PARTITION_ID: PartitionId = PartitionId::new(1);
fn arbitrary_file_meta() -> ParquetFileParams { fn arbitrary_file_meta() -> ParquetFileParams {
ParquetFileParams { ParquetFileParams {
namespace_id: NAMESPACE_ID, namespace_id: ARBITRARY_NAMESPACE_ID,
table_id: TABLE_ID, table_id: ARBITRARY_TABLE_ID,
partition_id: PARTITION_ID, partition_id: ARBITRARY_PARTITION_ID,
partition_hash_id: None, partition_hash_id: None,
object_store_id: Default::default(), object_store_id: Default::default(),
min_time: Timestamp::new(42), min_time: Timestamp::new(42),

View File

@ -148,21 +148,16 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use super::*;
use crate::{
persist::completion_observer::mock::MockCompletionObserver,
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID},
};
use data_types::{ use data_types::{
sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, NamespaceId, sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp,
ParquetFileParams, PartitionId, TableId, Timestamp,
}; };
use metric::assert_histogram; use metric::assert_histogram;
use std::sync::Arc;
use crate::persist::completion_observer::mock::MockCompletionObserver;
use super::*;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(1);
const TABLE_ID: TableId = TableId::new(1);
const PARTITION_ID: PartitionId = PartitionId::new(1);
#[tokio::test] #[tokio::test]
async fn test_persisted_file_metrics() { async fn test_persisted_file_metrics() {
@ -172,9 +167,9 @@ mod tests {
let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics); let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics);
let meta = ParquetFileParams { let meta = ParquetFileParams {
namespace_id: NAMESPACE_ID, namespace_id: ARBITRARY_NAMESPACE_ID,
table_id: TABLE_ID, table_id: ARBITRARY_TABLE_ID,
partition_id: PARTITION_ID, partition_id: ARBITRARY_PARTITION_ID,
partition_hash_id: None, partition_hash_id: None,
object_store_id: Default::default(), object_store_id: Default::default(),
min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _), min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _),

View File

@ -426,20 +426,22 @@ struct MetricState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{sync::Arc, time::Duration};
use crate::{make_batch, make_partition_stream, query::mock_query_exec::MockQueryExec};
use super::*; use super::*;
use crate::{
make_batch, make_partition_stream,
query::mock_query_exec::MockQueryExec,
test_util::{
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID,
},
};
use arrow::array::{Float32Array, Int64Array}; use arrow::array::{Float32Array, Int64Array};
use data_types::{PartitionHashId, PartitionId, PartitionKey}; use data_types::PartitionHashId;
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
use iox_time::MockProvider; use iox_time::MockProvider;
use metric::{assert_histogram, Attributes}; use metric::{assert_histogram, Attributes};
use std::{sync::Arc, time::Duration};
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const TABLE_ID: TableId = TableId::new(42);
const TIME_STEP: Duration = Duration::from_secs(42); const TIME_STEP: Duration = Duration::from_secs(42);
/// A query against a table that has been persisted / no longer contains any /// A query against a table that has been persisted / no longer contains any
@ -451,10 +453,10 @@ mod tests {
// Construct a stream with no batches. // Construct a stream with no batches.
let stream = PartitionStream::new(stream::iter([PartitionResponse::new( let stream = PartitionStream::new(stream::iter([PartitionResponse::new(
vec![], vec![],
PartitionId::new(42), ARBITRARY_PARTITION_ID,
Some(PartitionHashId::new( Some(PartitionHashId::new(
TABLE_ID, ARBITRARY_TABLE_ID,
&PartitionKey::from("arbitrary"), &ARBITRARY_PARTITION_KEY,
)), )),
42, 42,
)])); )]));
@ -465,7 +467,7 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time)); .with_time_provider(Arc::clone(&mock_time));
let response = layer let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await .await
.expect("query should succeed"); .expect("query should succeed");
@ -546,7 +548,7 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time)); .with_time_provider(Arc::clone(&mock_time));
let response = layer let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await .await
.expect("query should succeed"); .expect("query should succeed");
@ -626,7 +628,7 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time)); .with_time_provider(Arc::clone(&mock_time));
let response = layer let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await .await
.expect("query should succeed"); .expect("query should succeed");
@ -706,7 +708,7 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time)); .with_time_provider(Arc::clone(&mock_time));
let response = layer let response = layer
.query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await .await
.expect("query should succeed"); .expect("query should succeed");

View File

@ -375,22 +375,20 @@ fn encode_response(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use arrow::array::{Float64Array, Int32Array}; use super::*;
use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
use assert_matches::assert_matches;
use bytes::Bytes;
use data_types::PartitionKey;
use tonic::Code;
use crate::{ use crate::{
make_batch, make_batch,
query::{ query::{
mock_query_exec::MockQueryExec, partition_response::PartitionResponse, mock_query_exec::MockQueryExec, partition_response::PartitionResponse,
response::PartitionStream, response::PartitionStream,
}, },
test_util::ARBITRARY_PARTITION_KEY,
}; };
use arrow::array::{Float64Array, Int32Array};
use super::*; use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream};
use assert_matches::assert_matches;
use bytes::Bytes;
use tonic::Code;
#[tokio::test] #[tokio::test]
async fn limits_concurrent_queries() { async fn limits_concurrent_queries() {
@ -430,7 +428,7 @@ mod tests {
let ingester_id = IngesterId::new(); let ingester_id = IngesterId::new();
let partition_hash_id = Some(PartitionHashId::new( let partition_hash_id = Some(PartitionHashId::new(
TableId::new(3), TableId::new(3),
&PartitionKey::from("arbitrary"), &ARBITRARY_PARTITION_KEY,
)); ));
let (batch1, schema1) = make_batch!( let (batch1, schema1) = make_batch!(
Float64Array("float" => vec![1.1, 2.2, 3.3]), Float64Array("float" => vec![1.1, 2.2, 3.3]),

View File

@ -220,21 +220,21 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use data_types::SequenceNumber; use data_types::SequenceNumber;
use generated_types::influxdata::pbdata::v1::{ use generated_types::influxdata::pbdata::v1::{
column::{SemanticType, Values}, column::{SemanticType, Values},
Column, DatabaseBatch, TableBatch, Column, DatabaseBatch, TableBatch,
}; };
use std::sync::Arc;
use super::*; use super::*;
use crate::dml_payload::IngestOp; use crate::{
use crate::dml_sink::mock_sink::MockDmlSink; dml_payload::IngestOp,
dml_sink::mock_sink::MockDmlSink,
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID},
};
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const PARTITION_KEY: &str = "bananas";
const PERSIST_QUEUE_DEPTH: usize = 42; const PERSIST_QUEUE_DEPTH: usize = 42;
macro_rules! test_rpc_write { macro_rules! test_rpc_write {
@ -261,7 +261,13 @@ mod tests {
.write(Request::new($request)) .write(Request::new($request))
.await; .await;
assert_eq!(ret.is_err(), $want_err, "wanted handler error {} got {:?}", $want_err, ret); assert_eq!(
ret.is_err(),
$want_err,
"wanted handler error {} got {:?}",
$want_err,
ret
);
assert_matches!(mock.get_calls().as_slice(), $($want_calls)+); assert_matches!(mock.get_calls().as_slice(), $($want_calls)+);
} }
} }
@ -272,10 +278,10 @@ mod tests {
apply_ok, apply_ok,
request = proto::WriteRequest { request = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![TableBatch { table_batches: vec![TableBatch {
table_id: 42, table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column { columns: vec![Column {
column_name: "time".to_string(), column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(), semantic_type: SemanticType::Time.into(),
@ -299,10 +305,13 @@ mod tests {
want_err = false, want_err = false,
want_calls = [IngestOp::Write(w)] => { want_calls = [IngestOp::Write(w)] => {
// Assert the various IngestOp properties match the expected values // Assert the various IngestOp properties match the expected values
assert_eq!(w.namespace(), NAMESPACE_ID); assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID);
assert_eq!(w.tables().count(), 1); assert_eq!(w.tables().count(), 1);
assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY)); assert_eq!(w.partition_key(), &*ARBITRARY_PARTITION_KEY);
assert_eq!(w.tables().next().unwrap().1.partitioned_data().sequence_number(), SequenceNumber::new(1)); assert_eq!(
w.tables().next().unwrap().1.partitioned_data().sequence_number(),
SequenceNumber::new(1)
);
} }
); );
@ -318,8 +327,8 @@ mod tests {
no_tables, no_tables,
request = proto::WriteRequest { request = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![], table_batches: vec![],
}), }),
}, },
@ -332,10 +341,10 @@ mod tests {
batch_error, batch_error,
request = proto::WriteRequest { request = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![TableBatch { table_batches: vec![TableBatch {
table_id: 42, table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column { columns: vec![Column {
column_name: "time".to_string(), column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(), semantic_type: SemanticType::Time.into(),
@ -373,10 +382,10 @@ mod tests {
let req = proto::WriteRequest { let req = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![TableBatch { table_batches: vec![TableBatch {
table_id: 42, table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column { columns: vec![Column {
column_name: "time".to_string(), column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(), semantic_type: SemanticType::Time.into(),
@ -430,10 +439,10 @@ mod tests {
let req = proto::WriteRequest { let req = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![TableBatch { table_batches: vec![TableBatch {
table_id: 42, table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column { columns: vec![Column {
column_name: "time".to_string(), column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(), semantic_type: SemanticType::Time.into(),
@ -486,10 +495,10 @@ mod tests {
let req = proto::WriteRequest { let req = proto::WriteRequest {
payload: Some(DatabaseBatch { payload: Some(DatabaseBatch {
database_id: NAMESPACE_ID.get(), database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: PARTITION_KEY.to_string(), partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![TableBatch { table_batches: vec![TableBatch {
table_id: 42, table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column { columns: vec![Column {
column_name: "time".to_string(), column_name: "time".to_string(),
semantic_type: SemanticType::Time.into(), semantic_type: SemanticType::Time.into(),

View File

@ -229,7 +229,10 @@ macro_rules! make_partition_stream {
$( $(
let (batch, this_schema) = $batch; let (batch, this_schema) = $batch;
batches.push(batch); batches.push(batch);
schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas"); schema = Schema::try_merge([
schema,
(*this_schema).clone()
]).expect("incompatible batch schemas");
)+ )+
drop(schema); drop(schema);
@ -241,11 +244,11 @@ macro_rules! make_partition_stream {
// batches are in a different partition, not what the actual identifier // batches are in a different partition, not what the actual identifier
// values are. This will go away when the ingester no longer sends // values are. This will go away when the ingester no longer sends
// PartitionIds. // PartitionIds.
PartitionId::new($id), data_types::PartitionId::new($id),
Some( Some(
PartitionHashId::new( PartitionHashId::new(
TableId::new($id), TableId::new($id),
&PartitionKey::from("arbitrary") &*ARBITRARY_PARTITION_KEY
) )
), ),
42, 42,

View File

@ -204,22 +204,18 @@ impl WalReferenceHandle {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use std::{pin::Pin, task::Poll, time::Duration}; use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ use data_types::{ColumnId, ColumnSet, ParquetFileParams, Timestamp};
ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp,
};
use futures::{task::Context, Future, FutureExt}; use futures::{task::Context, Future, FutureExt};
use metric::{assert_counter, U64Gauge}; use metric::{assert_counter, U64Gauge};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{pin::Pin, task::Poll, time::Duration};
use test_helpers::timeout::FutureTimeout; use test_helpers::timeout::FutureTimeout;
use tokio::sync::Notify; use tokio::sync::Notify;
use super::*;
/// A mock file deleter that records the IDs it was asked to delete. /// A mock file deleter that records the IDs it was asked to delete.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct MockWalDeleter { struct MockWalDeleter {
@ -265,9 +261,9 @@ mod tests {
{ {
Arc::new(CompletedPersist::new( Arc::new(CompletedPersist::new(
ParquetFileParams { ParquetFileParams {
namespace_id: NamespaceId::new(1), namespace_id: ARBITRARY_NAMESPACE_ID,
table_id: TableId::new(2), table_id: ARBITRARY_TABLE_ID,
partition_id: PartitionId::new(3), partition_id: ARBITRARY_PARTITION_ID,
partition_hash_id: None, partition_hash_id: None,
object_store_id: Default::default(), object_store_id: Default::default(),
min_time: Timestamp::new(42), min_time: Timestamp::new(42),

View File

@ -131,26 +131,21 @@ impl WalAppender for Arc<wal::Wal> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use core::{future::Future, marker::Send, pin::Pin}; use super::*;
use std::{future, sync::Arc};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use mutable_batch_lp::lines_to_batches;
use wal::Wal;
use crate::{ use crate::{
dml_payload::write::{PartitionedData, TableData, WriteOperation}, dml_payload::write::{PartitionedData, TableData, WriteOperation},
dml_sink::mock_sink::MockDmlSink, dml_sink::mock_sink::MockDmlSink,
test_util::make_write_op, test_util::{
make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
},
}; };
use assert_matches::assert_matches;
use super::*; use core::{future::Future, marker::Send, pin::Pin};
use data_types::{SequenceNumber, TableId};
const TABLE_ID: TableId = TableId::new(44); use mutable_batch_lp::lines_to_batches;
const TABLE_NAME: &str = "bananas"; use std::{future, sync::Arc};
const NAMESPACE_NAME: &str = "platanos"; use wal::Wal;
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test] #[tokio::test]
async fn test_append() { async fn test_append() {
@ -161,22 +156,25 @@ mod tests {
// Generate a test op containing writes for multiple tables that will // Generate a test op containing writes for multiple tables that will
// be appended and read back // be appended and read back
let mut tables_by_name = lines_to_batches( let mut tables_by_name = lines_to_batches(
"bananas,region=Madrid temp=35 4242424242\n\ &format!(
"{},region=Madrid temp=35 4242424242\n\
banani,region=Iceland temp=25 7676767676", banani,region=Iceland temp=25 7676767676",
&*ARBITRARY_TABLE_NAME
),
0, 0,
) )
.expect("invalid line proto"); .expect("invalid line proto");
let op = WriteOperation::new( let op = WriteOperation::new(
NAMESPACE_ID, ARBITRARY_NAMESPACE_ID,
[ [
( (
TABLE_ID, ARBITRARY_TABLE_ID,
TableData::new( TableData::new(
TABLE_ID, ARBITRARY_TABLE_ID,
PartitionedData::new( PartitionedData::new(
SequenceNumber::new(42), SequenceNumber::new(42),
tables_by_name tables_by_name
.remove(TABLE_NAME) .remove(ARBITRARY_TABLE_NAME.as_ref())
.expect("table does not exist in LP"), .expect("table does not exist in LP"),
), ),
), ),
@ -196,7 +194,7 @@ mod tests {
] ]
.into_iter() .into_iter()
.collect(), .collect(),
PartitionKey::from("p1"), ARBITRARY_PARTITION_KEY.clone(),
None, None,
); );
@ -240,7 +238,7 @@ mod tests {
assert_eq!(read_op.sequence_number, 42); assert_eq!(read_op.sequence_number, 42);
assert_eq!( assert_eq!(
read_op.table_write_sequence_numbers, read_op.table_write_sequence_numbers,
[(TABLE_ID, 42), (SECOND_TABLE_ID, 42)] [(ARBITRARY_TABLE_ID, 42), (SECOND_TABLE_ID, 42)]
.into_iter() .into_iter()
.collect::<std::collections::HashMap<TableId, u64>>() .collect::<std::collections::HashMap<TableId, u64>>()
); );
@ -249,7 +247,7 @@ mod tests {
// The payload should match the serialised form of the "op" originally // The payload should match the serialised form of the "op" originally
// wrote above. // wrote above.
let want = encode_write_op(NAMESPACE_ID, &op); let want = encode_write_op(ARBITRARY_NAMESPACE_ID, &op);
assert_eq!(want, *payload); assert_eq!(want, *payload);
} }
@ -279,12 +277,15 @@ mod tests {
// Generate the test op // Generate the test op
let op = make_write_op( let op = make_write_op(
&PartitionKey::from("p1"), &ARBITRARY_PARTITION_KEY,
NAMESPACE_ID, ARBITRARY_NAMESPACE_ID,
TABLE_NAME, &ARBITRARY_TABLE_NAME,
TABLE_ID, ARBITRARY_TABLE_ID,
42, 42,
r#"bananas,region=Madrid temp=35 4242424242"#, &format!(
r#"{},region=Madrid temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
None, None,
); );