feat: add `cluser_uuid`/`catalog_uuid` to telemetry

closes: https://github.com/influxdata/influxdb_pro/issues/764
praveen/telem-cluster-uuid
Praveen Kumar 2025-04-27 13:14:59 +01:00
parent d30f26618c
commit f664973b0f
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
3 changed files with 73 additions and 27 deletions

View File

@ -36,7 +36,7 @@ use influxdb3_server::{
}; };
use influxdb3_shutdown::{ShutdownManager, wait_for_signal}; use influxdb3_shutdown::{ShutdownManager, wait_for_signal};
use influxdb3_sys_events::SysEventStore; use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore; use influxdb3_telemetry::store::{CreateTelemetryStoreArgs, TelemetryStore};
use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{ use influxdb3_write::{
WriteBuffer, WriteBuffer,
@ -684,6 +684,7 @@ pub async fn command(config: Config) -> Result<()> {
Some(Arc::clone(&write_buffer_impl.persisted_files())), Some(Arc::clone(&write_buffer_impl.persisted_files())),
config.telemetry_endpoint.as_str(), config.telemetry_endpoint.as_str(),
config.disable_telemetry_upload, config.disable_telemetry_upload,
catalog.catalog_uuid().to_string(),
) )
.await; .await;
@ -889,6 +890,7 @@ async fn setup_telemetry_store(
persisted_files: Option<Arc<PersistedFiles>>, persisted_files: Option<Arc<PersistedFiles>>,
telemetry_endpoint: &str, telemetry_endpoint: &str,
disable_upload: bool, disable_upload: bool,
catalog_uuid: String,
) -> Arc<TelemetryStore> { ) -> Arc<TelemetryStore> {
let os = std::env::consts::OS; let os = std::env::consts::OS;
let influxdb_pkg_version = env!("CARGO_PKG_VERSION"); let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
@ -905,15 +907,16 @@ async fn setup_telemetry_store(
TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _)) TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _))
} else { } else {
debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}."); debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}.");
TelemetryStore::new( TelemetryStore::new(CreateTelemetryStoreArgs {
instance_id, instance_id,
Arc::from(os), os: Arc::from(os),
Arc::from(influx_version), influx_version: Arc::from(influx_version),
Arc::from(storage_type), storage_type: Arc::from(storage_type),
num_cpus, cores: num_cpus,
persisted_files.map(|p| p as _), persisted_files: persisted_files.map(|p| p as _),
telemetry_endpoint.to_string(), telemetry_endpoint: telemetry_endpoint.to_string(),
) catalog_uuid,
})
.await .await
} }
} }

View File

@ -50,6 +50,11 @@ pub(crate) struct TelemetryPayload {
pub cores: usize, pub cores: usize,
pub product_type: &'static str, pub product_type: &'static str,
pub uptime_secs: u64, pub uptime_secs: u64,
// this is the same as catalog_uuid
// we call it as catalog_uuid everywhere but we save it as cluster_uuid in telemetry as it's
// called cluster_uuid in licensing service. Calling it as `cluster_uuid` here makes it easier
// when mapping telemetry and licensing data
pub cluster_uuid: Arc<str>,
// cpu // cpu
pub cpu_utilization_percent_min_1m: f32, pub cpu_utilization_percent_min_1m: f32,
pub cpu_utilization_percent_max_1m: f32, pub cpu_utilization_percent_max_1m: f32,
@ -164,6 +169,7 @@ mod tests {
storage_type: Arc::from("sample-str"), storage_type: Arc::from("sample-str"),
instance_id: Arc::from("sample-str"), instance_id: Arc::from("sample-str"),
cores: 10, cores: 10,
cluster_uuid: Arc::from("cluster_uuid"),
product_type: "OSS", product_type: "OSS",
cpu_utilization_percent_min_1m: 100.0, cpu_utilization_percent_min_1m: 100.0,
cpu_utilization_percent_max_1m: 100.0, cpu_utilization_percent_max_1m: 100.0,

View File

@ -14,6 +14,18 @@ use crate::{
sender::{TelemetryPayload, send_telemetry_in_background}, sender::{TelemetryPayload, send_telemetry_in_background},
}; };
#[derive(Debug)]
pub struct CreateTelemetryStoreArgs {
pub instance_id: Arc<str>,
pub os: Arc<str>,
pub influx_version: Arc<str>,
pub storage_type: Arc<str>,
pub cores: usize,
pub persisted_files: Option<Arc<dyn ParquetMetrics>>,
pub telemetry_endpoint: String,
pub catalog_uuid: String,
}
/// This store is responsible for holding all the stats which will be sent in the background /// This store is responsible for holding all the stats which will be sent in the background
/// to the server. There are primarily 4 different types of data held in the store: /// to the server. There are primarily 4 different types of data held in the store:
/// - static info (like instance ids, OS etc): These are passed in to create telemetry store. /// - static info (like instance ids, OS etc): These are passed in to create telemetry store.
@ -37,13 +49,16 @@ const MAIN_SENDER_INTERVAL_SECS: u64 = 60 * 60;
impl TelemetryStore { impl TelemetryStore {
pub async fn new( pub async fn new(
instance_id: Arc<str>, CreateTelemetryStoreArgs {
os: Arc<str>, instance_id,
influx_version: Arc<str>, os,
storage_type: Arc<str>, influx_version,
cores: usize, storage_type,
persisted_files: Option<Arc<dyn ParquetMetrics>>, cores,
telemetry_endpoint: String, persisted_files,
telemetry_endpoint,
catalog_uuid,
}: CreateTelemetryStoreArgs,
) -> Arc<Self> { ) -> Arc<Self> {
debug!( debug!(
instance_id = ?instance_id, instance_id = ?instance_id,
@ -53,7 +68,14 @@ impl TelemetryStore {
cores = ?cores, cores = ?cores,
"Initializing telemetry store" "Initializing telemetry store"
); );
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); let inner = TelemetryStoreInner::new(
instance_id,
os,
influx_version,
storage_type,
cores,
catalog_uuid,
);
let store = Arc::new(TelemetryStore { let store = Arc::new(TelemetryStore {
inner: parking_lot::Mutex::new(inner), inner: parking_lot::Mutex::new(inner),
persisted_files, persisted_files,
@ -83,7 +105,15 @@ impl TelemetryStore {
let influx_version = Arc::from("influxdb3-0.1.0"); let influx_version = Arc::from("influxdb3-0.1.0");
let storage_type = Arc::from("Memory"); let storage_type = Arc::from("Memory");
let cores = 10; let cores = 10;
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); let sample_catalog_uuid = "catalog_uuid".to_owned();
let inner = TelemetryStoreInner::new(
instance_id,
os,
influx_version,
storage_type,
cores,
sample_catalog_uuid,
);
Arc::new(TelemetryStore { Arc::new(TelemetryStore {
inner: parking_lot::Mutex::new(inner), inner: parking_lot::Mutex::new(inner),
persisted_files, persisted_files,
@ -146,6 +176,7 @@ struct TelemetryStoreInner {
start_timer: Instant, start_timer: Instant,
cores: usize, cores: usize,
cpu: Cpu, cpu: Cpu,
catalog_uuid: String,
memory: Memory, memory: Memory,
// Both write/read events are captured in this bucket // Both write/read events are captured in this bucket
// and then later rolledup / summarized in // and then later rolledup / summarized in
@ -168,6 +199,7 @@ impl TelemetryStoreInner {
influx_version: Arc<str>, influx_version: Arc<str>,
storage_type: Arc<str>, storage_type: Arc<str>,
cores: usize, cores: usize,
catalog_uuid: String,
) -> Self { ) -> Self {
TelemetryStoreInner { TelemetryStoreInner {
os, os,
@ -175,6 +207,7 @@ impl TelemetryStoreInner {
influx_version, influx_version,
storage_type, storage_type,
cores, cores,
catalog_uuid,
start_timer: Instant::now(), start_timer: Instant::now(),
cpu: Cpu::default(), cpu: Cpu::default(),
memory: Memory::default(), memory: Memory::default(),
@ -198,6 +231,8 @@ impl TelemetryStoreInner {
storage_type: Arc::clone(&self.storage_type), storage_type: Arc::clone(&self.storage_type),
cores: self.cores, cores: self.cores,
product_type: "Core", product_type: "Core",
// cluster_uuid == catalog_uuid
cluster_uuid: Arc::from(self.catalog_uuid.as_str()),
uptime_secs: self.start_timer.elapsed().as_secs(), uptime_secs: self.start_timer.elapsed().as_secs(),
cpu_utilization_percent_min_1m: self.cpu.utilization.min, cpu_utilization_percent_min_1m: self.cpu.utilization.min,
@ -319,15 +354,16 @@ mod tests {
async fn test_telemetry_store_cpu_mem() { async fn test_telemetry_store_cpu_mem() {
// create store // create store
let parqet_file_metrics = Arc::new(SampleParquetMetrics); let parqet_file_metrics = Arc::new(SampleParquetMetrics);
let store: Arc<TelemetryStore> = TelemetryStore::new( let store: Arc<TelemetryStore> = TelemetryStore::new(CreateTelemetryStoreArgs {
Arc::from("some-instance-id"), instance_id: Arc::from("some-instance-id"),
Arc::from("Linux"), os: Arc::from("Linux"),
Arc::from("Core-v3.0"), influx_version: Arc::from("Core-v3.0"),
Arc::from("Memory"), storage_type: Arc::from("Memory"),
10, cores: 10,
Some(parqet_file_metrics), persisted_files: Some(parqet_file_metrics),
"http://localhost/telemetry".to_string(), telemetry_endpoint: "http://localhost/telemetry".to_owned(),
) catalog_uuid: "catalog_but_cluster_uuid".to_owned(),
})
.await; .await;
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
@ -335,6 +371,7 @@ mod tests {
let snapshot = store.snapshot(); let snapshot = store.snapshot();
assert_eq!("some-instance-id", &*snapshot.instance_id); assert_eq!("some-instance-id", &*snapshot.instance_id);
assert_eq!(1, snapshot.uptime_secs); assert_eq!(1, snapshot.uptime_secs);
assert_eq!("catalog_but_cluster_uuid", &*snapshot.cluster_uuid);
// add cpu/mem and snapshot 1 // add cpu/mem and snapshot 1
let mem_used_bytes = 123456789; let mem_used_bytes = 123456789;