feat: Add debug/tracing logging into the influxrpc_storage_client (#5916)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
fa220589bd
commit
5a63cf1f33
|
|
@ -2300,6 +2300,7 @@ dependencies = [
|
|||
"client_util",
|
||||
"futures-util",
|
||||
"generated_types",
|
||||
"observability_deps",
|
||||
"prost 0.11.0",
|
||||
"tonic",
|
||||
"workspace-hack",
|
||||
|
|
|
|||
|
|
@ -6,10 +6,11 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
client_util = { path = "../client_util" }
|
||||
generated_types = { path = "../generated_types", default-features=false }
|
||||
generated_types = { path = "../generated_types", default-features=false, features=["data_types"] }
|
||||
prost = "0.11"
|
||||
tonic = { version = "0.8" }
|
||||
futures-util = { version = "0.3" }
|
||||
observability_deps = { path = "../observability_deps"}
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ use client_util::connection::GrpcConnection;
|
|||
use futures_util::TryStreamExt;
|
||||
use prost::Message;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
|
||||
/// Re-export generated_types
|
||||
pub mod generated_types {
|
||||
|
|
@ -32,6 +33,7 @@ pub use client_util::connection;
|
|||
use self::connection::Connection;
|
||||
use self::generated_types::*;
|
||||
use ::generated_types::google::protobuf::*;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
/// InfluxDB IOx deals with database names. The gRPC interface deals
|
||||
|
|
@ -83,7 +85,11 @@ impl Client {
|
|||
/// Creates a new client with the provided connection
|
||||
pub fn new(connection: Connection) -> Self {
|
||||
Self {
|
||||
inner: storage_client::StorageClient::new(connection.into_grpc_connection()),
|
||||
inner: storage_client::StorageClient::new(
|
||||
connection
|
||||
.log_debug("creating with connection")
|
||||
.into_grpc_connection(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -106,7 +112,12 @@ impl Client {
|
|||
|
||||
/// return the capabilities of the server as a hash map
|
||||
pub async fn capabilities(&mut self) -> Result<HashMap<String, Vec<String>>, tonic::Status> {
|
||||
let response = self.inner.capabilities(Empty {}).await?.into_inner();
|
||||
let response = self
|
||||
.inner
|
||||
.capabilities(Empty {})
|
||||
.await
|
||||
.log_trace("capabilities response")?
|
||||
.into_inner();
|
||||
|
||||
let CapabilitiesResponse { caps } = response;
|
||||
|
||||
|
|
@ -125,10 +136,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: MeasurementNamesRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("measurement_names request");
|
||||
let responses = self
|
||||
.inner
|
||||
.measurement_names(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("measurement_names response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -142,10 +155,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: ReadWindowAggregateRequest,
|
||||
) -> Result<Vec<read_response::frame::Data>, tonic::Status> {
|
||||
let request = request.log_trace("read_window_aggregate request");
|
||||
let responses: Vec<_> = self
|
||||
.inner
|
||||
.read_window_aggregate(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("read_window_aggregate response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -159,10 +174,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: TagKeysRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("tag_keys request");
|
||||
let responses = self
|
||||
.inner
|
||||
.tag_keys(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("tag_keys response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -176,10 +193,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: MeasurementTagKeysRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("measurement_tag_keys request");
|
||||
let responses = self
|
||||
.inner
|
||||
.measurement_tag_keys(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("measurement_tag_keys response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -193,10 +212,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: TagValuesRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("tag_values request");
|
||||
let responses = self
|
||||
.inner
|
||||
.tag_values(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("tag_values response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -210,10 +231,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: TagValuesGroupedByMeasurementAndTagKeyRequest,
|
||||
) -> Result<Vec<TagValuesResponse>, tonic::Status> {
|
||||
let request = request.log_trace("tag_values_grouped_by_measurement_and_tag_key request");
|
||||
let responses: Vec<_> = self
|
||||
.inner
|
||||
.tag_values_grouped_by_measurement_and_tag_key(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("tag_values_grouped_by_measurement_and_tag_key response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -227,10 +250,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: MeasurementTagValuesRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("measurement_tag_values request");
|
||||
let responses = self
|
||||
.inner
|
||||
.measurement_tag_values(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("measurement_tag_values response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -244,10 +269,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: ReadFilterRequest,
|
||||
) -> Result<Vec<read_response::frame::Data>, tonic::Status> {
|
||||
let request = request.log_trace("read_filter request");
|
||||
let responses: Vec<_> = self
|
||||
.inner
|
||||
.read_filter(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("read_filter response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -261,10 +288,12 @@ impl Client {
|
|||
&mut self,
|
||||
request: ReadGroupRequest,
|
||||
) -> Result<Vec<read_response::frame::Data>, tonic::Status> {
|
||||
let request = request.log_trace("read_group request");
|
||||
let responses: Vec<_> = self
|
||||
.inner
|
||||
.read_group(request)
|
||||
.await?
|
||||
.await
|
||||
.log_trace("read_group response")?
|
||||
.into_inner()
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
@ -278,12 +307,14 @@ impl Client {
|
|||
&mut self,
|
||||
request: MeasurementFieldsRequest,
|
||||
) -> Result<Vec<String>, tonic::Status> {
|
||||
let request = request.log_trace("measurement_fields request");
|
||||
let measurement_fields_response = self.inner.measurement_fields(request).await?;
|
||||
|
||||
let responses: Vec<_> = measurement_fields_response
|
||||
.into_inner()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?
|
||||
.await
|
||||
.log_trace("measurement_fields response")?
|
||||
.into_iter()
|
||||
.flat_map(|r| r.fields)
|
||||
.map(|message_field| {
|
||||
|
|
@ -332,3 +363,21 @@ pub fn tag_key_bytes_to_strings(bytes: Vec<u8>) -> String {
|
|||
_ => String::from_utf8(bytes).expect("string value response was not utf8"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Logs the specific item
|
||||
trait Loggable {
|
||||
fn log_trace(self, msg: &'static str) -> Self;
|
||||
fn log_debug(self, msg: &'static str) -> Self;
|
||||
}
|
||||
|
||||
impl<T: Debug> Loggable for T {
|
||||
fn log_trace(self, msg: &'static str) -> Self {
|
||||
trace!(data=?self, "{}", msg);
|
||||
self
|
||||
}
|
||||
|
||||
fn log_debug(self, msg: &'static str) -> Self {
|
||||
debug!(data=?self, "{}", msg);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue