From c1a5e1b5fdd3b89960fe8ceca4d0124bd5ad09fe Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Fri, 20 Sep 2024 19:20:54 +0100 Subject: [PATCH] feat(telemetry): added basic types (#25374) - `TelemetryStore` is exposed for holding telemetry samples - added influxdb3_telemetry dependency to influxdb3 crate --- Cargo.lock | 16 ++++ Cargo.toml | 1 + influxdb3/Cargo.toml | 1 + influxdb3_telemetry/Cargo.toml | 20 +++++ influxdb3_telemetry/src/lib.rs | 142 +++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 influxdb3_telemetry/Cargo.toml create mode 100644 influxdb3_telemetry/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index de074f0ed0..85af2c56f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2580,6 +2580,7 @@ dependencies = [ "influxdb3_client", "influxdb3_process", "influxdb3_server", + "influxdb3_telemetry", "influxdb3_wal", "influxdb3_write", "influxdb_iox_client", @@ -2768,6 +2769,21 @@ dependencies = [ "urlencoding 1.3.3", ] +[[package]] +name = "influxdb3_telemetry" +version = "0.1.0" +dependencies = [ + "futures", + "futures-util", + "observability_deps", + "parking_lot", + "reqwest 0.11.27", + "serde", + "serde_json", + "test-log", + "tokio", +] + [[package]] name = "influxdb3_wal" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8c9c0f8e79..f7688eca17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "influxdb3_wal", "influxdb3_write", "iox_query_influxql_rewrite", + "influxdb3_telemetry", ] default-members = ["influxdb3"] diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 6be8d5d90c..9510000b95 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -30,6 +30,7 @@ influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_server = { path = "../influxdb3_server" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } +influxdb3_telemetry = { path = "../influxdb3_telemetry" } # Crates.io dependencies anyhow.workspace = true diff --git a/influxdb3_telemetry/Cargo.toml b/influxdb3_telemetry/Cargo.toml new file mode 100644 index 0000000000..3970de849c --- /dev/null +++ b/influxdb3_telemetry/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "influxdb3_telemetry" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +observability_deps.workspace = true +futures.workspace = true +futures-util.workspace = true +reqwest.workspace = true +parking_lot.workspace = true + +[dev-dependencies] +test-log.workspace = true + diff --git a/influxdb3_telemetry/src/lib.rs b/influxdb3_telemetry/src/lib.rs new file mode 100644 index 0000000000..19dd0594f3 --- /dev/null +++ b/influxdb3_telemetry/src/lib.rs @@ -0,0 +1,142 @@ +use observability_deps::tracing::error; +use serde::Serialize; +use std::{sync::Arc, time::Duration}; + +/// This store is responsible for holding all the stats which +/// will be sent in the background to the server. +pub struct TelemetryStore { + inner: parking_lot::Mutex, +} + +impl TelemetryStore { + pub async fn new( + instance_id: String, + os: String, + influx_version: String, + storage_type: String, + cores: u32, + ) -> Arc { + let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores); + let store = Arc::new(TelemetryStore { + inner: parking_lot::Mutex::new(inner), + }); + send_telemetry_in_background(store.clone()).await; + store + } + + pub fn add_cpu_utilization(&self, value: u32) { + let mut inner_store = self.inner.lock(); + inner_store.cpu_utilization_percent = Some(value); + } + + pub fn snapshot(&self) -> ExternalTelemetry { + let inner_store = self.inner.lock(); + inner_store.snapshot() + } +} + +struct TelemetryStoreInner { + instance_id: String, + os: String, + influx_version: String, + storage_type: String, + cores: u32, + // just for explanation + cpu_utilization_percent: Option, +} + +impl TelemetryStoreInner { + pub fn new( + instance_id: String, + os: String, + influx_version: String, + storage_type: String, + cores: u32, + ) -> Self { + TelemetryStoreInner { + os, + instance_id, + influx_version, + storage_type, + cores, + cpu_utilization_percent: None, + } + } + + pub fn snapshot(&self) -> ExternalTelemetry { + ExternalTelemetry { + os: self.os.clone(), + version: self.influx_version.clone(), + instance_id: self.instance_id.clone(), + storage_type: self.storage_type.clone(), + cores: self.cores, + cpu_utilization_percent: self.cpu_utilization_percent, + } + } +} + +#[derive(Serialize)] +pub struct ExternalTelemetry { + pub os: String, + pub version: String, + pub storage_type: String, + pub instance_id: String, + pub cores: u32, + pub cpu_utilization_percent: Option, +} + +async fn send_telemetry_in_background(store: Arc) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let client = reqwest::Client::new(); + // TODO: Pass in the duration rather than hardcode it to 1hr + let mut interval = tokio::time::interval(Duration::from_secs(60 * 60)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + let telemetry = store.snapshot(); + let maybe_json = serde_json::to_vec(&telemetry); + match maybe_json { + Ok(json) => { + // TODO: wire it up to actual telemetry sender + let _res = client + .post("https://telemetry.influxdata.endpoint.com") + .body(json) + .send() + .await; + } + Err(e) => { + error!(error = ?e, "Cannot send telemetry"); + } + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test_log::test(tokio::test)] + async fn test_telemetry_handle_creation() { + // create store + let store: Arc = TelemetryStore::new( + "some-instance-id".to_owned(), + "Linux".to_owned(), + "OSS-v3.0".to_owned(), + "Memory".to_owned(), + 10, + ) + .await; + // check snapshot + let snapshot = store.snapshot(); + assert_eq!("some-instance-id", snapshot.instance_id); + + // add cpu utilization + store.add_cpu_utilization(89); + + // check snapshot again + let snapshot = store.snapshot(); + assert_eq!(Some(89), snapshot.cpu_utilization_percent); + } +}