From 0b343bcf1963be1abcebcde70d66985746969707 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 13 Jan 2022 19:06:45 +0000 Subject: [PATCH] feat: add RAII token to time query completion --- db/src/access.rs | 13 ++++++++++--- db/src/lib.rs | 8 ++++++-- db/src/query_log.rs | 5 +++++ query/src/lib.rs | 31 ++++++++++++++++++++++++++++++- query/src/test.rs | 9 ++++++++- 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/db/src/access.rs b/db/src/access.rs index 60dfb89eff..e5c32b417c 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -22,7 +22,7 @@ use predicate::predicate::{Predicate, PredicateBuilder}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, - QueryChunk, QueryChunkMeta, QueryDatabase, DEFAULT_SCHEMA, + QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, DEFAULT_SCHEMA, }; use schema::Schema; use std::{any::Any, sync::Arc}; @@ -234,8 +234,15 @@ impl QueryDatabase for QueryCatalogAccess { .map(|table| Arc::clone(&table.schema().read())) } - fn record_query(&self, query_type: impl Into, query_text: impl Into) { - self.query_log.push(query_type, query_text) + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_> { + // When the query token is dropped the query entry's completion time + // will be set. + let entry = self.query_log.push(query_type, query_text); + QueryCompletedToken::new(move || self.query_log.set_completed(Arc::clone(&entry))) } } diff --git a/db/src/lib.rs b/db/src/lib.rs index ca33364cea..d6d321035f 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -45,7 +45,7 @@ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::Persisten use predicate::predicate::Predicate; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, - QueryDatabase, + QueryCompletedToken, QueryDatabase, }; use rand_distr::{Distribution, Poisson}; use schema::selection::Selection; @@ -1224,7 +1224,11 @@ impl QueryDatabase for Db { self.catalog_access.table_schema(table_name) } - fn record_query(&self, query_type: impl Into, query_text: impl Into) { + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_> { self.catalog_access.record_query(query_type, query_text) } } diff --git a/db/src/query_log.rs b/db/src/query_log.rs index 8b58197a86..bc21dc594a 100644 --- a/db/src/query_log.rs +++ b/db/src/query_log.rs @@ -104,6 +104,11 @@ impl QueryLog { let log = self.log.lock(); log.clone() } + + /// Marks the provided query entry as completed using the current time. + pub fn set_completed(&self, entry: Arc) { + entry.set_completed(self.time_provider.now()) + } } #[cfg(test)] diff --git a/query/src/lib.rs b/query/src/lib.rs index 9120886731..143faabe5a 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -77,6 +77,31 @@ pub trait QueryChunkMeta: Sized { } } +/// A `QueryCompletedToken` is returned by `record_query` implementations of +/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing) +/// on query completion. +pub struct QueryCompletedToken<'a> { + f: Box, +} + +impl<'a> Debug for QueryCompletedToken<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueryCompletedToken").finish() + } +} + +impl<'a> QueryCompletedToken<'a> { + pub fn new(f: impl Fn() + Send + 'a) -> Self { + Self { f: Box::new(f) } + } +} + +impl<'a> Drop for QueryCompletedToken<'a> { + fn drop(&mut self) { + (self.f)() + } +} + /// A `Database` is the main trait implemented by the IOx subsystems /// that store actual data. /// @@ -100,7 +125,11 @@ pub trait QueryDatabase: Debug + Send + Sync { fn chunk_summaries(&self) -> Vec; /// Record that particular type of query was run / planned - fn record_query(&self, query_type: impl Into, query_text: impl Into); + fn record_query( + &self, + query_type: impl Into, + query_text: impl Into, + ) -> QueryCompletedToken<'_>; } /// Collection of data that shares the same partition key diff --git a/query/src/test.rs b/query/src/test.rs index 33b0ad0c34..b9989be2e5 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -4,6 +4,7 @@ //! AKA it is a Mock use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}; +use crate::QueryCompletedToken; use crate::{ exec::stringset::{StringSet, StringSetRef}, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, @@ -145,7 +146,13 @@ impl QueryDatabase for TestDatabase { found_one.then(|| Arc::new(merger.build())) } - fn record_query(&self, _query_type: impl Into, _query_text: impl Into) {} + fn record_query( + &self, + _query_type: impl Into, + _query_text: impl Into, + ) -> QueryCompletedToken<'_> { + QueryCompletedToken::new(|| {}) + } } impl ExecutionContextProvider for TestDatabase {