feat: add `cluser_uuid`/`catalog_uuid` to telemetry (#26335)
closes: https://github.com/influxdata/influxdb_pro/issues/764pull/26350/head
parent
ca123614da
commit
22c196b8fb
|
@ -36,7 +36,7 @@ use influxdb3_server::{
|
|||
};
|
||||
use influxdb3_shutdown::{ShutdownManager, wait_for_signal};
|
||||
use influxdb3_sys_events::SysEventStore;
|
||||
use influxdb3_telemetry::store::TelemetryStore;
|
||||
use influxdb3_telemetry::store::{CreateTelemetryStoreArgs, TelemetryStore};
|
||||
use influxdb3_wal::{Gen1Duration, WalConfig};
|
||||
use influxdb3_write::{
|
||||
WriteBuffer,
|
||||
|
@ -684,6 +684,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Some(Arc::clone(&write_buffer_impl.persisted_files())),
|
||||
config.telemetry_endpoint.as_str(),
|
||||
config.disable_telemetry_upload,
|
||||
catalog.catalog_uuid().to_string(),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -889,6 +890,7 @@ async fn setup_telemetry_store(
|
|||
persisted_files: Option<Arc<PersistedFiles>>,
|
||||
telemetry_endpoint: &str,
|
||||
disable_upload: bool,
|
||||
catalog_uuid: String,
|
||||
) -> Arc<TelemetryStore> {
|
||||
let os = std::env::consts::OS;
|
||||
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
|
||||
|
@ -905,15 +907,16 @@ async fn setup_telemetry_store(
|
|||
TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _))
|
||||
} else {
|
||||
debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}.");
|
||||
TelemetryStore::new(
|
||||
TelemetryStore::new(CreateTelemetryStoreArgs {
|
||||
instance_id,
|
||||
Arc::from(os),
|
||||
Arc::from(influx_version),
|
||||
Arc::from(storage_type),
|
||||
num_cpus,
|
||||
persisted_files.map(|p| p as _),
|
||||
telemetry_endpoint.to_string(),
|
||||
)
|
||||
os: Arc::from(os),
|
||||
influx_version: Arc::from(influx_version),
|
||||
storage_type: Arc::from(storage_type),
|
||||
cores: num_cpus,
|
||||
persisted_files: persisted_files.map(|p| p as _),
|
||||
telemetry_endpoint: telemetry_endpoint.to_string(),
|
||||
catalog_uuid,
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,11 @@ pub(crate) struct TelemetryPayload {
|
|||
pub cores: usize,
|
||||
pub product_type: &'static str,
|
||||
pub uptime_secs: u64,
|
||||
// this is the same as catalog_uuid
|
||||
// we call it as catalog_uuid everywhere but we save it as cluster_uuid in telemetry as it's
|
||||
// called cluster_uuid in licensing service. Calling it as `cluster_uuid` here makes it easier
|
||||
// when mapping telemetry and licensing data
|
||||
pub cluster_uuid: Arc<str>,
|
||||
// cpu
|
||||
pub cpu_utilization_percent_min_1m: f32,
|
||||
pub cpu_utilization_percent_max_1m: f32,
|
||||
|
@ -164,6 +169,7 @@ mod tests {
|
|||
storage_type: Arc::from("sample-str"),
|
||||
instance_id: Arc::from("sample-str"),
|
||||
cores: 10,
|
||||
cluster_uuid: Arc::from("cluster_uuid"),
|
||||
product_type: "OSS",
|
||||
cpu_utilization_percent_min_1m: 100.0,
|
||||
cpu_utilization_percent_max_1m: 100.0,
|
||||
|
|
|
@ -14,6 +14,18 @@ use crate::{
|
|||
sender::{TelemetryPayload, send_telemetry_in_background},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CreateTelemetryStoreArgs {
|
||||
pub instance_id: Arc<str>,
|
||||
pub os: Arc<str>,
|
||||
pub influx_version: Arc<str>,
|
||||
pub storage_type: Arc<str>,
|
||||
pub cores: usize,
|
||||
pub persisted_files: Option<Arc<dyn ParquetMetrics>>,
|
||||
pub telemetry_endpoint: String,
|
||||
pub catalog_uuid: String,
|
||||
}
|
||||
|
||||
/// This store is responsible for holding all the stats which will be sent in the background
|
||||
/// to the server. There are primarily 4 different types of data held in the store:
|
||||
/// - static info (like instance ids, OS etc): These are passed in to create telemetry store.
|
||||
|
@ -37,13 +49,16 @@ const MAIN_SENDER_INTERVAL_SECS: u64 = 60 * 60;
|
|||
|
||||
impl TelemetryStore {
|
||||
pub async fn new(
|
||||
instance_id: Arc<str>,
|
||||
os: Arc<str>,
|
||||
influx_version: Arc<str>,
|
||||
storage_type: Arc<str>,
|
||||
cores: usize,
|
||||
persisted_files: Option<Arc<dyn ParquetMetrics>>,
|
||||
telemetry_endpoint: String,
|
||||
CreateTelemetryStoreArgs {
|
||||
instance_id,
|
||||
os,
|
||||
influx_version,
|
||||
storage_type,
|
||||
cores,
|
||||
persisted_files,
|
||||
telemetry_endpoint,
|
||||
catalog_uuid,
|
||||
}: CreateTelemetryStoreArgs,
|
||||
) -> Arc<Self> {
|
||||
debug!(
|
||||
instance_id = ?instance_id,
|
||||
|
@ -53,7 +68,14 @@ impl TelemetryStore {
|
|||
cores = ?cores,
|
||||
"Initializing telemetry store"
|
||||
);
|
||||
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
|
||||
let inner = TelemetryStoreInner::new(
|
||||
instance_id,
|
||||
os,
|
||||
influx_version,
|
||||
storage_type,
|
||||
cores,
|
||||
catalog_uuid,
|
||||
);
|
||||
let store = Arc::new(TelemetryStore {
|
||||
inner: parking_lot::Mutex::new(inner),
|
||||
persisted_files,
|
||||
|
@ -83,7 +105,15 @@ impl TelemetryStore {
|
|||
let influx_version = Arc::from("influxdb3-0.1.0");
|
||||
let storage_type = Arc::from("Memory");
|
||||
let cores = 10;
|
||||
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
|
||||
let sample_catalog_uuid = "catalog_uuid".to_owned();
|
||||
let inner = TelemetryStoreInner::new(
|
||||
instance_id,
|
||||
os,
|
||||
influx_version,
|
||||
storage_type,
|
||||
cores,
|
||||
sample_catalog_uuid,
|
||||
);
|
||||
Arc::new(TelemetryStore {
|
||||
inner: parking_lot::Mutex::new(inner),
|
||||
persisted_files,
|
||||
|
@ -146,6 +176,7 @@ struct TelemetryStoreInner {
|
|||
start_timer: Instant,
|
||||
cores: usize,
|
||||
cpu: Cpu,
|
||||
catalog_uuid: String,
|
||||
memory: Memory,
|
||||
// Both write/read events are captured in this bucket
|
||||
// and then later rolledup / summarized in
|
||||
|
@ -168,6 +199,7 @@ impl TelemetryStoreInner {
|
|||
influx_version: Arc<str>,
|
||||
storage_type: Arc<str>,
|
||||
cores: usize,
|
||||
catalog_uuid: String,
|
||||
) -> Self {
|
||||
TelemetryStoreInner {
|
||||
os,
|
||||
|
@ -175,6 +207,7 @@ impl TelemetryStoreInner {
|
|||
influx_version,
|
||||
storage_type,
|
||||
cores,
|
||||
catalog_uuid,
|
||||
start_timer: Instant::now(),
|
||||
cpu: Cpu::default(),
|
||||
memory: Memory::default(),
|
||||
|
@ -198,6 +231,8 @@ impl TelemetryStoreInner {
|
|||
storage_type: Arc::clone(&self.storage_type),
|
||||
cores: self.cores,
|
||||
product_type: "Core",
|
||||
// cluster_uuid == catalog_uuid
|
||||
cluster_uuid: Arc::from(self.catalog_uuid.as_str()),
|
||||
uptime_secs: self.start_timer.elapsed().as_secs(),
|
||||
|
||||
cpu_utilization_percent_min_1m: self.cpu.utilization.min,
|
||||
|
@ -319,15 +354,16 @@ mod tests {
|
|||
async fn test_telemetry_store_cpu_mem() {
|
||||
// create store
|
||||
let parqet_file_metrics = Arc::new(SampleParquetMetrics);
|
||||
let store: Arc<TelemetryStore> = TelemetryStore::new(
|
||||
Arc::from("some-instance-id"),
|
||||
Arc::from("Linux"),
|
||||
Arc::from("Core-v3.0"),
|
||||
Arc::from("Memory"),
|
||||
10,
|
||||
Some(parqet_file_metrics),
|
||||
"http://localhost/telemetry".to_string(),
|
||||
)
|
||||
let store: Arc<TelemetryStore> = TelemetryStore::new(CreateTelemetryStoreArgs {
|
||||
instance_id: Arc::from("some-instance-id"),
|
||||
os: Arc::from("Linux"),
|
||||
influx_version: Arc::from("Core-v3.0"),
|
||||
storage_type: Arc::from("Memory"),
|
||||
cores: 10,
|
||||
persisted_files: Some(parqet_file_metrics),
|
||||
telemetry_endpoint: "http://localhost/telemetry".to_owned(),
|
||||
catalog_uuid: "catalog_but_cluster_uuid".to_owned(),
|
||||
})
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
|
@ -335,6 +371,7 @@ mod tests {
|
|||
let snapshot = store.snapshot();
|
||||
assert_eq!("some-instance-id", &*snapshot.instance_id);
|
||||
assert_eq!(1, snapshot.uptime_secs);
|
||||
assert_eq!("catalog_but_cluster_uuid", &*snapshot.cluster_uuid);
|
||||
|
||||
// add cpu/mem and snapshot 1
|
||||
let mem_used_bytes = 123456789;
|
||||
|
|
Loading…
Reference in New Issue