feat: add tracing for tag_values
parent
6a6fbf73ae
commit
787a848bf5
|
@ -510,9 +510,14 @@ impl QueryChunk for DbChunk {
|
|||
|
||||
fn column_values(
|
||||
&self,
|
||||
mut ctx: IOxExecutionContext,
|
||||
column_name: &str,
|
||||
predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
ctx.set_metadata("storage", self.state.state_name());
|
||||
ctx.set_metadata("column_name", column_name.to_string());
|
||||
ctx.set_metadata("predicate", format!("{}", &predicate));
|
||||
|
||||
match &self.state {
|
||||
State::MutableBuffer { .. } => {
|
||||
// There is no advantage to manually implementing this
|
||||
|
@ -527,6 +532,7 @@ impl QueryChunk for DbChunk {
|
|||
return Ok(None);
|
||||
}
|
||||
};
|
||||
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
|
||||
|
||||
self.access_recorder.record_access();
|
||||
let mut values = chunk
|
||||
|
@ -553,6 +559,7 @@ impl QueryChunk for DbChunk {
|
|||
column_name
|
||||
),
|
||||
})?;
|
||||
ctx.set_metadata("output_values", values.len() as i64);
|
||||
|
||||
Ok(Some(values))
|
||||
}
|
||||
|
@ -638,7 +645,7 @@ mod tests {
|
|||
|
||||
let t3 = time.inc(Duration::from_secs(1));
|
||||
let column_values = snapshot
|
||||
.column_values("tag", &Default::default())
|
||||
.column_values(IOxExecutionContext::default(), "tag", &Default::default())
|
||||
.unwrap()
|
||||
.is_some();
|
||||
let m5 = chunk.access_recorder().get_metrics();
|
||||
|
|
|
@ -99,7 +99,8 @@ impl Planner {
|
|||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let tag_name = tag_name.into();
|
||||
let planner = InfluxRpcPlanner::default();
|
||||
let planner =
|
||||
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
|
||||
|
||||
self.ctx
|
||||
.run(async move {
|
||||
|
|
|
@ -205,6 +205,7 @@ impl QueryChunk for QueryableBatch {
|
|||
/// The requested columns must all have String type.
|
||||
fn column_values(
|
||||
&self,
|
||||
_ctx: IOxExecutionContext,
|
||||
_column_name: &str,
|
||||
_predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
|
|
|
@ -444,7 +444,13 @@ impl InfluxRpcPlanner {
|
|||
.table_schema(table_name)
|
||||
.context(TableRemovedSnafu { table_name })?;
|
||||
|
||||
let plan = self.tag_keys_plan(table_name, schema, predicate, chunks)?;
|
||||
let plan = self.tag_keys_plan(
|
||||
ctx.child_ctx("tag_keys_plan"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
if let Some(plan) = plan {
|
||||
builder = builder.append_other(plan)
|
||||
|
@ -472,6 +478,7 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let ctx = self.ctx.child_ctx("tag_values planning");
|
||||
debug!(?rpc_predicate, tag_name, "planning tag_values");
|
||||
|
||||
// The basic algorithm is:
|
||||
|
@ -541,7 +548,11 @@ impl InfluxRpcPlanner {
|
|||
if !do_full_plan {
|
||||
// try and get the list of values directly from metadata
|
||||
let maybe_values = chunk
|
||||
.column_values(tag_name, predicate)
|
||||
.column_values(
|
||||
self.ctx.child_ctx("tag_values execution"),
|
||||
tag_name,
|
||||
predicate,
|
||||
)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnValuesSnafu)?;
|
||||
|
||||
|
@ -591,8 +602,13 @@ impl InfluxRpcPlanner {
|
|||
.table_schema(table_name)
|
||||
.context(TableRemovedSnafu { table_name })?;
|
||||
|
||||
let scan_and_filter =
|
||||
self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
// if we have any data to scan, make a plan!
|
||||
if let Some(TableScanAndFilter {
|
||||
|
@ -872,6 +888,7 @@ impl InfluxRpcPlanner {
|
|||
/// ```
|
||||
fn tag_keys_plan<C>(
|
||||
&self,
|
||||
ctx: IOxExecutionContext,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
|
@ -880,7 +897,13 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -948,7 +971,13 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
self.ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
schema,
|
||||
|
@ -1007,7 +1036,13 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
debug!(%table_name, "Creating table_name full plan");
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
self.ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
schema,
|
||||
|
@ -1054,7 +1089,13 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.as_ref();
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
self.ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1163,7 +1204,13 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
self.ctx.child_ctx("scan_and_filter planning"),
|
||||
table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1277,7 +1324,13 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(
|
||||
self.ctx.child_ctx("scan_and_filter planning"),
|
||||
&table_name,
|
||||
schema,
|
||||
predicate,
|
||||
chunks,
|
||||
)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1346,6 +1399,7 @@ impl InfluxRpcPlanner {
|
|||
/// ```
|
||||
fn scan_and_filter<C>(
|
||||
&self,
|
||||
ctx: IOxExecutionContext,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
|
@ -1354,8 +1408,6 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let ctx = self.ctx.child_ctx("scan_and_filter");
|
||||
|
||||
// Scan all columns to begin with (DataFusion projection
|
||||
// push-down optimization will prune out unneeded columns later)
|
||||
let projection = None;
|
||||
|
|
|
@ -203,6 +203,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
|
|||
/// The requested columns must all have String type.
|
||||
fn column_values(
|
||||
&self,
|
||||
ctx: IOxExecutionContext,
|
||||
column_name: &str,
|
||||
predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, Self::Error>;
|
||||
|
|
|
@ -952,6 +952,7 @@ impl QueryChunk for TestChunk {
|
|||
|
||||
fn column_values(
|
||||
&self,
|
||||
_ctx: IOxExecutionContext,
|
||||
_column_name: &str,
|
||||
_predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
|
|
Loading…
Reference in New Issue