feat: add sequencer get_by_kafka_topic_id_and_partition to catalog

pull/24376/head
Paul Dix 2022-01-19 16:45:06 -05:00
parent ad0fc25e55
commit 172d75c6d7
3 changed files with 61 additions and 0 deletions

View File

@ -326,6 +326,13 @@ pub trait SequencerRepo {
partition: KafkaPartition,
) -> Result<Sequencer>;
/// get the sequencer record by `KafkaTopicId` and `KafkaPartition`
async fn get_by_topic_id_and_partition(
&self,
topic_id: KafkaTopicId,
partition: KafkaPartition,
) -> Result<Option<Sequencer>>;
/// list all sequencers
async fn list(&self) -> Result<Vec<Sequencer>>;
@ -970,6 +977,22 @@ pub(crate) mod test_helpers {
.collect::<BTreeMap<_, _>>();
assert_eq!(created, listed);
// get by the sequencer id and partition
let kafka_partition = KafkaPartition::new(1);
let sequencer = sequencer_repo
.get_by_topic_id_and_partition(kafka.id, kafka_partition)
.await
.unwrap()
.unwrap();
assert_eq!(kafka.id, sequencer.kafka_topic_id);
assert_eq!(kafka_partition, sequencer.kafka_partition);
let sequencer = sequencer_repo
.get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523))
.await
.unwrap();
assert!(sequencer.is_none());
}
async fn test_partition<T: RepoCollection + Send + Sync>(repo: &T) {

View File

@ -287,6 +287,20 @@ impl SequencerRepo for MemCatalog {
Ok(*sequencer)
}
async fn get_by_topic_id_and_partition(
&self,
topic_id: KafkaTopicId,
partition: KafkaPartition,
) -> Result<Option<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
let sequencer = collections
.sequencers
.iter()
.find(|s| s.kafka_topic_id == topic_id && s.kafka_partition == partition)
.cloned();
Ok(sequencer)
}
async fn list(&self) -> Result<Vec<Sequencer>> {
let collections = self.collections.lock().expect("mutex poisoned");
Ok(collections.sequencers.clone())

View File

@ -329,6 +329,30 @@ impl SequencerRepo for PostgresCatalog {
})
}
async fn get_by_topic_id_and_partition(
&self,
topic_id: KafkaTopicId,
partition: KafkaPartition,
) -> Result<Option<Sequencer>> {
let rec = sqlx::query_as::<_, Sequencer>(
r#"
SELECT * FROM sequencer WHERE kafka_topic_id = $1 AND kafka_partition = $2;
"#,
)
.bind(topic_id) // $1
.bind(partition) // $2
.fetch_one(&self.pool)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let sequencer = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(sequencer))
}
async fn list(&self) -> Result<Vec<Sequencer>> {
sqlx::query_as::<_, Sequencer>(r#"SELECT * FROM sequencer;"#)
.fetch_all(&self.pool)