From dbd27f648fbedfdc5dcc25f87b8a992b76cf0b00 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 25 Aug 2022 16:56:14 -0400 Subject: [PATCH] refactor: Rename more mentions of Kafka to their other name where appropriate --- influxdb_iox/src/commands/catalog.rs | 2 +- ingester/src/handler.rs | 2 +- ingester/src/lifecycle.rs | 6 +- ingester/src/stream_handler/handler.rs | 64 +++++++++---------- .../stream_handler/sink_instrumentation.rs | 12 ++-- iox_catalog/src/lib.rs | 6 +- ioxd_querier/src/lib.rs | 2 +- router/src/lib.rs | 2 +- test_helpers_end_to_end/src/database.rs | 2 +- 9 files changed, 49 insertions(+), 49 deletions(-) diff --git a/influxdb_iox/src/commands/catalog.rs b/influxdb_iox/src/commands/catalog.rs index 5f4dde29c9..719bff6727 100644 --- a/influxdb_iox/src/commands/catalog.rs +++ b/influxdb_iox/src/commands/catalog.rs @@ -40,7 +40,7 @@ enum Command { /// Run database migrations Setup(Setup), - /// Manage kafka topic + /// Manage topic Topic(topic::Config), } diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 8a46092c5d..52823371d5 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -83,7 +83,7 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle { /// persistence and answer queries #[derive(Debug)] pub struct IngestHandlerImpl { - /// Kafka Topic assigned to this ingester + /// Topic assigned to this ingester #[allow(dead_code)] topic: TopicMetadata, diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 522437a680..fe22c10c2c 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -129,8 +129,8 @@ pub struct LifecycleManager { /// The configuration options for the lifecycle on the ingester. #[derive(Debug, Clone, Copy)] pub struct LifecycleConfig { - /// The ingester will pause pulling data from Kafka if it hits this amount of memory used, waiting - /// until persistence evicts partitions from memory. + /// The ingester will pause pulling data from the write buffer if it hits this amount of memory + /// used, waiting until persistence evicts partitions from memory. pause_ingest_size: usize, /// When the ingester hits this threshold, the lifecycle manager will persist the largest /// partitions currently buffered until it falls below this threshold. An ingester running @@ -147,7 +147,7 @@ pub struct LifecycleConfig { partition_size_threshold: usize, /// If an individual partitiion has had data buffered for longer than this period of time, the /// manager will persist it. This setting is to ensure we have an upper bound on how far back - /// we will need to read in Kafka on restart or recovery. + /// we will need to read in the write buffer on restart or recovery. partition_age_threshold: Duration, /// If an individual partition hasn't received a write for longer than this period of time, the /// manager will persist it. This is to ensure that cold partitions get cleared out to make diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 73150c9f82..bdd8cba909 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -64,7 +64,7 @@ pub struct SequencedStreamHandler { shard_reset_count: U64Counter, /// Log context fields - otherwise unused. - kafka_topic_name: String, + topic_name: String, shard_index: ShardIndex, skip_to_oldest_available: bool, @@ -83,7 +83,7 @@ impl SequencedStreamHandler { current_sequence_number: SequenceNumber, sink: O, lifecycle_handle: LifecycleHandleImpl, - kafka_topic_name: String, + topic_name: String, shard_index: ShardIndex, metrics: &metric::Registry, skip_to_oldest_available: bool, @@ -92,7 +92,7 @@ impl SequencedStreamHandler { let time_to_be_readable = metrics.register_metric::( "ingester_ttbr", "duration of time between producer writing to consumer putting into queryable cache", - ).recorder(metric_attrs(shard_index, &kafka_topic_name, None, false)); + ).recorder(metric_attrs(shard_index, &topic_name, None, false)); // Lifecycle-driven ingest pause duration let pause_duration = metrics @@ -109,31 +109,31 @@ impl SequencedStreamHandler { ); let shard_unknown_sequence_number_count = ingest_errors.recorder(metric_attrs( shard_index, - &kafka_topic_name, + &topic_name, Some("shard_unknown_sequence_number"), true, )); let shard_invalid_data_count = ingest_errors.recorder(metric_attrs( shard_index, - &kafka_topic_name, + &topic_name, Some("shard_invalid_data"), true, )); let shard_unknown_error_count = ingest_errors.recorder(metric_attrs( shard_index, - &kafka_topic_name, + &topic_name, Some("shard_unknown_error"), true, )); let sink_apply_error_count = ingest_errors.recorder(metric_attrs( shard_index, - &kafka_topic_name, + &topic_name, Some("sink_apply_error"), true, )); let skipped_sequence_number_amount = ingest_errors.recorder(metric_attrs( shard_index, - &kafka_topic_name, + &topic_name, Some("skipped_sequence_number_amount"), true, )); @@ -144,7 +144,7 @@ impl SequencedStreamHandler { "shard_reset_count", "how often a shard was already reset", ) - .recorder(metric_attrs(shard_index, &kafka_topic_name, None, true)); + .recorder(metric_attrs(shard_index, &topic_name, None, true)); Self { write_buffer_stream_handler, @@ -160,7 +160,7 @@ impl SequencedStreamHandler { sink_apply_error_count, skipped_sequence_number_amount, shard_reset_count, - kafka_topic_name, + topic_name, shard_index, skip_to_oldest_available, } @@ -183,7 +183,7 @@ impl SequencedStreamHandler { sink_apply_error_count: self.sink_apply_error_count, skipped_sequence_number_amount: self.skipped_sequence_number_amount, shard_reset_count: self.shard_reset_count, - kafka_topic_name: self.kafka_topic_name, + topic_name: self.topic_name, shard_index: self.shard_index, skip_to_oldest_available: self.skip_to_oldest_available, } @@ -220,7 +220,7 @@ where next = stream.next().fuse() => next, _ = shutdown_fut => { info!( - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, "stream handler shutdown", ); @@ -260,7 +260,7 @@ where if self.skip_to_oldest_available && sequence_number_before_reset.is_none() { warn!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, potential_data_loss=true, "reset stream" @@ -273,7 +273,7 @@ where } else { error!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, potential_data_loss=true, "unable to read from desired sequence number offset" @@ -285,7 +285,7 @@ where Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => { warn!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, "I/O error reading from shard" ); @@ -300,7 +300,7 @@ where // be applied/persisted. error!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, potential_data_loss=true, "unable to deserialize dml operation" @@ -316,13 +316,13 @@ Shard Index {:?} stream for topic {} has a high watermark BEFORE the sequence nu is either a bug (see https://github.com/influxdata/rskafka/issues/147 for example) or means that \ someone re-created the shard and data is lost. In both cases, it's better to panic than to try \ something clever.", - self.shard_index, self.kafka_topic_name, + self.shard_index, self.topic_name, ) } Some(Err(e)) => { error!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, potential_data_loss=true, "unhandled error converting write buffer data to DmlOperation", @@ -334,7 +334,7 @@ something clever.", None => { panic!( "shard index {:?} stream for topic {} ended without graceful shutdown", - self.shard_index, self.kafka_topic_name + self.shard_index, self.topic_name ); } }; @@ -349,7 +349,7 @@ something clever.", if let Some(op) = op { // Emit per-op debug info. trace!( - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, op_size=op.size(), op_namespace=op.namespace(), @@ -366,7 +366,7 @@ something clever.", let should_pause = match self.sink.apply(op).await { Ok(should_pause) => { trace!( - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, %should_pause, "successfully applied dml operation" @@ -376,7 +376,7 @@ something clever.", Err(e) => { error!( error=%e, - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, potential_data_loss=true, "failed to apply dml operation" @@ -404,7 +404,7 @@ something clever.", let started_at = self.time_provider.now(); warn!( - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, "pausing ingest until persistence has run" ); @@ -429,7 +429,7 @@ something clever.", .unwrap_or_else(|| "unknown".to_string()); info!( - kafka_topic=%self.kafka_topic_name, + kafka_topic=%self.topic_name, shard_index=%self.shard_index, pause_duration=%duration_str, "resuming ingest" @@ -483,7 +483,7 @@ mod tests { static TEST_TIME: Lazy