fix: Start Kafka Partition IDs for default records at 0, not 1
In the all-in-one command, only one write buffer partition is supported, and it's specified using Kafka Partition ID 0: ``` // All-in-one mode only supports one write buffer partition. let write_buffer_partition_range_start = 0; let write_buffer_partition_range_end = 0; ``` When using all-in-one mode with an ephemeral, in-memory catalog, `create_or_get_default_records` is what puts records into the catalog that need to match the write buffer configuration.pull/24376/head
parent
311d4c1f9a
commit
a96976db46
|
@ -170,8 +170,10 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for
|
||||
/// each of the partitions.
|
||||
/// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers
|
||||
/// for each of the partitions.
|
||||
///
|
||||
/// Used in tests and when creating an in-memory catalog.
|
||||
pub async fn create_or_get_default_records(
|
||||
kafka_partition_count: i32,
|
||||
txn: &mut dyn Transaction,
|
||||
|
@ -180,7 +182,8 @@ pub async fn create_or_get_default_records(
|
|||
let query_pool = txn.query_pools().create_or_get(SHARED_QUERY_POOL).await?;
|
||||
|
||||
let mut sequencers = BTreeMap::new();
|
||||
for partition in 1..=kafka_partition_count {
|
||||
// Start at 0 to match the one write buffer partition ID used in all-in-one mode
|
||||
for partition in 0..kafka_partition_count {
|
||||
let sequencer = txn
|
||||
.sequencers()
|
||||
.create_or_get(&kafka_topic, KafkaPartition::new(partition))
|
||||
|
@ -223,7 +226,10 @@ mod tests {
|
|||
let metrics = Arc::new(metric::Registry::default());
|
||||
let repo = MemCatalog::new(metrics);
|
||||
let mut txn = repo.start_transaction().await.unwrap();
|
||||
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, txn.deref_mut()).await.unwrap();
|
||||
let (kafka_topic, query_pool, _) = create_or_get_default_records(
|
||||
2,
|
||||
txn.deref_mut()
|
||||
).await.unwrap();
|
||||
|
||||
let namespace = txn
|
||||
.namespaces()
|
||||
|
|
Loading…
Reference in New Issue