From 5473a9489b939f7e6b6005505960effea8a707aa Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Fri, 18 Oct 2024 14:34:27 +0100 Subject: [PATCH] feat: allow telemetry endpoint to be passed in (#25475) Allow the endpoint for telemetry to be passed in via the cli args, e.g ``` --telemetry-endpoint "https://somehost/test/" ``` and the actual endpoint always appends `v3` to it. So, above URL becomes "https://somehost/test/v3" --- influxdb3/src/commands/serve.rs | 12 ++++++++++++ influxdb3_telemetry/src/sender.rs | 20 +++++++++----------- influxdb3_telemetry/src/store.rs | 3 +++ 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 494f504101..f42dbe05cf 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -262,6 +262,15 @@ pub struct Config { action )] pub disable_parquet_mem_cache: bool, + + /// telemetry server endpoint + #[clap( + long = "telemetry-endpoint", + env = "INFLUXDB3_TELEMETRY_ENDPOINT", + default_value = "localhost", + action + )] + pub telemetry_endpoint: String, } /// Specified size of the Parquet cache in megabytes (MB) @@ -450,6 +459,7 @@ pub async fn command(config: Config) -> Result<()> { catalog.instance_id(), num_cpus, Arc::clone(&write_buffer_impl.persisted_files()), + config.telemetry_endpoint, ) .await; @@ -502,6 +512,7 @@ async fn setup_telemetry_store( instance_id: Arc, num_cpus: usize, persisted_files: Arc, + telemetry_endpoint: String, ) -> Arc { let os = std::env::consts::OS; let influxdb_pkg_version = env!("CARGO_PKG_VERSION"); @@ -520,6 +531,7 @@ async fn setup_telemetry_store( Arc::from(storage_type), num_cpus, persisted_files, + telemetry_endpoint, ) .await } diff --git a/influxdb3_telemetry/src/sender.rs b/influxdb3_telemetry/src/sender.rs index 8f80221c6c..3f3479e980 100644 --- a/influxdb3_telemetry/src/sender.rs +++ b/influxdb3_telemetry/src/sender.rs @@ -14,13 +14,13 @@ pub(crate) struct TelemetrySender { impl TelemetrySender { pub fn new(client: reqwest::Client, base_url: impl IntoUrl) -> Self { - let base_url = base_url + let base_url: Url = base_url .into_url() .expect("Cannot parse telemetry sender url"); Self { client, full_url: base_url - .join("/api/v3") + .join("./v3") .expect("Cannot set the telemetry request path"), } } @@ -34,7 +34,7 @@ impl TelemetrySender { .send() .await .map_err(TelemetryError::CannotSendToTelemetryServer)?; - debug!("Successfully sent telemetry data to server"); + debug!(endpoint = ?self.full_url.as_str(), "Successfully sent telemetry data to server to"); Ok(()) } } @@ -82,14 +82,12 @@ pub(crate) struct TelemetryPayload { /// This function runs in the background and if any call fails /// there is no retrying mechanism and it is ok to lose a few samples pub(crate) async fn send_telemetry_in_background( + full_url: String, store: Arc, duration_secs: Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let mut telem_sender = TelemetrySender::new( - reqwest::Client::new(), - "https://telemetry.influxdata.foo.com", - ); + let mut telem_sender = TelemetrySender::new(reqwest::Client::new(), full_url); let mut interval = tokio::time::interval(duration_secs); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -125,7 +123,7 @@ mod tests { let client = reqwest::Client::new(); let mut mock_server = Server::new_async().await; let mut sender = TelemetrySender::new(client, mock_server.url()); - let mock = mock_server.mock("POST", "/api/v3").create_async().await; + let mock = mock_server.mock("POST", "/v3").create_async().await; let telem_payload = create_sample_payload(); let result = sender.try_sending(&telem_payload).await; @@ -136,9 +134,9 @@ mod tests { #[test] fn test_url_join() { - let url = Url::parse("https://foo.com/").unwrap(); - let new_url = url.join("/foo").unwrap(); - assert_eq!("https://foo.com/foo", new_url.as_str()); + let url = Url::parse("https://foo.com/boo/1.html").unwrap(); + let new_url = url.join("./foo").unwrap(); + assert_eq!("https://foo.com/boo/foo", new_url.as_str()); } fn create_sample_payload() -> TelemetryPayload { diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs index 733b8f2d6a..b8ec6fb3f5 100644 --- a/influxdb3_telemetry/src/store.rs +++ b/influxdb3_telemetry/src/store.rs @@ -40,6 +40,7 @@ impl TelemetryStore { storage_type: Arc, cores: usize, persisted_files: Arc, + telemetry_endpoint: String, ) -> Arc { debug!( instance_id = ?instance_id, @@ -58,6 +59,7 @@ impl TelemetryStore { if !cfg!(test) { sample_metrics(store.clone(), Duration::from_secs(SAMPLER_INTERVAL_SECS)).await; send_telemetry_in_background( + telemetry_endpoint, store.clone(), Duration::from_secs(MAIN_SENDER_INTERVAL_SECS), ) @@ -304,6 +306,7 @@ mod tests { Arc::from("Memory"), 10, parqet_file_metrics, + "http://localhost/telemetry".to_owned(), ) .await; // check snapshot