feat: add RAII token to time query completion
parent
6a842fc105
commit
0b343bcf19
|
@ -22,7 +22,7 @@ use predicate::predicate::{Predicate, PredicateBuilder};
|
|||
use query::{
|
||||
provider::{ChunkPruner, ProviderBuilder},
|
||||
pruning::{prune_chunks, PruningObserver},
|
||||
QueryChunk, QueryChunkMeta, QueryDatabase, DEFAULT_SCHEMA,
|
||||
QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, DEFAULT_SCHEMA,
|
||||
};
|
||||
use schema::Schema;
|
||||
use std::{any::Any, sync::Arc};
|
||||
|
@ -234,8 +234,15 @@ impl QueryDatabase for QueryCatalogAccess {
|
|||
.map(|table| Arc::clone(&table.schema().read()))
|
||||
}
|
||||
|
||||
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
|
||||
self.query_log.push(query_type, query_text)
|
||||
fn record_query(
|
||||
&self,
|
||||
query_type: impl Into<String>,
|
||||
query_text: impl Into<String>,
|
||||
) -> QueryCompletedToken<'_> {
|
||||
// When the query token is dropped the query entry's completion time
|
||||
// will be set.
|
||||
let entry = self.query_log.push(query_type, query_text);
|
||||
QueryCompletedToken::new(move || self.query_log.set_completed(Arc::clone(&entry)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::Persisten
|
|||
use predicate::predicate::Predicate;
|
||||
use query::{
|
||||
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
|
||||
QueryDatabase,
|
||||
QueryCompletedToken, QueryDatabase,
|
||||
};
|
||||
use rand_distr::{Distribution, Poisson};
|
||||
use schema::selection::Selection;
|
||||
|
@ -1224,7 +1224,11 @@ impl QueryDatabase for Db {
|
|||
self.catalog_access.table_schema(table_name)
|
||||
}
|
||||
|
||||
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>) {
|
||||
fn record_query(
|
||||
&self,
|
||||
query_type: impl Into<String>,
|
||||
query_text: impl Into<String>,
|
||||
) -> QueryCompletedToken<'_> {
|
||||
self.catalog_access.record_query(query_type, query_text)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,11 @@ impl QueryLog {
|
|||
let log = self.log.lock();
|
||||
log.clone()
|
||||
}
|
||||
|
||||
/// Marks the provided query entry as completed using the current time.
|
||||
pub fn set_completed(&self, entry: Arc<QueryLogEntry>) {
|
||||
entry.set_completed(self.time_provider.now())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -77,6 +77,31 @@ pub trait QueryChunkMeta: Sized {
|
|||
}
|
||||
}
|
||||
|
||||
/// A `QueryCompletedToken` is returned by `record_query` implementations of
|
||||
/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing)
|
||||
/// on query completion.
|
||||
pub struct QueryCompletedToken<'a> {
|
||||
f: Box<dyn Fn() + Send + 'a>,
|
||||
}
|
||||
|
||||
impl<'a> Debug for QueryCompletedToken<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("QueryCompletedToken").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> QueryCompletedToken<'a> {
|
||||
pub fn new(f: impl Fn() + Send + 'a) -> Self {
|
||||
Self { f: Box::new(f) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for QueryCompletedToken<'a> {
|
||||
fn drop(&mut self) {
|
||||
(self.f)()
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Database` is the main trait implemented by the IOx subsystems
|
||||
/// that store actual data.
|
||||
///
|
||||
|
@ -100,7 +125,11 @@ pub trait QueryDatabase: Debug + Send + Sync {
|
|||
fn chunk_summaries(&self) -> Vec<ChunkSummary>;
|
||||
|
||||
/// Record that particular type of query was run / planned
|
||||
fn record_query(&self, query_type: impl Into<String>, query_text: impl Into<String>);
|
||||
fn record_query(
|
||||
&self,
|
||||
query_type: impl Into<String>,
|
||||
query_text: impl Into<String>,
|
||||
) -> QueryCompletedToken<'_>;
|
||||
}
|
||||
|
||||
/// Collection of data that shares the same partition key
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
//! AKA it is a Mock
|
||||
|
||||
use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext};
|
||||
use crate::QueryCompletedToken;
|
||||
use crate::{
|
||||
exec::stringset::{StringSet, StringSetRef},
|
||||
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
|
||||
|
@ -145,7 +146,13 @@ impl QueryDatabase for TestDatabase {
|
|||
found_one.then(|| Arc::new(merger.build()))
|
||||
}
|
||||
|
||||
fn record_query(&self, _query_type: impl Into<String>, _query_text: impl Into<String>) {}
|
||||
fn record_query(
|
||||
&self,
|
||||
_query_type: impl Into<String>,
|
||||
_query_text: impl Into<String>,
|
||||
) -> QueryCompletedToken<'_> {
|
||||
QueryCompletedToken::new(|| {})
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionContextProvider for TestDatabase {
|
||||
|
|
Loading…
Reference in New Issue