From 657ac249e9a6506aea97e4e374a374a0d442b758 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Feb 2022 15:33:47 +0000 Subject: [PATCH] feat: track ingester jobs (#3836) --- Cargo.lock | 1 + influxdb_iox/src/commands/run/ingester.rs | 2 +- ingester/Cargo.toml | 1 + ingester/src/data.rs | 2 + ingester/src/handler.rs | 7 ++- ingester/src/job.rs | 76 +++++++++++++++++++++++ ingester/src/lib.rs | 1 + ingester/src/lifecycle.rs | 61 +++++++++++++----- tracker/src/task.rs | 2 +- 9 files changed, 131 insertions(+), 22 deletions(-) create mode 100644 ingester/src/job.rs diff --git a/Cargo.lock b/Cargo.lock index 868b6ecb4f..c78144025f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2101,6 +2101,7 @@ dependencies = [ "tokio-util 0.7.0", "tonic", "trace", + "tracker", "uuid", "workspace-hack", "write_buffer", diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs index ec1eb6196e..4e31c415ea 100644 --- a/influxdb_iox/src/commands/run/ingester.rs +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -201,7 +201,7 @@ pub async fn command(config: Config) -> Result<()> { object_store, write_buffer, Executor::new(config.query_exect_thread_count), - &metric_registry, + Arc::clone(&metric_registry), ) .await?, ); diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 8e9e5d2f04..3775a346e7 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -38,6 +38,7 @@ thiserror = "1.0" time = { path = "../time" } tokio = { version = "1.17", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } tonic = { version = "0.6" } +tracker = { path = "../tracker" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} write_buffer = { path = "../write_buffer" } diff --git a/ingester/src/data.rs b/ingester/src/data.rs index e1b724a87b..cc3c80f139 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1418,6 +1418,7 @@ mod tests { let pause_size = w1.size() + 1; let manager = LifecycleManager::new( LifecycleConfig::new(pause_size, 0, 0, Duration::from_secs(1)), + Arc::new(metric::Registry::new()), Arc::new(SystemProvider::new()), ); let should_pause = data @@ -1501,6 +1502,7 @@ mod tests { std::mem::drop(repos); let manager = LifecycleManager::new( LifecycleConfig::new(1, 0, 0, Duration::from_secs(1)), + Arc::new(metric::Registry::new()), Arc::new(SystemProvider::new()), ); diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 4f4a631ae7..749b0e55ea 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -123,7 +123,7 @@ impl IngestHandlerImpl { object_store: Arc, write_buffer: Arc, exec: Executor, - registry: &metric::Registry, + metric_registry: Arc, ) -> Result { // build the initial ingester data state let mut sequencers = BTreeMap::new(); @@ -139,12 +139,13 @@ impl IngestHandlerImpl { let ingester_data = Arc::clone(&data); let kafka_topic_name = topic.name.clone(); - let ingest_metrics = WriteBufferIngestMetrics::new(registry, &topic.name); + let ingest_metrics = WriteBufferIngestMetrics::new(&metric_registry, &topic.name); // start the lifecycle manager let persister = Arc::clone(&data); let lifecycle_manager = Arc::new(LifecycleManager::new( lifecycle_config, + metric_registry, Arc::new(SystemProvider::new()), )); let manager = Arc::clone(&lifecycle_manager); @@ -710,7 +711,7 @@ mod tests { object_store, reading, Executor::new(1), - &metrics, + Arc::clone(&metrics), ) .await .unwrap(); diff --git a/ingester/src/job.rs b/ingester/src/job.rs new file mode 100644 index 0000000000..55f70fc4bb --- /dev/null +++ b/ingester/src/job.rs @@ -0,0 +1,76 @@ +use std::sync::Arc; + +use iox_catalog::interface::PartitionId; +use parking_lot::Mutex; +use time::TimeProvider; +use tracker::{ + AbstractTaskRegistry, TaskRegistration, TaskRegistry, TaskRegistryWithHistory, + TaskRegistryWithMetrics, TaskTracker, +}; + +const JOB_HISTORY_SIZE: usize = 1000; + +#[derive(Debug)] +pub enum Job { + Persist { partition_id: PartitionId }, +} + +impl Job { + fn name(&self) -> &'static str { + match self { + Self::Persist { .. } => "persist", + } + } + + fn partition_id(&self) -> Option { + match self { + Self::Persist { partition_id, .. } => Some(*partition_id), + } + } +} + +/// The global job registry +#[derive(Debug)] +pub struct JobRegistry { + inner: Mutex>>>, +} + +impl JobRegistry { + pub fn new( + metric_registry: Arc, + time_provider: Arc, + ) -> Self { + let registry = TaskRegistry::new(time_provider); + let registry = TaskRegistryWithHistory::new(registry, JOB_HISTORY_SIZE); + let registry = + TaskRegistryWithMetrics::new(registry, metric_registry, Box::new(f_attributes)); + Self { + inner: Mutex::new(registry), + } + } + + pub fn register(&self, job: Job) -> (TaskTracker, TaskRegistration) { + self.inner.lock().register(job) + } + + /// Reclaims jobs into the historical archive + /// + /// Returns the number of remaining jobs + /// + /// Should be called periodically + pub fn reclaim(&self) -> usize { + let mut lock = self.inner.lock(); + lock.reclaim(); + lock.tracked_len() + } +} + +fn f_attributes(job: &Job) -> metric::Attributes { + let mut attributes = metric::Attributes::from(&[("name", job.name())]); + + if let Some(partition_id) = job.partition_id() { + attributes.insert("partition_id", partition_id.get().to_string()); + } + + attributes +} diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 1a2c727503..bdaa85d2ef 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -16,6 +16,7 @@ pub mod compact; pub mod data; pub mod handler; +mod job; pub mod lifecycle; pub mod persist; mod poison; diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 8f313864d0..bfe6fc00fa 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -7,6 +7,7 @@ use crate::{ data::Persister, + job::{Job, JobRegistry}, poison::{PoisonCabinet, PoisonPill}, }; use iox_catalog::interface::{PartitionId, SequenceNumber, SequencerId}; @@ -17,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use time::{Time, TimeProvider}; use tokio_util::sync::CancellationToken; +use tracker::TrackedFutureExt; /// The lifecycle manager keeps track of the size and age of partitions across all sequencers. /// It triggers persistence based on keeping total memory usage around a set amount while @@ -26,6 +28,7 @@ pub struct LifecycleManager { time_provider: Arc, state: Mutex, persist_running: tokio::sync::Mutex<()>, + job_registry: Arc, } /// The configuration options for the lifecycle on the ingester. @@ -113,12 +116,21 @@ pub struct PartitionLifecycleStats { impl LifecycleManager { /// Initialize a new lifecycle manager that will persist when `maybe_persist` is called /// if anything is over the size or age threshold. - pub(crate) fn new(config: LifecycleConfig, time_provider: Arc) -> Self { + pub(crate) fn new( + config: LifecycleConfig, + metric_registry: Arc, + time_provider: Arc, + ) -> Self { + let job_registry = Arc::new(JobRegistry::new( + metric_registry, + Arc::clone(&time_provider), + )); Self { config, time_provider, state: Default::default(), persist_running: Default::default(), + job_registry, } } @@ -229,9 +241,13 @@ impl LifecycleManager { self.remove(s.partition_id); let persister = Arc::clone(persister); + let (_tracker, registration) = self.job_registry.register(Job::Persist { + partition_id: s.partition_id, + }); tokio::task::spawn(async move { persister.persist(s.partition_id).await; }) + .track(registration) }) .collect(); @@ -301,6 +317,8 @@ pub(crate) async fn run_lifecycle_manager( manager.maybe_persist(&persister).await; + manager.job_registry.reclaim(); + tokio::select!( _ = tokio::time::sleep(CHECK_INTERVAL) => {}, _ = shutdown.cancelled() => {}, @@ -313,7 +331,7 @@ mod tests { use super::*; use async_trait::async_trait; use std::collections::BTreeSet; - use time::{MockProvider, SystemProvider}; + use time::MockProvider; #[derive(Default)] struct TestPersister { @@ -358,9 +376,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_nanos(0), }; - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let tp = Arc::clone(&time_provider); - let m = LifecycleManager::new(config, tp); + let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); // log first two writes at different times @@ -393,8 +409,7 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_nanos(0), }; - let time_provider = Arc::new(SystemProvider::new()); - let m = LifecycleManager::new(config, time_provider); + let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); assert!(!m.log_write( @@ -426,9 +441,7 @@ mod tests { partition_size_threshold: 10, partition_age_threshold: Duration::from_nanos(5), }; - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let tp = Arc::clone(&time_provider); - let m = LifecycleManager::new(config, tp); + let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config); let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); let sequencer_id = SequencerId::new(1); @@ -472,10 +485,9 @@ mod tests { partition_size_threshold: 5, partition_age_threshold: Duration::from_millis(100), }; - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); - let m = LifecycleManager::new(config, time_provider); let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); m.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 4); @@ -515,8 +527,7 @@ mod tests { partition_age_threshold: Duration::from_millis(1000), }; let sequencer_id = SequencerId::new(1); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let m = LifecycleManager::new(config, time_provider); + let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); m.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 8); @@ -579,9 +590,7 @@ mod tests { partition_age_threshold: Duration::from_millis(1000), }; let sequencer_id = SequencerId::new(1); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let tp = Arc::clone(&time_provider); - let m = LifecycleManager::new(config, tp); + let TestLifecycleManger { m, time_provider } = TestLifecycleManger::new(config); let persister = Arc::new(TestPersister::default()); m.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(1), 4); time_provider.inc(Duration::from_nanos(1)); @@ -603,4 +612,22 @@ mod tests { vec![(sequencer_id, SequenceNumber::new(3))] ); } + + struct TestLifecycleManger { + m: LifecycleManager, + time_provider: Arc, + } + + impl TestLifecycleManger { + fn new(config: LifecycleConfig) -> Self { + let metric_registry = Arc::new(metric::Registry::new()); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let m = LifecycleManager::new( + config, + metric_registry, + Arc::::clone(&time_provider), + ); + Self { m, time_provider } + } + } } diff --git a/tracker/src/task.rs b/tracker/src/task.rs index c573b004a7..5150cb7f4f 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -91,7 +91,7 @@ use tokio_util::sync::CancellationToken; pub use future::{TrackedFuture, TrackedFutureExt}; pub use history::TaskRegistryWithHistory; -pub use metrics::TaskRegistryWithMetrics; +pub use metrics::{FAttributes, TaskRegistryWithMetrics}; pub use registry::{AbstractTaskRegistry, TaskId, TaskRegistry}; use tokio::sync::Notify;