feat: add tracing support tag_keys
parent
998e205c2c
commit
6a6fbf73ae
|
@ -457,9 +457,14 @@ impl QueryChunk for DbChunk {
|
|||
|
||||
fn column_names(
|
||||
&self,
|
||||
mut ctx: IOxExecutionContext,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
ctx.set_metadata("storage", self.state.state_name());
|
||||
ctx.set_metadata("projection", format!("{}", columns));
|
||||
ctx.set_metadata("predicate", format!("{}", &predicate));
|
||||
|
||||
match &self.state {
|
||||
State::MutableBuffer { chunk, .. } => {
|
||||
if !predicate.is_empty() {
|
||||
|
@ -477,17 +482,20 @@ impl QueryChunk for DbChunk {
|
|||
return Ok(None);
|
||||
}
|
||||
};
|
||||
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
|
||||
|
||||
self.access_recorder.record_access();
|
||||
// TODO(edd): wire up delete predicates to be pushed down to
|
||||
// the read buffer.
|
||||
Ok(Some(
|
||||
chunk
|
||||
.column_names(rb_predicate, vec![], columns, BTreeSet::new())
|
||||
.context(ReadBufferChunkSnafu {
|
||||
chunk_id: self.id(),
|
||||
})?,
|
||||
))
|
||||
|
||||
let names = chunk
|
||||
.column_names(rb_predicate, vec![], columns, BTreeSet::new())
|
||||
.context(ReadBufferChunkSnafu {
|
||||
chunk_id: self.id(),
|
||||
})?;
|
||||
ctx.set_metadata("output_values", names.len() as i64);
|
||||
|
||||
Ok(Some(names))
|
||||
}
|
||||
State::ParquetFile { chunk, .. } => {
|
||||
if !predicate.is_empty() {
|
||||
|
@ -619,7 +627,11 @@ mod tests {
|
|||
|
||||
let t2 = time.inc(Duration::from_secs(1));
|
||||
let column_names = snapshot
|
||||
.column_names(&Default::default(), Selection::All)
|
||||
.column_names(
|
||||
IOxExecutionContext::default(),
|
||||
&Default::default(),
|
||||
Selection::All,
|
||||
)
|
||||
.unwrap()
|
||||
.is_some();
|
||||
let m4 = chunk.access_recorder().get_metrics();
|
||||
|
|
|
@ -75,7 +75,8 @@ impl Planner {
|
|||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let planner = InfluxRpcPlanner::default();
|
||||
let planner =
|
||||
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
|
||||
|
||||
self.ctx
|
||||
.run(async move {
|
||||
|
|
|
@ -191,6 +191,7 @@ impl QueryChunk for QueryableBatch {
|
|||
/// this Chunk. Returns `None` otherwise
|
||||
fn column_names(
|
||||
&self,
|
||||
_ctx: IOxExecutionContext,
|
||||
_predicate: &Predicate,
|
||||
_columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
|
|
|
@ -333,6 +333,7 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let ctx = self.ctx.child_ctx("tag_keys planning");
|
||||
debug!(?rpc_predicate, "planning tag_keys");
|
||||
|
||||
// Special case predicates that span the entire valid timestamp range
|
||||
|
@ -385,7 +386,11 @@ impl InfluxRpcPlanner {
|
|||
if !do_full_plan {
|
||||
// filter the columns further from the predicate
|
||||
let maybe_names = chunk
|
||||
.column_names(predicate, selection)
|
||||
.column_names(
|
||||
ctx.child_ctx("column_names execution"),
|
||||
predicate,
|
||||
selection,
|
||||
)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(FindingColumnNamesSnafu)?;
|
||||
|
||||
|
|
|
@ -191,6 +191,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
|
|||
/// this Chunk. Returns `None` otherwise
|
||||
fn column_names(
|
||||
&self,
|
||||
ctx: IOxExecutionContext,
|
||||
predicate: &Predicate,
|
||||
columns: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error>;
|
||||
|
|
|
@ -961,6 +961,7 @@ impl QueryChunk for TestChunk {
|
|||
|
||||
fn column_names(
|
||||
&self,
|
||||
_ctx: IOxExecutionContext,
|
||||
predicate: &Predicate,
|
||||
selection: Selection<'_>,
|
||||
) -> Result<Option<StringSet>, Self::Error> {
|
||||
|
|
Loading…
Reference in New Issue