diff --git a/ingester/src/data.rs b/ingester/src/data.rs index f21bf15364..4b1bca735d 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -7,11 +7,12 @@ use uuid::Uuid; use crate::server::IngesterServer; use iox_catalog::interface::{ - KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequenceNumber, SequencerId, TableId, + KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber, + SequencerId, TableId, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] @@ -27,6 +28,16 @@ pub enum Error { source: iox_catalog::interface::Error, id: KafkaPartition, }, + + #[snafu(display( + "Sequencer record not found for kafka_topic_id {} and kafka_partition {}", + kafka_topic_id, + kafka_partition + ))] + SequencerNotFound { + kafka_topic_id: KafkaTopicId, + kafka_partition: KafkaPartition, + }, } /// A specialized `Error` for Ingester Data errors @@ -52,9 +63,13 @@ impl Sequencers { let topic = ingester.get_topic(); for shard in ingester.get_kafka_partitions() { let sequencer = sequencer_repro - .create_or_get(&topic, shard) //todo: use `get` instead + .get_by_topic_id_and_partition(topic.id, shard) .await - .context(ReadSequencerSnafu { id: shard })?; + .context(ReadSequencerSnafu { id: shard })? + .context(SequencerNotFoundSnafu { + kafka_topic_id: topic.id, + kafka_partition: shard, + })?; // Create empty buffer for each sequencer sequencers.insert(sequencer.id, Arc::new(SequencerData::default())); } @@ -121,7 +136,6 @@ struct DataBuffer { /// . A read request from Querier /// The `buffer` will be empty when this happens. snapshots: Vec>, - /// When a persist is called, data in `buffer` will be moved to a `snapshot` /// and then all `snapshots` will be moved to a `persisting`. /// Both `buffer` and 'snaphots` will be empty when this happens. diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 626c032836..ea0714d3f7 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -78,6 +78,12 @@ impl KafkaTopicId { } } +impl std::fmt::Display for KafkaTopicId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// Unique ID for a `QueryPool` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -153,6 +159,12 @@ impl KafkaPartition { } } +impl std::fmt::Display for KafkaPartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// Unique ID for a `Partition` #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)]