diff --git a/query/src/exec.rs b/query/src/exec.rs index 6fad8c0ae2..c503a7a3d7 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -178,6 +178,15 @@ pub fn make_stream_split(input: LogicalPlan, split_expr: Expr) -> LogicalPlan { LogicalPlan::Extension { node } } +/// A type that can provide `IOxExecutionContext` for query +pub trait ExecutionContextProvider { + /// Returns a new execution context suitable for running queries + fn new_query_context( + self: &Arc<Self>, + span_ctx: Option<trace::ctx::SpanContext>, + ) -> IOxExecutionContext; +} + #[cfg(test)] mod tests { use arrow::{ diff --git a/query/src/test.rs b/query/src/test.rs index bc1e1368ef..89761b9b46 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -1,8 +1,9 @@ -//! This module provides a reference implementaton of +//! This module provides a reference implementation of //! [`QueryDatabase`] for use in testing. //! //! AKA it is a Mock +use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}; use crate::{ exec::stringset::{StringSet, StringSetRef}, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, @@ -27,9 +28,11 @@ use parking_lot::Mutex; use snafu::Snafu; use std::num::NonZeroU64; use std::{collections::BTreeMap, fmt, sync::Arc}; +use trace::ctx::SpanContext; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct TestDatabase { + executor: Arc<Executor>, /// Partitions which have been saved to this test database /// Key is partition name /// Value is map of chunk_id to chunk @@ -53,8 +56,12 @@ pub enum TestError { pub type Result<T, E = TestError> = std::result::Result<T, E>; impl TestDatabase { - pub fn new() -> Self { - Self::default() + pub fn new(executor: Arc<Executor>) -> Self { + Self { + executor, + partitions: Default::default(), + column_names: Default::default(), + } } /// Add a test chunk to the database @@ -127,6 +134,16 @@ impl QueryDatabase for TestDatabase { } } +impl ExecutionContextProvider for TestDatabase { + fn new_query_context(self: &Arc<Self>, span_ctx: Option<SpanContext>) -> IOxExecutionContext { + // Note: unlike Db this does not register a catalog provider + self.executor + .new_execution_config(ExecutorType::Query) + .with_span_context(span_ctx) + .build() + } +} + #[derive(Debug)] pub struct TestChunk { /// Table name diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index bf61885aa9..2e5d980950 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_util::assert_batches_sorted_eq; use datafusion::logical_plan::{col, lit}; use query::{ - exec::{stringset::StringSet, ExecutorType}, + exec::{stringset::StringSet, ExecutionContextProvider, ExecutorType}, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, predicate::PredicateBuilder, QueryChunk, diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index b9a838b789..17d2807e62 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -6,7 +6,7 @@ use super::scenarios::*; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; -use query::frontend::sql::SqlQueryPlanner; +use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner}; /// runs table_names(predicate) and compares it to the expected /// output diff --git a/server/src/db.rs b/server/src/db.rs index fc043ade28..b41f619c02 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -38,8 +38,11 @@ use parquet_file::{ cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, }; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; -use query::exec::{ExecutorType, IOxExecutionContext}; -use query::{exec::Executor, predicate::Predicate, QueryDatabase}; +use query::{ + exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, + predicate::Predicate, + QueryDatabase, +}; use rand_distr::{Distribution, Poisson}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ @@ -480,20 +483,6 @@ impl Db { Arc::clone(&self.exec) } - /// Returns a new execution context suitable for running queries - /// - /// Registers `self` as the default catalog provider - pub fn new_query_context( - self: &Arc<Self>, - span_ctx: Option<SpanContext>, - ) -> IOxExecutionContext { - self.exec - .new_execution_config(ExecutorType::Query) - .with_default_catalog(Arc::<Self>::clone(self)) - .with_span_context(span_ctx) - .build() - } - /// Return the current database rules pub fn rules(&self) -> Arc<DatabaseRules> { Arc::clone(&*self.rules.read()) @@ -1410,6 +1399,16 @@ impl QueryDatabase for Db { } } +impl ExecutionContextProvider for Db { + fn new_query_context(self: &Arc<Self>, span_ctx: Option<SpanContext>) -> IOxExecutionContext { + self.exec + .new_execution_config(ExecutorType::Query) + .with_default_catalog(Arc::<Self>::clone(self)) + .with_span_context(span_ctx) + .build() + } +} + /// Convenience implementation of `CatalogProvider` so the rest of the /// code can use Db as a `CatalogProvider` (e.g. for running /// SQL). even though the implementation lives in `catalog_access` diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 5d7025afaa..9d19e7e614 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -431,7 +431,7 @@ mod tests { checkpoint::{PartitionCheckpoint, PersistCheckpointBuilder, ReplayPlanner}, min_max_sequence::OptionalMinMaxSequence, }; - use query::frontend::sql::SqlQueryPlanner; + use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner}; use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; diff --git a/server/src/lib.rs b/server/src/lib.rs index 587bab617b..a7a2d7cdac 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -88,7 +88,6 @@ use lifecycle::LockableChunk; use metrics::{KeyValue, MetricObserverBuilder}; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; -use query::exec::Executor; use rand::seq::SliceRandom; use resolver::Resolver; use snafu::{OptionExt, ResultExt, Snafu}; @@ -217,7 +216,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>; #[async_trait] pub trait DatabaseStore: std::fmt::Debug + Send + Sync { /// The type of database that is stored by this DatabaseStore - type Database: query::QueryDatabase; + type Database: query::QueryDatabase + query::exec::ExecutionContextProvider; /// The type of error this DataBase store generates type Error: std::error::Error + Send + Sync + 'static; @@ -232,10 +231,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync { /// Retrieve the database specified by `name`, creating it if it /// doesn't exist. async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>; - - /// Provide a query executor to use for running queries on - /// databases in this `DatabaseStore` - fn executor(&self) -> Arc<Executor>; } /// A collection of metrics used to instrument the Server. @@ -1226,11 +1221,6 @@ where Ok(db) } - - /// Return a handle to the query executor - fn executor(&self) -> Arc<Executor> { - Arc::clone(self.shared.application.executor()) - } } #[cfg(test)] @@ -1254,7 +1244,7 @@ mod tests { use metrics::TestMetricRegistry; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; - use query::{frontend::sql::SqlQueryPlanner, QueryDatabase}; + use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase}; use std::{ convert::{Infallible, TryFrom}, sync::{ diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index ddb6ddd657..506c3abeaa 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -354,6 +354,7 @@ mod tests { use data_types::{database_rules::DatabaseRules, DatabaseName}; use influxdb_iox_client::connection::Connection; use std::convert::TryInto; + use std::num::NonZeroU64; use structopt::StructOpt; use tokio::task::JoinHandle; use trace::span::Span; @@ -663,11 +664,16 @@ mod tests { let (addr, server, join) = tracing_server(&collector).await; let conn = jaeger_client(addr, "34f8495:35e32:0:1").await; + let db_info = influxdb_storage_client::OrgAndBucket::new( + NonZeroU64::new(12).unwrap(), + NonZeroU64::new(344).unwrap(), + ); + let mut management = influxdb_iox_client::management::Client::new(conn.clone()); management .create_database( influxdb_iox_client::management::generated_types::DatabaseRules { - name: "database".to_string(), + name: db_info.db_name().to_string(), ..Default::default() }, ) @@ -676,13 +682,24 @@ mod tests { let mut write = influxdb_iox_client::write::Client::new(conn.clone()); write - .write("database", "cpu,tag0=foo val=1 100\n") + .write(db_info.db_name(), "cpu,tag0=foo val=1 100\n") .await .unwrap(); - let mut flight = influxdb_iox_client::flight::Client::new(conn); + let mut flight = influxdb_iox_client::flight::Client::new(conn.clone()); flight - .perform_query("database", "select * from cpu;") + .perform_query(db_info.db_name(), "select * from cpu;") + .await + .unwrap(); + + let mut storage = influxdb_storage_client::Client::new(conn); + storage + .tag_values(influxdb_storage_client::generated_types::TagValuesRequest { + tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)), + range: None, + predicate: None, + tag_key: "tag0".into(), + }) .await .unwrap(); @@ -693,9 +710,7 @@ mod tests { let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect(); // Made 3 requests - assert_eq!(root_spans.len(), 3); - - let query_span = root_spans[2]; + assert_eq!(root_spans.len(), 4); let child = |parent: &Span, name: &'static str| -> Option<&Span> { spans.iter().find(|span| { @@ -703,14 +718,24 @@ mod tests { }) }; - let ctx_span = child(query_span, "Query Execution").unwrap(); + // Test SQL + let sql_span = root_spans[2]; + let ctx_span = child(sql_span, "Query Execution").unwrap(); let planner_span = child(ctx_span, "Planner").unwrap(); let sql_span = child(planner_span, "sql").unwrap(); let prepare_sql_span = child(sql_span, "prepare_sql").unwrap(); child(prepare_sql_span, "prepare_plan").unwrap(); child(ctx_span, "collect").unwrap(); + + // Test tag_values + let storage_span = root_spans[3]; + let ctx_span = child(storage_span, "Query Execution").unwrap(); + child(ctx_span, "Planner").unwrap(); + + let to_string_set = child(ctx_span, "to_string_set").unwrap(); + child(to_string_set, "run_logical_plans").unwrap(); } #[tokio::test] diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index d7cab734a8..b79044aa3b 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -24,7 +24,7 @@ use data_types::{ }; use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; -use query::QueryDatabase; +use query::{exec::ExecutionContextProvider, QueryDatabase}; use server::{ApplicationState, ConnectionManager, Error, Server as AppServer}; // External crates diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index 8cea57ceba..c91e8aafc2 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -1,12 +1,7 @@ //! Implements the native gRPC IOx query API using Arrow Flight +use std::fmt::Debug; use std::{pin::Pin, sync::Arc}; -use futures::Stream; -use observability_deps::tracing::{info, warn}; -use serde::Deserialize; -use snafu::{ResultExt, Snafu}; -use tonic::{Request, Response, Streaming}; - use arrow::{ array::{make_array, ArrayRef, MutableArrayData}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -18,12 +13,19 @@ use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; +use futures::Stream; +use serde::Deserialize; +use snafu::{ResultExt, Snafu}; +use tonic::{Request, Response, Streaming}; + use data_types::{DatabaseName, DatabaseNameError}; +use observability_deps::tracing::{info, warn}; +use query::exec::ExecutionContextProvider; use server::{ConnectionManager, Server}; -use std::fmt::Debug; + +use crate::influxdb_ioxd::rpc::error::default_server_error_handler; use super::super::planner::Planner; -use crate::influxdb_ioxd::rpc::error::default_server_error_handler; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -362,15 +364,18 @@ fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> { #[cfg(test)] mod tests { - use super::*; + use std::sync::Arc; + use arrow::array::StringArray; use arrow::{ array::{DictionaryArray, UInt32Array}, datatypes::{DataType, Int32Type}, }; use arrow_flight::utils::flight_data_to_arrow_batch; + use datafusion::physical_plan::limit::truncate_batch; - use std::sync::Arc; + + use super::*; #[test] fn test_deep_clone_array() { diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 408e4f4ba0..ca709a8589 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -2,7 +2,7 @@ //! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and //! [`DatabaseStore`] -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use snafu::{OptionExt, ResultExt, Snafu}; use tokio::sync::mpsc; @@ -21,7 +21,7 @@ use generated_types::{ use metrics::KeyValue; use observability_deps::tracing::{error, info}; use query::{ - exec::{fieldlist::FieldList, seriesset::Error as SeriesSetError, ExecutorType}, + exec::{fieldlist::FieldList, seriesset::Error as SeriesSetError, ExecutionContextProvider}, predicate::PredicateBuilder, }; use server::DatabaseStore; @@ -38,6 +38,7 @@ use crate::influxdb_ioxd::{ StorageService, }, }; +use trace::ctx::SpanContext; #[derive(Debug, Snafu)] pub enum Error { @@ -223,6 +224,7 @@ where &self, req: tonic::Request<ReadFilterRequest>, ) -> Result<tonic::Response<Self::ReadFilterStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let read_filter_request = req.into_inner(); let db_name = get_database_name(&read_filter_request)?; @@ -241,7 +243,7 @@ where KeyValue::new("db_name", db_name.to_string()), ]; - let results = read_filter_impl(Arc::clone(&self.db_store), db_name, range, predicate) + let results = read_filter_impl(self.db_store.as_ref(), db_name, range, predicate, span_ctx) .await .map_err(|e| { if e.is_internal() { @@ -265,6 +267,7 @@ where &self, req: tonic::Request<ReadGroupRequest>, ) -> Result<tonic::Response<Self::ReadGroupStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let read_group_request = req.into_inner(); let db_name = get_database_name(&read_group_request)?; @@ -314,11 +317,12 @@ where })?; let results = query_group_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, range, predicate, gby_agg, + span_ctx, ) .await .map_err(|e| { @@ -344,6 +348,7 @@ where &self, req: tonic::Request<ReadWindowAggregateRequest>, ) -> Result<tonic::Response<Self::ReadGroupStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let read_window_aggregate_request = req.into_inner(); let db_name = get_database_name(&read_window_aggregate_request)?; @@ -379,11 +384,12 @@ where })?; let results = query_group_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, range, predicate, gby_agg, + span_ctx, ) .await .map_err(|e| { @@ -408,6 +414,7 @@ where &self, req: tonic::Request<TagKeysRequest>, ) -> Result<tonic::Response<Self::TagKeysStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let tag_keys_request = req.into_inner(); @@ -431,11 +438,12 @@ where let measurement = None; let response = tag_keys_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, measurement, range, predicate, + span_ctx, ) .await .map_err(|e| { @@ -461,6 +469,7 @@ where &self, req: tonic::Request<TagValuesRequest>, ) -> Result<tonic::Response<Self::TagValuesStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let tag_values_request = req.into_inner(); @@ -495,7 +504,7 @@ where .to_status()); } - measurement_name_impl(Arc::clone(&self.db_store), db_name, range) + measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx) .await .map_err(|e| { if e.is_internal() { @@ -508,17 +517,23 @@ where } else if tag_key.is_field() { info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)"); - let fieldlist = - field_names_impl(Arc::clone(&self.db_store), db_name, None, range, predicate) - .await - .map_err(|e| { - if e.is_internal() { - ob.error_with_labels(labels); - } else { - ob.client_error_with_labels(labels); - } - e - })?; + let fieldlist = field_names_impl( + self.db_store.as_ref(), + db_name, + None, + range, + predicate, + span_ctx, + ) + .await + .map_err(|e| { + if e.is_internal() { + ob.error_with_labels(labels); + } else { + ob.client_error_with_labels(labels); + } + e + })?; // Pick out the field names into a Vec<Vec<u8>>for return let values = fieldlist @@ -539,12 +554,13 @@ where info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",); tag_values_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, tag_key, measurement, range, predicate, + span_ctx, ) .await }; @@ -610,6 +626,7 @@ where &self, req: tonic::Request<MeasurementNamesRequest>, ) -> Result<tonic::Response<Self::MeasurementNamesStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let measurement_names_request = req.into_inner(); @@ -641,7 +658,7 @@ where KeyValue::new("db_name", db_name.to_string()), ]; - let response = measurement_name_impl(Arc::clone(&self.db_store), db_name, range) + let response = measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx) .await .map_err(|e| { if e.is_internal() { @@ -666,6 +683,7 @@ where &self, req: tonic::Request<MeasurementTagKeysRequest>, ) -> Result<tonic::Response<Self::MeasurementTagKeysStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let measurement_tag_keys_request = req.into_inner(); @@ -690,11 +708,12 @@ where let measurement = Some(measurement); let response = tag_keys_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, measurement, range, predicate, + span_ctx, ) .await .map_err(|e| { @@ -720,6 +739,7 @@ where &self, req: tonic::Request<MeasurementTagValuesRequest>, ) -> Result<tonic::Response<Self::MeasurementTagValuesStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let measurement_tag_values_request = req.into_inner(); @@ -745,12 +765,13 @@ where let measurement = Some(measurement); let response = tag_values_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, tag_key, measurement, range, predicate, + span_ctx, ) .await .map_err(|e| { @@ -776,6 +797,7 @@ where &self, req: tonic::Request<MeasurementFieldsRequest>, ) -> Result<tonic::Response<Self::MeasurementFieldsStream>, Status> { + let span_ctx = req.extensions().get().cloned(); let (tx, rx) = mpsc::channel(4); let measurement_fields_request = req.into_inner(); @@ -800,11 +822,12 @@ where let measurement = Some(measurement); let response = field_names_impl( - Arc::clone(&self.db_store), + self.db_store.as_ref(), db_name, measurement, range, predicate, + span_ctx, ) .await .map(|fieldlist| { @@ -857,9 +880,10 @@ fn get_database_name(input: &impl GrpcInputs) -> Result<DatabaseName<'static>, S /// Gathers all measurement names that have data in the specified /// (optional) range async fn measurement_name_impl<T>( - db_store: Arc<T>, + db_store: &T, db_name: DatabaseName<'static>, range: Option<TimestampRange>, + span_ctx: Option<SpanContext>, ) -> Result<StringValuesResponse> where T: DatabaseStore + 'static, @@ -868,7 +892,7 @@ where let db_name = db_name.as_ref(); let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); let plan = Planner::new(&ctx) .table_names(db, predicate) @@ -894,11 +918,12 @@ where /// Return tag keys with optional measurement, timestamp and arbitratry /// predicates async fn tag_keys_impl<T>( - db_store: Arc<T>, + db_store: &T, db_name: DatabaseName<'static>, measurement: Option<String>, range: Option<TimestampRange>, rpc_predicate: Option<Predicate>, + span_ctx: Option<SpanContext>, ) -> Result<StringValuesResponse> where T: DatabaseStore + 'static, @@ -918,7 +943,7 @@ where db_name: db_name.as_str(), })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); let tag_key_plan = Planner::new(&ctx) .tag_keys(db, predicate) @@ -948,12 +973,13 @@ where /// Return tag values for tag_name, with optional measurement, timestamp and /// arbitratry predicates async fn tag_values_impl<T>( - db_store: Arc<T>, + db_store: &T, db_name: DatabaseName<'static>, tag_name: String, measurement: Option<String>, range: Option<TimestampRange>, rpc_predicate: Option<Predicate>, + span_ctx: Option<SpanContext>, ) -> Result<StringValuesResponse> where T: DatabaseStore + 'static, @@ -973,7 +999,7 @@ where let tag_name = &tag_name; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); let tag_value_plan = Planner::new(&ctx) .tag_values(db, tag_name, predicate) @@ -1001,11 +1027,12 @@ where } /// Launch async tasks that materialises the result of executing read_filter. -async fn read_filter_impl<'a, T>( - db_store: Arc<T>, +async fn read_filter_impl<T>( + db_store: &T, db_name: DatabaseName<'static>, range: Option<TimestampRange>, rpc_predicate: Option<Predicate>, + span_ctx: Option<SpanContext>, ) -> Result<Vec<ReadResponse>, Error> where T: DatabaseStore + 'static, @@ -1026,7 +1053,7 @@ where let db_name = owned_db_name.as_str(); let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); // PERF - This used to send responses to the client before execution had // completed, but now it doesn't. We may need to revisit this in the future @@ -1058,11 +1085,12 @@ where /// Launch async tasks that send the result of executing read_group to `tx` async fn query_group_impl<T>( - db_store: Arc<T>, + db_store: &T, db_name: DatabaseName<'static>, range: Option<TimestampRange>, rpc_predicate: Option<Predicate>, gby_agg: GroupByAndAggregate, + span_ctx: Option<SpanContext>, ) -> Result<Vec<ReadResponse>, Error> where T: DatabaseStore + 'static, @@ -1083,7 +1111,7 @@ where let db_name = owned_db_name.as_str(); let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); let planner = Planner::new(&ctx); let grouped_series_set_plan = match gby_agg { @@ -1124,11 +1152,12 @@ where /// Return field names, restricted via optional measurement, timestamp and /// predicate async fn field_names_impl<T>( - db_store: Arc<T>, + db_store: &T, db_name: DatabaseName<'static>, measurement: Option<String>, range: Option<TimestampRange>, rpc_predicate: Option<Predicate>, + span_ctx: Option<SpanContext>, ) -> Result<FieldList> where T: DatabaseStore + 'static, @@ -1146,7 +1175,7 @@ where let db_name = db_name.as_str(); let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; - let ctx = db_store.executor().new_context(ExecutorType::Query); + let ctx = db.new_query_context(span_ctx); let field_list_plan = Planner::new(&ctx) .field_columns(db, predicate) @@ -1165,10 +1194,11 @@ where #[cfg(test)] mod tests { - use std::num::NonZeroU64; use std::{ collections::BTreeMap, net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU64, + sync::Arc, }; use parking_lot::Mutex; @@ -2487,14 +2517,10 @@ mod tests { if let Some(db) = databases.get(name) { Ok(Arc::clone(db)) } else { - let new_db = Arc::new(TestDatabase::new()); + let new_db = Arc::new(TestDatabase::new(Arc::clone(&self.executor))); databases.insert(name.to_string(), Arc::clone(&new_db)); Ok(new_db) } } - - fn executor(&self) -> Arc<Executor> { - Arc::clone(&self.executor) - } } }