From 22c196b8fb1e78cef5978bab36ad805794e632db Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Mon, 28 Apr 2025 18:37:37 +0100 Subject: [PATCH] feat: add `cluser_uuid`/`catalog_uuid` to telemetry (#26335) closes: https://github.com/influxdata/influxdb_pro/issues/764 --- influxdb3/src/commands/serve.rs | 21 +++++---- influxdb3_telemetry/src/sender.rs | 6 +++ influxdb3_telemetry/src/store.rs | 73 +++++++++++++++++++++++-------- 3 files changed, 73 insertions(+), 27 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index ae22a7b63f..12e6278bf2 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -36,7 +36,7 @@ use influxdb3_server::{ }; use influxdb3_shutdown::{ShutdownManager, wait_for_signal}; use influxdb3_sys_events::SysEventStore; -use influxdb3_telemetry::store::TelemetryStore; +use influxdb3_telemetry::store::{CreateTelemetryStoreArgs, TelemetryStore}; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ WriteBuffer, @@ -684,6 +684,7 @@ pub async fn command(config: Config) -> Result<()> { Some(Arc::clone(&write_buffer_impl.persisted_files())), config.telemetry_endpoint.as_str(), config.disable_telemetry_upload, + catalog.catalog_uuid().to_string(), ) .await; @@ -889,6 +890,7 @@ async fn setup_telemetry_store( persisted_files: Option>, telemetry_endpoint: &str, disable_upload: bool, + catalog_uuid: String, ) -> Arc { let os = std::env::consts::OS; let influxdb_pkg_version = env!("CARGO_PKG_VERSION"); @@ -905,15 +907,16 @@ async fn setup_telemetry_store( TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _)) } else { debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}."); - TelemetryStore::new( + TelemetryStore::new(CreateTelemetryStoreArgs { instance_id, - Arc::from(os), - Arc::from(influx_version), - Arc::from(storage_type), - num_cpus, - persisted_files.map(|p| p as _), - telemetry_endpoint.to_string(), - ) + os: Arc::from(os), + influx_version: Arc::from(influx_version), + storage_type: Arc::from(storage_type), + cores: num_cpus, + persisted_files: persisted_files.map(|p| p as _), + telemetry_endpoint: telemetry_endpoint.to_string(), + catalog_uuid, + }) .await } } diff --git a/influxdb3_telemetry/src/sender.rs b/influxdb3_telemetry/src/sender.rs index 7dff445bfc..13176f441c 100644 --- a/influxdb3_telemetry/src/sender.rs +++ b/influxdb3_telemetry/src/sender.rs @@ -50,6 +50,11 @@ pub(crate) struct TelemetryPayload { pub cores: usize, pub product_type: &'static str, pub uptime_secs: u64, + // this is the same as catalog_uuid + // we call it as catalog_uuid everywhere but we save it as cluster_uuid in telemetry as it's + // called cluster_uuid in licensing service. Calling it as `cluster_uuid` here makes it easier + // when mapping telemetry and licensing data + pub cluster_uuid: Arc, // cpu pub cpu_utilization_percent_min_1m: f32, pub cpu_utilization_percent_max_1m: f32, @@ -164,6 +169,7 @@ mod tests { storage_type: Arc::from("sample-str"), instance_id: Arc::from("sample-str"), cores: 10, + cluster_uuid: Arc::from("cluster_uuid"), product_type: "OSS", cpu_utilization_percent_min_1m: 100.0, cpu_utilization_percent_max_1m: 100.0, diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs index 3c50c9135d..29d4d0dd5a 100644 --- a/influxdb3_telemetry/src/store.rs +++ b/influxdb3_telemetry/src/store.rs @@ -14,6 +14,18 @@ use crate::{ sender::{TelemetryPayload, send_telemetry_in_background}, }; +#[derive(Debug)] +pub struct CreateTelemetryStoreArgs { + pub instance_id: Arc, + pub os: Arc, + pub influx_version: Arc, + pub storage_type: Arc, + pub cores: usize, + pub persisted_files: Option>, + pub telemetry_endpoint: String, + pub catalog_uuid: String, +} + /// This store is responsible for holding all the stats which will be sent in the background /// to the server. There are primarily 4 different types of data held in the store: /// - static info (like instance ids, OS etc): These are passed in to create telemetry store. @@ -37,13 +49,16 @@ const MAIN_SENDER_INTERVAL_SECS: u64 = 60 * 60; impl TelemetryStore { pub async fn new( - instance_id: Arc, - os: Arc, - influx_version: Arc, - storage_type: Arc, - cores: usize, - persisted_files: Option>, - telemetry_endpoint: String, + CreateTelemetryStoreArgs { + instance_id, + os, + influx_version, + storage_type, + cores, + persisted_files, + telemetry_endpoint, + catalog_uuid, + }: CreateTelemetryStoreArgs, ) -> Arc { debug!( instance_id = ?instance_id, @@ -53,7 +68,14 @@ impl TelemetryStore { cores = ?cores, "Initializing telemetry store" ); - let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); + let inner = TelemetryStoreInner::new( + instance_id, + os, + influx_version, + storage_type, + cores, + catalog_uuid, + ); let store = Arc::new(TelemetryStore { inner: parking_lot::Mutex::new(inner), persisted_files, @@ -83,7 +105,15 @@ impl TelemetryStore { let influx_version = Arc::from("influxdb3-0.1.0"); let storage_type = Arc::from("Memory"); let cores = 10; - let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); + let sample_catalog_uuid = "catalog_uuid".to_owned(); + let inner = TelemetryStoreInner::new( + instance_id, + os, + influx_version, + storage_type, + cores, + sample_catalog_uuid, + ); Arc::new(TelemetryStore { inner: parking_lot::Mutex::new(inner), persisted_files, @@ -146,6 +176,7 @@ struct TelemetryStoreInner { start_timer: Instant, cores: usize, cpu: Cpu, + catalog_uuid: String, memory: Memory, // Both write/read events are captured in this bucket // and then later rolledup / summarized in @@ -168,6 +199,7 @@ impl TelemetryStoreInner { influx_version: Arc, storage_type: Arc, cores: usize, + catalog_uuid: String, ) -> Self { TelemetryStoreInner { os, @@ -175,6 +207,7 @@ impl TelemetryStoreInner { influx_version, storage_type, cores, + catalog_uuid, start_timer: Instant::now(), cpu: Cpu::default(), memory: Memory::default(), @@ -198,6 +231,8 @@ impl TelemetryStoreInner { storage_type: Arc::clone(&self.storage_type), cores: self.cores, product_type: "Core", + // cluster_uuid == catalog_uuid + cluster_uuid: Arc::from(self.catalog_uuid.as_str()), uptime_secs: self.start_timer.elapsed().as_secs(), cpu_utilization_percent_min_1m: self.cpu.utilization.min, @@ -319,15 +354,16 @@ mod tests { async fn test_telemetry_store_cpu_mem() { // create store let parqet_file_metrics = Arc::new(SampleParquetMetrics); - let store: Arc = TelemetryStore::new( - Arc::from("some-instance-id"), - Arc::from("Linux"), - Arc::from("Core-v3.0"), - Arc::from("Memory"), - 10, - Some(parqet_file_metrics), - "http://localhost/telemetry".to_string(), - ) + let store: Arc = TelemetryStore::new(CreateTelemetryStoreArgs { + instance_id: Arc::from("some-instance-id"), + os: Arc::from("Linux"), + influx_version: Arc::from("Core-v3.0"), + storage_type: Arc::from("Memory"), + cores: 10, + persisted_files: Some(parqet_file_metrics), + telemetry_endpoint: "http://localhost/telemetry".to_owned(), + catalog_uuid: "catalog_but_cluster_uuid".to_owned(), + }) .await; tokio::time::sleep(Duration::from_secs(1)).await; @@ -335,6 +371,7 @@ mod tests { let snapshot = store.snapshot(); assert_eq!("some-instance-id", &*snapshot.instance_id); assert_eq!(1, snapshot.uptime_secs); + assert_eq!("catalog_but_cluster_uuid", &*snapshot.cluster_uuid); // add cpu/mem and snapshot 1 let mem_used_bytes = 123456789;