feat: add more logging to SQL execution via flight (#5877)

* refactor: Extract do_get implementation into its own function

* feat: add more logging to SQL execution

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-10-18 06:36:11 -04:00 committed by GitHub
parent 819dbe9e0c
commit d78591f52e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 67 additions and 40 deletions

View File

@ -16,13 +16,13 @@ use iox_query::{
exec::{ExecutionContextProvider, IOxSessionContext},
QueryCompletedToken, QueryDatabase,
};
use observability_deps::tracing::{info, warn};
use observability_deps::tracing::{debug, info, warn};
use pin_project::{pin_project, pinned_drop};
use prost::Message;
use serde::Deserialize;
use service_common::{planner::Planner, QueryDatabaseProvider};
use snafu::{ResultExt, Snafu};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use tokio::task::JoinHandle;
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
@ -174,6 +174,46 @@ where
FlightServer::new(FlightService { server })
}
impl<S> FlightService<S>
where
S: QueryDatabaseProvider,
{
async fn run_query(
&self,
span_ctx: Option<SpanContext>,
permit: InstrumentedAsyncOwnedSemaphorePermit,
sql_query: String,
database_name: String,
) -> Result<Response<TonicStream<FlightData>>, tonic::Status> {
let database = DatabaseName::new(&database_name).context(InvalidDatabaseNameSnafu)?;
let db = self
.server
.db(&database, span_ctx.child_span("get namespace"))
.await
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?;
let ctx = db.new_query_context(span_ctx);
let query_completed_token = db.record_query(&ctx, "sql", Box::new(sql_query.clone()));
let physical_plan = Planner::new(&ctx)
.sql(sql_query)
.await
.context(PlanningSnafu)?;
let output = GetStream::new(
ctx,
physical_plan,
database_name,
query_completed_token,
permit,
)
.await?;
Ok(Response::new(Box::pin(output) as TonicStream<FlightData>))
}
}
#[tonic::async_trait]
impl<S> Flight for FlightService<S>
where
@ -199,57 +239,44 @@ where
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let external_span_ctx: Option<RequestLogContext> = request.extensions().get().cloned();
let trace = external_span_ctx.format_jaeger();
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let ticket = request.into_inner();
// decode ticket
let read_info = match ReadInfo::decode_protobuf(&ticket.ticket) {
Ok(read_info) => read_info,
Err(_) => {
// try legacy json
ReadInfo::decode_json(&ticket.ticket)?
}
let read_info = ReadInfo::decode_protobuf(&ticket.ticket).or_else(|_e| {
// try legacy json
ReadInfo::decode_json(&ticket.ticket)
});
if let Err(e) = &read_info {
info!(%e, "Error decoding database and SQL query name from flight ticket");
};
let ReadInfo {
database_name,
sql_query,
} = read_info?;
let permit = self
.server
.acquire_semaphore(span_ctx.child_span("query rate limit semaphore"))
.await;
info!(
db_name=%read_info.database_name,
sql_query=%read_info.sql_query,
trace=%external_span_ctx.format_jaeger(),
"flight do_get",
);
let database =
DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?;
// Log after we acquire the permit and are about to start execution
let start = Instant::now();
info!(db_name=%database_name, %sql_query, %trace, "Running SQL via flight do_get");
let db = self
.server
.db(&database, span_ctx.child_span("get namespace"))
.await
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?;
let response = self
.run_query(span_ctx, permit, sql_query.clone(), database_name.clone())
.await;
let ctx = db.new_query_context(span_ctx);
let query_completed_token =
db.record_query(&ctx, "sql", Box::new(read_info.sql_query.clone()));
let physical_plan = Planner::new(&ctx)
.sql(&read_info.sql_query)
.await
.context(PlanningSnafu)?;
let output = GetStream::new(
ctx,
physical_plan,
read_info.database_name,
query_completed_token,
permit,
)
.await?;
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
if let Err(e) = &response {
info!(db_name=%database_name, %sql_query, %trace, %e, "Error running SQL query");
} else {
let elapsed = Instant::now() - start;
debug!(db_name=%database_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully");
}
response
}
async fn handshake(