feat: allow telemetry endpoint to be passed in (#25475)
Allow the endpoint for telemetry to be passed in via the cli args, e.g ``` --telemetry-endpoint "https://somehost/test/" ``` and the actual endpoint always appends `v3` to it. So, above URL becomes "https://somehost/test/v3"pull/25123/merge
parent
10b6a2810d
commit
5473a9489b
|
@ -262,6 +262,15 @@ pub struct Config {
|
|||
action
|
||||
)]
|
||||
pub disable_parquet_mem_cache: bool,
|
||||
|
||||
/// telemetry server endpoint
|
||||
#[clap(
|
||||
long = "telemetry-endpoint",
|
||||
env = "INFLUXDB3_TELEMETRY_ENDPOINT",
|
||||
default_value = "localhost",
|
||||
action
|
||||
)]
|
||||
pub telemetry_endpoint: String,
|
||||
}
|
||||
|
||||
/// Specified size of the Parquet cache in megabytes (MB)
|
||||
|
@ -450,6 +459,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
catalog.instance_id(),
|
||||
num_cpus,
|
||||
Arc::clone(&write_buffer_impl.persisted_files()),
|
||||
config.telemetry_endpoint,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -502,6 +512,7 @@ async fn setup_telemetry_store(
|
|||
instance_id: Arc<str>,
|
||||
num_cpus: usize,
|
||||
persisted_files: Arc<PersistedFiles>,
|
||||
telemetry_endpoint: String,
|
||||
) -> Arc<TelemetryStore> {
|
||||
let os = std::env::consts::OS;
|
||||
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
|
||||
|
@ -520,6 +531,7 @@ async fn setup_telemetry_store(
|
|||
Arc::from(storage_type),
|
||||
num_cpus,
|
||||
persisted_files,
|
||||
telemetry_endpoint,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -14,13 +14,13 @@ pub(crate) struct TelemetrySender {
|
|||
|
||||
impl TelemetrySender {
|
||||
pub fn new(client: reqwest::Client, base_url: impl IntoUrl) -> Self {
|
||||
let base_url = base_url
|
||||
let base_url: Url = base_url
|
||||
.into_url()
|
||||
.expect("Cannot parse telemetry sender url");
|
||||
Self {
|
||||
client,
|
||||
full_url: base_url
|
||||
.join("/api/v3")
|
||||
.join("./v3")
|
||||
.expect("Cannot set the telemetry request path"),
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ impl TelemetrySender {
|
|||
.send()
|
||||
.await
|
||||
.map_err(TelemetryError::CannotSendToTelemetryServer)?;
|
||||
debug!("Successfully sent telemetry data to server");
|
||||
debug!(endpoint = ?self.full_url.as_str(), "Successfully sent telemetry data to server to");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -82,14 +82,12 @@ pub(crate) struct TelemetryPayload {
|
|||
/// This function runs in the background and if any call fails
|
||||
/// there is no retrying mechanism and it is ok to lose a few samples
|
||||
pub(crate) async fn send_telemetry_in_background(
|
||||
full_url: String,
|
||||
store: Arc<TelemetryStore>,
|
||||
duration_secs: Duration,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut telem_sender = TelemetrySender::new(
|
||||
reqwest::Client::new(),
|
||||
"https://telemetry.influxdata.foo.com",
|
||||
);
|
||||
let mut telem_sender = TelemetrySender::new(reqwest::Client::new(), full_url);
|
||||
let mut interval = tokio::time::interval(duration_secs);
|
||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
|
@ -125,7 +123,7 @@ mod tests {
|
|||
let client = reqwest::Client::new();
|
||||
let mut mock_server = Server::new_async().await;
|
||||
let mut sender = TelemetrySender::new(client, mock_server.url());
|
||||
let mock = mock_server.mock("POST", "/api/v3").create_async().await;
|
||||
let mock = mock_server.mock("POST", "/v3").create_async().await;
|
||||
let telem_payload = create_sample_payload();
|
||||
|
||||
let result = sender.try_sending(&telem_payload).await;
|
||||
|
@ -136,9 +134,9 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_url_join() {
|
||||
let url = Url::parse("https://foo.com/").unwrap();
|
||||
let new_url = url.join("/foo").unwrap();
|
||||
assert_eq!("https://foo.com/foo", new_url.as_str());
|
||||
let url = Url::parse("https://foo.com/boo/1.html").unwrap();
|
||||
let new_url = url.join("./foo").unwrap();
|
||||
assert_eq!("https://foo.com/boo/foo", new_url.as_str());
|
||||
}
|
||||
|
||||
fn create_sample_payload() -> TelemetryPayload {
|
||||
|
|
|
@ -40,6 +40,7 @@ impl TelemetryStore {
|
|||
storage_type: Arc<str>,
|
||||
cores: usize,
|
||||
persisted_files: Arc<dyn ParquetMetrics>,
|
||||
telemetry_endpoint: String,
|
||||
) -> Arc<Self> {
|
||||
debug!(
|
||||
instance_id = ?instance_id,
|
||||
|
@ -58,6 +59,7 @@ impl TelemetryStore {
|
|||
if !cfg!(test) {
|
||||
sample_metrics(store.clone(), Duration::from_secs(SAMPLER_INTERVAL_SECS)).await;
|
||||
send_telemetry_in_background(
|
||||
telemetry_endpoint,
|
||||
store.clone(),
|
||||
Duration::from_secs(MAIN_SENDER_INTERVAL_SECS),
|
||||
)
|
||||
|
@ -304,6 +306,7 @@ mod tests {
|
|||
Arc::from("Memory"),
|
||||
10,
|
||||
parqet_file_metrics,
|
||||
"http://localhost/telemetry".to_owned(),
|
||||
)
|
||||
.await;
|
||||
// check snapshot
|
||||
|
|
Loading…
Reference in New Issue