From 211bee5886eb2ffa41e84794c7c345e4610d2d06 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 16:29:14 +0000 Subject: [PATCH 1/6] feat: add support for setting complete time --- db/src/query_log.rs | 79 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/db/src/query_log.rs b/db/src/query_log.rs index d20365ed5b..22a0378611 100644 --- a/db/src/query_log.rs +++ b/db/src/query_log.rs @@ -1,6 +1,10 @@ //! Ring buffer of queries that have been run with some brief information -use std::{collections::VecDeque, sync::Arc}; +use std::{ + collections::VecDeque, + sync::{atomic, Arc}, + time::Duration, +}; use parking_lot::Mutex; use time::{Time, TimeProvider}; @@ -16,6 +20,10 @@ pub struct QueryLogEntry { /// Time at which the query was run pub issue_time: Time, + + /// Duration in nanoseconds query took to complete (-1 is a sentinel value + /// indicating query not completed). + query_completed_duration: atomic::AtomicI64, } impl QueryLogEntry { @@ -25,8 +33,25 @@ impl QueryLogEntry { query_type, query_text, issue_time, + query_completed_duration: (-1_i64).into(), } } + + pub fn query_completed_duration(&self) -> Option { + match self + .query_completed_duration + .load(atomic::Ordering::Relaxed) + { + -1 => None, + d => Some(Duration::from_nanos(d as u64)), + } + } + + fn set_completed(&self, now: Time) { + let dur = now - self.issue_time; + self.query_completed_duration + .store(dur.as_nanos() as i64, atomic::Ordering::Relaxed); + } } /// Stores a fixed number `QueryExcutions` -- handles locking @@ -49,17 +74,21 @@ impl QueryLog { } } - pub fn push(&self, query_type: impl Into, query_text: impl Into) { - if self.max_size == 0 { - return; - } - + pub fn push( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> Arc { let entry = Arc::new(QueryLogEntry::new( query_type.into(), query_text.into(), self.time_provider.now(), )); + if self.max_size == 0 { + return entry; + } + let mut log = self.log.lock(); // enforce limit @@ -67,7 +96,8 @@ impl QueryLog { log.pop_front(); } - log.push_back(entry); + log.push_back(Arc::clone(&entry)); + entry } pub fn entries(&self) -> VecDeque> { @@ -75,3 +105,38 @@ impl QueryLog { log.clone() } } + +#[cfg(test)] +mod test_super { + use time::MockProvider; + + use super::*; + + #[test] + fn test_query_log_entry_completed() { + let time_provider = MockProvider::new(Time::from_timestamp_millis(100)); + + let entry = Arc::new(QueryLogEntry::new( + "sql".into(), + "SELECT 1".into(), + time_provider.now(), + )); + // query has not completed + assert_eq!(entry.query_completed_duration(), None); + + // when the query completes at the same time it's issued + entry.set_completed(time_provider.now()); + assert_eq!( + entry.query_completed_duration(), + Some(Duration::from_millis(0)) + ); + + // when the query completes some time in the future. + time_provider.set(Time::from_timestamp_millis(200)); + entry.set_completed(time_provider.now()); + assert_eq!( + entry.query_completed_duration(), + Some(Duration::from_millis(100)) + ); + } +} From 6a842fc105aea6302e7f4e0c053691975f48c5aa Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 17:16:06 +0000 Subject: [PATCH 2/6] feat: add completed duration to system table --- arrow_util/src/display.rs | 19 ++++++++++++-- db/src/query_log.rs | 6 ++--- db/src/system_tables/queries.rs | 46 ++++++++++++++++++++++++++------- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/arrow_util/src/display.rs b/arrow_util/src/display.rs index 98e40090f2..b8bd54606a 100644 --- a/arrow_util/src/display.rs +++ b/arrow_util/src/display.rs @@ -1,6 +1,6 @@ -use arrow::array::{ArrayRef, TimestampNanosecondArray}; +use arrow::array::{ArrayRef, DurationMillisecondArray, TimestampNanosecondArray}; use arrow::datatypes::{DataType, TimeUnit}; -use arrow::error::Result; +use arrow::error::{ArrowError, Result}; use arrow::record_batch::RecordBatch; use comfy_table::{Cell, Table}; @@ -41,6 +41,21 @@ fn array_value_to_string(column: &ArrayRef, row: usize) -> Result { let use_z = true; Ok(ts.to_rfc3339_opts(SecondsFormat::AutoSi, use_z)) } + // TODO(edd): see https://github.com/apache/arrow-rs/issues/1168 + DataType::Duration(TimeUnit::Millisecond) if column.is_valid(row) => { + let dur_column = column + .as_any() + .downcast_ref::() + .unwrap(); + + let duration = std::time::Duration::from_millis( + dur_column + .value(row) + .try_into() + .map_err(|e| ArrowError::InvalidArgumentError(format!("{:?}", e)))?, + ); + Ok(format!("{:?}", duration)) + } _ => { // fallback to arrow's default printing for other types arrow::util::display::array_value_to_string(column, row) diff --git a/db/src/query_log.rs b/db/src/query_log.rs index 22a0378611..8b58197a86 100644 --- a/db/src/query_log.rs +++ b/db/src/query_log.rs @@ -47,7 +47,7 @@ impl QueryLogEntry { } } - fn set_completed(&self, now: Time) { + pub fn set_completed(&self, now: Time) { let dur = now - self.issue_time; self.query_completed_duration .store(dur.as_nanos() as i64, atomic::Ordering::Relaxed); @@ -132,11 +132,11 @@ mod test_super { ); // when the query completes some time in the future. - time_provider.set(Time::from_timestamp_millis(200)); + time_provider.set(Time::from_timestamp_millis(300)); entry.set_completed(time_provider.now()); assert_eq!( entry.query_completed_duration(), - Some(Duration::from_millis(100)) + Some(Duration::from_millis(200)) ); } } diff --git a/db/src/system_tables/queries.rs b/db/src/system_tables/queries.rs index dacfdb8162..8749d01e20 100644 --- a/db/src/system_tables/queries.rs +++ b/db/src/system_tables/queries.rs @@ -3,7 +3,7 @@ use crate::{ system_tables::IoxSystemTable, }; use arrow::{ - array::{StringArray, TimestampNanosecondArray}, + array::{DurationMillisecondArray, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, error::Result, record_batch::RecordBatch, @@ -47,6 +47,11 @@ fn queries_schema() -> SchemaRef { ), Field::new("query_type", DataType::Utf8, false), Field::new("query_text", DataType::Utf8, false), + Field::new( + "completed_duration", + DataType::Duration(TimeUnit::Millisecond), + false, + ), ])) } @@ -70,12 +75,18 @@ fn from_query_log_entries( .map(|e| Some(&e.query_text)) .collect::(); + let query_runtime = entries + .iter() + .map(|e| e.query_completed_duration().map(|d| d.as_millis() as i64)) + .collect::(); + RecordBatch::try_new( schema, vec![ Arc::new(issue_time), Arc::new(query_type), Arc::new(query_text), + Arc::new(query_runtime), ], ) } @@ -94,19 +105,36 @@ mod tests { query_log.push("sql", "select * from foo"); time_provider.inc(std::time::Duration::from_secs(24 * 60 * 60)); query_log.push("sql", "select * from bar"); - query_log.push("read_filter", "json goop"); + let read_filter_entry = query_log.push("read_filter", "json goop"); let expected = vec![ - "+----------------------+-------------+-------------------+", - "| issue_time | query_type | query_text |", - "+----------------------+-------------+-------------------+", - "| 1996-12-19T16:39:57Z | sql | select * from foo |", - "| 1996-12-20T16:39:57Z | sql | select * from bar |", - "| 1996-12-20T16:39:57Z | read_filter | json goop |", - "+----------------------+-------------+-------------------+", + "+----------------------+-------------+-------------------+--------------------+", + "| issue_time | query_type | query_text | completed_duration |", + "+----------------------+-------------+-------------------+--------------------+", + "| 1996-12-19T16:39:57Z | sql | select * from foo | |", + "| 1996-12-20T16:39:57Z | sql | select * from bar | |", + "| 1996-12-20T16:39:57Z | read_filter | json goop | |", + "+----------------------+-------------+-------------------+--------------------+", ]; let schema = queries_schema(); + let batch = from_query_log_entries(schema.clone(), query_log.entries()).unwrap(); + assert_batches_eq!(&expected, &[batch]); + + // mark one of the queries completed after 4s + let now = Time::from_rfc3339("1996-12-20T16:40:01+00:00").unwrap(); + read_filter_entry.set_completed(now); + + let expected = vec![ + "+----------------------+-------------+-------------------+--------------------+", + "| issue_time | query_type | query_text | completed_duration |", + "+----------------------+-------------+-------------------+--------------------+", + "| 1996-12-19T16:39:57Z | sql | select * from foo | |", + "| 1996-12-20T16:39:57Z | sql | select * from bar | |", + "| 1996-12-20T16:39:57Z | read_filter | json goop | 4s |", + "+----------------------+-------------+-------------------+--------------------+", + ]; + let batch = from_query_log_entries(schema, query_log.entries()).unwrap(); assert_batches_eq!(&expected, &[batch]); } From 0b343bcf1963be1abcebcde70d66985746969707 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 19:06:45 +0000 Subject: [PATCH 3/6] feat: add RAII token to time query completion --- db/src/access.rs | 13 ++++++++++--- db/src/lib.rs | 8 ++++++-- db/src/query_log.rs | 5 +++++ query/src/lib.rs | 31 ++++++++++++++++++++++++++++++- query/src/test.rs | 9 ++++++++- 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/db/src/access.rs b/db/src/access.rs index 60dfb89eff..e5c32b417c 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -22,7 +22,7 @@ use predicate::predicate::{Predicate, PredicateBuilder}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, - QueryChunk, QueryChunkMeta, QueryDatabase, DEFAULT_SCHEMA, + QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, DEFAULT_SCHEMA, }; use schema::Schema; use std::{any::Any, sync::Arc}; @@ -234,8 +234,15 @@ impl QueryDatabase for QueryCatalogAccess { .map(|table| Arc::clone(&table.schema().read())) } - fn record_query(&self, query_type: impl Into, query_text: impl Into) { - self.query_log.push(query_type, query_text) + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_> { + // When the query token is dropped the query entry's completion time + // will be set. + let entry = self.query_log.push(query_type, query_text); + QueryCompletedToken::new(move || self.query_log.set_completed(Arc::clone(&entry))) } } diff --git a/db/src/lib.rs b/db/src/lib.rs index ca33364cea..d6d321035f 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -45,7 +45,7 @@ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::Persisten use predicate::predicate::Predicate; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, - QueryDatabase, + QueryCompletedToken, QueryDatabase, }; use rand_distr::{Distribution, Poisson}; use schema::selection::Selection; @@ -1224,7 +1224,11 @@ impl QueryDatabase for Db { self.catalog_access.table_schema(table_name) } - fn record_query(&self, query_type: impl Into, query_text: impl Into) { + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_> { self.catalog_access.record_query(query_type, query_text) } } diff --git a/db/src/query_log.rs b/db/src/query_log.rs index 8b58197a86..bc21dc594a 100644 --- a/db/src/query_log.rs +++ b/db/src/query_log.rs @@ -104,6 +104,11 @@ impl QueryLog { let log = self.log.lock(); log.clone() } + + /// Marks the provided query entry as completed using the current time. + pub fn set_completed(&self, entry: Arc) { + entry.set_completed(self.time_provider.now()) + } } #[cfg(test)] diff --git a/query/src/lib.rs b/query/src/lib.rs index 9120886731..143faabe5a 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -77,6 +77,31 @@ pub trait QueryChunkMeta: Sized { } } +/// A `QueryCompletedToken` is returned by `record_query` implementations of +/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing) +/// on query completion. +pub struct QueryCompletedToken<'a> { + f: Box, +} + +impl<'a> Debug for QueryCompletedToken<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueryCompletedToken").finish() + } +} + +impl<'a> QueryCompletedToken<'a> { + pub fn new(f: impl Fn() + Send + 'a) -> Self { + Self { f: Box::new(f) } + } +} + +impl<'a> Drop for QueryCompletedToken<'a> { + fn drop(&mut self) { + (self.f)() + } +} + /// A `Database` is the main trait implemented by the IOx subsystems /// that store actual data. /// @@ -100,7 +125,11 @@ pub trait QueryDatabase: Debug + Send + Sync { fn chunk_summaries(&self) -> Vec; /// Record that particular type of query was run / planned - fn record_query(&self, query_type: impl Into, query_text: impl Into); + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_>; } /// Collection of data that shares the same partition key diff --git a/query/src/test.rs b/query/src/test.rs index 33b0ad0c34..b9989be2e5 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -4,6 +4,7 @@ //! AKA it is a Mock use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}; +use crate::QueryCompletedToken; use crate::{ exec::stringset::{StringSet, StringSetRef}, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, @@ -145,7 +146,13 @@ impl QueryDatabase for TestDatabase { found_one.then(|| Arc::new(merger.build())) } - fn record_query(&self, _query_type: impl Into, _query_text: impl Into) {} + fn record_query( + &self, + _query_type: impl Into, + _query_text: impl Into, + ) -> QueryCompletedToken<'_> { + QueryCompletedToken::new(|| {}) + } } impl ExecutionContextProvider for TestDatabase { From 36ec6019f98fc50fba221adecf593806e21a5ff6 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 19:23:31 +0000 Subject: [PATCH 4/6] feat: wire up token to query frontends --- .../server_type/database/http.rs | 2 +- .../server_type/database/rpc/flight.rs | 2 +- .../database/rpc/storage/service.rs | 139 ++++++++++++------ 3 files changed, 92 insertions(+), 51 deletions(-) diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 783273b0ef..2c5216dd95 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -252,7 +252,7 @@ async fn query( let db = server.db(&db_name)?; - db.record_query("sql", &q); + let _query_completed_token = db.record_query("sql", &q); let ctx = db.new_query_context(req.extensions().get().cloned()); let physical_plan = Planner::new(&ctx).sql(&q).await.context(PlanningSnafu)?; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs index 00d9416bcb..287da93276 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs @@ -172,7 +172,7 @@ impl Flight for FlightService { .db(&database) .map_err(default_server_error_handler)?; - db.record_query("sql", &read_info.sql_query); + let _query_completed_token = db.record_query("sql", &read_info.sql_query); let ctx = db.new_query_context(span_ctx); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs index d4057493f9..016cbbdf6d 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs @@ -245,9 +245,9 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("read_filter", defer_json(&req)); + let _query_completed_token = db.record_query("read_filter", defer_json(&req)); - let results = read_filter_impl(db, db_name, req, span_ctx) + let results = read_filter_impl(Arc::clone(&db), db_name, req, span_ctx) .await? .into_iter() .map(Ok) @@ -270,7 +270,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("read_group", defer_json(&req)); + let _query_completed_token = db.record_query("read_group", defer_json(&req)); let ReadGroupRequest { read_source: _read_source, @@ -295,12 +295,19 @@ where let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys) .context(ConvertingReadGroupAggregateSnafu { aggregate_string })?; - let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl( + Arc::clone(&db), + db_name, + range, + predicate, + gby_agg, + span_ctx, + ) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -320,7 +327,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("read_window_aggregate", defer_json(&req)); + let _query_completed_token = db.record_query("read_window_aggregate", defer_json(&req)); let ReadWindowAggregateRequest { read_source: _read_source, @@ -342,12 +349,19 @@ where let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window) .context(ConvertingWindowAggregateSnafu { aggregate_string })?; - let results = query_group_impl(db, db_name, range, predicate, gby_agg, span_ctx) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl( + Arc::clone(&db), + db_name, + range, + predicate, + gby_agg, + span_ctx, + ) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -368,7 +382,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("tag_keys", defer_json(&req)); + let _query_completed_token = db.record_query("tag_keys", defer_json(&req)); let TagKeysRequest { tags_source: _tag_source, @@ -380,9 +394,16 @@ where let measurement = None; - let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx) - .await - .map_err(|e| e.to_status()); + let response = tag_keys_impl( + Arc::clone(&db), + db_name, + measurement, + range, + predicate, + span_ctx, + ) + .await + .map_err(|e| e.to_status()); tx.send(response) .await @@ -407,7 +428,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("tag_values", defer_json(&req)); + let _query_completed_token = db.record_query("tag_values", defer_json(&req)); let TagValuesRequest { tags_source: _tag_source, @@ -430,11 +451,13 @@ where .to_status()); } - measurement_name_impl(db, db_name, range, predicate, span_ctx).await + measurement_name_impl(Arc::clone(&db), db_name, range, predicate, span_ctx).await } else if tag_key.is_field() { info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)"); - let fieldlist = field_names_impl(db, db_name, None, range, predicate, span_ctx).await?; + let fieldlist = + field_names_impl(Arc::clone(&db), db_name, None, range, predicate, span_ctx) + .await?; // Pick out the field names into a Vec>for return let values = fieldlist @@ -450,7 +473,7 @@ where info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",); tag_values_impl( - db, + Arc::clone(&db), db_name, tag_key, measurement, @@ -486,20 +509,24 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query( + let _query_completed_token = db.record_query( "tag_values_grouped_by_measurement_and_tag_key", defer_json(&req), ); info!(%db_name, ?req.measurement_patterns, ?req.tag_key_predicate, predicate=%req.condition.loggable(), "tag_values_grouped_by_measurement_and_tag_key"); - let results = - tag_values_grouped_by_measurement_and_tag_key_impl(db, db_name, req, span_ctx) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = tag_values_grouped_by_measurement_and_tag_key_impl( + Arc::clone(&db), + db_name, + req, + span_ctx, + ) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); Ok(tonic::Response::new(futures::stream::iter(results))) } @@ -565,7 +592,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("measurement_names", defer_json(&req)); + let _query_completed_token = db.record_query("measurement_names", defer_json(&req)); let MeasurementNamesRequest { source: _source, @@ -575,7 +602,7 @@ where info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_names"); - let response = measurement_name_impl(db, db_name, range, predicate, span_ctx) + let response = measurement_name_impl(Arc::clone(&db), db_name, range, predicate, span_ctx) .await .map_err(|e| e.to_status()); @@ -602,7 +629,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("measurement_tag_keys", defer_json(&req)); + let _query_completed_token = db.record_query("measurement_tag_keys", defer_json(&req)); let MeasurementTagKeysRequest { source: _source, @@ -615,9 +642,16 @@ where let measurement = Some(measurement); - let response = tag_keys_impl(db, db_name, measurement, range, predicate, span_ctx) - .await - .map_err(|e| e.to_status()); + let response = tag_keys_impl( + Arc::clone(&db), + db_name, + measurement, + range, + predicate, + span_ctx, + ) + .await + .map_err(|e| e.to_status()); tx.send(response) .await @@ -642,7 +676,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("measurement_tag_values", defer_json(&req)); + let _query_completed_token = db.record_query("measurement_tag_values", defer_json(&req)); let MeasurementTagValuesRequest { source: _source, @@ -657,7 +691,7 @@ where let measurement = Some(measurement); let response = tag_values_impl( - db, + Arc::clone(&db), db_name, tag_key, measurement, @@ -691,7 +725,7 @@ where .db_store .db(&db_name) .context(DatabaseNotFoundSnafu { db_name: &db_name })?; - db.record_query("measurement_fields", defer_json(&req)); + let _query_completed_token = db.record_query("measurement_fields", defer_json(&req)); let MeasurementFieldsRequest { source: _source, @@ -704,14 +738,21 @@ where let measurement = Some(measurement); - let response = field_names_impl(db, db_name, measurement, range, predicate, span_ctx) - .await - .map(|fieldlist| { - fieldlist_to_measurement_fields_response(fieldlist) - .context(ConvertingFieldListSnafu) - .map_err(|e| e.to_status()) - }) - .map_err(|e| e.to_status())?; + let response = field_names_impl( + Arc::clone(&db), + db_name, + measurement, + range, + predicate, + span_ctx, + ) + .await + .map(|fieldlist| { + fieldlist_to_measurement_fields_response(fieldlist) + .context(ConvertingFieldListSnafu) + .map_err(|e| e.to_status()) + }) + .map_err(|e| e.to_status())?; tx.send(response) .await From 9283432a0fef1f533a32a505b73eca3e60c14f2f Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 19:36:00 +0000 Subject: [PATCH 5/6] refactor: display as ns --- arrow_util/src/display.rs | 8 ++++---- db/src/system_tables/queries.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/arrow_util/src/display.rs b/arrow_util/src/display.rs index b8bd54606a..380e375c31 100644 --- a/arrow_util/src/display.rs +++ b/arrow_util/src/display.rs @@ -1,4 +1,4 @@ -use arrow::array::{ArrayRef, DurationMillisecondArray, TimestampNanosecondArray}; +use arrow::array::{ArrayRef, DurationNanosecondArray, TimestampNanosecondArray}; use arrow::datatypes::{DataType, TimeUnit}; use arrow::error::{ArrowError, Result}; use arrow::record_batch::RecordBatch; @@ -42,13 +42,13 @@ fn array_value_to_string(column: &ArrayRef, row: usize) -> Result { Ok(ts.to_rfc3339_opts(SecondsFormat::AutoSi, use_z)) } // TODO(edd): see https://github.com/apache/arrow-rs/issues/1168 - DataType::Duration(TimeUnit::Millisecond) if column.is_valid(row) => { + DataType::Duration(TimeUnit::Nanosecond) if column.is_valid(row) => { let dur_column = column .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); - let duration = std::time::Duration::from_millis( + let duration = std::time::Duration::from_nanos( dur_column .value(row) .try_into() diff --git a/db/src/system_tables/queries.rs b/db/src/system_tables/queries.rs index 8749d01e20..a4c18f1fd4 100644 --- a/db/src/system_tables/queries.rs +++ b/db/src/system_tables/queries.rs @@ -3,7 +3,7 @@ use crate::{ system_tables::IoxSystemTable, }; use arrow::{ - array::{DurationMillisecondArray, StringArray, TimestampNanosecondArray}, + array::{DurationNanosecondArray, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, error::Result, record_batch::RecordBatch, @@ -49,7 +49,7 @@ fn queries_schema() -> SchemaRef { Field::new("query_text", DataType::Utf8, false), Field::new( "completed_duration", - DataType::Duration(TimeUnit::Millisecond), + DataType::Duration(TimeUnit::Nanosecond), false, ), ])) @@ -77,8 +77,8 @@ fn from_query_log_entries( let query_runtime = entries .iter() - .map(|e| e.query_completed_duration().map(|d| d.as_millis() as i64)) - .collect::(); + .map(|e| e.query_completed_duration().map(|d| d.as_nanos() as i64)) + .collect::(); RecordBatch::try_new( schema, From cdb4f43d628cc97e5963ed4e12d3d68fbf79950a Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 14 Jan 2022 10:41:27 +0000 Subject: [PATCH 6/6] refactor: address feedback --- db/src/access.rs | 2 +- db/src/query_log.rs | 9 ++++++--- query/src/lib.rs | 12 ++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/db/src/access.rs b/db/src/access.rs index e5c32b417c..324546a164 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -242,7 +242,7 @@ impl QueryDatabase for QueryCatalogAccess { // When the query token is dropped the query entry's completion time // will be set. let entry = self.query_log.push(query_type, query_text); - QueryCompletedToken::new(move || self.query_log.set_completed(Arc::clone(&entry))) + QueryCompletedToken::new(move || self.query_log.set_completed(entry)) } } diff --git a/db/src/query_log.rs b/db/src/query_log.rs index bc21dc594a..917b63a292 100644 --- a/db/src/query_log.rs +++ b/db/src/query_log.rs @@ -9,6 +9,9 @@ use std::{ use parking_lot::Mutex; use time::{Time, TimeProvider}; +// The query duration used for queries still running. +const UNCOMPLETED_DURATION: i64 = -1; + /// Information about a single query that was executed #[derive(Debug)] pub struct QueryLogEntry { @@ -33,7 +36,7 @@ impl QueryLogEntry { query_type, query_text, issue_time, - query_completed_duration: (-1_i64).into(), + query_completed_duration: UNCOMPLETED_DURATION.into(), } } @@ -42,7 +45,7 @@ impl QueryLogEntry { .query_completed_duration .load(atomic::Ordering::Relaxed) { - -1 => None, + UNCOMPLETED_DURATION => None, d => Some(Duration::from_nanos(d as u64)), } } @@ -54,7 +57,7 @@ impl QueryLogEntry { } } -/// Stores a fixed number `QueryExcutions` -- handles locking +/// Stores a fixed number `QueryExecutions` -- handles locking /// internally so can be shared across multiple #[derive(Debug)] pub struct QueryLog { diff --git a/query/src/lib.rs b/query/src/lib.rs index 143faabe5a..701274bb5f 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -81,7 +81,7 @@ pub trait QueryChunkMeta: Sized { /// a `QueryDatabase`. It is used to trigger side-effects (such as query timing) /// on query completion. pub struct QueryCompletedToken<'a> { - f: Box, + f: Option>, } impl<'a> Debug for QueryCompletedToken<'a> { @@ -91,14 +91,18 @@ impl<'a> Debug for QueryCompletedToken<'a> { } impl<'a> QueryCompletedToken<'a> { - pub fn new(f: impl Fn() + Send + 'a) -> Self { - Self { f: Box::new(f) } + pub fn new(f: impl FnOnce() + Send + 'a) -> Self { + Self { + f: Some(Box::new(f)), + } } } impl<'a> Drop for QueryCompletedToken<'a> { fn drop(&mut self) { - (self.f)() + if let Some(f) = self.f.take() { + (f)() + } } }