From d78591f52ea236c151a681efc9773133e56f75bd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Oct 2022 06:36:11 -0400 Subject: [PATCH] feat: add more logging to SQL execution via flight (#5877) * refactor: Extract do_get implementation into its own function * feat: add more logging to SQL execution Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- service_grpc_flight/src/lib.rs | 107 +++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 40 deletions(-) diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 8703ccdc0b..6f1bc41a82 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -16,13 +16,13 @@ use iox_query::{ exec::{ExecutionContextProvider, IOxSessionContext}, QueryCompletedToken, QueryDatabase, }; -use observability_deps::tracing::{info, warn}; +use observability_deps::tracing::{debug, info, warn}; use pin_project::{pin_project, pinned_drop}; use prost::Message; use serde::Deserialize; use service_common::{planner::Planner, QueryDatabaseProvider}; use snafu::{ResultExt, Snafu}; -use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll}; +use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; use tokio::task::JoinHandle; use tonic::{Request, Response, Streaming}; use trace::{ctx::SpanContext, span::SpanExt}; @@ -174,6 +174,46 @@ where FlightServer::new(FlightService { server }) } +impl FlightService +where + S: QueryDatabaseProvider, +{ + async fn run_query( + &self, + span_ctx: Option, + permit: InstrumentedAsyncOwnedSemaphorePermit, + sql_query: String, + database_name: String, + ) -> Result>, tonic::Status> { + let database = DatabaseName::new(&database_name).context(InvalidDatabaseNameSnafu)?; + + let db = self + .server + .db(&database, span_ctx.child_span("get namespace")) + .await + .ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?; + + let ctx = db.new_query_context(span_ctx); + let query_completed_token = db.record_query(&ctx, "sql", Box::new(sql_query.clone())); + + let physical_plan = Planner::new(&ctx) + .sql(sql_query) + .await + .context(PlanningSnafu)?; + + let output = GetStream::new( + ctx, + physical_plan, + database_name, + query_completed_token, + permit, + ) + .await?; + + Ok(Response::new(Box::pin(output) as TonicStream)) + } +} + #[tonic::async_trait] impl Flight for FlightService where @@ -199,57 +239,44 @@ where request: Request, ) -> Result, tonic::Status> { let external_span_ctx: Option = request.extensions().get().cloned(); + let trace = external_span_ctx.format_jaeger(); let span_ctx: Option = request.extensions().get().cloned(); let ticket = request.into_inner(); // decode ticket - let read_info = match ReadInfo::decode_protobuf(&ticket.ticket) { - Ok(read_info) => read_info, - Err(_) => { - // try legacy json - ReadInfo::decode_json(&ticket.ticket)? - } + let read_info = ReadInfo::decode_protobuf(&ticket.ticket).or_else(|_e| { + // try legacy json + ReadInfo::decode_json(&ticket.ticket) + }); + + if let Err(e) = &read_info { + info!(%e, "Error decoding database and SQL query name from flight ticket"); }; + let ReadInfo { + database_name, + sql_query, + } = read_info?; let permit = self .server .acquire_semaphore(span_ctx.child_span("query rate limit semaphore")) .await; - info!( - db_name=%read_info.database_name, - sql_query=%read_info.sql_query, - trace=%external_span_ctx.format_jaeger(), - "flight do_get", - ); - let database = - DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?; + // Log after we acquire the permit and are about to start execution + let start = Instant::now(); + info!(db_name=%database_name, %sql_query, %trace, "Running SQL via flight do_get"); - let db = self - .server - .db(&database, span_ctx.child_span("get namespace")) - .await - .ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?; + let response = self + .run_query(span_ctx, permit, sql_query.clone(), database_name.clone()) + .await; - let ctx = db.new_query_context(span_ctx); - let query_completed_token = - db.record_query(&ctx, "sql", Box::new(read_info.sql_query.clone())); - - let physical_plan = Planner::new(&ctx) - .sql(&read_info.sql_query) - .await - .context(PlanningSnafu)?; - - let output = GetStream::new( - ctx, - physical_plan, - read_info.database_name, - query_completed_token, - permit, - ) - .await?; - - Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + if let Err(e) = &response { + info!(db_name=%database_name, %sql_query, %trace, %e, "Error running SQL query"); + } else { + let elapsed = Instant::now() - start; + debug!(db_name=%database_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully"); + } + response } async fn handshake(