refactor: use wrapper ID

pull/24376/head
NGA-TRAN 2022-01-19 12:51:04 -05:00
parent 9d61580136
commit 9977f174b7
2 changed files with 23 additions and 20 deletions

View File

@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::server::IngesterServer;
use iox_catalog::interface::{
KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequencerId, TableId,
KafkaPartition, NamespaceId, PartitionId, RepoCollection, SequenceNumber, SequencerId, TableId,
};
use mutable_batch::MutableBatch;
use parking_lot::RwLock;
@ -46,17 +46,10 @@ impl Sequencers {
pub async fn initialize<T: RepoCollection + Send + Sync>(
ingester: &IngesterServer<'_, T>,
) -> Result<Self> {
// Get kafka topic from the catalog
let topic_name = ingester.get_topic();
let kafka_topic_repro = ingester.iox_catalog.kafka_topic();
let topic = kafka_topic_repro
.create_or_get(topic_name.as_str()) //todo: use `get` instead
.await
.context(ReadTopicSnafu { name: topic_name })?;
// Get sequencer ids from the catalog
let sequencer_repro = ingester.iox_catalog.sequencer();
let mut sequencers = BTreeMap::default();
let topic = ingester.get_topic();
for shard in ingester.get_kafka_partitions() {
let sequencer = sequencer_repro
.create_or_get(&topic, shard) //todo: use `get` instead
@ -150,7 +143,7 @@ struct DataBuffer {
/// helps the ingester keep the batches of data in thier ingesting order
struct BufferBatch {
/// Sequencer number of the ingesting data
pub sequencer_number: u64,
pub sequencer_number: SequenceNumber,
/// Ingesting data
pub data: MutableBatch,
}
@ -158,9 +151,9 @@ struct BufferBatch {
/// SnapshotBatch contains data of many contiguous BufferBatches
struct SnapshotBatch {
/// Min sequencer number of its comebined BufferBatches
pub min_sequencer_number: u64,
pub min_sequencer_number: SequenceNumber,
/// Max sequencer number of its comebined BufferBatches
pub max_sequencer_number: u64,
pub max_sequencer_number: SequenceNumber,
/// Data of its comebined BufferBatches kept in one RecordBatch
pub data: RecordBatch,
}

View File

@ -3,7 +3,7 @@
use std::sync::Arc;
use iox_catalog::interface::{KafkaPartition, RepoCollection};
use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection};
/// The [`IngesterServer`] manages the lifecycle and contains all state for
/// an `ingester` server instance.
@ -11,9 +11,9 @@ pub struct IngesterServer<'a, T>
where
T: RepoCollection + Send + Sync,
{
// Kafka Topic assigned to this ingester
kafka_topic_name: String,
// Kafka Partitions (Shards) assigned to this INgester
/// Kafka Topic assigned to this ingester
kafka_topic: KafkaTopic,
/// Kafka Partitions (Shards) assigned to this INgester
kafka_partitions: Vec<KafkaPartition>,
/// Catalog of this ingester
pub iox_catalog: &'a Arc<T>,
@ -24,17 +24,27 @@ where
T: RepoCollection + Send + Sync,
{
/// Initialize the Ingester
pub fn new(topic_name: String, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {
pub fn new(topic: KafkaTopic, shard_ids: Vec<KafkaPartition>, catalog: &'a Arc<T>) -> Self {
Self {
kafka_topic_name: topic_name,
kafka_topic: topic,
kafka_partitions: shard_ids,
iox_catalog: catalog,
}
}
/// Return a kafka topic
pub fn get_topic(&self) -> KafkaTopic {
self.kafka_topic.clone()
}
/// Return a kafka topic id
pub fn get_topic_id(&self) -> KafkaTopicId {
self.kafka_topic.id
}
/// Return a kafka topic name
pub fn get_topic(&self) -> String {
self.kafka_topic_name.clone()
pub fn get_topic_name(&self) -> String {
self.kafka_topic.name.clone()
}
/// Return Kafka Partitions