From 9977f174b720c1e431695da7068c5e582eff4d67 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Wed, 19 Jan 2022 12:51:04 -0500 Subject: [PATCH] refactor: use wrapper ID --- ingester/src/data.rs | 17 +++++------------ ingester/src/server.rs | 26 ++++++++++++++++++-------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index d53a96b0a3..91454c8eba 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::server::IngesterServer; use iox_catalog::interface::{ - KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequencerId, TableId, + KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequenceNumber, SequencerId, TableId, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; @@ -46,17 +46,10 @@ impl Sequencers { pub async fn initialize( ingester: &IngesterServer<'_, T>, ) -> Result { - // Get kafka topic from the catalog - let topic_name = ingester.get_topic(); - let kafka_topic_repro = ingester.iox_catalog.kafka_topic(); - let topic = kafka_topic_repro - .create_or_get(topic_name.as_str()) //todo: use `get` instead - .await - .context(ReadTopicSnafu { name: topic_name })?; - // Get sequencer ids from the catalog let sequencer_repro = ingester.iox_catalog.sequencer(); let mut sequencers = BTreeMap::default(); + let topic = ingester.get_topic(); for shard in ingester.get_kafka_partitions() { let sequencer = sequencer_repro .create_or_get(&topic, shard) //todo: use `get` instead @@ -150,7 +143,7 @@ struct DataBuffer { /// helps the ingester keep the batches of data in thier ingesting order struct BufferBatch { /// Sequencer number of the ingesting data - pub sequencer_number: u64, + pub sequencer_number: SequenceNumber, /// Ingesting data pub data: MutableBatch, } @@ -158,9 +151,9 @@ struct BufferBatch { /// SnapshotBatch contains data of many contiguous BufferBatches struct SnapshotBatch { /// Min sequencer number of its comebined BufferBatches - pub min_sequencer_number: u64, + pub min_sequencer_number: SequenceNumber, /// Max sequencer number of its comebined BufferBatches - pub max_sequencer_number: u64, + pub max_sequencer_number: SequenceNumber, /// Data of its comebined BufferBatches kept in one RecordBatch pub data: RecordBatch, } diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 6019b77262..11ce6dc553 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use iox_catalog::interface::{KafkaPartition, RepoCollection}; +use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection}; /// The [`IngesterServer`] manages the lifecycle and contains all state for /// an `ingester` server instance. @@ -11,9 +11,9 @@ pub struct IngesterServer<'a, T> where T: RepoCollection + Send + Sync, { - // Kafka Topic assigned to this ingester - kafka_topic_name: String, - // Kafka Partitions (Shards) assigned to this INgester + /// Kafka Topic assigned to this ingester + kafka_topic: KafkaTopic, + /// Kafka Partitions (Shards) assigned to this INgester kafka_partitions: Vec, /// Catalog of this ingester pub iox_catalog: &'a Arc, @@ -24,17 +24,27 @@ where T: RepoCollection + Send + Sync, { /// Initialize the Ingester - pub fn new(topic_name: String, shard_ids: Vec, catalog: &'a Arc) -> Self { + pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { Self { - kafka_topic_name: topic_name, + kafka_topic: topic, kafka_partitions: shard_ids, iox_catalog: catalog, } } + /// Return a kafka topic + pub fn get_topic(&self) -> KafkaTopic { + self.kafka_topic.clone() + } + + /// Return a kafka topic id + pub fn get_topic_id(&self) -> KafkaTopicId { + self.kafka_topic.id + } + /// Return a kafka topic name - pub fn get_topic(&self) -> String { - self.kafka_topic_name.clone() + pub fn get_topic_name(&self) -> String { + self.kafka_topic.name.clone() } /// Return Kafka Partitions