feat: Log gRPC predicates in a human readable form (#507)
* feat: log predicate passed to storage system * fix: clippypull/24376/head
parent
60f933c3f7
commit
a2e5af1508
|
@ -4,7 +4,7 @@
|
|||
//! RPCPredicate --> query::Predicates
|
||||
//!
|
||||
//! Aggregates / windows --> query::GroupByAndAggregate
|
||||
use std::convert::TryFrom;
|
||||
use std::{convert::TryFrom, fmt};
|
||||
|
||||
use arrow_deps::datafusion::{
|
||||
logical_plan::{binary_expr, Expr, Operator},
|
||||
|
@ -615,6 +615,151 @@ pub fn convert_group_type(group: i32) -> Result<RPCGroup> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Creates a representation of some struct (in another crate that we
|
||||
/// don't control) suitable for logging with `std::fmt::Display`)
|
||||
pub trait Loggable<'a> {
|
||||
fn loggable(&'a self) -> Box<dyn fmt::Display + 'a>;
|
||||
}
|
||||
|
||||
impl<'a> Loggable<'a> for Option<RPCPredicate> {
|
||||
fn loggable(&'a self) -> Box<dyn fmt::Display + 'a> {
|
||||
Box::new(displayable_predicate(self.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Loggable<'a> for RPCPredicate {
|
||||
fn loggable(&'a self) -> Box<dyn fmt::Display + 'a> {
|
||||
Box::new(displayable_predicate(Some(self)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a struct that can format gRPC predicate (aka `RPCPredicates`) for Display
|
||||
///
|
||||
/// For example:
|
||||
/// let pred = RPCPredicate (...);
|
||||
/// println!("The predicate is {:?}", loggable_predicate(pred));
|
||||
///
|
||||
pub fn displayable_predicate(pred: Option<&RPCPredicate>) -> impl fmt::Display + '_ {
|
||||
struct Wrapper<'a>(Option<&'a RPCPredicate>);
|
||||
|
||||
impl<'a> fmt::Display for Wrapper<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.0 {
|
||||
None => write!(f, "<NONE>"),
|
||||
Some(pred) => format_predicate(pred, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
Wrapper(pred)
|
||||
}
|
||||
|
||||
fn format_predicate<'a>(pred: &'a RPCPredicate, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match &pred.root {
|
||||
Some(r) => format_node(r, f),
|
||||
None => write!(f, "root: <NONE>"),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_node<'a>(node: &'a RPCNode, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let value = match &node.value {
|
||||
None => {
|
||||
write!(f, "node: NONE")?;
|
||||
return Ok(());
|
||||
}
|
||||
Some(value) => value,
|
||||
};
|
||||
|
||||
match node.children.len() {
|
||||
0 => {
|
||||
format_value(value, f)?;
|
||||
}
|
||||
// print using infix notation
|
||||
// (child0 <op> child1)
|
||||
2 => {
|
||||
write!(f, "(")?;
|
||||
format_node(&node.children[0], f)?;
|
||||
write!(f, " ")?;
|
||||
format_value(value, f)?;
|
||||
write!(f, " ")?;
|
||||
format_node(&node.children[1], f)?;
|
||||
write!(f, ")")?;
|
||||
}
|
||||
// print func notation
|
||||
// <op>(child0, chold1, ...)
|
||||
_ => {
|
||||
format_value(value, f)?;
|
||||
write!(f, "(")?;
|
||||
for (i, child) in node.children.iter().enumerate() {
|
||||
if i > 0 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
format_node(child, f)?;
|
||||
}
|
||||
write!(f, ")")?;
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn format_value<'a>(value: &'a RPCValue, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
use RPCValue::*;
|
||||
match value {
|
||||
StringValue(s) => write!(f, "\"{}\"", s),
|
||||
BoolValue(b) => write!(f, "{}", b),
|
||||
IntValue(i) => write!(f, "{}", i),
|
||||
UintValue(u) => write!(f, "{}", u),
|
||||
FloatValue(fval) => write!(f, "{}", fval),
|
||||
RegexValue(r) => write!(f, "RegEx:{}", r),
|
||||
TagRefValue(bytes) => {
|
||||
let temp = String::from_utf8_lossy(bytes);
|
||||
let sval = match *bytes.as_slice() {
|
||||
[0] => "_m[0x00]",
|
||||
[255] => "_f[0xff]",
|
||||
_ => &temp,
|
||||
};
|
||||
write!(f, "TagRef:{}", sval)
|
||||
}
|
||||
FieldRefValue(d) => write!(f, "FieldRef:{}", d),
|
||||
Logical(v) => format_logical(*v, f),
|
||||
Comparison(v) => format_comparison(*v, f),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_logical(v: i32, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if v == RPCLogical::And as i32 {
|
||||
write!(f, "AND")
|
||||
} else if v == RPCLogical::Or as i32 {
|
||||
write!(f, "Or")
|
||||
} else {
|
||||
write!(f, "UNKNOWN_LOGICAL:{}", v)
|
||||
}
|
||||
}
|
||||
|
||||
fn format_comparison(v: i32, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if v == RPCComparison::Equal as i32 {
|
||||
write!(f, "==")
|
||||
} else if v == RPCComparison::NotEqual as i32 {
|
||||
write!(f, "!=")
|
||||
} else if v == RPCComparison::StartsWith as i32 {
|
||||
write!(f, "StartsWith")
|
||||
} else if v == RPCComparison::Regex as i32 {
|
||||
write!(f, "RegEx")
|
||||
} else if v == RPCComparison::NotRegex as i32 {
|
||||
write!(f, "NotRegex")
|
||||
} else if v == RPCComparison::Lt as i32 {
|
||||
write!(f, "<")
|
||||
} else if v == RPCComparison::Lte as i32 {
|
||||
write!(f, "<=")
|
||||
} else if v == RPCComparison::Gt as i32 {
|
||||
write!(f, ">")
|
||||
} else if v == RPCComparison::Gte as i32 {
|
||||
write!(f, ">=")
|
||||
} else {
|
||||
write!(f, "UNKNOWN_COMPARISON:{}", v)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use generated_types::node::Type as RPCNodeType;
|
||||
|
@ -1206,4 +1351,74 @@ mod tests {
|
|||
offset: offset.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_displayable_predicate_none() {
|
||||
let rpc_pred = None;
|
||||
|
||||
assert_eq!(
|
||||
"<NONE>",
|
||||
format!("{}", displayable_predicate(rpc_pred.as_ref()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_displayable_predicate_root_none() {
|
||||
let rpc_pred = Some(RPCPredicate { root: None });
|
||||
|
||||
assert_eq!(
|
||||
"root: <NONE>",
|
||||
format!("{}", displayable_predicate(rpc_pred.as_ref()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_displayable_predicate_two_args() {
|
||||
let (comparison, _) = make_host_comparison();
|
||||
let rpc_pred = Some(RPCPredicate {
|
||||
root: Some(comparison),
|
||||
});
|
||||
assert_eq!(
|
||||
"(FieldRef:host > 5)",
|
||||
format!("{}", displayable_predicate(rpc_pred.as_ref()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_displayable_predicate_three_args() {
|
||||
// Make one with more than two children (not sure if this ever happens)
|
||||
let node = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![
|
||||
make_tag_ref_node(b"tag1", "val1"),
|
||||
make_tag_ref_node(b"tag2", "val2"),
|
||||
make_tag_ref_node(b"tag3", "val3"),
|
||||
],
|
||||
value: Some(RPCValue::Logical(RPCLogical::And as i32)),
|
||||
};
|
||||
let rpc_pred = Some(RPCPredicate { root: Some(node) });
|
||||
assert_eq!(
|
||||
"AND((TagRef:tag1 == \"val1\"), (TagRef:tag2 == \"val2\"), (TagRef:tag3 == \"val3\"))",
|
||||
format!("{}", displayable_predicate(rpc_pred.as_ref()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_displayable_predicate_mesurement_and_field() {
|
||||
// Make one with more than two children (not sure if this ever happens)
|
||||
let node = RPCNode {
|
||||
node_type: RPCNodeType::LogicalExpression as i32,
|
||||
children: vec![
|
||||
make_tag_ref_node(&[0], "val1"),
|
||||
make_tag_ref_node(b"tag2", "val2"),
|
||||
make_tag_ref_node(&[255], "val3"),
|
||||
],
|
||||
value: Some(RPCValue::Logical(RPCLogical::And as i32)),
|
||||
};
|
||||
let rpc_pred = Some(RPCPredicate { root: Some(node) });
|
||||
assert_eq!(
|
||||
"AND((TagRef:_m[0x00] == \"val1\"), (TagRef:tag2 == \"val2\"), (TagRef:_f[0xff] == \"val3\"))",
|
||||
format!("{}", displayable_predicate(rpc_pred.as_ref()))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ use data_types::error::ErrorLogger;
|
|||
use generated_types::{node, Node};
|
||||
use query::group_by::GroupByAndAggregate;
|
||||
|
||||
use crate::server::rpc::expr::{self, AddRPCNode, SpecialTagKeys};
|
||||
use crate::server::rpc::expr::{self, AddRPCNode, Loggable, SpecialTagKeys};
|
||||
use crate::server::rpc::input::GrpcInputs;
|
||||
|
||||
use query::{
|
||||
|
@ -288,7 +288,12 @@ where
|
|||
predicate,
|
||||
} = read_filter_request;
|
||||
|
||||
info!("read_filter for database {}, range: {:?}", db_name, range);
|
||||
info!(
|
||||
"read_filter for database {}, range: {:?}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
read_filter_impl(
|
||||
tx.clone(),
|
||||
|
@ -327,8 +332,9 @@ where
|
|||
} = read_group_request;
|
||||
|
||||
info!(
|
||||
"read_group for database {}, range: {:?}, group_keys: {:?}, group: {:?}, aggregate: {:?}",
|
||||
db_name, range, group_keys, group, aggregate
|
||||
"read_group for database {}, range: {:?}, group_keys: {:?}, group: {:?}, aggregate: {:?}, predicate: {}",
|
||||
db_name, range, group_keys, group, aggregate,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
warn!("read_group implementation not yet complete: https://github.com/influxdata/influxdb_iox/issues/448");
|
||||
|
@ -383,8 +389,9 @@ where
|
|||
} = read_window_aggregate_request;
|
||||
|
||||
info!(
|
||||
"read_window_aggregate for database {}, range: {:?}, window_every: {:?}, offset: {:?}, aggregate: {:?}, window: {:?}",
|
||||
db_name, range, window_every, offset, aggregate, window
|
||||
"read_window_aggregate for database {}, range: {:?}, window_every: {:?}, offset: {:?}, aggregate: {:?}, window: {:?}, predicate: {}",
|
||||
db_name, range, window_every, offset, aggregate, window,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let aggregate_string = format!(
|
||||
|
@ -428,7 +435,12 @@ where
|
|||
predicate,
|
||||
} = tag_keys_request;
|
||||
|
||||
info!("tag_keys for database {}, range: {:?}", db_name, range);
|
||||
info!(
|
||||
"tag_keys for database {}, range: {:?}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let measurement = None;
|
||||
|
||||
|
@ -474,8 +486,9 @@ where
|
|||
// Special case a request for 'tag_key=_measurement" means to list all measurements
|
||||
let response = if tag_key.is_measurement() {
|
||||
info!(
|
||||
"tag_values with tag_key=[x00] for database {}, range: {:?} --> returning measurement_names",
|
||||
db_name, range
|
||||
"tag_values with tag_key=[x00] for database {}, range: {:?}, predicate: {} --> returning measurement_names",
|
||||
db_name, range,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
if predicate.is_some() {
|
||||
|
@ -486,8 +499,11 @@ where
|
|||
.await
|
||||
} else {
|
||||
info!(
|
||||
"tag_values for database {}, range: {:?}, tag_key: {}",
|
||||
db_name, range, tag_key
|
||||
"tag_values for database {}, range: {:?}, tag_key: {}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
tag_key,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
tag_values_impl(
|
||||
|
@ -583,8 +599,10 @@ where
|
|||
}
|
||||
|
||||
info!(
|
||||
"measurement_names for database {}, range: {:?}",
|
||||
db_name, range
|
||||
"measurement_names for database {}, range: {:?}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let response =
|
||||
|
@ -619,8 +637,11 @@ where
|
|||
} = measurement_tag_keys_request;
|
||||
|
||||
info!(
|
||||
"measurement_tag_keys for database {}, range: {:?}, measurement: {}",
|
||||
db_name, range, measurement
|
||||
"measurement_tag_keys for database {}, range: {:?}, measurement: {}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
measurement,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let measurement = Some(measurement);
|
||||
|
@ -664,8 +685,9 @@ where
|
|||
} = measurement_tag_values_request;
|
||||
|
||||
info!(
|
||||
"measurement_tag_values for database {}, range: {:?}, measurement: {}, tag_key: {}",
|
||||
db_name, range, measurement, tag_key
|
||||
"measurement_tag_values for database {}, range: {:?}, measurement: {}, tag_key: {}, predicate: {}",
|
||||
db_name, range, measurement, tag_key,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let measurement = Some(measurement);
|
||||
|
@ -709,8 +731,10 @@ where
|
|||
} = measurement_fields_request;
|
||||
|
||||
info!(
|
||||
"measurement_fields for database {}, range: {:?}",
|
||||
db_name, range
|
||||
"measurement_fields for database {}, range: {:?}, predicate: {}",
|
||||
db_name,
|
||||
range,
|
||||
predicate.loggable()
|
||||
);
|
||||
|
||||
let measurement = measurement;
|
||||
|
|
Loading…
Reference in New Issue