refactor: enable gRPC handler

Plumbs the gRPC write handler into the existing router2 server.
pull/24376/head
Dom Dwyer 2022-02-23 15:42:59 +00:00
parent 26c43f0a2c
commit 7c5ba34d44
6 changed files with 52 additions and 17 deletions

View File

@ -21,7 +21,7 @@ use router2::{
},
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::{http::HttpDelegate, RouterServer},
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
};
use thiserror::Error;
@ -198,17 +198,16 @@ pub async fn command(config: Config) -> Result<()> {
let handler_stack =
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
// Initialise the API delegates, sharing the handler stack between them.
let handler_stack = Arc::new(handler_stack);
let http = HttpDelegate::new(
config.run_config.max_http_request_size,
handler_stack,
Arc::clone(&handler_stack),
&metrics,
);
let router_server = RouterServer::new(
http,
Default::default(),
metrics,
common_state.trace_collector(),
);
let grpc = GrpcDelegate::new(handler_stack, Arc::clone(&metrics));
let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector());
let server_type = Arc::new(RouterServerType::new(router_server, &common_state));
info!("starting router2");

View File

@ -68,7 +68,7 @@ fn e2e_benchmarks(c: &mut Criterion) {
let handler_stack =
schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer)));
HttpDelegate::new(1024, handler_stack, &metrics)
HttpDelegate::new(1024, Arc::new(handler_stack), &metrics)
};
let body_str = "platanos,tag1=A,tag2=B val=42i 123456";

View File

@ -1,8 +1,10 @@
use std::{collections::VecDeque, fmt::Debug};
use super::{DmlError, DmlHandler};
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use parking_lot::Mutex;
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
use trace::ctx::SpanContext;
/// A captured call to a [`MockDmlHandler`], generic over `W`, the captured
@ -84,7 +86,7 @@ macro_rules! record_and_return {
}
#[async_trait]
impl<W> DmlHandler for Arc<MockDmlHandler<W>>
impl<W> DmlHandler for MockDmlHandler<W>
where
W: Debug + Send + Sync,
{

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use data_types2::{DatabaseName, DeletePredicate};
use std::{error::Error, fmt::Debug};
use std::{error::Error, fmt::Debug, sync::Arc};
use thiserror::Error;
use trace::ctx::SpanContext;
@ -75,3 +75,36 @@ pub trait DmlHandler: Debug + Send + Sync {
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError>;
}
#[async_trait]
impl<T> DmlHandler for Arc<T>
where
T: DmlHandler,
{
type WriteInput = T::WriteInput;
type WriteOutput = T::WriteOutput;
type WriteError = T::WriteError;
type DeleteError = T::DeleteError;
async fn write(
&self,
namespace: &DatabaseName<'static>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
(**self).write(namespace, input, span_ctx).await
}
/// Delete the data specified in `delete`.
async fn delete(
&self,
namespace: &DatabaseName<'static>,
table_name: &str,
predicate: &DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
(**self)
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}

View File

@ -1,6 +1,9 @@
//! HTTP service implementations for `router2`.
use std::{str::Utf8Error, sync::Arc};
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
use bytes::{Bytes, BytesMut};
use data_types2::{org_and_bucket_to_database, OrgBucketMappingError};
use futures::StreamExt;
@ -11,7 +14,6 @@ use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
use serde::Deserialize;
use std::str::Utf8Error;
use thiserror::Error;
use time::{SystemProvider, TimeProvider};
use trace::ctx::SpanContext;
@ -78,7 +80,6 @@ impl Error {
Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST,
Error::ParseDelete(_) => StatusCode::BAD_REQUEST,
Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE,
Error::DmlHandler(DmlError::Schema(_)) => StatusCode::BAD_REQUEST,
Error::InvalidContentEncoding(_) => {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
StatusCode::UNSUPPORTED_MEDIA_TYPE
@ -151,7 +152,7 @@ impl<T> TryFrom<&Request<T>> for OrgBucketInfo {
pub struct HttpDelegate<D, T = SystemProvider> {
max_request_bytes: usize,
time_provider: T,
dml_handler: D,
dml_handler: Arc<D>,
write_metric_lines: U64Counter,
write_metric_fields: U64Counter,
@ -166,7 +167,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
///
/// HTTP request bodies are limited to `max_request_bytes` in size,
/// returning an error if exceeded.
pub fn new(max_request_bytes: usize, dml_handler: D, metrics: &metric::Registry) -> Self {
pub fn new(max_request_bytes: usize, dml_handler: Arc<D>, metrics: &metric::Registry) -> Self {
let write_metric_lines = metrics
.register_metric::<U64Counter>(
"http_write_lines_total",

View File

@ -113,7 +113,7 @@ impl TestContext {
let handler_stack =
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
let delegate = HttpDelegate::new(1024, handler_stack, &metrics);
let delegate = HttpDelegate::new(1024, Arc::new(handler_stack), &metrics);
Self {
delegate,