feat: propagate span context into storage RPC queries (#2407)

* feat: propagate span context into storage RPC queries

* refactor: create ExecutionContextProvider trait

* chore: cleanup imports

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-08-26 18:11:49 +01:00 committed by GitHub
parent 3603f219fb
commit e3e801d29a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 167 additions and 96 deletions

View File

@ -178,6 +178,15 @@ pub fn make_stream_split(input: LogicalPlan, split_expr: Expr) -> LogicalPlan {
LogicalPlan::Extension { node } LogicalPlan::Extension { node }
} }
/// A type that can provide `IOxExecutionContext` for query
pub trait ExecutionContextProvider {
/// Returns a new execution context suitable for running queries
fn new_query_context(
self: &Arc<Self>,
span_ctx: Option<trace::ctx::SpanContext>,
) -> IOxExecutionContext;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use arrow::{ use arrow::{

View File

@ -1,8 +1,9 @@
//! This module provides a reference implementaton of //! This module provides a reference implementation of
//! [`QueryDatabase`] for use in testing. //! [`QueryDatabase`] for use in testing.
//! //!
//! AKA it is a Mock //! AKA it is a Mock
use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext};
use crate::{ use crate::{
exec::stringset::{StringSet, StringSetRef}, exec::stringset::{StringSet, StringSetRef},
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
@ -27,9 +28,11 @@ use parking_lot::Mutex;
use snafu::Snafu; use snafu::Snafu;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::{collections::BTreeMap, fmt, sync::Arc}; use std::{collections::BTreeMap, fmt, sync::Arc};
use trace::ctx::SpanContext;
#[derive(Debug, Default)] #[derive(Debug)]
pub struct TestDatabase { pub struct TestDatabase {
executor: Arc<Executor>,
/// Partitions which have been saved to this test database /// Partitions which have been saved to this test database
/// Key is partition name /// Key is partition name
/// Value is map of chunk_id to chunk /// Value is map of chunk_id to chunk
@ -53,8 +56,12 @@ pub enum TestError {
pub type Result<T, E = TestError> = std::result::Result<T, E>; pub type Result<T, E = TestError> = std::result::Result<T, E>;
impl TestDatabase { impl TestDatabase {
pub fn new() -> Self { pub fn new(executor: Arc<Executor>) -> Self {
Self::default() Self {
executor,
partitions: Default::default(),
column_names: Default::default(),
}
} }
/// Add a test chunk to the database /// Add a test chunk to the database
@ -127,6 +134,16 @@ impl QueryDatabase for TestDatabase {
} }
} }
impl ExecutionContextProvider for TestDatabase {
fn new_query_context(self: &Arc<Self>, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
// Note: unlike Db this does not register a catalog provider
self.executor
.new_execution_config(ExecutorType::Query)
.with_span_context(span_ctx)
.build()
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TestChunk { pub struct TestChunk {
/// Table name /// Table name

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use datafusion::logical_plan::{col, lit}; use datafusion::logical_plan::{col, lit};
use query::{ use query::{
exec::{stringset::StringSet, ExecutorType}, exec::{stringset::StringSet, ExecutionContextProvider, ExecutorType},
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
predicate::PredicateBuilder, predicate::PredicateBuilder,
QueryChunk, QueryChunk,

View File

@ -6,7 +6,7 @@
use super::scenarios::*; use super::scenarios::*;
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq; use arrow_util::assert_batches_sorted_eq;
use query::frontend::sql::SqlQueryPlanner; use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
/// runs table_names(predicate) and compares it to the expected /// runs table_names(predicate) and compares it to the expected
/// output /// output

View File

@ -38,8 +38,11 @@ use parquet_file::{
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
}; };
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use query::exec::{ExecutorType, IOxExecutionContext}; use query::{
use query::{exec::Executor, predicate::Predicate, QueryDatabase}; exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
predicate::Predicate,
QueryDatabase,
};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{ use std::{
@ -480,20 +483,6 @@ impl Db {
Arc::clone(&self.exec) Arc::clone(&self.exec)
} }
/// Returns a new execution context suitable for running queries
///
/// Registers `self` as the default catalog provider
pub fn new_query_context(
self: &Arc<Self>,
span_ctx: Option<SpanContext>,
) -> IOxExecutionContext {
self.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::<Self>::clone(self))
.with_span_context(span_ctx)
.build()
}
/// Return the current database rules /// Return the current database rules
pub fn rules(&self) -> Arc<DatabaseRules> { pub fn rules(&self) -> Arc<DatabaseRules> {
Arc::clone(&*self.rules.read()) Arc::clone(&*self.rules.read())
@ -1410,6 +1399,16 @@ impl QueryDatabase for Db {
} }
} }
impl ExecutionContextProvider for Db {
fn new_query_context(self: &Arc<Self>, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
self.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::<Self>::clone(self))
.with_span_context(span_ctx)
.build()
}
}
/// Convenience implementation of `CatalogProvider` so the rest of the /// Convenience implementation of `CatalogProvider` so the rest of the
/// code can use Db as a `CatalogProvider` (e.g. for running /// code can use Db as a `CatalogProvider` (e.g. for running
/// SQL). even though the implementation lives in `catalog_access` /// SQL). even though the implementation lives in `catalog_access`

View File

@ -431,7 +431,7 @@ mod tests {
checkpoint::{PartitionCheckpoint, PersistCheckpointBuilder, ReplayPlanner}, checkpoint::{PartitionCheckpoint, PersistCheckpointBuilder, ReplayPlanner},
min_max_sequence::OptionalMinMaxSequence, min_max_sequence::OptionalMinMaxSequence,
}; };
use query::frontend::sql::SqlQueryPlanner; use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture}; use test_helpers::{assert_contains, assert_not_contains, tracing::TracingCapture};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;

View File

@ -88,7 +88,6 @@ use lifecycle::LockableChunk;
use metrics::{KeyValue, MetricObserverBuilder}; use metrics::{KeyValue, MetricObserverBuilder};
use observability_deps::tracing::{error, info, warn}; use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use query::exec::Executor;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use resolver::Resolver; use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
@ -217,7 +216,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[async_trait] #[async_trait]
pub trait DatabaseStore: std::fmt::Debug + Send + Sync { pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore /// The type of database that is stored by this DatabaseStore
type Database: query::QueryDatabase; type Database: query::QueryDatabase + query::exec::ExecutionContextProvider;
/// The type of error this DataBase store generates /// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static; type Error: std::error::Error + Send + Sync + 'static;
@ -232,10 +231,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
/// Retrieve the database specified by `name`, creating it if it /// Retrieve the database specified by `name`, creating it if it
/// doesn't exist. /// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>; async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
/// Provide a query executor to use for running queries on
/// databases in this `DatabaseStore`
fn executor(&self) -> Arc<Executor>;
} }
/// A collection of metrics used to instrument the Server. /// A collection of metrics used to instrument the Server.
@ -1226,11 +1221,6 @@ where
Ok(db) Ok(db)
} }
/// Return a handle to the query executor
fn executor(&self) -> Arc<Executor> {
Arc::clone(self.shared.application.executor())
}
} }
#[cfg(test)] #[cfg(test)]
@ -1254,7 +1244,7 @@ mod tests {
use metrics::TestMetricRegistry; use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{frontend::sql::SqlQueryPlanner, QueryDatabase}; use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
use std::{ use std::{
convert::{Infallible, TryFrom}, convert::{Infallible, TryFrom},
sync::{ sync::{

View File

@ -354,6 +354,7 @@ mod tests {
use data_types::{database_rules::DatabaseRules, DatabaseName}; use data_types::{database_rules::DatabaseRules, DatabaseName};
use influxdb_iox_client::connection::Connection; use influxdb_iox_client::connection::Connection;
use std::convert::TryInto; use std::convert::TryInto;
use std::num::NonZeroU64;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use trace::span::Span; use trace::span::Span;
@ -663,11 +664,16 @@ mod tests {
let (addr, server, join) = tracing_server(&collector).await; let (addr, server, join) = tracing_server(&collector).await;
let conn = jaeger_client(addr, "34f8495:35e32:0:1").await; let conn = jaeger_client(addr, "34f8495:35e32:0:1").await;
let db_info = influxdb_storage_client::OrgAndBucket::new(
NonZeroU64::new(12).unwrap(),
NonZeroU64::new(344).unwrap(),
);
let mut management = influxdb_iox_client::management::Client::new(conn.clone()); let mut management = influxdb_iox_client::management::Client::new(conn.clone());
management management
.create_database( .create_database(
influxdb_iox_client::management::generated_types::DatabaseRules { influxdb_iox_client::management::generated_types::DatabaseRules {
name: "database".to_string(), name: db_info.db_name().to_string(),
..Default::default() ..Default::default()
}, },
) )
@ -676,13 +682,24 @@ mod tests {
let mut write = influxdb_iox_client::write::Client::new(conn.clone()); let mut write = influxdb_iox_client::write::Client::new(conn.clone());
write write
.write("database", "cpu,tag0=foo val=1 100\n") .write(db_info.db_name(), "cpu,tag0=foo val=1 100\n")
.await .await
.unwrap(); .unwrap();
let mut flight = influxdb_iox_client::flight::Client::new(conn); let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
flight flight
.perform_query("database", "select * from cpu;") .perform_query(db_info.db_name(), "select * from cpu;")
.await
.unwrap();
let mut storage = influxdb_storage_client::Client::new(conn);
storage
.tag_values(influxdb_storage_client::generated_types::TagValuesRequest {
tags_source: Some(influxdb_storage_client::Client::read_source(&db_info, 1)),
range: None,
predicate: None,
tag_key: "tag0".into(),
})
.await .await
.unwrap(); .unwrap();
@ -693,9 +710,7 @@ mod tests {
let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect(); let root_spans: Vec<_> = spans.iter().filter(|span| &span.name == "IOx").collect();
// Made 3 requests // Made 3 requests
assert_eq!(root_spans.len(), 3); assert_eq!(root_spans.len(), 4);
let query_span = root_spans[2];
let child = |parent: &Span, name: &'static str| -> Option<&Span> { let child = |parent: &Span, name: &'static str| -> Option<&Span> {
spans.iter().find(|span| { spans.iter().find(|span| {
@ -703,14 +718,24 @@ mod tests {
}) })
}; };
let ctx_span = child(query_span, "Query Execution").unwrap(); // Test SQL
let sql_span = root_spans[2];
let ctx_span = child(sql_span, "Query Execution").unwrap();
let planner_span = child(ctx_span, "Planner").unwrap(); let planner_span = child(ctx_span, "Planner").unwrap();
let sql_span = child(planner_span, "sql").unwrap(); let sql_span = child(planner_span, "sql").unwrap();
let prepare_sql_span = child(sql_span, "prepare_sql").unwrap(); let prepare_sql_span = child(sql_span, "prepare_sql").unwrap();
child(prepare_sql_span, "prepare_plan").unwrap(); child(prepare_sql_span, "prepare_plan").unwrap();
child(ctx_span, "collect").unwrap(); child(ctx_span, "collect").unwrap();
// Test tag_values
let storage_span = root_spans[3];
let ctx_span = child(storage_span, "Query Execution").unwrap();
child(ctx_span, "Planner").unwrap();
let to_string_set = child(ctx_span, "to_string_set").unwrap();
child(to_string_set, "run_logical_plans").unwrap();
} }
#[tokio::test] #[tokio::test]

View File

@ -24,7 +24,7 @@ use data_types::{
}; };
use influxdb_iox_client::format::QueryOutputFormat; use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use query::QueryDatabase; use query::{exec::ExecutionContextProvider, QueryDatabase};
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer}; use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
// External crates // External crates

View File

@ -1,12 +1,7 @@
//! Implements the native gRPC IOx query API using Arrow Flight //! Implements the native gRPC IOx query API using Arrow Flight
use std::fmt::Debug;
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use futures::Stream;
use observability_deps::tracing::{info, warn};
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use tonic::{Request, Response, Streaming};
use arrow::{ use arrow::{
array::{make_array, ArrayRef, MutableArrayData}, array::{make_array, ArrayRef, MutableArrayData},
datatypes::{DataType, Field, Schema, SchemaRef}, datatypes::{DataType, Field, Schema, SchemaRef},
@ -18,12 +13,19 @@ use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
}; };
use futures::Stream;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use tonic::{Request, Response, Streaming};
use data_types::{DatabaseName, DatabaseNameError}; use data_types::{DatabaseName, DatabaseNameError};
use observability_deps::tracing::{info, warn};
use query::exec::ExecutionContextProvider;
use server::{ConnectionManager, Server}; use server::{ConnectionManager, Server};
use std::fmt::Debug;
use crate::influxdb_ioxd::rpc::error::default_server_error_handler;
use super::super::planner::Planner; use super::super::planner::Planner;
use crate::influxdb_ioxd::rpc::error::default_server_error_handler;
#[allow(clippy::enum_variant_names)] #[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -362,15 +364,18 @@ fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef, Error> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use std::sync::Arc;
use arrow::array::StringArray; use arrow::array::StringArray;
use arrow::{ use arrow::{
array::{DictionaryArray, UInt32Array}, array::{DictionaryArray, UInt32Array},
datatypes::{DataType, Int32Type}, datatypes::{DataType, Int32Type},
}; };
use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::utils::flight_data_to_arrow_batch;
use datafusion::physical_plan::limit::truncate_batch; use datafusion::physical_plan::limit::truncate_batch;
use std::sync::Arc;
use super::*;
#[test] #[test]
fn test_deep_clone_array() { fn test_deep_clone_array() {

View File

@ -2,7 +2,7 @@
//! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and //! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and
//! [`DatabaseStore`] //! [`DatabaseStore`]
use std::{collections::HashMap, sync::Arc}; use std::collections::HashMap;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -21,7 +21,7 @@ use generated_types::{
use metrics::KeyValue; use metrics::KeyValue;
use observability_deps::tracing::{error, info}; use observability_deps::tracing::{error, info};
use query::{ use query::{
exec::{fieldlist::FieldList, seriesset::Error as SeriesSetError, ExecutorType}, exec::{fieldlist::FieldList, seriesset::Error as SeriesSetError, ExecutionContextProvider},
predicate::PredicateBuilder, predicate::PredicateBuilder,
}; };
use server::DatabaseStore; use server::DatabaseStore;
@ -38,6 +38,7 @@ use crate::influxdb_ioxd::{
StorageService, StorageService,
}, },
}; };
use trace::ctx::SpanContext;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
@ -223,6 +224,7 @@ where
&self, &self,
req: tonic::Request<ReadFilterRequest>, req: tonic::Request<ReadFilterRequest>,
) -> Result<tonic::Response<Self::ReadFilterStream>, Status> { ) -> Result<tonic::Response<Self::ReadFilterStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_filter_request = req.into_inner(); let read_filter_request = req.into_inner();
let db_name = get_database_name(&read_filter_request)?; let db_name = get_database_name(&read_filter_request)?;
@ -241,7 +243,7 @@ where
KeyValue::new("db_name", db_name.to_string()), KeyValue::new("db_name", db_name.to_string()),
]; ];
let results = read_filter_impl(Arc::clone(&self.db_store), db_name, range, predicate) let results = read_filter_impl(self.db_store.as_ref(), db_name, range, predicate, span_ctx)
.await .await
.map_err(|e| { .map_err(|e| {
if e.is_internal() { if e.is_internal() {
@ -265,6 +267,7 @@ where
&self, &self,
req: tonic::Request<ReadGroupRequest>, req: tonic::Request<ReadGroupRequest>,
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> { ) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_group_request = req.into_inner(); let read_group_request = req.into_inner();
let db_name = get_database_name(&read_group_request)?; let db_name = get_database_name(&read_group_request)?;
@ -314,11 +317,12 @@ where
})?; })?;
let results = query_group_impl( let results = query_group_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
range, range,
predicate, predicate,
gby_agg, gby_agg,
span_ctx,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -344,6 +348,7 @@ where
&self, &self,
req: tonic::Request<ReadWindowAggregateRequest>, req: tonic::Request<ReadWindowAggregateRequest>,
) -> Result<tonic::Response<Self::ReadGroupStream>, Status> { ) -> Result<tonic::Response<Self::ReadGroupStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let read_window_aggregate_request = req.into_inner(); let read_window_aggregate_request = req.into_inner();
let db_name = get_database_name(&read_window_aggregate_request)?; let db_name = get_database_name(&read_window_aggregate_request)?;
@ -379,11 +384,12 @@ where
})?; })?;
let results = query_group_impl( let results = query_group_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
range, range,
predicate, predicate,
gby_agg, gby_agg,
span_ctx,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -408,6 +414,7 @@ where
&self, &self,
req: tonic::Request<TagKeysRequest>, req: tonic::Request<TagKeysRequest>,
) -> Result<tonic::Response<Self::TagKeysStream>, Status> { ) -> Result<tonic::Response<Self::TagKeysStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let tag_keys_request = req.into_inner(); let tag_keys_request = req.into_inner();
@ -431,11 +438,12 @@ where
let measurement = None; let measurement = None;
let response = tag_keys_impl( let response = tag_keys_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
measurement, measurement,
range, range,
predicate, predicate,
span_ctx,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -461,6 +469,7 @@ where
&self, &self,
req: tonic::Request<TagValuesRequest>, req: tonic::Request<TagValuesRequest>,
) -> Result<tonic::Response<Self::TagValuesStream>, Status> { ) -> Result<tonic::Response<Self::TagValuesStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let tag_values_request = req.into_inner(); let tag_values_request = req.into_inner();
@ -495,7 +504,7 @@ where
.to_status()); .to_status());
} }
measurement_name_impl(Arc::clone(&self.db_store), db_name, range) measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx)
.await .await
.map_err(|e| { .map_err(|e| {
if e.is_internal() { if e.is_internal() {
@ -508,17 +517,23 @@ where
} else if tag_key.is_field() { } else if tag_key.is_field() {
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)"); info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)");
let fieldlist = let fieldlist = field_names_impl(
field_names_impl(Arc::clone(&self.db_store), db_name, None, range, predicate) self.db_store.as_ref(),
.await db_name,
.map_err(|e| { None,
if e.is_internal() { range,
ob.error_with_labels(labels); predicate,
} else { span_ctx,
ob.client_error_with_labels(labels); )
} .await
e .map_err(|e| {
})?; if e.is_internal() {
ob.error_with_labels(labels);
} else {
ob.client_error_with_labels(labels);
}
e
})?;
// Pick out the field names into a Vec<Vec<u8>>for return // Pick out the field names into a Vec<Vec<u8>>for return
let values = fieldlist let values = fieldlist
@ -539,12 +554,13 @@ where
info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",); info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",);
tag_values_impl( tag_values_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
tag_key, tag_key,
measurement, measurement,
range, range,
predicate, predicate,
span_ctx,
) )
.await .await
}; };
@ -610,6 +626,7 @@ where
&self, &self,
req: tonic::Request<MeasurementNamesRequest>, req: tonic::Request<MeasurementNamesRequest>,
) -> Result<tonic::Response<Self::MeasurementNamesStream>, Status> { ) -> Result<tonic::Response<Self::MeasurementNamesStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let measurement_names_request = req.into_inner(); let measurement_names_request = req.into_inner();
@ -641,7 +658,7 @@ where
KeyValue::new("db_name", db_name.to_string()), KeyValue::new("db_name", db_name.to_string()),
]; ];
let response = measurement_name_impl(Arc::clone(&self.db_store), db_name, range) let response = measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx)
.await .await
.map_err(|e| { .map_err(|e| {
if e.is_internal() { if e.is_internal() {
@ -666,6 +683,7 @@ where
&self, &self,
req: tonic::Request<MeasurementTagKeysRequest>, req: tonic::Request<MeasurementTagKeysRequest>,
) -> Result<tonic::Response<Self::MeasurementTagKeysStream>, Status> { ) -> Result<tonic::Response<Self::MeasurementTagKeysStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let measurement_tag_keys_request = req.into_inner(); let measurement_tag_keys_request = req.into_inner();
@ -690,11 +708,12 @@ where
let measurement = Some(measurement); let measurement = Some(measurement);
let response = tag_keys_impl( let response = tag_keys_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
measurement, measurement,
range, range,
predicate, predicate,
span_ctx,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -720,6 +739,7 @@ where
&self, &self,
req: tonic::Request<MeasurementTagValuesRequest>, req: tonic::Request<MeasurementTagValuesRequest>,
) -> Result<tonic::Response<Self::MeasurementTagValuesStream>, Status> { ) -> Result<tonic::Response<Self::MeasurementTagValuesStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let measurement_tag_values_request = req.into_inner(); let measurement_tag_values_request = req.into_inner();
@ -745,12 +765,13 @@ where
let measurement = Some(measurement); let measurement = Some(measurement);
let response = tag_values_impl( let response = tag_values_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
tag_key, tag_key,
measurement, measurement,
range, range,
predicate, predicate,
span_ctx,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@ -776,6 +797,7 @@ where
&self, &self,
req: tonic::Request<MeasurementFieldsRequest>, req: tonic::Request<MeasurementFieldsRequest>,
) -> Result<tonic::Response<Self::MeasurementFieldsStream>, Status> { ) -> Result<tonic::Response<Self::MeasurementFieldsStream>, Status> {
let span_ctx = req.extensions().get().cloned();
let (tx, rx) = mpsc::channel(4); let (tx, rx) = mpsc::channel(4);
let measurement_fields_request = req.into_inner(); let measurement_fields_request = req.into_inner();
@ -800,11 +822,12 @@ where
let measurement = Some(measurement); let measurement = Some(measurement);
let response = field_names_impl( let response = field_names_impl(
Arc::clone(&self.db_store), self.db_store.as_ref(),
db_name, db_name,
measurement, measurement,
range, range,
predicate, predicate,
span_ctx,
) )
.await .await
.map(|fieldlist| { .map(|fieldlist| {
@ -857,9 +880,10 @@ fn get_database_name(input: &impl GrpcInputs) -> Result<DatabaseName<'static>, S
/// Gathers all measurement names that have data in the specified /// Gathers all measurement names that have data in the specified
/// (optional) range /// (optional) range
async fn measurement_name_impl<T>( async fn measurement_name_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse> ) -> Result<StringValuesResponse>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -868,7 +892,7 @@ where
let db_name = db_name.as_ref(); let db_name = db_name.as_ref();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
let plan = Planner::new(&ctx) let plan = Planner::new(&ctx)
.table_names(db, predicate) .table_names(db, predicate)
@ -894,11 +918,12 @@ where
/// Return tag keys with optional measurement, timestamp and arbitratry /// Return tag keys with optional measurement, timestamp and arbitratry
/// predicates /// predicates
async fn tag_keys_impl<T>( async fn tag_keys_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
measurement: Option<String>, measurement: Option<String>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>, rpc_predicate: Option<Predicate>,
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse> ) -> Result<StringValuesResponse>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -918,7 +943,7 @@ where
db_name: db_name.as_str(), db_name: db_name.as_str(),
})?; })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
let tag_key_plan = Planner::new(&ctx) let tag_key_plan = Planner::new(&ctx)
.tag_keys(db, predicate) .tag_keys(db, predicate)
@ -948,12 +973,13 @@ where
/// Return tag values for tag_name, with optional measurement, timestamp and /// Return tag values for tag_name, with optional measurement, timestamp and
/// arbitratry predicates /// arbitratry predicates
async fn tag_values_impl<T>( async fn tag_values_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
tag_name: String, tag_name: String,
measurement: Option<String>, measurement: Option<String>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>, rpc_predicate: Option<Predicate>,
span_ctx: Option<SpanContext>,
) -> Result<StringValuesResponse> ) -> Result<StringValuesResponse>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -973,7 +999,7 @@ where
let tag_name = &tag_name; let tag_name = &tag_name;
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
let tag_value_plan = Planner::new(&ctx) let tag_value_plan = Planner::new(&ctx)
.tag_values(db, tag_name, predicate) .tag_values(db, tag_name, predicate)
@ -1001,11 +1027,12 @@ where
} }
/// Launch async tasks that materialises the result of executing read_filter. /// Launch async tasks that materialises the result of executing read_filter.
async fn read_filter_impl<'a, T>( async fn read_filter_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>, rpc_predicate: Option<Predicate>,
span_ctx: Option<SpanContext>,
) -> Result<Vec<ReadResponse>, Error> ) -> Result<Vec<ReadResponse>, Error>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -1026,7 +1053,7 @@ where
let db_name = owned_db_name.as_str(); let db_name = owned_db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
// PERF - This used to send responses to the client before execution had // PERF - This used to send responses to the client before execution had
// completed, but now it doesn't. We may need to revisit this in the future // completed, but now it doesn't. We may need to revisit this in the future
@ -1058,11 +1085,12 @@ where
/// Launch async tasks that send the result of executing read_group to `tx` /// Launch async tasks that send the result of executing read_group to `tx`
async fn query_group_impl<T>( async fn query_group_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>, rpc_predicate: Option<Predicate>,
gby_agg: GroupByAndAggregate, gby_agg: GroupByAndAggregate,
span_ctx: Option<SpanContext>,
) -> Result<Vec<ReadResponse>, Error> ) -> Result<Vec<ReadResponse>, Error>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -1083,7 +1111,7 @@ where
let db_name = owned_db_name.as_str(); let db_name = owned_db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
let planner = Planner::new(&ctx); let planner = Planner::new(&ctx);
let grouped_series_set_plan = match gby_agg { let grouped_series_set_plan = match gby_agg {
@ -1124,11 +1152,12 @@ where
/// Return field names, restricted via optional measurement, timestamp and /// Return field names, restricted via optional measurement, timestamp and
/// predicate /// predicate
async fn field_names_impl<T>( async fn field_names_impl<T>(
db_store: Arc<T>, db_store: &T,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
measurement: Option<String>, measurement: Option<String>,
range: Option<TimestampRange>, range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>, rpc_predicate: Option<Predicate>,
span_ctx: Option<SpanContext>,
) -> Result<FieldList> ) -> Result<FieldList>
where where
T: DatabaseStore + 'static, T: DatabaseStore + 'static,
@ -1146,7 +1175,7 @@ where
let db_name = db_name.as_str(); let db_name = db_name.as_str();
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?; let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db_store.executor().new_context(ExecutorType::Query); let ctx = db.new_query_context(span_ctx);
let field_list_plan = Planner::new(&ctx) let field_list_plan = Planner::new(&ctx)
.field_columns(db, predicate) .field_columns(db, predicate)
@ -1165,10 +1194,11 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::num::NonZeroU64;
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
num::NonZeroU64,
sync::Arc,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
@ -2487,14 +2517,10 @@ mod tests {
if let Some(db) = databases.get(name) { if let Some(db) = databases.get(name) {
Ok(Arc::clone(db)) Ok(Arc::clone(db))
} else { } else {
let new_db = Arc::new(TestDatabase::new()); let new_db = Arc::new(TestDatabase::new(Arc::clone(&self.executor)));
databases.insert(name.to_string(), Arc::clone(&new_db)); databases.insert(name.to_string(), Arc::clone(&new_db));
Ok(new_db) Ok(new_db)
} }
} }
fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.executor)
}
} }
} }