diff --git a/Cargo.lock b/Cargo.lock index 50e41537bc..bc0513c448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2300,6 +2300,7 @@ dependencies = [ "client_util", "futures-util", "generated_types", + "observability_deps", "prost 0.11.0", "tonic", "workspace-hack", diff --git a/influxdb_storage_client/Cargo.toml b/influxdb_storage_client/Cargo.toml index 583437f404..e5f5b49b8e 100644 --- a/influxdb_storage_client/Cargo.toml +++ b/influxdb_storage_client/Cargo.toml @@ -6,10 +6,11 @@ edition = "2021" [dependencies] client_util = { path = "../client_util" } -generated_types = { path = "../generated_types", default-features=false } +generated_types = { path = "../generated_types", default-features=false, features=["data_types"] } prost = "0.11" tonic = { version = "0.8" } futures-util = { version = "0.3" } +observability_deps = { path = "../observability_deps"} workspace-hack = { path = "../workspace-hack"} [dev-dependencies] diff --git a/influxdb_storage_client/src/lib.rs b/influxdb_storage_client/src/lib.rs index d4dfffd84e..445622b248 100644 --- a/influxdb_storage_client/src/lib.rs +++ b/influxdb_storage_client/src/lib.rs @@ -21,6 +21,7 @@ use client_util::connection::GrpcConnection; use futures_util::TryStreamExt; use prost::Message; use std::collections::HashMap; +use std::fmt::Debug; /// Re-export generated_types pub mod generated_types { @@ -32,6 +33,7 @@ pub use client_util::connection; use self::connection::Connection; use self::generated_types::*; use ::generated_types::google::protobuf::*; +use observability_deps::tracing::{debug, trace}; use std::num::NonZeroU64; /// InfluxDB IOx deals with database names. The gRPC interface deals @@ -83,7 +85,11 @@ impl Client { /// Creates a new client with the provided connection pub fn new(connection: Connection) -> Self { Self { - inner: storage_client::StorageClient::new(connection.into_grpc_connection()), + inner: storage_client::StorageClient::new( + connection + .log_debug("creating with connection") + .into_grpc_connection(), + ), } } @@ -106,7 +112,12 @@ impl Client { /// return the capabilities of the server as a hash map pub async fn capabilities(&mut self) -> Result>, tonic::Status> { - let response = self.inner.capabilities(Empty {}).await?.into_inner(); + let response = self + .inner + .capabilities(Empty {}) + .await + .log_trace("capabilities response")? + .into_inner(); let CapabilitiesResponse { caps } = response; @@ -125,10 +136,12 @@ impl Client { &mut self, request: MeasurementNamesRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("measurement_names request"); let responses = self .inner .measurement_names(request) - .await? + .await + .log_trace("measurement_names response")? .into_inner() .try_collect() .await?; @@ -142,10 +155,12 @@ impl Client { &mut self, request: ReadWindowAggregateRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("read_window_aggregate request"); let responses: Vec<_> = self .inner .read_window_aggregate(request) - .await? + .await + .log_trace("read_window_aggregate response")? .into_inner() .try_collect() .await?; @@ -159,10 +174,12 @@ impl Client { &mut self, request: TagKeysRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("tag_keys request"); let responses = self .inner .tag_keys(request) - .await? + .await + .log_trace("tag_keys response")? .into_inner() .try_collect() .await?; @@ -176,10 +193,12 @@ impl Client { &mut self, request: MeasurementTagKeysRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("measurement_tag_keys request"); let responses = self .inner .measurement_tag_keys(request) - .await? + .await + .log_trace("measurement_tag_keys response")? .into_inner() .try_collect() .await?; @@ -193,10 +212,12 @@ impl Client { &mut self, request: TagValuesRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("tag_values request"); let responses = self .inner .tag_values(request) - .await? + .await + .log_trace("tag_values response")? .into_inner() .try_collect() .await?; @@ -210,10 +231,12 @@ impl Client { &mut self, request: TagValuesGroupedByMeasurementAndTagKeyRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("tag_values_grouped_by_measurement_and_tag_key request"); let responses: Vec<_> = self .inner .tag_values_grouped_by_measurement_and_tag_key(request) - .await? + .await + .log_trace("tag_values_grouped_by_measurement_and_tag_key response")? .into_inner() .try_collect() .await?; @@ -227,10 +250,12 @@ impl Client { &mut self, request: MeasurementTagValuesRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("measurement_tag_values request"); let responses = self .inner .measurement_tag_values(request) - .await? + .await + .log_trace("measurement_tag_values response")? .into_inner() .try_collect() .await?; @@ -244,10 +269,12 @@ impl Client { &mut self, request: ReadFilterRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("read_filter request"); let responses: Vec<_> = self .inner .read_filter(request) - .await? + .await + .log_trace("read_filter response")? .into_inner() .try_collect() .await?; @@ -261,10 +288,12 @@ impl Client { &mut self, request: ReadGroupRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("read_group request"); let responses: Vec<_> = self .inner .read_group(request) - .await? + .await + .log_trace("read_group response")? .into_inner() .try_collect() .await?; @@ -278,12 +307,14 @@ impl Client { &mut self, request: MeasurementFieldsRequest, ) -> Result, tonic::Status> { + let request = request.log_trace("measurement_fields request"); let measurement_fields_response = self.inner.measurement_fields(request).await?; let responses: Vec<_> = measurement_fields_response .into_inner() .try_collect::>() - .await? + .await + .log_trace("measurement_fields response")? .into_iter() .flat_map(|r| r.fields) .map(|message_field| { @@ -332,3 +363,21 @@ pub fn tag_key_bytes_to_strings(bytes: Vec) -> String { _ => String::from_utf8(bytes).expect("string value response was not utf8"), } } + +/// Logs the specific item +trait Loggable { + fn log_trace(self, msg: &'static str) -> Self; + fn log_debug(self, msg: &'static str) -> Self; +} + +impl Loggable for T { + fn log_trace(self, msg: &'static str) -> Self { + trace!(data=?self, "{}", msg); + self + } + + fn log_debug(self, msg: &'static str) -> Self { + debug!(data=?self, "{}", msg); + self + } +}