Merge branch 'main' into dom/gossip-basic

pull/24376/head
Dom 2023-07-12 15:53:49 +01:00 committed by GitHub
commit 1fce6d7b86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 8 deletions

View File

@ -19,7 +19,7 @@ use tokio::sync::{Semaphore, TryAcquireError};
use tonic::{Request, Response, Streaming};
use trace::{
ctx::SpanContext,
span::{Span, SpanExt},
span::{Span, SpanExt, SpanRecorder},
};
mod instrumentation;
@ -175,7 +175,7 @@ where
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let span = span_ctx.child_span("ingester query");
let mut query_recorder = SpanRecorder::new(span_ctx.child_span("ingester query"));
// Acquire and hold a permit for the duration of this request, or return
// an error if the existing requests have already exhausted the
@ -211,16 +211,23 @@ where
let response = match self
.query_handler
.query_exec(namespace_id, table_id, projection, span.clone(), predicate)
.query_exec(
namespace_id,
table_id,
projection,
query_recorder.child_span("query exec"),
predicate,
)
.await
{
Ok(v) => v,
Err(e @ (QueryError::TableNotFound(_, _) | QueryError::NamespaceNotFound(_))) => {
debug!(
error=%e,
%namespace_id,
%table_id,
"query error, no buffered data found");
error=%e,
%namespace_id,
%table_id,
"no buffered data found for query"
);
return Err(e)?;
}
@ -229,11 +236,12 @@ where
let output = encode_response(
response,
self.ingester_id,
span,
query_recorder.child_span("serialise response"),
Arc::clone(&self.query_request_frame_encoding_duration),
)
.map_err(tonic::Status::from);
query_recorder.ok("query exec complete - streaming results");
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}