diff --git a/ingester2/src/buffer_tree/namespace.rs b/ingester2/src/buffer_tree/namespace.rs index 2ba74481fd..2fe285b1ae 100644 --- a/ingester2/src/buffer_tree/namespace.rs +++ b/ingester2/src/buffer_tree/namespace.rs @@ -5,7 +5,7 @@ pub(crate) mod name_resolver; use std::sync::Arc; use async_trait::async_trait; -use data_types::{NamespaceId, TableId}; +use data_types::{NamespaceId, ShardId, TableId}; use dml::DmlOperation; use metric::U64Counter; use observability_deps::tracing::warn; @@ -77,6 +77,8 @@ pub(crate) struct NamespaceData { partition_provider: Arc, post_write_observer: Arc, + + transition_shard_id: ShardId, } impl NamespaceData { @@ -88,6 +90,7 @@ impl NamespaceData { partition_provider: Arc, post_write_observer: Arc, metrics: &metric::Registry, + transition_shard_id: ShardId, ) -> Self { let table_count = metrics .register_metric::( @@ -104,6 +107,7 @@ impl NamespaceData { table_count, partition_provider, post_write_observer, + transition_shard_id, } } @@ -163,6 +167,7 @@ where Arc::clone(&self.namespace_name), Arc::clone(&self.partition_provider), Arc::clone(&self.post_write_observer), + self.transition_shard_id, )) }); @@ -226,7 +231,7 @@ where mod tests { use std::{sync::Arc, time::Duration}; - use data_types::{PartitionId, PartitionKey, ShardIndex}; + use data_types::{PartitionId, PartitionKey, ShardId, ShardIndex}; use metric::{Attributes, Metric}; use super::*; @@ -246,6 +251,7 @@ mod tests { const TABLE_ID: TableId = TableId::new(44); const NAMESPACE_NAME: &str = "platanos"; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); #[tokio::test] async fn test_namespace_init_table() { @@ -266,6 +272,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), )); @@ -276,6 +283,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, + TRANSITION_SHARD_ID, ); // Assert the namespace name was stored diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index cff687302f..c86d4f7d02 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -2,7 +2,7 @@ use std::{collections::VecDeque, sync::Arc}; -use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; +use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; use mutable_batch::MutableBatch; use observability_deps::tracing::*; use schema::sort::SortKey; @@ -89,6 +89,8 @@ pub(crate) struct PartitionData { /// The number of persist operations completed over the lifetime of this /// [`PartitionData`]. completed_persistence_count: u64, + + transition_shard_id: ShardId, } impl PartitionData { @@ -102,6 +104,7 @@ impl PartitionData { table_id: TableId, table_name: Arc>, sort_key: SortKeyState, + transition_shard_id: ShardId, ) -> Self { Self { partition_id: id, @@ -115,6 +118,7 @@ impl PartitionData { persisting: VecDeque::with_capacity(1), started_persistence_count: BatchIdent::default(), completed_persistence_count: 0, + transition_shard_id, } } @@ -293,6 +297,11 @@ impl PartitionData { &self.partition_key } + /// Return the transition_shard_id for this partition. + pub(crate) fn transition_shard_id(&self) -> ShardId { + self.transition_shard_id + } + /// Return the [`NamespaceId`] this partition is a part of. pub(crate) fn namespace_id(&self) -> NamespaceId { self.namespace_id @@ -344,6 +353,7 @@ mod tests { use crate::{buffer_tree::partition::resolver::SortKeyResolver, test_util::populate_catalog}; const PARTITION_ID: PartitionId = PartitionId::new(1); + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); lazy_static! { static ref PARTITION_KEY: PartitionKey = PartitionKey::from("platanos"); @@ -369,6 +379,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); // And no data should be returned when queried. @@ -449,6 +460,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); assert!(p.get_query_data().is_none()); @@ -599,6 +611,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); // Perform the initial write. @@ -777,6 +790,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); // Perform the initial write. @@ -957,6 +971,7 @@ mod tests { TableName::from("platanos") })), starting_state, + TRANSITION_SHARD_ID, ); let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); @@ -1016,6 +1031,7 @@ mod tests { TableName::from("platanos") })), starting_state, + shard_id, ); let want = Some(SortKey::from_columns(["banana", "platanos", "time"])); @@ -1040,6 +1056,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); // Perform out of order writes. @@ -1087,6 +1104,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); assert!(p.mark_persisting().is_none()); @@ -1106,6 +1124,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1; @@ -1132,6 +1151,7 @@ mod tests { TABLE_NAME.clone() })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); assert!(p.get_query_data().is_none()); diff --git a/ingester2/src/buffer_tree/partition/resolver/cache.rs b/ingester2/src/buffer_tree/partition/resolver/cache.rs index adf00420ff..0323ef3f40 100644 --- a/ingester2/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester2/src/buffer_tree/partition/resolver/cache.rs @@ -2,7 +2,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId}; +use data_types::{ + NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, +}; use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -164,6 +166,7 @@ where namespace_name: Arc>, table_id: TableId, table_name: Arc>, + transition_shard_id: ShardId, ) -> PartitionData { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. @@ -193,6 +196,7 @@ where table_id, table_name, SortKeyState::Deferred(Arc::new(sort_key_resolver)), + transition_shard_id, ); } @@ -206,6 +210,7 @@ where namespace_name, table_id, table_name, + transition_shard_id, ) .await } @@ -213,12 +218,11 @@ where #[cfg(test)] mod tests { + use data_types::ShardId; use iox_catalog::mem::MemCatalog; use super::*; - use crate::{ - buffer_tree::partition::resolver::mock::MockPartitionProvider, TRANSITION_SHARD_ID, - }; + use crate::buffer_tree::partition::resolver::mock::MockPartitionProvider; const PARTITION_KEY: &str = "bananas"; const PARTITION_ID: PartitionId = PartitionId::new(42); @@ -226,6 +230,7 @@ mod tests { const NAMESPACE_NAME: &str = "ns-bananas"; const TABLE_ID: TableId = TableId::new(3); const TABLE_NAME: &str = "platanos"; + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); fn new_cache

( inner: MockPartitionProvider, @@ -257,6 +262,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); let inner = MockPartitionProvider::default().with_partition(data); @@ -272,6 +278,7 @@ mod tests { Arc::new(DeferredLoad::new(Duration::from_secs(1), async { TableName::from(TABLE_NAME) })), + TRANSITION_SHARD_ID, ) .await; @@ -310,6 +317,7 @@ mod tests { Arc::new(DeferredLoad::new(Duration::from_secs(1), async { TableName::from(TABLE_NAME) })), + TRANSITION_SHARD_ID, ) .await; @@ -347,6 +355,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )); let partition = Partition { @@ -370,6 +379,7 @@ mod tests { Arc::new(DeferredLoad::new(Duration::from_secs(1), async { TableName::from(TABLE_NAME) })), + TRANSITION_SHARD_ID, ) .await; @@ -393,6 +403,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )); let partition = Partition { @@ -416,6 +427,7 @@ mod tests { Arc::new(DeferredLoad::new(Duration::from_secs(1), async { TableName::from(TABLE_NAME) })), + TRANSITION_SHARD_ID, ) .await; diff --git a/ingester2/src/buffer_tree/partition/resolver/catalog.rs b/ingester2/src/buffer_tree/partition/resolver/catalog.rs index 4710a3d3d2..c105fda28a 100644 --- a/ingester2/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester2/src/buffer_tree/partition/resolver/catalog.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{NamespaceId, Partition, PartitionKey, TableId}; +use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId}; use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; @@ -17,7 +17,6 @@ use crate::{ table::TableName, }, deferred_load::DeferredLoad, - TRANSITION_SHARD_ID, }; /// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve @@ -43,12 +42,13 @@ impl CatalogPartitionResolver { &self, partition_key: PartitionKey, table_id: TableId, + transition_shard_id: ShardId, ) -> Result { self.catalog .repositories() .await .partitions() - .create_or_get(partition_key, TRANSITION_SHARD_ID, table_id) + .create_or_get(partition_key, transition_shard_id, table_id) .await } } @@ -62,16 +62,18 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_name: Arc>, table_id: TableId, table_name: Arc>, + transition_shard_id: ShardId, ) -> PartitionData { debug!( %partition_key, %table_id, %table_name, + %transition_shard_id, "upserting partition in catalog" ); let p = Backoff::new(&self.backoff_config) .retry_all_errors("resolve partition", || { - self.get(partition_key.clone(), table_id) + self.get(partition_key.clone(), table_id, transition_shard_id) }) .await .expect("retry forever"); @@ -87,6 +89,7 @@ impl PartitionProvider for CatalogPartitionResolver { table_id, table_name, SortKeyState::Provided(p.sort_key()), + transition_shard_id, ) } } @@ -99,7 +102,6 @@ mod tests { use data_types::ShardIndex; use super::*; - use crate::TRANSITION_SHARD_ID; const TABLE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "ns-bananas"; @@ -111,7 +113,7 @@ mod tests { let catalog: Arc = Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); - let (_shard_id, namespace_id, table_id) = { + let (shard_id, namespace_id, table_id) = { let mut repos = catalog.repositories().await; let t = repos.topics().create_or_get("platanos").await.unwrap(); let q = repos.query_pools().create_or_get("platanos").await.unwrap(); @@ -126,7 +128,6 @@ mod tests { .create_or_get(&t, ShardIndex::new(0)) .await .unwrap(); - assert_eq!(shard.id, TRANSITION_SHARD_ID); let table = repos .tables() @@ -151,6 +152,7 @@ mod tests { Arc::new(DeferredLoad::new(Duration::from_secs(1), async { TableName::from(TABLE_NAME) })), + shard_id, ) .await; diff --git a/ingester2/src/buffer_tree/partition/resolver/mock.rs b/ingester2/src/buffer_tree/partition/resolver/mock.rs index 65981c82d6..9b25d8554e 100644 --- a/ingester2/src/buffer_tree/partition/resolver/mock.rs +++ b/ingester2/src/buffer_tree/partition/resolver/mock.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, PartitionKey, TableId}; +use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; use parking_lot::Mutex; use super::r#trait::PartitionProvider; @@ -54,6 +54,7 @@ impl PartitionProvider for MockPartitionProvider { namespace_name: Arc>, table_id: TableId, table_name: Arc>, + _transition_shard_id: ShardId, ) -> PartitionData { let p = self .partitions diff --git a/ingester2/src/buffer_tree/partition/resolver/trait.rs b/ingester2/src/buffer_tree/partition/resolver/trait.rs index b92bfda153..80322380b9 100644 --- a/ingester2/src/buffer_tree/partition/resolver/trait.rs +++ b/ingester2/src/buffer_tree/partition/resolver/trait.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, PartitionKey, TableId}; +use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; use crate::{ buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName}, @@ -24,6 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug { namespace_name: Arc>, table_id: TableId, table_name: Arc>, + transition_shard_id: ShardId, ) -> PartitionData; } @@ -39,6 +40,7 @@ where namespace_name: Arc>, table_id: TableId, table_name: Arc>, + transition_shard_id: ShardId, ) -> PartitionData { (**self) .get_partition( @@ -47,6 +49,7 @@ where namespace_name, table_id, table_name, + transition_shard_id, ) .await } @@ -56,11 +59,13 @@ where mod tests { use std::{sync::Arc, time::Duration}; - use data_types::PartitionId; + use data_types::{PartitionId, ShardId}; use super::*; use crate::buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState}; + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); + #[tokio::test] async fn test_arc_impl() { let key = PartitionKey::from("bananas"); @@ -81,6 +86,7 @@ mod tests { table_id, Arc::clone(&table_name), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ); let mock = Arc::new(MockPartitionProvider::default().with_partition(data)); @@ -92,6 +98,7 @@ mod tests { Arc::clone(&namespace_name), table_id, Arc::clone(&table_name), + TRANSITION_SHARD_ID, ) .await; assert_eq!(got.partition_id(), partition); diff --git a/ingester2/src/buffer_tree/root.rs b/ingester2/src/buffer_tree/root.rs index e1e78c89f9..76c27e9f12 100644 --- a/ingester2/src/buffer_tree/root.rs +++ b/ingester2/src/buffer_tree/root.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, TableId}; +use data_types::{NamespaceId, ShardId, TableId}; use dml::DmlOperation; use metric::U64Counter; use parking_lot::Mutex; @@ -97,6 +97,7 @@ pub(crate) struct BufferTree { namespace_count: U64Counter, post_write_observer: Arc, + transition_shard_id: ShardId, } impl BufferTree @@ -110,6 +111,7 @@ where partition_provider: Arc, post_write_observer: Arc, metrics: Arc, + transition_shard_id: ShardId, ) -> Self { let namespace_count = metrics .register_metric::( @@ -126,6 +128,7 @@ where partition_provider, post_write_observer, namespace_count, + transition_shard_id, } } @@ -176,6 +179,7 @@ where Arc::clone(&self.partition_provider), Arc::clone(&self.post_write_observer), &self.metrics, + self.transition_shard_id, )) }); @@ -239,6 +243,7 @@ mod tests { const TABLE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "platanos"; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); #[tokio::test] async fn test_namespace_init_table() { @@ -259,6 +264,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), )); @@ -270,6 +276,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), &metrics, + TRANSITION_SHARD_ID, ); // Assert the namespace name was stored @@ -339,6 +346,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), + TRANSITION_SHARD_ID, ); // Write the provided DmlWrites @@ -383,6 +391,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )], writes = [make_write_op( &PartitionKey::from("p1"), @@ -418,6 +427,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), PartitionData::new( PartitionId::new(1), @@ -431,6 +441,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ) ], writes = [ @@ -478,6 +489,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), PartitionData::new( PartitionId::new(1), @@ -491,6 +503,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ) ], writes = [ @@ -537,6 +550,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), PartitionData::new( PartitionId::new(1), @@ -550,6 +564,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ) ], writes = [ @@ -597,6 +612,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )], writes = [ make_write_op( @@ -646,6 +662,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )) .with_partition(PartitionData::new( PartitionId::new(0), @@ -659,6 +676,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )), ); @@ -671,6 +689,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&metrics), + TRANSITION_SHARD_ID, ); // Write data to partition p1, in table "bananas". @@ -740,6 +759,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )) .with_partition(PartitionData::new( PartitionId::new(1), @@ -753,6 +773,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )) .with_partition(PartitionData::new( PartitionId::new(2), @@ -766,6 +787,7 @@ mod tests { TableName::from("another_table") })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )), ); @@ -776,6 +798,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::clone(&Arc::new(metric::Registry::default())), + TRANSITION_SHARD_ID, ); assert_eq!(buf.partitions().count(), 0); @@ -849,6 +872,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), )); @@ -859,6 +883,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), + TRANSITION_SHARD_ID, ); // Query the empty tree @@ -932,6 +957,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )) .with_partition(PartitionData::new( PartitionId::new(1), @@ -945,6 +971,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, )), ); @@ -955,6 +982,7 @@ mod tests { partition_provider, Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), + TRANSITION_SHARD_ID, ); // Write data to partition p1, in table "bananas". diff --git a/ingester2/src/buffer_tree/table.rs b/ingester2/src/buffer_tree/table.rs index 6d294ec42d..287bc9653f 100644 --- a/ingester2/src/buffer_tree/table.rs +++ b/ingester2/src/buffer_tree/table.rs @@ -5,7 +5,7 @@ pub(crate) mod name_resolver; use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId}; +use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; use datafusion_util::MemoryStream; use mutable_batch::MutableBatch; use parking_lot::{Mutex, RwLock}; @@ -116,6 +116,7 @@ pub(crate) struct TableData { partition_data: RwLock, post_write_observer: Arc, + transition_shard_id: ShardId, } impl TableData { @@ -136,6 +137,7 @@ impl TableData { namespace_name: Arc>, partition_provider: Arc, post_write_observer: Arc, + transition_shard_id: ShardId, ) -> Self { Self { table_id, @@ -145,6 +147,7 @@ impl TableData { partition_data: Default::default(), partition_provider, post_write_observer, + transition_shard_id, } } @@ -218,6 +221,7 @@ where Arc::clone(&self.namespace_name), self.table_id, Arc::clone(&self.table_name), + self.transition_shard_id, ) .await; // Add the double-referenced partition to the map. @@ -319,6 +323,7 @@ mod tests { const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const PARTITION_KEY: &str = "platanos"; const PARTITION_ID: PartitionId = PartitionId::new(0); + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); #[tokio::test] async fn test_partition_double_ref() { @@ -337,6 +342,7 @@ mod tests { TableName::from(TABLE_NAME) })), SortKeyState::Provided(None), + TRANSITION_SHARD_ID, ), )); @@ -351,6 +357,7 @@ mod tests { })), partition_provider, Arc::new(MockPostWriteObserver::default()), + TRANSITION_SHARD_ID, ); let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0) diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index f0666d56e3..8435b71fd9 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -27,7 +27,7 @@ use crate::{ server::grpc::GrpcDelegate, timestamp_oracle::TimestampOracle, wal::{rotate_task::periodic_rotation, wal_sink::WalSink}, - TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, + TRANSITION_SHARD_INDEX, }; /// Acquire opaque handles to the Ingester RPC service implementations. @@ -168,7 +168,8 @@ pub async fn new( .create_or_get("iox-shared") .await .expect("get topic"); - txn.shards() + let transition_shard = txn + .shards() .create_or_get(&topic, TRANSITION_SHARD_INDEX) .await .expect("create transition shard"); @@ -199,7 +200,7 @@ pub async fn new( .repositories() .await .partitions() - .most_recent_n(40_000, &[TRANSITION_SHARD_ID]) + .most_recent_n(40_000, &[transition_shard.id]) .await .map_err(InitError::PreWarmPartitions)?; @@ -242,6 +243,7 @@ pub async fn new( partition_provider, Arc::new(hot_partition_persister), Arc::clone(&metrics), + transition_shard.id, )); // Initialise the WAL diff --git a/ingester2/src/lib.rs b/ingester2/src/lib.rs index 66fab929e7..6dc1405389 100644 --- a/ingester2/src/lib.rs +++ b/ingester2/src/lib.rs @@ -44,7 +44,7 @@ missing_docs )] -use data_types::{ShardId, ShardIndex}; +use data_types::ShardIndex; /// A macro to conditionally prepend `pub` to the inner tokens for benchmarking /// purposes, should the `benches` feature be enabled. @@ -60,9 +60,9 @@ macro_rules! maybe_pub { }; } -/// During the testing of ingester2, the catalog will require a ShardId for -/// various operations. This is a const value for these occasions. -const TRANSITION_SHARD_ID: ShardId = ShardId::new(1); +/// During the testing of ingester2, the catalog will require a ShardIndex for +/// various operations. This is a const value for these occasions. Look up the ShardId for this +/// ShardIndex when needed. const TRANSITION_SHARD_INDEX: ShardIndex = ShardIndex::new(1); /// Ingester initialisation methods & types. diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs index 8dab09823f..5c85786b70 100644 --- a/ingester2/src/persist/context.rs +++ b/ingester2/src/persist/context.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use backoff::Backoff; use data_types::{ CompactionLevel, NamespaceId, ParquetFileParams, PartitionId, PartitionKey, SequenceNumber, - TableId, + ShardId, TableId, }; use iox_catalog::interface::get_table_schema_by_id; use iox_time::{SystemProvider, TimeProvider}; @@ -25,7 +25,6 @@ use crate::{ }, deferred_load::DeferredLoad, persist::compact::{compact_persisting_batch, CompactedStream}, - TRANSITION_SHARD_ID, }; use super::handle::Inner; @@ -79,6 +78,8 @@ pub(super) struct Context { table_id: TableId, partition_id: PartitionId, + transition_shard_id: ShardId, + // The partition key for this partition partition_key: PartitionKey, @@ -162,6 +163,7 @@ impl Context { enqueued_at, dequeued_at: Instant::now(), permit, + transition_shard_id: guard.transition_shard_id(), } }; @@ -233,7 +235,7 @@ impl Context { let iox_metadata = IoxMetadata { object_store_id, creation_timestamp: SystemProvider::new().now(), - shard_id: TRANSITION_SHARD_ID, + shard_id: self.transition_shard_id, namespace_id: self.namespace_id, namespace_name: Arc::clone(&*self.namespace_name.get().await), table_id: self.table_id, diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index 69d11bbdf4..61daf81b48 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -430,7 +430,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{NamespaceId, PartitionId, PartitionKey, TableId}; + use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId}; use dml::DmlOperation; use iox_catalog::mem::MemCatalog; use lazy_static::lazy_static; @@ -460,6 +460,7 @@ mod tests { const TABLE_ID: TableId = TableId::new(2442); const TABLE_NAME: &str = "banana-report"; const NAMESPACE_NAME: &str = "platanos"; + const TRANSITION_SHARD_ID: ShardId = ShardId::new(84); lazy_static! { static ref EXEC: Arc = Arc::new(Executor::new_testing()); @@ -492,10 +493,12 @@ mod tests { TABLE_ID, Arc::clone(&TABLE_NAME_LOADER), sort_key, + TRANSITION_SHARD_ID, )), ), Arc::new(MockPostWriteObserver::default()), Default::default(), + TRANSITION_SHARD_ID, ); buffer_tree diff --git a/ingester2/src/test_util.rs b/ingester2/src/test_util.rs index a659543589..b249c8e4ce 100644 --- a/ingester2/src/test_util.rs +++ b/ingester2/src/test_util.rs @@ -78,8 +78,6 @@ pub(crate) async fn populate_catalog( .unwrap() .id; - assert_eq!(shard_id, crate::TRANSITION_SHARD_ID); - (shard_id, ns_id, table_id) }