From a8cb8755de9ff1bb7459e3b070d058be1fe60fda Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 12 Jan 2022 14:13:20 +0000 Subject: [PATCH 1/3] feat: new router2 crate This commit adds an almost-empty router2 crate containing enough of a skeleton to plumb into the IOx CLI/server runner. --- Cargo.lock | 14 ++++++++++++++ Cargo.toml | 1 + influxdb_iox/Cargo.toml | 1 + router2/Cargo.toml | 15 +++++++++++++++ router2/src/lib.rs | 28 +++++++++++++++++++++++++++ router2/src/server.rs | 35 ++++++++++++++++++++++++++++++++++ router2/src/server/grpc.rs | 34 +++++++++++++++++++++++++++++++++ router2/src/server/http.rs | 39 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 167 insertions(+) create mode 100644 router2/Cargo.toml create mode 100644 router2/src/lib.rs create mode 100644 router2/src/server.rs create mode 100644 router2/src/server/grpc.rs create mode 100644 router2/src/server/http.rs diff --git a/Cargo.lock b/Cargo.lock index d8a8ca4fc7..04c8683e34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1629,6 +1629,7 @@ dependencies = [ "regex", "reqwest", "router", + "router2", "rustyline", "schema", "serde", @@ -3560,6 +3561,19 @@ dependencies = [ "write_buffer", ] +[[package]] +name = "router2" +version = "0.1.0" +dependencies = [ + "dml", + "generated_types", + "hyper", + "metric", + "thiserror", + "tonic", + "trace", +] + [[package]] name = "rusoto_core" version = "0.47.0" diff --git a/Cargo.toml b/Cargo.toml index 1ab068e6b9..b700996750 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "query_tests", "read_buffer", "router", + "router2", "schema", "server", "server_benchmarks", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index d78e28b40b..c57cbe5efc 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -32,6 +32,7 @@ predicate = { path = "../predicate" } query = { path = "../query" } read_buffer = { path = "../read_buffer" } router = { path = "../router" } +router2 = { path = "../router2" } server = { path = "../server" } time = { path = "../time" } trace = { path = "../trace" } diff --git a/router2/Cargo.toml b/router2/Cargo.toml new file mode 100644 index 0000000000..e2d723c5e4 --- /dev/null +++ b/router2/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "router2" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dml = { path = "../dml" } +generated_types = { path = "../generated_types" } +hyper = "0.14" +metric = { path = "../metric" } +thiserror = "1.0" +tonic = "0.5" +trace = { path = "../trace/" } diff --git a/router2/src/lib.rs b/router2/src/lib.rs new file mode 100644 index 0000000000..859b0d0a46 --- /dev/null +++ b/router2/src/lib.rs @@ -0,0 +1,28 @@ +//! IOx router role implementation. +//! +//! An IOx router is responsible for: +//! +//! * Creating IOx namespaces & synchronising them within the catalog. +//! * Handling writes: +//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints. +//! * Enforcing schema validation & synchronising it within the catalog. +//! * Applying sharding logic. +//! * Push resulting operations into the appropriate kafka topics. +//! + +#![deny( + rustdoc::broken_intra_doc_links, + rust_2018_idioms, + missing_debug_implementations, + unreachable_pub +)] +#![warn( + missing_docs, + clippy::todo, + clippy::dbg_macro, + clippy::clone_on_ref_ptr, + clippy::future_not_send +)] +#![allow(clippy::missing_docs_in_private_items)] + +pub mod server; diff --git a/router2/src/server.rs b/router2/src/server.rs new file mode 100644 index 0000000000..d6b4e16a8c --- /dev/null +++ b/router2/src/server.rs @@ -0,0 +1,35 @@ +//! Router server entrypoint. + +use std::sync::Arc; + +use self::{grpc::GrpcDelegate, http::HttpDelegate}; + +pub mod grpc; +pub mod http; + +/// The [`RouterServer`] manages the lifecycle and contains all state for a +/// `router2` server instance. +#[derive(Debug, Default)] +pub struct RouterServer { + metrics: Arc, + + http: HttpDelegate, + grpc: GrpcDelegate, +} + +impl RouterServer { + /// Get a reference to the router http delegate. + pub fn http(&self) -> &HttpDelegate { + &self.http + } + + /// Get a reference to the router grpc delegate. + pub fn grpc(&self) -> &GrpcDelegate { + &self.grpc + } + + /// Return the [`metric::Registry`] used by the router. + pub fn metric_registry(&self) -> Arc { + Arc::clone(&self.metrics) + } +} diff --git a/router2/src/server/grpc.rs b/router2/src/server/grpc.rs new file mode 100644 index 0000000000..30b2892f90 --- /dev/null +++ b/router2/src/server/grpc.rs @@ -0,0 +1,34 @@ +//! gRPC service implementations for `router2`. + +use generated_types::influxdata::pbdata::v1::*; +use tonic::{Request, Response, Status}; + +/// This type is responsible for managing all gRPC services exposed by +/// `router2`. +#[derive(Debug, Default)] +pub struct GrpcDelegate; + +impl GrpcDelegate { + /// Acquire a [`WriteService`] gRPC service implementation. + /// + /// [`WriteService`]: generated_types::influxdata::pbdata::v1::write_service_server::WriteService. + pub fn write_service( + &self, + ) -> write_service_server::WriteServiceServer { + write_service_server::WriteServiceServer::new(WriteService::default()) + } +} + +#[derive(Debug, Default)] +struct WriteService; + +#[tonic::async_trait] +impl write_service_server::WriteService for WriteService { + /// Receive a gRPC [`WriteRequest`] and dispatch it to the DML handler. + async fn write( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented")) + } +} diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs new file mode 100644 index 0000000000..9432f121ed --- /dev/null +++ b/router2/src/server/http.rs @@ -0,0 +1,39 @@ +//! HTTP service implementations for `router2`. + +use hyper::{Body, Request, Response, StatusCode}; +use thiserror::Error; + +/// Errors returned by the `router2` HTTP request handler. +#[derive(Debug, Error)] +pub enum Error { + /// The requested path has no registered handler. + #[error("not found")] + NotFound, +} + +impl Error { + /// Convert the error into an appropriate [`StatusCode`] to be returned to + /// the end user. + pub fn as_status_code(&self) -> StatusCode { + match self { + Error::NotFound => StatusCode::NOT_FOUND, + } + } +} + +/// This type is responsible for servicing requests to the `router2` HTTP +/// endpoint. +/// +/// Requests to some paths may be handled externally by the caller - the IOx +/// server runner framework takes care of implementing the heath endpoint, +/// metrics, pprof, etc. +#[derive(Debug, Default)] +pub struct HttpDelegate; + +impl HttpDelegate { + /// Routes `req` to the appropriate handler, if any, returning the handler + /// response. + pub fn route(&self, _req: Request) -> Result, Error> { + unimplemented!() + } +} From 7fc17203e201a76095596c6b48b6f91c692549b4 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 12 Jan 2022 14:31:54 +0000 Subject: [PATCH 2/3] refactor: add router2 server mode Plumbs the router2 crate into IOx's CLI & server-runner framework. --- influxdb_iox/src/commands/run/mod.rs | 9 ++ influxdb_iox/src/commands/run/router2.rs | 63 ++++++++++++ influxdb_iox/src/influxdb_ioxd/http/dml.rs | 2 +- influxdb_iox/src/influxdb_ioxd/http/error.rs | 29 +++++- .../src/influxdb_ioxd/server_type/mod.rs | 1 + .../src/influxdb_ioxd/server_type/router2.rs | 95 +++++++++++++++++++ 6 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 influxdb_iox/src/commands/run/router2.rs create mode 100644 influxdb_iox/src/influxdb_ioxd/server_type/router2.rs diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 12607ce965..6f8e0a3b63 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -5,6 +5,7 @@ use crate::structopt_blocks::run_config::RunConfig; pub mod database; pub mod router; +pub mod router2; pub mod test; #[derive(Debug, Snafu)] @@ -16,6 +17,9 @@ pub enum Error { #[snafu(display("Error in router subcommand: {}", source))] RouterError { source: router::Error }, + #[snafu(display("Error in router2 subcommand: {}", source))] + Router2Error { source: router2::Error }, + #[snafu(display("Error in test subcommand: {}", source))] TestError { source: test::Error }, } @@ -39,6 +43,7 @@ impl Config { None => &self.database_config.run_config, Some(Command::Database(config)) => &config.run_config, Some(Command::Router(config)) => &config.run_config, + Some(Command::Router2(config)) => &config.run_config, Some(Command::Test(config)) => &config.run_config, } } @@ -52,6 +57,9 @@ enum Command { /// Run the server in routing mode Router(router::Config), + /// Run the server in router2 mode + Router2(router2::Config), + /// Run the server in test mode Test(test::Config), } @@ -68,6 +76,7 @@ pub async fn command(config: Config) -> Result<()> { } Some(Command::Database(config)) => database::command(config).await.context(DatabaseSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), + Some(Command::Router2(config)) => router2::command(config).await.context(Router2Snafu), Some(Command::Test(config)) => test::command(config).await.context(TestSnafu), } } diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs new file mode 100644 index 0000000000..92f7314ddd --- /dev/null +++ b/influxdb_iox/src/commands/run/router2.rs @@ -0,0 +1,63 @@ +//! Implementation of command line option for running router2 + +use std::sync::Arc; + +use crate::{ + influxdb_ioxd::{ + self, + server_type::{ + common_state::{CommonServerState, CommonServerStateError}, + router2::RouterServerType, + }, + }, + structopt_blocks::run_config::RunConfig, +}; +use observability_deps::tracing::*; +use router2::server::RouterServer; +use structopt::StructOpt; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] influxdb_ioxd::Error), + + #[error("Cannot setup server: {0}")] + Setup(#[from] crate::influxdb_ioxd::server_type::database::setup::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), +} + +pub type Result = std::result::Result; + +#[derive(Debug, StructOpt)] +#[structopt( + name = "run", + about = "Runs in router2 mode", + long_about = "Run the IOx router2 server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. + +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[structopt(flatten)] + pub(crate) run_config: RunConfig, +} + +pub async fn command(config: Config) -> Result<()> { + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let router_server = RouterServer::default(); + let server_type = Arc::new(RouterServerType::new(router_server, &common_state)); + + info!("starting router2"); + + Ok(influxdb_ioxd::main(common_state, server_type).await?) +} diff --git a/influxdb_iox/src/influxdb_ioxd/http/dml.rs b/influxdb_iox/src/influxdb_ioxd/http/dml.rs index 5743d0b73c..6550046e78 100644 --- a/influxdb_iox/src/influxdb_ioxd/http/dml.rs +++ b/influxdb_iox/src/influxdb_ioxd/http/dml.rs @@ -338,7 +338,7 @@ pub trait HttpDrivenDml: ServerType { /// - [`route_write_http_request`](Self::route_write_http_request) /// /// Returns `RequestOrResponse::Response` if the request was routed, - /// Returns `RequestOrResponse::Response` if the request did not match (and needs to be handled some other way) + /// Returns `RequestOrResponse::Request` if the request did not match (and needs to be handled some other way) async fn route_dml_http_request( &self, req: Request, diff --git a/influxdb_iox/src/influxdb_ioxd/http/error.rs b/influxdb_iox/src/influxdb_ioxd/http/error.rs index c2d9b01ad8..995f310d49 100644 --- a/influxdb_iox/src/influxdb_ioxd/http/error.rs +++ b/influxdb_iox/src/influxdb_ioxd/http/error.rs @@ -1,4 +1,5 @@ use hyper::{Body, Response, StatusCode}; +use observability_deps::tracing::warn; /// Constants used in API error codes. /// @@ -66,6 +67,30 @@ impl HttpApiErrorCode { } } +impl From for HttpApiErrorCode { + fn from(s: StatusCode) -> Self { + match s { + StatusCode::INTERNAL_SERVER_ERROR => Self::InternalError, + StatusCode::NOT_FOUND => Self::NotFound, + StatusCode::CONFLICT => Self::Conflict, + StatusCode::BAD_REQUEST => Self::Invalid, + StatusCode::UNPROCESSABLE_ENTITY => Self::UnprocessableEntity, + StatusCode::NO_CONTENT => Self::EmptyValue, + StatusCode::SERVICE_UNAVAILABLE => Self::Unavailable, + StatusCode::FORBIDDEN => Self::Forbidden, + StatusCode::TOO_MANY_REQUESTS => Self::TooManyRequests, + StatusCode::UNAUTHORIZED => Self::Unauthorized, + StatusCode::METHOD_NOT_ALLOWED => Self::MethodNotAllowed, + StatusCode::PAYLOAD_TOO_LARGE => Self::RequestTooLarge, + StatusCode::UNSUPPORTED_MEDIA_TYPE => Self::UnsupportedMediaType, + v => { + warn!(code=%v, "returning unexpected status code as internal error"); + Self::InternalError + } + } + } +} + /// Error that is compatible with the Influxdata Cloud 2 HTTP API. /// /// See . @@ -80,9 +105,9 @@ pub struct HttpApiError { impl HttpApiError { /// Create new error from code and message. - pub fn new(code: HttpApiErrorCode, msg: impl Into) -> Self { + pub fn new(code: impl Into, msg: impl Into) -> Self { Self { - code, + code: code.into(), msg: msg.into(), } } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs index 1d6919a37c..ade35bd93c 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/mod.rs @@ -11,6 +11,7 @@ use crate::influxdb_ioxd::{http::error::HttpApiErrorSource, rpc::RpcBuilderInput pub mod common_state; pub mod database; pub mod router; +pub mod router2; pub mod test; #[derive(Debug, Snafu)] diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs new file mode 100644 index 0000000000..7c37c64d82 --- /dev/null +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs @@ -0,0 +1,95 @@ +use std::{fmt::Display, sync::Arc}; + +use async_trait::async_trait; +use hyper::{Body, Request, Response}; +use metric::Registry; +use router2::server::RouterServer; +use tokio_util::sync::CancellationToken; +use trace::TraceCollector; + +use crate::influxdb_ioxd::{ + http::error::{HttpApiError, HttpApiErrorSource}, + rpc::{add_service, serve_builder, setup_builder, RpcBuilderInput}, + server_type::{common_state::CommonServerState, RpcError, ServerType}, +}; + +#[derive(Debug)] +pub struct RouterServerType { + server: RouterServer, + shutdown: CancellationToken, + trace_collector: Option>, +} + +impl RouterServerType { + pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { + Self { + server, + shutdown: CancellationToken::new(), + trace_collector: common_state.trace_collector(), + } + } +} + +#[async_trait] +impl ServerType for RouterServerType { + type RouteError = IoxHttpErrorAdaptor; + + /// Return the [`metric::Registry`] used by the router. + fn metric_registry(&self) -> Arc { + self.server.metric_registry() + } + + /// Returns the trace collector for router traces. + fn trace_collector(&self) -> Option> { + self.trace_collector.as_ref().map(Arc::clone) + } + + /// Dispatches `req` to the router [`HttpDelegate`] delegate. + /// + /// [`HttpDelegate`]: router2::server::http::HttpDelegate + async fn route_http_request( + &self, + req: Request, + ) -> Result, Self::RouteError> { + self.server.http().route(req).map_err(IoxHttpErrorAdaptor) + } + + /// Registers the services exposed by the router [`GrpcDelegate`] delegate. + /// + /// [`GrpcDelegate`]: router2::server::grpc::GrpcDelegate + async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError> { + let builder = setup_builder!(builder_input, self); + add_service!(builder, self.server.grpc().write_service()); + serve_builder!(builder); + + Ok(()) + } + + async fn join(self: Arc) { + self.shutdown.cancelled().await; + } + + fn shutdown(&self) { + self.shutdown.cancel(); + } +} + +/// This adaptor converts the `router2` http error type into a type that +/// satisfies the requirements of influxdb_ioxd's runner framework, keeping the +/// two decoupled. +#[derive(Debug)] +pub struct IoxHttpErrorAdaptor(router2::server::http::Error); + +impl Display for IoxHttpErrorAdaptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for IoxHttpErrorAdaptor {} + +impl HttpApiErrorSource for IoxHttpErrorAdaptor { + fn to_http_api_error(&self) -> HttpApiError { + HttpApiError::new(self.0.as_status_code(), self.to_string()) + } +} From b9bee7f73574c87904cd6f81d7ab0f585f117277 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 12 Jan 2022 15:09:06 +0000 Subject: [PATCH 3/3] build: update workspace-hack --- Cargo.lock | 1 + router2/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 04c8683e34..5ff01fa2e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3572,6 +3572,7 @@ dependencies = [ "thiserror", "tonic", "trace", + "workspace-hack", ] [[package]] diff --git a/router2/Cargo.toml b/router2/Cargo.toml index e2d723c5e4..6ac6ed696b 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -13,3 +13,4 @@ metric = { path = "../metric" } thiserror = "1.0" tonic = "0.5" trace = { path = "../trace/" } +workspace-hack = { path = "../workspace-hack"}