refactor: update ingester to get sequencer record and not attempt to create

pull/24376/head
Paul Dix 2022-01-19 17:15:10 -05:00
parent 172d75c6d7
commit 860e5a30ca
2 changed files with 31 additions and 5 deletions

View File

@ -7,11 +7,12 @@ use uuid::Uuid;
use crate::server::IngesterServer;
use iox_catalog::interface::{
KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequenceNumber, SequencerId, TableId,
KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber,
SequencerId, TableId,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
@ -27,6 +28,16 @@ pub enum Error {
source: iox_catalog::interface::Error,
id: KafkaPartition,
},
#[snafu(display(
"Sequencer record not found for kafka_topic_id {} and kafka_partition {}",
kafka_topic_id,
kafka_partition
))]
SequencerNotFound {
kafka_topic_id: KafkaTopicId,
kafka_partition: KafkaPartition,
},
}
/// A specialized `Error` for Ingester Data errors
@ -52,9 +63,13 @@ impl Sequencers {
let topic = ingester.get_topic();
for shard in ingester.get_kafka_partitions() {
let sequencer = sequencer_repro
.create_or_get(&topic, shard) //todo: use `get` instead
.get_by_topic_id_and_partition(topic.id, shard)
.await
.context(ReadSequencerSnafu { id: shard })?;
.context(ReadSequencerSnafu { id: shard })?
.context(SequencerNotFoundSnafu {
kafka_topic_id: topic.id,
kafka_partition: shard,
})?;
// Create empty buffer for each sequencer
sequencers.insert(sequencer.id, Arc::new(SequencerData::default()));
}
@ -121,7 +136,6 @@ struct DataBuffer {
/// . A read request from Querier
/// The `buffer` will be empty when this happens.
snapshots: Vec<Arc<SnapshotBatch>>,
/// When a persist is called, data in `buffer` will be moved to a `snapshot`
/// and then all `snapshots` will be moved to a `persisting`.
/// Both `buffer` and 'snaphots` will be empty when this happens.

View File

@ -78,6 +78,12 @@ impl KafkaTopicId {
}
}
impl std::fmt::Display for KafkaTopicId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Unique ID for a `QueryPool`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -153,6 +159,12 @@ impl KafkaPartition {
}
}
impl std::fmt::Display for KafkaPartition {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Unique ID for a `Partition`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]