refactor: associated QueryExec return type

Allow the return type of the QueryExec trait's query_exec() method to be
parametrised by the implementer.

This allows the trait to be reused across different data sources that
return differing concrete types.
pull/24376/head
Dom Dwyer 2022-11-25 21:40:43 +01:00
parent b28bba51f8
commit de6f0468d8
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
7 changed files with 32 additions and 15 deletions

View File

@ -17,13 +17,15 @@ impl QueryRunner {
#[async_trait] #[async_trait]
impl QueryExec for QueryRunner { impl QueryExec for QueryRunner {
type Response = QueryResponse;
async fn query_exec( async fn query_exec(
&self, &self,
namespace_id: NamespaceId, namespace_id: NamespaceId,
table_id: TableId, table_id: TableId,
columns: Vec<String>, columns: Vec<String>,
span: Option<Span>, span: Option<Span>,
) -> Result<QueryResponse, QueryError> { ) -> Result<Self::Response, QueryError> {
let mut _span_recorder = SpanRecorder::new(span); let mut _span_recorder = SpanRecorder::new(span);
info!( info!(

View File

@ -4,7 +4,7 @@ use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric}; use metric::{DurationHistogram, Metric};
use trace::span::Span; use trace::span::Span;
use super::{response::QueryResponse, QueryExec}; use super::QueryExec;
use crate::query::QueryError; use crate::query::QueryError;
/// An instrumentation decorator over a [`QueryExec`] implementation. /// An instrumentation decorator over a [`QueryExec`] implementation.
@ -49,6 +49,8 @@ where
T: QueryExec, T: QueryExec,
P: TimeProvider, P: TimeProvider,
{ {
type Response = T::Response;
#[inline(always)] #[inline(always)]
async fn query_exec( async fn query_exec(
&self, &self,
@ -56,7 +58,7 @@ where
table_id: TableId, table_id: TableId,
columns: Vec<String>, columns: Vec<String>,
span: Option<Span>, span: Option<Span>,
) -> Result<QueryResponse, QueryError> { ) -> Result<Self::Response, QueryError> {
let t = self.time_provider.now(); let t = self.time_provider.now();
let res = self let res = self
@ -83,7 +85,10 @@ mod tests {
use metric::Attributes; use metric::Attributes;
use super::*; use super::*;
use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream}; use crate::query::{
mock_query_exec::MockQueryExec,
response::{PartitionStream, QueryResponse},
};
macro_rules! test_metric { macro_rules! test_metric {
( (

View File

@ -19,13 +19,15 @@ impl MockQueryExec {
#[async_trait] #[async_trait]
impl QueryExec for MockQueryExec { impl QueryExec for MockQueryExec {
type Response = QueryResponse;
async fn query_exec( async fn query_exec(
&self, &self,
_namespace_id: NamespaceId, _namespace_id: NamespaceId,
_table_id: TableId, _table_id: TableId,
_columns: Vec<String>, _columns: Vec<String>,
_span: Option<Span>, _span: Option<Span>,
) -> Result<QueryResponse, QueryError> { ) -> Result<Self::Response, QueryError> {
self.response self.response
.lock() .lock()
.take() .take()

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
use trace::span::{Span, SpanRecorder}; use trace::span::{Span, SpanRecorder};
use super::{response::QueryResponse, QueryExec}; use super::QueryExec;
use crate::query::QueryError; use crate::query::QueryError;
/// An tracing decorator over a [`QueryExec`] implementation. /// An tracing decorator over a [`QueryExec`] implementation.
@ -33,6 +33,8 @@ impl<T> QueryExec for QueryExecTracing<T>
where where
T: QueryExec, T: QueryExec,
{ {
type Response = T::Response;
#[inline(always)] #[inline(always)]
async fn query_exec( async fn query_exec(
&self, &self,
@ -40,7 +42,7 @@ where
table_id: TableId, table_id: TableId,
columns: Vec<String>, columns: Vec<String>,
span: Option<Span>, span: Option<Span>,
) -> Result<QueryResponse, QueryError> { ) -> Result<Self::Response, QueryError> {
let span = span.map(|s| s.child(self.name.clone())); let span = span.map(|s| s.child(self.name.clone()));
let mut recorder = SpanRecorder::new(span.clone()); let mut recorder = SpanRecorder::new(span.clone());

View File

@ -5,8 +5,6 @@ use data_types::{NamespaceId, TableId};
use thiserror::Error; use thiserror::Error;
use trace::span::Span; use trace::span::Span;
use super::response::QueryResponse;
#[derive(Debug, Error)] #[derive(Debug, Error)]
#[allow(missing_copy_implementations)] #[allow(missing_copy_implementations)]
pub(crate) enum QueryError { pub(crate) enum QueryError {
@ -19,13 +17,15 @@ pub(crate) enum QueryError {
#[async_trait] #[async_trait]
pub(crate) trait QueryExec: Send + Sync + Debug { pub(crate) trait QueryExec: Send + Sync + Debug {
type Response: Send + Debug;
async fn query_exec( async fn query_exec(
&self, &self,
namespace_id: NamespaceId, namespace_id: NamespaceId,
table_id: TableId, table_id: TableId,
columns: Vec<String>, columns: Vec<String>,
span: Option<Span>, span: Option<Span>,
) -> Result<QueryResponse, QueryError>; ) -> Result<Self::Response, QueryError>;
} }
#[async_trait] #[async_trait]
@ -33,13 +33,15 @@ impl<T> QueryExec for Arc<T>
where where
T: QueryExec, T: QueryExec,
{ {
type Response = T::Response;
async fn query_exec( async fn query_exec(
&self, &self,
namespace_id: NamespaceId, namespace_id: NamespaceId,
table_id: TableId, table_id: TableId,
columns: Vec<String>, columns: Vec<String>,
span: Option<Span>, span: Option<Span>,
) -> Result<QueryResponse, QueryError> { ) -> Result<Self::Response, QueryError> {
self.deref() self.deref()
.query_exec(namespace_id, table_id, columns, span) .query_exec(namespace_id, table_id, columns, span)
.await .await

View File

@ -13,7 +13,11 @@ use generated_types::influxdata::iox::{
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService; use service_grpc_catalog::CatalogService;
use crate::{dml_sink::DmlSink, init::IngesterRpcInterface, query::QueryExec}; use crate::{
dml_sink::DmlSink,
init::IngesterRpcInterface,
query::{response::QueryResponse, QueryExec},
};
use self::rpc_write::RpcWrite; use self::rpc_write::RpcWrite;
@ -31,7 +35,7 @@ pub(crate) struct GrpcDelegate<D, Q> {
impl<D, Q> GrpcDelegate<D, Q> impl<D, Q> GrpcDelegate<D, Q>
where where
D: DmlSink + 'static, D: DmlSink + 'static,
Q: QueryExec + 'static, Q: QueryExec<Response = QueryResponse> + 'static,
{ {
/// Initialise a new [`GrpcDelegate`]. /// Initialise a new [`GrpcDelegate`].
pub(crate) fn new(dml_sink: Arc<D>, query_exec: Arc<Q>) -> Self { pub(crate) fn new(dml_sink: Arc<D>, query_exec: Arc<Q>) -> Self {
@ -47,7 +51,7 @@ where
impl<D, Q> IngesterRpcInterface for GrpcDelegate<D, Q> impl<D, Q> IngesterRpcInterface for GrpcDelegate<D, Q>
where where
D: DmlSink + 'static, D: DmlSink + 'static,
Q: QueryExec + 'static, Q: QueryExec<Response = QueryResponse> + 'static,
{ {
type CatalogHandler = CatalogService; type CatalogHandler = CatalogService;
type WriteHandler = RpcWrite<Arc<D>>; type WriteHandler = RpcWrite<Arc<D>>;

View File

@ -133,7 +133,7 @@ type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send
#[tonic::async_trait] #[tonic::async_trait]
impl<Q> Flight for FlightService<Q> impl<Q> Flight for FlightService<Q>
where where
Q: QueryExec + 'static, Q: QueryExec<Response = QueryResponse> + 'static,
{ {
type HandshakeStream = TonicStream<HandshakeResponse>; type HandshakeStream = TonicStream<HandshakeResponse>;
type ListFlightsStream = TonicStream<FlightInfo>; type ListFlightsStream = TonicStream<FlightInfo>;