fix: Set up the transition shard in the in-mem catalog
parent
5e01261a87
commit
584d3ab0e7
|
@ -8,14 +8,14 @@ use crate::{
|
|||
Result, ShardRepo, SoftDeletedRows, TableRepo, TopicMetadataRepo, Transaction,
|
||||
},
|
||||
metrics::MetricDecorator,
|
||||
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
|
||||
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
|
||||
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp,
|
||||
TopicId, TopicMetadata,
|
||||
TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::warn;
|
||||
|
@ -123,7 +123,35 @@ impl Display for MemCatalog {
|
|||
#[async_trait]
|
||||
impl Catalog for MemCatalog {
|
||||
async fn setup(&self) -> Result<(), Error> {
|
||||
// nothing to do
|
||||
let guard = Arc::clone(&self.collections).lock_owned().await;
|
||||
let stage = guard.clone();
|
||||
let mut transaction = MemTxn {
|
||||
inner: MemTxnInner::Txn {
|
||||
guard,
|
||||
stage,
|
||||
finalized: false,
|
||||
},
|
||||
time_provider: self.time_provider(),
|
||||
};
|
||||
let stage = transaction.stage();
|
||||
|
||||
// We need to manually insert the topic here so that we can create the transition shard
|
||||
// below.
|
||||
let topic = TopicMetadata {
|
||||
id: SHARED_TOPIC_ID,
|
||||
name: SHARED_TOPIC_NAME.to_string(),
|
||||
};
|
||||
stage.topics.push(topic.clone());
|
||||
|
||||
// The transition shard must exist and must have magic ID and INDEX.
|
||||
let shard = Shard {
|
||||
id: TRANSITION_SHARD_ID,
|
||||
topic_id: topic.id,
|
||||
shard_index: TRANSITION_SHARD_INDEX,
|
||||
min_unpersisted_sequence_number: SequenceNumber::new(0),
|
||||
};
|
||||
stage.shards.push(shard);
|
||||
transaction.commit_inplace().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue