diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index e69be745a6..8beb2d3418 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -8,14 +8,14 @@ use crate::{ Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME, }; use async_trait::async_trait; use data_types::{ Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp, - TopicId, TopicMetadata, + TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::warn; @@ -123,7 +123,35 @@ impl Display for MemCatalog { #[async_trait] impl Catalog for MemCatalog { async fn setup(&self) -> Result<(), Error> { - // nothing to do + let guard = Arc::clone(&self.collections).lock_owned().await; + let stage = guard.clone(); + let mut transaction = MemTxn { + inner: MemTxnInner::Txn { + guard, + stage, + finalized: false, + }, + time_provider: self.time_provider(), + }; + let stage = transaction.stage(); + + // We need to manually insert the topic here so that we can create the transition shard + // below. + let topic = TopicMetadata { + id: SHARED_TOPIC_ID, + name: SHARED_TOPIC_NAME.to_string(), + }; + stage.topics.push(topic.clone()); + + // The transition shard must exist and must have magic ID and INDEX. + let shard = Shard { + id: TRANSITION_SHARD_ID, + topic_id: topic.id, + shard_index: TRANSITION_SHARD_INDEX, + min_unpersisted_sequence_number: SequenceNumber::new(0), + }; + stage.shards.push(shard); + transaction.commit_inplace().await?; Ok(()) }