Merge pull request #3460 from influxdata/er/feat/query_duration

feat: add query completion duration to queries system table
pull/24376/head
kodiakhq[bot] 2022-01-14 10:49:36 +00:00 committed by GitHub
commit e1d7b71d6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 285 additions and 77 deletions

View File

@ -1,6 +1,6 @@
use arrow::array::{ArrayRef, TimestampNanosecondArray};
use arrow::array::{ArrayRef, DurationNanosecondArray, TimestampNanosecondArray};
use arrow::datatypes::{DataType, TimeUnit};
use arrow::error::Result;
use arrow::error::{ArrowError, Result};
use arrow::record_batch::RecordBatch;
use comfy_table::{Cell, Table};
@ -41,6 +41,21 @@ fn array_value_to_string(column: &ArrayRef, row: usize) -> Result<String> {
let use_z = true;
Ok(ts.to_rfc3339_opts(SecondsFormat::AutoSi, use_z))
}
// TODO(edd): see https://github.com/apache/arrow-rs/issues/1168
DataType::Duration(TimeUnit::Nanosecond) if column.is_valid(row) => {
let dur_column = column
.as_any()
.downcast_ref::<DurationNanosecondArray>()
.unwrap();
let duration = std::time::Duration::from_nanos(
dur_column
.value(row)
.try_into()
.map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?,
);
Ok(format!("{:?}", duration))
}
_ => {
// fallback to arrow's default printing for other types
arrow::util::display::array_value_to_string(column, row)

View File

@ -22,7 +22,7 @@ use predicate::predicate::{Predicate, PredicateBuilder};
use query::{
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
QueryChunk, QueryChunkMeta, QueryDatabase, DEFAULT_SCHEMA,
QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, DEFAULT_SCHEMA,
};
use schema::Schema;
use std::{any::Any, sync::Arc};
@ -234,8 +234,15 @@ impl QueryDatabase for QueryCatalogAccess {
.map(|table| Arc::clone(&table.schema().read()))
}
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
self.query_log.push(query_type, query_text)
fn record_query(
&self,
query_type: impl Into<String>,
query_text: impl Into<String>,
) -> QueryCompletedToken<'_> {
// When the query token is dropped the query entry's completion time
// will be set.
let entry = self.query_log.push(query_type, query_text);
QueryCompletedToken::new(move || self.query_log.set_completed(entry))
}
}

View File

@ -45,7 +45,7 @@ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::Persisten
use predicate::predicate::Predicate;
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
QueryDatabase,
QueryCompletedToken, QueryDatabase,
};
use rand_distr::{Distribution, Poisson};
use schema::selection::Selection;
@ -1224,7 +1224,11 @@ impl QueryDatabase for Db {
self.catalog_access.table_schema(table_name)
}
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
fn record_query(
&self,
query_type: impl Into<String>,
query_text: impl Into<String>,
) -> QueryCompletedToken<'_> {
self.catalog_access.record_query(query_type, query_text)
}
}

View File

@ -1,10 +1,17 @@
//! Ring buffer of queries that have been run with some brief information
use std::{collections::VecDeque, sync::Arc};
use std::{
collections::VecDeque,
sync::{atomic, Arc},
time::Duration,
};
use parking_lot::Mutex;
use time::{Time, TimeProvider};
// The query duration used for queries still running.
const UNCOMPLETED_DURATION: i64 = -1;
/// Information about a single query that was executed
#[derive(Debug)]
pub struct QueryLogEntry {
@ -16,6 +23,10 @@ pub struct QueryLogEntry {
/// Time at which the query was run
pub issue_time: Time,
/// Duration in nanoseconds query took to complete (-1 is a sentinel value
/// indicating query not completed).
query_completed_duration: atomic::AtomicI64,
}
impl QueryLogEntry {
@ -25,11 +36,28 @@ impl QueryLogEntry {
query_type,
query_text,
issue_time,
query_completed_duration: UNCOMPLETED_DURATION.into(),
}
}
pub fn query_completed_duration(&self) -> Option<Duration> {
match self
.query_completed_duration
.load(atomic::Ordering::Relaxed)
{
UNCOMPLETED_DURATION => None,
d => Some(Duration::from_nanos(d as u64)),
}
}
pub fn set_completed(&self, now: Time) {
let dur = now - self.issue_time;
self.query_completed_duration
.store(dur.as_nanos() as i64, atomic::Ordering::Relaxed);
}
}
/// Stores a fixed number `QueryExcutions` -- handles locking
/// Stores a fixed number `QueryExecutions` -- handles locking
/// internally so can be shared across multiple
#[derive(Debug)]
pub struct QueryLog {
@ -49,17 +77,21 @@ impl QueryLog {
}
}
pub fn push(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
if self.max_size == 0 {
return;
}
pub fn push(
&self,
query_type: impl Into<String>,
query_text: impl Into<String>,
) -> Arc<QueryLogEntry> {
let entry = Arc::new(QueryLogEntry::new(
query_type.into(),
query_text.into(),
self.time_provider.now(),
));
if self.max_size == 0 {
return entry;
}
let mut log = self.log.lock();
// enforce limit
@ -67,11 +99,52 @@ impl QueryLog {
log.pop_front();
}
log.push_back(entry);
log.push_back(Arc::clone(&entry));
entry
}
pub fn entries(&self) -> VecDeque<Arc<QueryLogEntry>> {
let log = self.log.lock();
log.clone()
}
/// Marks the provided query entry as completed using the current time.
pub fn set_completed(&self, entry: Arc<QueryLogEntry>) {
entry.set_completed(self.time_provider.now())
}
}
#[cfg(test)]
mod test_super {
use time::MockProvider;
use super::*;
#[test]
fn test_query_log_entry_completed() {
let time_provider = MockProvider::new(Time::from_timestamp_millis(100));
let entry = Arc::new(QueryLogEntry::new(
"sql".into(),
"SELECT 1".into(),
time_provider.now(),
));
// query has not completed
assert_eq!(entry.query_completed_duration(), None);
// when the query completes at the same time it's issued
entry.set_completed(time_provider.now());
assert_eq!(
entry.query_completed_duration(),
Some(Duration::from_millis(0))
);
// when the query completes some time in the future.
time_provider.set(Time::from_timestamp_millis(300));
entry.set_completed(time_provider.now());
assert_eq!(
entry.query_completed_duration(),
Some(Duration::from_millis(200))
);
}
}

View File

@ -3,7 +3,7 @@ use crate::{
system_tables::IoxSystemTable,
};
use arrow::{
array::{StringArray, TimestampNanosecondArray},
array::{DurationNanosecondArray, StringArray, TimestampNanosecondArray},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
error::Result,
record_batch::RecordBatch,
@ -47,6 +47,11 @@ fn queries_schema() -> SchemaRef {
),
Field::new("query_type", DataType::Utf8, false),
Field::new("query_text", DataType::Utf8, false),
Field::new(
"completed_duration",
DataType::Duration(TimeUnit::Nanosecond),
false,
),
]))
}
@ -70,12 +75,18 @@ fn from_query_log_entries(
.map(|e| Some(&e.query_text))
.collect::<StringArray>();
let query_runtime = entries
.iter()
.map(|e| e.query_completed_duration().map(|d| d.as_nanos() as i64))
.collect::<DurationNanosecondArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(issue_time),
Arc::new(query_type),
Arc::new(query_text),
Arc::new(query_runtime),
],
)
}
@ -94,19 +105,36 @@ mod tests {
query_log.push("sql", "select * from foo");
time_provider.inc(std::time::Duration::from_secs(24 * 60 * 60));
query_log.push("sql", "select * from bar");
query_log.push("read_filter", "json goop");
let read_filter_entry = query_log.push("read_filter", "json goop");
let expected = vec![
"+----------------------+-------------+-------------------+",
"| issue_time | query_type | query_text |",
"+----------------------+-------------+-------------------+",
"| 1996-12-19T16:39:57Z | sql | select * from foo |",
"| 1996-12-20T16:39:57Z | sql | select * from bar |",
"| 1996-12-20T16:39:57Z | read_filter | json goop |",
"+----------------------+-------------+-------------------+",
"+----------------------+-------------+-------------------+--------------------+",
"| issue_time | query_type | query_text | completed_duration |",
"+----------------------+-------------+-------------------+--------------------+",
"| 1996-12-19T16:39:57Z | sql | select * from foo | |",
"| 1996-12-20T16:39:57Z | sql | select * from bar | |",
"| 1996-12-20T16:39:57Z | read_filter | json goop | |",
"+----------------------+-------------+-------------------+--------------------+",
];
let schema = queries_schema();
let batch = from_query_log_entries(schema.clone(), query_log.entries()).unwrap();
assert_batches_eq!(&expected, &[batch]);
// mark one of the queries completed after 4s
let now = Time::from_rfc3339("1996-12-20T16:40:01+00:00").unwrap();
read_filter_entry.set_completed(now);
let expected = vec![
"+----------------------+-------------+-------------------+--------------------+",
"| issue_time | query_type | query_text | completed_duration |",
"+----------------------+-------------+-------------------+--------------------+",
"| 1996-12-19T16:39:57Z | sql | select * from foo | |",
"| 1996-12-20T16:39:57Z | sql | select * from bar | |",
"| 1996-12-20T16:39:57Z | read_filter | json goop | 4s |",
"+----------------------+-------------+-------------------+--------------------+",
];
let batch = from_query_log_entries(schema, query_log.entries()).unwrap();
assert_batches_eq!(&expected, &[batch]);
}

View File

@ -252,7 +252,7 @@ async fn query(
let db = server.db(&db_name)?;
db.record_query("sql", &q);
let _query_completed_token = db.record_query("sql", &q);
let ctx = db.new_query_context(req.extensions().get().cloned());
let physical_plan = Planner::new(&ctx).sql(&q).await.context(PlanningSnafu)?;

View File

@ -172,7 +172,7 @@ impl Flight for FlightService {
.db(&database)
.map_err(default_server_error_handler)?;
db.record_query("sql", &read_info.sql_query);
let _query_completed_token = db.record_query("sql", &read_info.sql_query);
let ctx = db.new_query_context(span_ctx);

View File

@ -245,9 +245,9 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("read_filter", defer_json(&req));
let _query_completed_token = db.record_query("read_filter", defer_json(&req));
let results = read_filter_impl(db, db_name, req, span_ctx)
let results = read_filter_impl(Arc::clone(&db), db_name, req, span_ctx)
.await?
.into_iter()
.map(Ok)
@ -270,7 +270,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("read_group", defer_json(&req));
let _query_completed_token = db.record_query("read_group", defer_json(&req));
let ReadGroupRequest {
read_source: _read_source,
@ -295,12 +295,19 @@ where
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
.context(ConvertingReadGroupAggregateSnafu { aggregate_string })?;
let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(
Arc::clone(&db),
db_name,
range,
predicate,
gby_agg,
span_ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -320,7 +327,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("read_window_aggregate", defer_json(&req));
let _query_completed_token = db.record_query("read_window_aggregate", defer_json(&req));
let ReadWindowAggregateRequest {
read_source: _read_source,
@ -342,12 +349,19 @@ where
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
.context(ConvertingWindowAggregateSnafu { aggregate_string })?;
let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(
Arc::clone(&db),
db_name,
range,
predicate,
gby_agg,
span_ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -368,7 +382,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("tag_keys", defer_json(&req));
let _query_completed_token = db.record_query("tag_keys", defer_json(&req));
let TagKeysRequest {
tags_source: _tag_source,
@ -380,9 +394,16 @@ where
let measurement = None;
let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map_err(|e| e.to_status());
let response = tag_keys_impl(
Arc::clone(&db),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map_err(|e| e.to_status());
tx.send(response)
.await
@ -407,7 +428,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("tag_values", defer_json(&req));
let _query_completed_token = db.record_query("tag_values", defer_json(&req));
let TagValuesRequest {
tags_source: _tag_source,
@ -430,11 +451,13 @@ where
.to_status());
}
measurement_name_impl(db, db_name, range, predicate, span_ctx).await
measurement_name_impl(Arc::clone(&db), db_name, range, predicate, span_ctx).await
} else if tag_key.is_field() {
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)");
let fieldlist = field_names_impl(db, db_name, None, range, predicate, span_ctx).await?;
let fieldlist =
field_names_impl(Arc::clone(&db), db_name, None, range, predicate, span_ctx)
.await?;
// Pick out the field names into a Vec<Vec<u8>>for return
let values = fieldlist
@ -450,7 +473,7 @@ where
info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",);
tag_values_impl(
db,
Arc::clone(&db),
db_name,
tag_key,
measurement,
@ -486,20 +509,24 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query(
let _query_completed_token = db.record_query(
"tag_values_grouped_by_measurement_and_tag_key",
defer_json(&req),
);
info!(%db_name, ?req.measurement_patterns, ?req.tag_key_predicate, predicate=%req.condition.loggable(), "tag_values_grouped_by_measurement_and_tag_key");
let results =
tag_values_grouped_by_measurement_and_tag_key_impl(db, db_name, req, span_ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = tag_values_grouped_by_measurement_and_tag_key_impl(
Arc::clone(&db),
db_name,
req,
span_ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
Ok(tonic::Response::new(futures::stream::iter(results)))
}
@ -565,7 +592,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("measurement_names", defer_json(&req));
let _query_completed_token = db.record_query("measurement_names", defer_json(&req));
let MeasurementNamesRequest {
source: _source,
@ -575,7 +602,7 @@ where
info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_names");
let response = measurement_name_impl(db, db_name, range, predicate, span_ctx)
let response = measurement_name_impl(Arc::clone(&db), db_name, range, predicate, span_ctx)
.await
.map_err(|e| e.to_status());
@ -602,7 +629,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("measurement_tag_keys", defer_json(&req));
let _query_completed_token = db.record_query("measurement_tag_keys", defer_json(&req));
let MeasurementTagKeysRequest {
source: _source,
@ -615,9 +642,16 @@ where
let measurement = Some(measurement);
let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map_err(|e| e.to_status());
let response = tag_keys_impl(
Arc::clone(&db),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map_err(|e| e.to_status());
tx.send(response)
.await
@ -642,7 +676,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("measurement_tag_values", defer_json(&req));
let _query_completed_token = db.record_query("measurement_tag_values", defer_json(&req));
let MeasurementTagValuesRequest {
source: _source,
@ -657,7 +691,7 @@ where
let measurement = Some(measurement);
let response = tag_values_impl(
db,
Arc::clone(&db),
db_name,
tag_key,
measurement,
@ -691,7 +725,7 @@ where
.db_store
.db(&db_name)
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
db.record_query("measurement_fields", defer_json(&req));
let _query_completed_token = db.record_query("measurement_fields", defer_json(&req));
let MeasurementFieldsRequest {
source: _source,
@ -704,14 +738,21 @@ where
let measurement = Some(measurement);
let response = field_names_impl(db, db_name, measurement, range, predicate, span_ctx)
.await
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldListSnafu)
.map_err(|e| e.to_status())
})
.map_err(|e| e.to_status())?;
let response = field_names_impl(
Arc::clone(&db),
db_name,
measurement,
range,
predicate,
span_ctx,
)
.await
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldListSnafu)
.map_err(|e| e.to_status())
})
.map_err(|e| e.to_status())?;
tx.send(response)
.await

View File

@ -77,6 +77,35 @@ pub trait QueryChunkMeta: Sized {
}
}
/// A `QueryCompletedToken` is returned by `record_query` implementations of
/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing)
/// on query completion.
pub struct QueryCompletedToken<'a> {
f: Option<Box<dyn FnOnce() + Send + 'a>>,
}
impl<'a> Debug for QueryCompletedToken<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryCompletedToken").finish()
}
}
impl<'a> QueryCompletedToken<'a> {
pub fn new(f: impl FnOnce() + Send + 'a) -> Self {
Self {
f: Some(Box::new(f)),
}
}
}
impl<'a> Drop for QueryCompletedToken<'a> {
fn drop(&mut self) {
if let Some(f) = self.f.take() {
(f)()
}
}
}
/// A `Database` is the main trait implemented by the IOx subsystems
/// that store actual data.
///
@ -100,7 +129,11 @@ pub trait QueryDatabase: Debug + Send + Sync {
fn chunk_summaries(&self) -> Vec<ChunkSummary>;
/// Record that particular type of query was run / planned
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>);
fn record_query(
&self,
query_type: impl Into<String>,
query_text: impl Into<String>,
) -> QueryCompletedToken<'_>;
}
/// Collection of data that shares the same partition key

View File

@ -4,6 +4,7 @@
//! AKA it is a Mock
use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext};
use crate::QueryCompletedToken;
use crate::{
exec::stringset::{StringSet, StringSetRef},
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
@ -145,7 +146,13 @@ impl QueryDatabase for TestDatabase {
found_one.then(|| Arc::new(merger.build()))
}
fn record_query(&self, _query_type: impl Into<String>, _query_text: impl Into<String>) {}
fn record_query(
&self,
_query_type: impl Into<String>,
_query_text: impl Into<String>,
) -> QueryCompletedToken<'_> {
QueryCompletedToken::new(|| {})
}
}
impl ExecutionContextProvider for TestDatabase {