From afcd2d859d15f94ce56c39e02694b275fb852efb Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Jun 2023 13:12:15 -0400 Subject: [PATCH] refactor: Use test constants in more places So that when I change the type of PartitionIds to TransitionPartitionId, I don't have to update all these places that just need an arbitrary partition ID or related values. These test constants probably didn't exist when these tests were created. --- .../partition/resolver/coalesce.rs | 1 - ingester/src/dml_sink/instrumentation.rs | 51 ++++----------- ingester/src/dml_sink/tracing.rs | 65 ++++++------------- ingester/src/persist/compact.rs | 29 +++++---- ingester/src/persist/completion_observer.rs | 14 ++-- ingester/src/persist/file_metrics.rs | 25 +++---- ingester/src/query/result_instrumentation.rs | 32 ++++----- ingester/src/server/grpc/query.rs | 18 +++-- ingester/src/server/grpc/rpc_write.rs | 63 ++++++++++-------- ingester/src/test_util.rs | 9 ++- ingester/src/wal/reference_tracker/handle.rs | 18 ++--- ingester/src/wal/wal_sink.rs | 59 ++++++++--------- 12 files changed, 169 insertions(+), 215 deletions(-) diff --git a/ingester/src/buffer_tree/partition/resolver/coalesce.rs b/ingester/src/buffer_tree/partition/resolver/coalesce.rs index 4c8459fc60..b89ca460e4 100644 --- a/ingester/src/buffer_tree/partition/resolver/coalesce.rs +++ b/ingester/src/buffer_tree/partition/resolver/coalesce.rs @@ -271,7 +271,6 @@ mod tests { }; use assert_matches::assert_matches; - use data_types::PartitionId; use futures::Future; use futures::{stream::FuturesUnordered, StreamExt}; use lazy_static::lazy_static; diff --git a/ingester/src/dml_sink/instrumentation.rs b/ingester/src/dml_sink/instrumentation.rs index 1a5b4c3697..34319cb360 100644 --- a/ingester/src/dml_sink/instrumentation.rs +++ b/ingester/src/dml_sink/instrumentation.rs @@ -70,41 +70,16 @@ where #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - - use assert_matches::assert_matches; - use data_types::{NamespaceId, PartitionId, PartitionKey, TableId}; - use lazy_static::lazy_static; - use metric::Attributes; - use super::*; use crate::{ - buffer_tree::{namespace::NamespaceName, table::TableName}, - deferred_load::DeferredLoad, dml_sink::{mock_sink::MockDmlSink, DmlError}, - test_util::make_write_op, + test_util::{ + make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, + ARBITRARY_TABLE_NAME, + }, }; - - const PARTITION_ID: PartitionId = PartitionId::new(42); - const NAMESPACE_ID: NamespaceId = NamespaceId::new(24); - const TABLE_ID: TableId = TableId::new(2442); - const TABLE_NAME: &str = "banana-report"; - const NAMESPACE_NAME: &str = "platanos"; - - lazy_static! { - static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas"); - static ref NAMESPACE_NAME_LOADER: Arc> = - Arc::new(DeferredLoad::new( - Duration::from_secs(1), - async { NamespaceName::from(NAMESPACE_NAME) }, - &metric::Registry::default(), - )); - static ref TABLE_NAME_LOADER: Arc> = Arc::new(DeferredLoad::new( - Duration::from_secs(1), - async { TableName::from(TABLE_NAME) }, - &metric::Registry::default(), - )); - } + use assert_matches::assert_matches; + use metric::Attributes; const LAYER_NAME: &str = "test-bananas"; @@ -123,12 +98,12 @@ mod tests { let decorator = DmlSinkInstrumentation::new(LAYER_NAME, mock, &metrics); let op = IngestOp::Write(make_write_op( - &PARTITION_KEY, - NAMESPACE_ID, - TABLE_NAME, - TABLE_ID, + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, 42, - "banana-report,tag=1 v=2 42424242", + &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME), None, )); @@ -141,7 +116,9 @@ mod tests { // Validate the histogram with the specified attributes saw // an observation let histogram = metrics - .get_instrument::>("ingester_dml_sink_apply_duration") + .get_instrument::>( + "ingester_dml_sink_apply_duration" + ) .expect("failed to find metric") .get_observer(&Attributes::from(&$want_metric_attr)) .expect("failed to find attributes") diff --git a/ingester/src/dml_sink/tracing.rs b/ingester/src/dml_sink/tracing.rs index c4172a0dbf..d47db8f49f 100644 --- a/ingester/src/dml_sink/tracing.rs +++ b/ingester/src/dml_sink/tracing.rs @@ -55,42 +55,17 @@ where #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - - use assert_matches::assert_matches; - use data_types::{NamespaceId, PartitionId, PartitionKey, TableId}; - use lazy_static::lazy_static; - use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector}; - - use crate::{ - buffer_tree::{namespace::NamespaceName, table::TableName}, - deferred_load::DeferredLoad, - dml_sink::{mock_sink::MockDmlSink, DmlError}, - test_util::make_write_op, - }; - use super::*; - - const PARTITION_ID: PartitionId = PartitionId::new(42); - const NAMESPACE_ID: NamespaceId = NamespaceId::new(24); - const TABLE_ID: TableId = TableId::new(2442); - const TABLE_NAME: &str = "banana-report"; - const NAMESPACE_NAME: &str = "platanos"; - - lazy_static! { - static ref PARTITION_KEY: PartitionKey = PartitionKey::from("bananas"); - static ref NAMESPACE_NAME_LOADER: Arc> = - Arc::new(DeferredLoad::new( - Duration::from_secs(1), - async { NamespaceName::from(NAMESPACE_NAME) }, - &metric::Registry::default(), - )); - static ref TABLE_NAME_LOADER: Arc> = Arc::new(DeferredLoad::new( - Duration::from_secs(1), - async { TableName::from(TABLE_NAME) }, - &metric::Registry::default(), - )); - } + use crate::{ + dml_sink::{mock_sink::MockDmlSink, DmlError}, + test_util::{ + make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, + ARBITRARY_TABLE_NAME, + }, + }; + use assert_matches::assert_matches; + use std::sync::Arc; + use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector}; #[track_caller] fn assert_trace(name: impl Into, status: SpanStatus, traces: &dyn TraceCollector) { @@ -120,12 +95,12 @@ mod tests { let span = SpanContext::new(Arc::clone(&traces)); let op = IngestOp::Write(make_write_op( - &PARTITION_KEY, - NAMESPACE_ID, - TABLE_NAME, - TABLE_ID, + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, 42, - "banana-report,tag=1 v=2 42424242", + &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME), Some(span), )); @@ -148,12 +123,12 @@ mod tests { let span = SpanContext::new(Arc::clone(&traces)); let op = IngestOp::Write(make_write_op( - &PARTITION_KEY, - NAMESPACE_ID, - TABLE_NAME, - TABLE_ID, + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, 42, - "banana-report,tag=1 v=2 42424242", + &format!("{},tag=1 v=2 42424242", &*ARBITRARY_TABLE_NAME), Some(span), )); diff --git a/ingester/src/persist/compact.rs b/ingester/src/persist/compact.rs index 6a78cadd16..c56f6056c0 100644 --- a/ingester/src/persist/compact.rs +++ b/ingester/src/persist/compact.rs @@ -107,12 +107,12 @@ pub(super) async fn compact_persisting_batch( mod tests { use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; - use data_types::PartitionId; use iox_query::test::{raw_data, TestChunk}; use mutable_batch_lp::lines_to_batches; use schema::Projection; use super::*; + use crate::test_util::ARBITRARY_PARTITION_ID; // this test was added to guard against https://github.com/influxdata/influxdb_iox/issues/3782 // where if sending in a single row it would compact into an output of two batches, one of @@ -127,7 +127,7 @@ mod tests { .to_arrow(Projection::All) .unwrap(); - let batch = QueryAdaptor::new(PartitionId::new(1), vec![Arc::new(batch)]); + let batch = QueryAdaptor::new(ARBITRARY_PARTITION_ID, vec![Arc::new(batch)]); // verify PK let schema = batch.schema(); @@ -162,7 +162,7 @@ mod tests { async fn test_compact_batch_on_one_record_batch_no_dupilcates() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_one_record_batch_with_influxtype_no_duplicates().await, ); @@ -211,7 +211,7 @@ mod tests { async fn test_compact_batch_no_sort_key() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_cardinality().await, ); @@ -265,7 +265,7 @@ mod tests { async fn test_compact_batch_with_specified_sort_key() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_cardinality().await, ); @@ -324,7 +324,7 @@ mod tests { async fn test_compact_batch_new_column_for_sort_key() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_cardinality().await, ); @@ -387,7 +387,7 @@ mod tests { async fn test_compact_batch_missing_column_for_sort_key() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_cardinality().await, ); @@ -449,7 +449,7 @@ mod tests { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_one_row_record_batch_with_influxtype().await, ); @@ -490,7 +490,7 @@ mod tests { async fn test_compact_one_batch_with_duplicates() { // create input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_one_record_batch_with_influxtype_duplicates().await, ); @@ -538,7 +538,10 @@ mod tests { #[tokio::test] async fn test_compact_many_batches_same_columns_with_duplicates() { // create many-batches input data - let batch = QueryAdaptor::new(PartitionId::new(1), create_batches_with_influxtype().await); + let batch = QueryAdaptor::new( + ARBITRARY_PARTITION_ID, + create_batches_with_influxtype().await, + ); // verify PK let schema = batch.schema(); @@ -583,7 +586,7 @@ mod tests { async fn test_compact_many_batches_different_columns_with_duplicates() { // create many-batches input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_columns().await, ); @@ -634,7 +637,7 @@ mod tests { async fn test_compact_many_batches_different_columns_different_order_with_duplicates() { // create many-batches input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_different_columns_different_order().await, ); @@ -688,7 +691,7 @@ mod tests { async fn test_compact_many_batches_same_columns_different_types() { // create many-batches input data let batch = QueryAdaptor::new( - PartitionId::new(1), + ARBITRARY_PARTITION_ID, create_batches_with_influxtype_same_columns_different_type().await, ); diff --git a/ingester/src/persist/completion_observer.rs b/ingester/src/persist/completion_observer.rs index d28cee17ee..a9e898ed49 100644 --- a/ingester/src/persist/completion_observer.rs +++ b/ingester/src/persist/completion_observer.rs @@ -156,19 +156,15 @@ pub(crate) mod mock { #[cfg(test)] mod tests { - use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp}; - use super::*; - - const NAMESPACE_ID: NamespaceId = NamespaceId::new(1); - const TABLE_ID: TableId = TableId::new(1); - const PARTITION_ID: PartitionId = PartitionId::new(1); + use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}; + use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp}; fn arbitrary_file_meta() -> ParquetFileParams { ParquetFileParams { - namespace_id: NAMESPACE_ID, - table_id: TABLE_ID, - partition_id: PARTITION_ID, + namespace_id: ARBITRARY_NAMESPACE_ID, + table_id: ARBITRARY_TABLE_ID, + partition_id: ARBITRARY_PARTITION_ID, partition_hash_id: None, object_store_id: Default::default(), min_time: Timestamp::new(42), diff --git a/ingester/src/persist/file_metrics.rs b/ingester/src/persist/file_metrics.rs index 5d2c58b80d..54f268f926 100644 --- a/ingester/src/persist/file_metrics.rs +++ b/ingester/src/persist/file_metrics.rs @@ -148,21 +148,16 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - + use super::*; + use crate::{ + persist::completion_observer::mock::MockCompletionObserver, + test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}, + }; use data_types::{ - sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, NamespaceId, - ParquetFileParams, PartitionId, TableId, Timestamp, + sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp, }; use metric::assert_histogram; - - use crate::persist::completion_observer::mock::MockCompletionObserver; - - use super::*; - - const NAMESPACE_ID: NamespaceId = NamespaceId::new(1); - const TABLE_ID: TableId = TableId::new(1); - const PARTITION_ID: PartitionId = PartitionId::new(1); + use std::sync::Arc; #[tokio::test] async fn test_persisted_file_metrics() { @@ -172,9 +167,9 @@ mod tests { let decorator = ParquetFileInstrumentation::new(Arc::clone(&inner), &metrics); let meta = ParquetFileParams { - namespace_id: NAMESPACE_ID, - table_id: TABLE_ID, - partition_id: PARTITION_ID, + namespace_id: ARBITRARY_NAMESPACE_ID, + table_id: ARBITRARY_TABLE_ID, + partition_id: ARBITRARY_PARTITION_ID, partition_hash_id: None, object_store_id: Default::default(), min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _), diff --git a/ingester/src/query/result_instrumentation.rs b/ingester/src/query/result_instrumentation.rs index 11e41282f1..a412059b82 100644 --- a/ingester/src/query/result_instrumentation.rs +++ b/ingester/src/query/result_instrumentation.rs @@ -426,20 +426,22 @@ struct MetricState { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - - use crate::{make_batch, make_partition_stream, query::mock_query_exec::MockQueryExec}; - use super::*; - + use crate::{ + make_batch, make_partition_stream, + query::mock_query_exec::MockQueryExec, + test_util::{ + ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, + ARBITRARY_TABLE_ID, + }, + }; use arrow::array::{Float32Array, Int64Array}; - use data_types::{PartitionHashId, PartitionId, PartitionKey}; + use data_types::PartitionHashId; use futures::{stream, StreamExt}; use iox_time::MockProvider; use metric::{assert_histogram, Attributes}; + use std::{sync::Arc, time::Duration}; - const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); - const TABLE_ID: TableId = TableId::new(42); const TIME_STEP: Duration = Duration::from_secs(42); /// A query against a table that has been persisted / no longer contains any @@ -451,10 +453,10 @@ mod tests { // Construct a stream with no batches. let stream = PartitionStream::new(stream::iter([PartitionResponse::new( vec![], - PartitionId::new(42), + ARBITRARY_PARTITION_ID, Some(PartitionHashId::new( - TABLE_ID, - &PartitionKey::from("arbitrary"), + ARBITRARY_TABLE_ID, + &ARBITRARY_PARTITION_KEY, )), 42, )])); @@ -465,7 +467,7 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .await .expect("query should succeed"); @@ -546,7 +548,7 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .await .expect("query should succeed"); @@ -626,7 +628,7 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .await .expect("query should succeed"); @@ -706,7 +708,7 @@ mod tests { .with_time_provider(Arc::clone(&mock_time)); let response = layer - .query_exec(NAMESPACE_ID, TABLE_ID, vec![], None) + .query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None) .await .expect("query should succeed"); diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index ca055b8306..77d457f368 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -375,22 +375,20 @@ fn encode_response( #[cfg(test)] mod tests { - use arrow::array::{Float64Array, Int32Array}; - use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream}; - use assert_matches::assert_matches; - use bytes::Bytes; - use data_types::PartitionKey; - use tonic::Code; - + use super::*; use crate::{ make_batch, query::{ mock_query_exec::MockQueryExec, partition_response::PartitionResponse, response::PartitionStream, }, + test_util::ARBITRARY_PARTITION_KEY, }; - - use super::*; + use arrow::array::{Float64Array, Int32Array}; + use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream}; + use assert_matches::assert_matches; + use bytes::Bytes; + use tonic::Code; #[tokio::test] async fn limits_concurrent_queries() { @@ -430,7 +428,7 @@ mod tests { let ingester_id = IngesterId::new(); let partition_hash_id = Some(PartitionHashId::new( TableId::new(3), - &PartitionKey::from("arbitrary"), + &ARBITRARY_PARTITION_KEY, )); let (batch1, schema1) = make_batch!( Float64Array("float" => vec![1.1, 2.2, 3.3]), diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 8809ac5872..844e5e23bd 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -220,21 +220,21 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - use assert_matches::assert_matches; use data_types::SequenceNumber; use generated_types::influxdata::pbdata::v1::{ column::{SemanticType, Values}, Column, DatabaseBatch, TableBatch, }; + use std::sync::Arc; use super::*; - use crate::dml_payload::IngestOp; - use crate::dml_sink::mock_sink::MockDmlSink; + use crate::{ + dml_payload::IngestOp, + dml_sink::mock_sink::MockDmlSink, + test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID}, + }; - const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); - const PARTITION_KEY: &str = "bananas"; const PERSIST_QUEUE_DEPTH: usize = 42; macro_rules! test_rpc_write { @@ -261,7 +261,13 @@ mod tests { .write(Request::new($request)) .await; - assert_eq!(ret.is_err(), $want_err, "wanted handler error {} got {:?}", $want_err, ret); + assert_eq!( + ret.is_err(), + $want_err, + "wanted handler error {} got {:?}", + $want_err, + ret + ); assert_matches!(mock.get_calls().as_slice(), $($want_calls)+); } } @@ -272,10 +278,10 @@ mod tests { apply_ok, request = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![TableBatch { - table_id: 42, + table_id: ARBITRARY_TABLE_ID.get(), columns: vec![Column { column_name: "time".to_string(), semantic_type: SemanticType::Time.into(), @@ -299,10 +305,13 @@ mod tests { want_err = false, want_calls = [IngestOp::Write(w)] => { // Assert the various IngestOp properties match the expected values - assert_eq!(w.namespace(), NAMESPACE_ID); + assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID); assert_eq!(w.tables().count(), 1); - assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY)); - assert_eq!(w.tables().next().unwrap().1.partitioned_data().sequence_number(), SequenceNumber::new(1)); + assert_eq!(w.partition_key(), &*ARBITRARY_PARTITION_KEY); + assert_eq!( + w.tables().next().unwrap().1.partitioned_data().sequence_number(), + SequenceNumber::new(1) + ); } ); @@ -318,8 +327,8 @@ mod tests { no_tables, request = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![], }), }, @@ -332,10 +341,10 @@ mod tests { batch_error, request = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![TableBatch { - table_id: 42, + table_id: ARBITRARY_TABLE_ID.get(), columns: vec![Column { column_name: "time".to_string(), semantic_type: SemanticType::Time.into(), @@ -373,10 +382,10 @@ mod tests { let req = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![TableBatch { - table_id: 42, + table_id: ARBITRARY_TABLE_ID.get(), columns: vec![Column { column_name: "time".to_string(), semantic_type: SemanticType::Time.into(), @@ -430,10 +439,10 @@ mod tests { let req = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![TableBatch { - table_id: 42, + table_id: ARBITRARY_TABLE_ID.get(), columns: vec![Column { column_name: "time".to_string(), semantic_type: SemanticType::Time.into(), @@ -486,10 +495,10 @@ mod tests { let req = proto::WriteRequest { payload: Some(DatabaseBatch { - database_id: NAMESPACE_ID.get(), - partition_key: PARTITION_KEY.to_string(), + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), table_batches: vec![TableBatch { - table_id: 42, + table_id: ARBITRARY_TABLE_ID.get(), columns: vec![Column { column_name: "time".to_string(), semantic_type: SemanticType::Time.into(), diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index a27bc811df..2fe3b5b1c4 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -229,7 +229,10 @@ macro_rules! make_partition_stream { $( let (batch, this_schema) = $batch; batches.push(batch); - schema = Schema::try_merge([schema, (*this_schema).clone()]).expect("incompatible batch schemas"); + schema = Schema::try_merge([ + schema, + (*this_schema).clone() + ]).expect("incompatible batch schemas"); )+ drop(schema); @@ -241,11 +244,11 @@ macro_rules! make_partition_stream { // batches are in a different partition, not what the actual identifier // values are. This will go away when the ingester no longer sends // PartitionIds. - PartitionId::new($id), + data_types::PartitionId::new($id), Some( PartitionHashId::new( TableId::new($id), - &PartitionKey::from("arbitrary") + &*ARBITRARY_PARTITION_KEY ) ), 42, diff --git a/ingester/src/wal/reference_tracker/handle.rs b/ingester/src/wal/reference_tracker/handle.rs index 2128cb2b36..c6c08f9863 100644 --- a/ingester/src/wal/reference_tracker/handle.rs +++ b/ingester/src/wal/reference_tracker/handle.rs @@ -204,22 +204,18 @@ impl WalReferenceHandle { #[cfg(test)] mod tests { - - use std::{pin::Pin, task::Poll, time::Duration}; - + use super::*; + use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}; use assert_matches::assert_matches; use async_trait::async_trait; - use data_types::{ - ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp, - }; + use data_types::{ColumnId, ColumnSet, ParquetFileParams, Timestamp}; use futures::{task::Context, Future, FutureExt}; use metric::{assert_counter, U64Gauge}; use parking_lot::Mutex; + use std::{pin::Pin, task::Poll, time::Duration}; use test_helpers::timeout::FutureTimeout; use tokio::sync::Notify; - use super::*; - /// A mock file deleter that records the IDs it was asked to delete. #[derive(Debug, Default)] struct MockWalDeleter { @@ -265,9 +261,9 @@ mod tests { { Arc::new(CompletedPersist::new( ParquetFileParams { - namespace_id: NamespaceId::new(1), - table_id: TableId::new(2), - partition_id: PartitionId::new(3), + namespace_id: ARBITRARY_NAMESPACE_ID, + table_id: ARBITRARY_TABLE_ID, + partition_id: ARBITRARY_PARTITION_ID, partition_hash_id: None, object_store_id: Default::default(), min_time: Timestamp::new(42), diff --git a/ingester/src/wal/wal_sink.rs b/ingester/src/wal/wal_sink.rs index f9d9154898..f3c22dcad0 100644 --- a/ingester/src/wal/wal_sink.rs +++ b/ingester/src/wal/wal_sink.rs @@ -131,26 +131,21 @@ impl WalAppender for Arc { #[cfg(test)] mod tests { - use core::{future::Future, marker::Send, pin::Pin}; - use std::{future, sync::Arc}; - - use assert_matches::assert_matches; - use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; - use mutable_batch_lp::lines_to_batches; - use wal::Wal; - + use super::*; use crate::{ dml_payload::write::{PartitionedData, TableData, WriteOperation}, dml_sink::mock_sink::MockDmlSink, - test_util::make_write_op, + test_util::{ + make_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, + ARBITRARY_TABLE_NAME, + }, }; - - use super::*; - - const TABLE_ID: TableId = TableId::new(44); - const TABLE_NAME: &str = "bananas"; - const NAMESPACE_NAME: &str = "platanos"; - const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + use assert_matches::assert_matches; + use core::{future::Future, marker::Send, pin::Pin}; + use data_types::{SequenceNumber, TableId}; + use mutable_batch_lp::lines_to_batches; + use std::{future, sync::Arc}; + use wal::Wal; #[tokio::test] async fn test_append() { @@ -161,22 +156,25 @@ mod tests { // Generate a test op containing writes for multiple tables that will // be appended and read back let mut tables_by_name = lines_to_batches( - "bananas,region=Madrid temp=35 4242424242\n\ + &format!( + "{},region=Madrid temp=35 4242424242\n\ banani,region=Iceland temp=25 7676767676", + &*ARBITRARY_TABLE_NAME + ), 0, ) .expect("invalid line proto"); let op = WriteOperation::new( - NAMESPACE_ID, + ARBITRARY_NAMESPACE_ID, [ ( - TABLE_ID, + ARBITRARY_TABLE_ID, TableData::new( - TABLE_ID, + ARBITRARY_TABLE_ID, PartitionedData::new( SequenceNumber::new(42), tables_by_name - .remove(TABLE_NAME) + .remove(ARBITRARY_TABLE_NAME.as_ref()) .expect("table does not exist in LP"), ), ), @@ -196,7 +194,7 @@ mod tests { ] .into_iter() .collect(), - PartitionKey::from("p1"), + ARBITRARY_PARTITION_KEY.clone(), None, ); @@ -240,7 +238,7 @@ mod tests { assert_eq!(read_op.sequence_number, 42); assert_eq!( read_op.table_write_sequence_numbers, - [(TABLE_ID, 42), (SECOND_TABLE_ID, 42)] + [(ARBITRARY_TABLE_ID, 42), (SECOND_TABLE_ID, 42)] .into_iter() .collect::>() ); @@ -249,7 +247,7 @@ mod tests { // The payload should match the serialised form of the "op" originally // wrote above. - let want = encode_write_op(NAMESPACE_ID, &op); + let want = encode_write_op(ARBITRARY_NAMESPACE_ID, &op); assert_eq!(want, *payload); } @@ -279,12 +277,15 @@ mod tests { // Generate the test op let op = make_write_op( - &PartitionKey::from("p1"), - NAMESPACE_ID, - TABLE_NAME, - TABLE_ID, + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, 42, - r#"bananas,region=Madrid temp=35 4242424242"#, + &format!( + r#"{},region=Madrid temp=35 4242424242"#, + &*ARBITRARY_TABLE_NAME + ), None, );