Merge pull request #3446 from influxdata/dom/router2

feat: router2 skeleton
pull/24376/head
kodiakhq[bot] 2022-01-12 17:00:25 +00:00 committed by GitHub
commit ecdb0165d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 365 additions and 3 deletions

15
Cargo.lock generated
View File

@ -1629,6 +1629,7 @@ dependencies = [
"regex",
"reqwest",
"router",
"router2",
"rustyline",
"schema",
"serde",
@ -3560,6 +3561,20 @@ dependencies = [
"write_buffer",
]
[[package]]
name = "router2"
version = "0.1.0"
dependencies = [
"dml",
"generated_types",
"hyper",
"metric",
"thiserror",
"tonic",
"trace",
"workspace-hack",
]
[[package]]
name = "rusoto_core"
version = "0.47.0"

View File

@ -42,6 +42,7 @@ members = [
"query_tests",
"read_buffer",
"router",
"router2",
"schema",
"server",
"server_benchmarks",

View File

@ -32,6 +32,7 @@ predicate = { path = "../predicate" }
query = { path = "../query" }
read_buffer = { path = "../read_buffer" }
router = { path = "../router" }
router2 = { path = "../router2" }
server = { path = "../server" }
time = { path = "../time" }
trace = { path = "../trace" }

View File

@ -5,6 +5,7 @@ use crate::structopt_blocks::run_config::RunConfig;
pub mod database;
pub mod router;
pub mod router2;
pub mod test;
#[derive(Debug, Snafu)]
@ -16,6 +17,9 @@ pub enum Error {
#[snafu(display("Error in router subcommand: {}", source))]
RouterError { source: router::Error },
#[snafu(display("Error in router2 subcommand: {}", source))]
Router2Error { source: router2::Error },
#[snafu(display("Error in test subcommand: {}", source))]
TestError { source: test::Error },
}
@ -39,6 +43,7 @@ impl Config {
None => &self.database_config.run_config,
Some(Command::Database(config)) => &config.run_config,
Some(Command::Router(config)) => &config.run_config,
Some(Command::Router2(config)) => &config.run_config,
Some(Command::Test(config)) => &config.run_config,
}
}
@ -52,6 +57,9 @@ enum Command {
/// Run the server in routing mode
Router(router::Config),
/// Run the server in router2 mode
Router2(router2::Config),
/// Run the server in test mode
Test(test::Config),
}
@ -68,6 +76,7 @@ pub async fn command(config: Config) -> Result<()> {
}
Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu),
Some(Command::Test(config)) => test::command(config).await.context(TestSnafu),
}
}

View File

@ -0,0 +1,63 @@
//! Implementation of command line option for running router2
use std::sync::Arc;
use crate::{
influxdb_ioxd::{
self,
server_type::{
common_state::{CommonServerState, CommonServerStateError},
router2::RouterServerType,
},
},
structopt_blocks::run_config::RunConfig,
};
use observability_deps::tracing::*;
use router2::server::RouterServer;
use structopt::StructOpt;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] influxdb_ioxd::Error),
#[error("Cannot setup server: {0}")]
Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, StructOpt)]
#[structopt(
name = "run",
about = "Runs in router2 mode",
long_about = "Run the IOx router2 server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[structopt(flatten)]
pub(crate) run_config: RunConfig,
}
pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let router_server = RouterServer::default();
let server_type = Arc::new(RouterServerType::new(router_server, &common_state));
info!("starting router2");
Ok(influxdb_ioxd::main(common_state, server_type).await?)
}

View File

@ -338,7 +338,7 @@ pub trait HttpDrivenDml: ServerType {
/// - [`route_write_http_request`](Self::route_write_http_request)
///
/// Returns `RequestOrResponse::Response` if the request was routed,
/// Returns `RequestOrResponse::Response` if the request did not match (and needs to be handled some other way)
/// Returns `RequestOrResponse::Request` if the request did not match (and needs to be handled some other way)
async fn route_dml_http_request(
&self,
req: Request<Body>,

View File

@ -1,4 +1,5 @@
use hyper::{Body, Response, StatusCode};
use observability_deps::tracing::warn;
/// Constants used in API error codes.
///
@ -66,6 +67,30 @@ impl HttpApiErrorCode {
}
}
impl From<StatusCode> for HttpApiErrorCode {
fn from(s: StatusCode) -> Self {
match s {
StatusCode::INTERNAL_SERVER_ERROR => Self::InternalError,
StatusCode::NOT_FOUND => Self::NotFound,
StatusCode::CONFLICT => Self::Conflict,
StatusCode::BAD_REQUEST => Self::Invalid,
StatusCode::UNPROCESSABLE_ENTITY => Self::UnprocessableEntity,
StatusCode::NO_CONTENT => Self::EmptyValue,
StatusCode::SERVICE_UNAVAILABLE => Self::Unavailable,
StatusCode::FORBIDDEN => Self::Forbidden,
StatusCode::TOO_MANY_REQUESTS => Self::TooManyRequests,
StatusCode::UNAUTHORIZED => Self::Unauthorized,
StatusCode::METHOD_NOT_ALLOWED => Self::MethodNotAllowed,
StatusCode::PAYLOAD_TOO_LARGE => Self::RequestTooLarge,
StatusCode::UNSUPPORTED_MEDIA_TYPE => Self::UnsupportedMediaType,
v => {
warn!(code=%v, "returning unexpected status code as internal error");
Self::InternalError
}
}
}
}
/// Error that is compatible with the Influxdata Cloud 2 HTTP API.
///
/// See <https://docs.influxdata.com/influxdb/v2.1/api/#operation/PostWrite>.
@ -80,9 +105,9 @@ pub struct HttpApiError {
impl HttpApiError {
/// Create new error from code and message.
pub fn new(code: HttpApiErrorCode, msg: impl Into<String>) -> Self {
pub fn new(code: impl Into<HttpApiErrorCode>, msg: impl Into<String>) -> Self {
Self {
code,
code: code.into(),
msg: msg.into(),
}
}

View File

@ -11,6 +11,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput
pub mod common_state;
pub mod database;
pub mod router;
pub mod router2;
pub mod test;
#[derive(Debug, Snafu)]

View File

@ -0,0 +1,95 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use metric::Registry;
use router2::server::RouterServer;
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use crate::influxdb_ioxd::{
http::error::{HttpApiError, HttpApiErrorSource},
rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput},
server_type::{common_state::CommonServerState, RpcError, ServerType},
};
#[derive(Debug)]
pub struct RouterServerType {
server: RouterServer,
shutdown: CancellationToken,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl RouterServerType {
pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self {
Self {
server,
shutdown: CancellationToken::new(),
trace_collector: common_state.trace_collector(),
}
}
}
#[async_trait]
impl ServerType for RouterServerType {
type RouteError = IoxHttpErrorAdaptor;
/// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> {
self.server.metric_registry()
}
/// Returns the trace collector for router traces.
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
self.trace_collector.as_ref().map(Arc::clone)
}
/// Dispatches `req` to the router [`HttpDelegate`] delegate.
///
/// [`HttpDelegate`]: router2::server::http::HttpDelegate
async fn route_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Self::RouteError> {
self.server.http().route(req).map_err(IoxHttpErrorAdaptor)
}
/// Registers the services exposed by the router [`GrpcDelegate`] delegate.
///
/// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().write_service());
serve_builder!(builder);
Ok(())
}
async fn join(self: Arc<Self>) {
self.shutdown.cancelled().await;
}
fn shutdown(&self) {
self.shutdown.cancel();
}
}
/// This adaptor converts the `router2` http error type into a type that
/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the
/// two decoupled.
#[derive(Debug)]
pub struct IoxHttpErrorAdaptor(router2::server::http::Error);
impl Display for IoxHttpErrorAdaptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for IoxHttpErrorAdaptor {}
impl HttpApiErrorSource for IoxHttpErrorAdaptor {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(self.0.as_status_code(), self.to_string())
}
}

16
router2/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "router2"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }
hyper = "0.14"
metric = { path = "../metric" }
thiserror = "1.0"
tonic = "0.5"
trace = { path = "../trace/" }
workspace-hack = { path = "../workspace-hack"}

28
router2/src/lib.rs Normal file
View File

@ -0,0 +1,28 @@
//! IOx router role implementation.
//!
//! An IOx router is responsible for:
//!
//! * Creating IOx namespaces & synchronising them within the catalog.
//! * Handling writes:
//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints.
//! * Enforcing schema validation & synchronising it within the catalog.
//! * Applying sharding logic.
//! * Push resulting operations into the appropriate kafka topics.
//!
#![deny(
rustdoc::broken_intra_doc_links,
rust_2018_idioms,
missing_debug_implementations,
unreachable_pub
)]
#![warn(
missing_docs,
clippy::todo,
clippy::dbg_macro,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
#![allow(clippy::missing_docs_in_private_items)]
pub mod server;

35
router2/src/server.rs Normal file
View File

@ -0,0 +1,35 @@
//! Router server entrypoint.
use std::sync::Arc;
use self::{grpc::GrpcDelegate, http::HttpDelegate};
pub mod grpc;
pub mod http;
/// The [`RouterServer`] manages the lifecycle and contains all state for a
/// `router2` server instance.
#[derive(Debug, Default)]
pub struct RouterServer {
metrics: Arc<metric::Registry>,
http: HttpDelegate,
grpc: GrpcDelegate,
}
impl RouterServer {
/// Get a reference to the router http delegate.
pub fn http(&self) -> &HttpDelegate {
&self.http
}
/// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate {
&self.grpc
}
/// Return the [`metric::Registry`] used by the router.
pub fn metric_registry(&self) -> Arc<metric::Registry> {
Arc::clone(&self.metrics)
}
}

View File

@ -0,0 +1,34 @@
//! gRPC service implementations for `router2`.
use generated_types::influxdata::pbdata::v1::*;
use tonic::{Request, Response, Status};
/// This type is responsible for managing all gRPC services exposed by
/// `router2`.
#[derive(Debug, Default)]
pub struct GrpcDelegate;
impl GrpcDelegate {
/// Acquire a [`WriteService`] gRPC service implementation.
///
/// [`WriteService`]: generated_types::influxdata::pbdata::v1::write_service_server::WriteService.
pub fn write_service(
&self,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
write_service_server::WriteServiceServer::new(WriteService::default())
}
}
#[derive(Debug, Default)]
struct WriteService;
#[tonic::async_trait]
impl write_service_server::WriteService for WriteService {
/// Receive a gRPC [`WriteRequest`] and dispatch it to the DML handler.
async fn write(
&self,
_request: Request<WriteRequest>,
) -> Result<Response<WriteResponse>, Status> {
Err(Status::unimplemented("not implemented"))
}
}

View File

@ -0,0 +1,39 @@
//! HTTP service implementations for `router2`.
use hyper::{Body, Request, Response, StatusCode};
use thiserror::Error;
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error)]
pub enum Error {
/// The requested path has no registered handler.
#[error("not found")]
NotFound,
}
impl Error {
/// Convert the error into an appropriate [`StatusCode`] to be returned to
/// the end user.
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
}
}
}
/// This type is responsible for servicing requests to the `router2` HTTP
/// endpoint.
///
/// Requests to some paths may be handled externally by the caller - the IOx
/// server runner framework takes care of implementing the heath endpoint,
/// metrics, pprof, etc.
#[derive(Debug, Default)]
pub struct HttpDelegate;
impl HttpDelegate {
/// Routes `req` to the appropriate handler, if any, returning the handler
/// response.
pub fn route(&self, _req: Request<Body>) -> Result<Response<Body>, Error> {
unimplemented!()
}
}