diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 10b52c18f6..8e42bcad38 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -170,8 +170,10 @@ where Ok(()) } -/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for -/// each of the partitions. +/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers +/// for each of the partitions. +/// +/// Used in tests and when creating an in-memory catalog. pub async fn create_or_get_default_records( kafka_partition_count: i32, txn: &mut dyn Transaction, @@ -180,7 +182,8 @@ pub async fn create_or_get_default_records( let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?; let mut sequencers = BTreeMap::new(); - for partition in 1..=kafka_partition_count { + // Start at 0 to match the one write buffer partition ID used in all-in-one mode + for partition in 0..kafka_partition_count { let sequencer = txn .sequencers() .create_or_get(&kafka_topic, KafkaPartition::new(partition)) @@ -223,7 +226,10 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let repo = MemCatalog::new(metrics); let mut txn = repo.start_transaction().await.unwrap(); - let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap(); + let (kafka_topic, query_pool, _) = create_or_get_default_records( + 2, + txn.deref_mut() + ).await.unwrap(); let namespace = txn .namespaces()