diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index bfd8065cdb..817d639059 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -449,7 +449,7 @@ mod tests { use super::*; use crate::{ buffer_tree::partition::resolver::SortKeyResolver, - test_util::{populate_catalog, PartitionDataBuilder, ARBITRARY_CATALOG_PARTITION_ID}, + test_util::{populate_catalog, PartitionDataBuilder, ARBITRARY_TRANSITION_PARTITION_ID}, }; // Write some data and read it back from the buffer. @@ -473,7 +473,7 @@ mod tests { let data = p .get_query_data(&OwnedProjection::default()) .expect("should return data"); - assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); let expected = [ "+--------+--------+----------+--------------------------------+", @@ -496,7 +496,7 @@ mod tests { let data = p .get_query_data(&OwnedProjection::default()) .expect("should contain data"); - assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); let expected = [ "+--------+--------+----------+--------------------------------+", @@ -532,7 +532,7 @@ mod tests { // And validate the data being persisted. assert_eq!( persisting_data.partition_id(), - ARBITRARY_CATALOG_PARTITION_ID + &*ARBITRARY_TRANSITION_PARTITION_ID ); assert_eq!(persisting_data.record_batches().len(), 1); let expected = [ @@ -561,7 +561,7 @@ mod tests { let data = p .get_query_data(&OwnedProjection::default()) .expect("must have data"); - assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); assert_eq!(data.record_batches().len(), 2); let expected = [ "+--------+--------+----------+--------------------------------+", @@ -589,7 +589,7 @@ mod tests { let data = p .get_query_data(&OwnedProjection::default()) .expect("must have data"); - assert_eq!(data.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(data.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); assert_eq!(data.record_batches().len(), 1); let expected = [ "+--------+--------+---------+--------------------------------+", diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 7b461e04b9..96a2782325 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -238,7 +238,7 @@ mod tests { use assert_matches::assert_matches; use data_types::{ partition_template::{test_table_partition_override, TemplatePart}, - PartitionHashId, PartitionId, PartitionKey, + PartitionId, PartitionKey, TransitionPartitionId, }; use datafusion::{ assert_batches_eq, assert_batches_sorted_eq, @@ -265,7 +265,7 @@ mod tests { defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_CATALOG_PARTITION_ID, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, - ARBITRARY_TABLE_PROVIDER, + ARBITRARY_TABLE_PROVIDER, ARBITRARY_TRANSITION_PARTITION_ID, }, }; @@ -1307,10 +1307,7 @@ mod tests { let partition = partitions.pop().unwrap(); // Ensure the partition hash ID is sent. - assert_eq!( - partition.partition_hash_id().unwrap(), - &PartitionHashId::new(ARBITRARY_TABLE_ID, &ARBITRARY_PARTITION_KEY) - ); + assert_eq!(partition.id(), &*ARBITRARY_TRANSITION_PARTITION_ID); // Perform the partition read let batches = partition.into_record_batches(); @@ -1383,6 +1380,9 @@ mod tests { let partition = partitions.pop().unwrap(); // Ensure the partition hash ID is NOT sent. - assert!(partition.partition_hash_id().is_none()); + assert_eq!( + partition.id(), + &TransitionPartitionId::Deprecated(ARBITRARY_CATALOG_PARTITION_ID), + ); } } diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 2307b1a423..f20403fdcf 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -273,11 +273,10 @@ where let partitions = self.partitions().into_iter().filter_map(move |p| { let mut span = span.child("partition read"); - let (id, hash_id, completed_persistence_count, data, partition_key) = { + let (id, completed_persistence_count, data, partition_key) = { let mut p = p.lock(); ( - p.partition_id(), - p.partition_hash_id().cloned(), + p.transition_partition_id(), p.completed_persistence_count(), p.get_query_data(&projection), p.partition_key().clone(), @@ -286,7 +285,7 @@ where let ret = match data { Some(data) => { - assert_eq!(id, data.partition_id()); + assert_eq!(&id, data.partition_id()); // Potentially prune out this partition if the partition // template & derived partition key can be used to match @@ -324,11 +323,10 @@ where PartitionResponse::new( data.into_record_batches(), id, - hash_id, completed_persistence_count, ) } - None => PartitionResponse::new(vec![], id, hash_id, completed_persistence_count), + None => PartitionResponse::new(vec![], id, completed_persistence_count), }; span.ok("read partition data"); diff --git a/ingester/src/persist/context.rs b/ingester/src/persist/context.rs index 21fa4d3f16..e9df2f89ae 100644 --- a/ingester/src/persist/context.rs +++ b/ingester/src/persist/context.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use data_types::{ - NamespaceId, ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, TableId, - TransitionPartitionId, -}; +use data_types::{NamespaceId, ParquetFileParams, PartitionKey, TableId, TransitionPartitionId}; use observability_deps::tracing::*; use parking_lot::Mutex; use schema::sort::SortKey; @@ -68,8 +65,8 @@ impl PersistRequest { ) } - /// Return the partition ID of the persisting data. - pub(super) fn partition_id(&self) -> PartitionId { + /// Return the partition identifier of the persisting data. + pub(super) fn partition_id(&self) -> &TransitionPartitionId { self.data.partition_id() } } @@ -88,8 +85,7 @@ pub(super) struct Context { /// IDs loaded from the partition at construction time. namespace_id: NamespaceId, table_id: TableId, - partition_id: PartitionId, - partition_hash_id: Option, + partition_id: TransitionPartitionId, // The partition key for this partition partition_key: PartitionKey, @@ -137,7 +133,7 @@ impl Context { /// Locks the [`PartitionData`] in `req` to read various properties which /// are then cached in the [`Context`]. pub(super) fn new(req: PersistRequest) -> Self { - let partition_id = req.data.partition_id(); + let partition_id = req.data.partition_id().clone(); // Obtain the partition lock and load the immutable values that will be // used during this persistence. @@ -153,7 +149,7 @@ impl Context { let p = Arc::clone(&partition); let guard = p.lock(); - assert_eq!(partition_id, guard.partition_id()); + assert_eq!(partition_id, guard.transition_partition_id()); Self { partition, @@ -161,7 +157,6 @@ impl Context { namespace_id: guard.namespace_id(), table_id: guard.table_id(), partition_id, - partition_hash_id: guard.partition_hash_id().cloned(), partition_key: guard.partition_key().clone(), namespace_name: Arc::clone(guard.namespace_name()), table: Arc::clone(guard.table()), @@ -292,16 +287,8 @@ impl Context { self.table_id } - pub(super) fn partition_id(&self) -> PartitionId { - self.partition_id - } - - pub(super) fn partition_hash_id(&self) -> Option { - self.partition_hash_id.clone() - } - - pub(super) fn transition_partition_id(&self) -> TransitionPartitionId { - TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) + pub(super) fn partition_id(&self) -> &TransitionPartitionId { + &self.partition_id } pub(super) fn partition_key(&self) -> &PartitionKey { diff --git a/ingester/src/persist/drain_buffer.rs b/ingester/src/persist/drain_buffer.rs index 5c1c1e0e66..1493d9fc91 100644 --- a/ingester/src/persist/drain_buffer.rs +++ b/ingester/src/persist/drain_buffer.rs @@ -32,7 +32,7 @@ where let data = p.lock().mark_persisting()?; debug!( - partition_id=data.partition_id().get(), + partition_id=%data.partition_id(), lock_wait=?Instant::now().duration_since(t), "read data for persistence" ); diff --git a/ingester/src/persist/handle.rs b/ingester/src/persist/handle.rs index ef32ee4c43..79719f2e3a 100644 --- a/ingester/src/persist/handle.rs +++ b/ingester/src/persist/handle.rs @@ -312,7 +312,7 @@ impl PersistHandle { fn assign_worker(&self, r: PersistRequest) { debug!( - partition_id = r.partition_id().get(), + partition_id = %r.partition_id(), "enqueue persist job to assigned worker" ); @@ -357,8 +357,8 @@ impl PersistQueue for PersistHandle { partition: Arc>, data: PersistingData, ) -> oneshot::Receiver<()> { - let partition_id = data.partition_id().get(); - debug!(partition_id, "enqueuing persistence task"); + let partition_id = data.partition_id().clone(); + debug!(%partition_id, "enqueuing persistence task"); // Record a starting timestamp, and increment the number of persist jobs // before waiting on the semaphore - this ensures the difference between @@ -435,7 +435,7 @@ impl PersistQueue for PersistHandle { if let Some(new_sort_key) = adjust_sort_key_columns(&v, &data_primary_key).1 { // This persist operation will require a sort key update. trace!( - partition_id, + %partition_id, old_sort_key = %v, %new_sort_key, "persist job will require sort key update" @@ -444,7 +444,7 @@ impl PersistQueue for PersistHandle { } else { // This persist operation will not require a sort key // update. - debug!(partition_id, "enqueue persist job to global work queue"); + debug!(%partition_id, "enqueue persist job to global work queue"); self.global_queue.send(r).await.expect("no persist workers"); } } @@ -452,7 +452,7 @@ impl PersistQueue for PersistHandle { // If no sort key is known (either because it was unresolved, or // not yet set), the task must be serialised w.r.t other persist // jobs for the same partition. - trace!(partition_id, "persist job has no known sort key"); + trace!(%partition_id, "persist job has no known sort key"); self.assign_worker(r); } } @@ -499,9 +499,9 @@ mod tests { tests::{assert_metric_counter, assert_metric_gauge}, }, test_util::{ - make_write_op, PartitionDataBuilder, ARBITRARY_CATALOG_PARTITION_ID, - ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, - ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER, + make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, + ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_PROVIDER, ARBITRARY_TRANSITION_PARTITION_ID, }, }; @@ -591,7 +591,7 @@ mod tests { .expect("message was not found in either worker") } }; - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); // Drop the message, and ensure the notification becomes inactive. drop(msg); @@ -611,7 +611,7 @@ mod tests { let msg = assigned_worker .try_recv() .expect("message was not found in either worker"); - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); } /// A test that ensures the correct destination of a partition that has no @@ -677,7 +677,7 @@ mod tests { .expect("message was not found in either worker") } }; - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); // Drop the message, and ensure the notification becomes inactive. drop(msg); @@ -698,7 +698,7 @@ mod tests { let msg = assigned_worker .try_recv() .expect("message was not found in either worker"); - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); } /// A test that ensures the correct destination of a partition that has an @@ -765,7 +765,7 @@ mod tests { .expect("message was not found in either worker") } }; - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); // Drop the message, and ensure the notification becomes inactive. drop(msg); @@ -786,7 +786,7 @@ mod tests { let msg = assigned_worker .try_recv() .expect("message was not found in either worker"); - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); } /// A test that a partition that does not require a sort key update is @@ -845,7 +845,7 @@ mod tests { let msg = global_rx .try_recv() .expect("task should be in global queue"); - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); // Drop the message, and ensure the notification becomes inactive. drop(msg); @@ -866,7 +866,7 @@ mod tests { let msg = global_rx .try_recv() .expect("task should be in global queue"); - assert_eq!(msg.partition_id(), ARBITRARY_CATALOG_PARTITION_ID); + assert_eq!(msg.partition_id(), &*ARBITRARY_TRANSITION_PARTITION_ID); } /// A test that a ensures tasks waiting to be enqueued (waiting on the diff --git a/ingester/src/persist/worker.rs b/ingester/src/persist/worker.rs index f640528a33..e5f61b4d95 100644 --- a/ingester/src/persist/worker.rs +++ b/ingester/src/persist/worker.rs @@ -278,12 +278,7 @@ where let pool = worker_state.exec.pool(); let (md, file_size) = worker_state .store - .upload( - record_stream, - &ctx.transition_partition_id(), - &iox_metadata, - pool, - ) + .upload(record_stream, &ctx.partition_id(), &iox_metadata, pool) .await .expect("unexpected fatal persist error"); @@ -376,11 +371,7 @@ where let mut repos = catalog.repositories().await; match repos .partitions() - .cas_sort_key( - &ctx.transition_partition_id(), - old_sort_key.clone(), - &new_sort_key_str, - ) + .cas_sort_key(&ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str) .await { Ok(_) => ControlFlow::Break(Ok(())), diff --git a/ingester/src/query/partition_response.rs b/ingester/src/query/partition_response.rs index 910d83c9c9..9b8d8eab07 100644 --- a/ingester/src/query/partition_response.rs +++ b/ingester/src/query/partition_response.rs @@ -3,7 +3,7 @@ //! [`QueryResponse`]: super::response::QueryResponse use arrow::record_batch::RecordBatch; -use data_types::{PartitionHashId, PartitionId}; +use data_types::TransitionPartitionId; /// Response data for a single partition. #[derive(Debug)] @@ -12,10 +12,7 @@ pub(crate) struct PartitionResponse { batches: Vec, /// Partition ID. - id: PartitionId, - - /// Partition hash ID, if stored in the database. - partition_hash_id: Option, + id: TransitionPartitionId, /// Count of persisted Parquet files for this partition by this ingester instance. completed_persistence_count: u64, @@ -24,24 +21,18 @@ pub(crate) struct PartitionResponse { impl PartitionResponse { pub(crate) fn new( data: Vec, - id: PartitionId, - partition_hash_id: Option, + id: TransitionPartitionId, completed_persistence_count: u64, ) -> Self { Self { batches: data, id, - partition_hash_id, completed_persistence_count, } } - pub(crate) fn id(&self) -> PartitionId { - self.id - } - - pub(crate) fn partition_hash_id(&self) -> Option<&PartitionHashId> { - self.partition_hash_id.as_ref() + pub(crate) fn id(&self) -> &TransitionPartitionId { + &self.id } pub(crate) fn completed_persistence_count(&self) -> u64 { diff --git a/ingester/src/query/result_instrumentation.rs b/ingester/src/query/result_instrumentation.rs index 50c0abf636..dafff05190 100644 --- a/ingester/src/query/result_instrumentation.rs +++ b/ingester/src/query/result_instrumentation.rs @@ -332,8 +332,7 @@ where *this.partition_count += 1; // Extract all the fields of the PartitionResponse - let id = p.id(); - let hash_id = p.partition_hash_id().cloned(); + let id = p.id().clone(); let persist_count = p.completed_persistence_count(); // And wrap the underlying stream of RecordBatch for this @@ -346,12 +345,7 @@ where this.record_batch_count .fetch_add(data.len(), Ordering::Relaxed); - Poll::Ready(Some(PartitionResponse::new( - data, - id, - hash_id, - persist_count, - ))) + Poll::Ready(Some(PartitionResponse::new(data, id, persist_count))) } Poll::Ready(None) => { // Record the wall clock timestamp of the stream end. @@ -435,12 +429,10 @@ mod tests { make_batch, make_partition_stream, query::mock_query_exec::MockQueryExec, test_util::{ - ARBITRARY_CATALOG_PARTITION_ID, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_HASH_ID, - ARBITRARY_TABLE_ID, + ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID, }, }; use arrow::array::{Float32Array, Int64Array}; - use data_types::PartitionHashId; use futures::{stream, StreamExt}; use iox_time::MockProvider; use metric::{assert_histogram, Attributes}; @@ -457,8 +449,7 @@ mod tests { // Construct a stream with no batches. let stream = PartitionStream::new(stream::iter([PartitionResponse::new( vec![], - ARBITRARY_CATALOG_PARTITION_ID, - Some(ARBITRARY_PARTITION_HASH_ID.clone()), + ARBITRARY_TRANSITION_PARTITION_ID.clone(), 42, )])); diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 34895513f6..70c30a5e94 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -88,10 +88,10 @@ impl QueryAdaptor { self.data } - /// Returns the partition ID from which the data this [`QueryAdaptor`] was + /// Returns the partition identifier from which the data this [`QueryAdaptor`] was /// sourced from. - pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id + pub(crate) fn partition_id(&self) -> &TransitionPartitionId { + &self.transition_partition_id } /// Number of rows, useful for building stats diff --git a/ingester/src/server/grpc/query.rs b/ingester/src/server/grpc/query.rs index 0b8fd037f6..f1a1c3fe5f 100644 --- a/ingester/src/server/grpc/query.rs +++ b/ingester/src/server/grpc/query.rs @@ -6,7 +6,7 @@ use arrow_flight::{ FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, }; -use data_types::{NamespaceId, PartitionHashId, PartitionId, TableId}; +use data_types::{NamespaceId, TableId, TransitionPartitionId}; use flatbuffers::FlatBufferBuilder; use futures::{Stream, StreamExt, TryStreamExt}; use ingester_query_grpc::influxdata::iox::ingester::v1 as proto; @@ -303,10 +303,8 @@ where /// Encode the partition information as a None flight data with meatadata fn encode_partition( - // Partition ID. - partition_id: PartitionId, - // Partition hash ID. - partition_hash_id: Option, + // Partition identifier. + partition_id: TransitionPartitionId, // Count of persisted Parquet files for the [`PartitionData`] instance this // [`PartitionResponse`] was generated from. // @@ -316,9 +314,17 @@ fn encode_partition( ingester_id: IngesterId, ) -> Result { let mut bytes = bytes::BytesMut::new(); + + let (partition_id, partition_hash_id) = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => { + (None, Some(hash_id.as_bytes().to_owned())) + } + TransitionPartitionId::Deprecated(partition_id) => (Some(partition_id.get()), None), + }; + let app_metadata = proto::IngesterQueryResponseMetadata { - partition_id: Some(partition_id.get()), - partition_hash_id: partition_hash_id.map(|hash_id| hash_id.as_bytes().to_owned()), + partition_id, + partition_hash_id, ingester_uuid: ingester_id.to_string(), completed_persistence_count, }; @@ -352,18 +358,12 @@ fn encode_response( frame_encoding_duration_metric: Arc, ) -> impl Stream> { response.into_partition_stream().flat_map(move |partition| { - let partition_id = partition.id(); - let partition_hash_id = partition.partition_hash_id().cloned(); + let partition_id = partition.id().clone(); let completed_persistence_count = partition.completed_persistence_count(); // prefix payload data w/ metadata for that particular partition let head = futures::stream::once(async move { - encode_partition( - partition_id, - partition_hash_id, - completed_persistence_count, - ingester_id, - ) + encode_partition(partition_id, completed_persistence_count, ingester_id) }); // An output vector of FlightDataEncoder streams, each entry stream with @@ -402,25 +402,24 @@ mod tests { mock_query_exec::MockQueryExec, partition_response::PartitionResponse, response::PartitionStream, }, - test_util::ARBITRARY_PARTITION_HASH_ID, + test_util::{ARBITRARY_PARTITION_HASH_ID, ARBITRARY_TRANSITION_PARTITION_ID}, }; use arrow::array::{Float64Array, Int32Array}; use arrow_flight::decode::{DecodedPayload, FlightRecordBatchStream}; use assert_matches::assert_matches; use bytes::Bytes; + use data_types::PartitionId; use tonic::Code; #[tokio::test] - async fn sends_partition_hash_id_if_present() { + async fn sends_only_partition_hash_id_if_present() { let ingester_id = IngesterId::new(); - // let partition_hash_id = PartitionHashId::new(TableId::new(3), &ARBITRARY_PARTITION_KEY); let flight = FlightService::new( MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new( futures::stream::iter([PartitionResponse::new( vec![], - PartitionId::new(2), - Some(ARBITRARY_PARTITION_HASH_ID.clone()), + ARBITRARY_TRANSITION_PARTITION_ID.clone(), 42, )]), )))), @@ -447,7 +446,7 @@ mod tests { let md_actual = proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap(); let md_expected = proto::IngesterQueryResponseMetadata { - partition_id: Some(2), + partition_id: None, partition_hash_id: Some(ARBITRARY_PARTITION_HASH_ID.as_bytes().to_vec()), ingester_uuid: ingester_id.to_string(), completed_persistence_count: 42, @@ -462,8 +461,7 @@ mod tests { MockQueryExec::default().with_result(Ok(QueryResponse::new(PartitionStream::new( futures::stream::iter([PartitionResponse::new( vec![], - PartitionId::new(2), - None, + TransitionPartitionId::Deprecated(PartitionId::new(2)), 42, )]), )))), @@ -562,8 +560,7 @@ mod tests { batch3.clone(), batch4.clone(), ], - PartitionId::new(2), - partition_hash_id.clone(), + ARBITRARY_TRANSITION_PARTITION_ID.clone(), 42, )]), )))), @@ -591,7 +588,7 @@ mod tests { let md_actual = proto::IngesterQueryResponseMetadata::decode(flight_data[0].app_metadata()).unwrap(); let md_expected = proto::IngesterQueryResponseMetadata { - partition_id: Some(2), + partition_id: None, partition_hash_id: partition_hash_id.map(|hash_id| hash_id.as_bytes().to_vec()), ingester_uuid: ingester_id.to_string(), completed_persistence_count: 42, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 7003c0749c..3d0e4b9523 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -256,6 +256,7 @@ macro_rules! make_partition_stream { query::{response::PartitionStream, partition_response::PartitionResponse}, test_util::ARBITRARY_PARTITION_KEY, }; + use data_types::{PartitionHashId, TableId, TransitionPartitionId}; use futures::stream; PartitionStream::new(stream::iter([ @@ -274,14 +275,7 @@ macro_rules! make_partition_stream { PartitionResponse::new( batches, - // Using the $id as both the PartitionId and the TableId in the - // PartitionHashId is a temporary way to reduce duplication in tests where - // the important part is which batches are in the same partition and which - // batches are in a different partition, not what the actual identifier - // values are. This will go away when the ingester no longer sends - // PartitionIds. - data_types::PartitionId::new($id), - Some( + TransitionPartitionId::Deterministic( PartitionHashId::new( TableId::new($id), &*ARBITRARY_PARTITION_KEY