diff --git a/Cargo.lock b/Cargo.lock index 7834ded9bd..9d28ea14ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1700,7 +1700,6 @@ dependencies = [ "rdkafka", "read_buffer", "reqwest", - "routerify", "rustyline", "serde", "serde_json", @@ -3510,19 +3509,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "routerify" -version = "2.0.0-beta-2" -source = "git+https://github.com/influxdata/routerify?rev=274e250#274e250e556b968ad02259282c0433e445db4200" -dependencies = [ - "http", - "hyper", - "lazy_static", - "percent-encoding", - "regex", - "thiserror", -] - [[package]] name = "rusoto_core" version = "0.47.0" diff --git a/Cargo.toml b/Cargo.toml index f136cb4bb5..b8a27779f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,8 +142,6 @@ prettytable-rs = "0.8" pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true } prost = "0.8" rustyline = { version = "9.0", default-features = false } -# Forked to upgrade hyper and tokio -routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.67" serde_urlencoded = "0.7.0" @@ -200,3 +198,4 @@ aws = ["object_store/aws"] # Optional AWS / S3 object store support # Cargo cannot currently implement mutually exclusive features so let's force every build # to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better. jemalloc_replacing_malloc = ["tikv-jemalloc-sys"] + diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index cf55cac709..992594c5e1 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -16,8 +16,6 @@ mod heappy; #[cfg(feature = "pprof")] mod pprof; -mod tower; - mod metrics; // Influx crates @@ -38,13 +36,13 @@ use futures::{self, StreamExt}; use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode}; use observability_deps::tracing::{debug, error}; -use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use trace_http::ctx::TraceHeaderParser; use crate::influxdb_ioxd::http::metrics::LineProtocolMetrics; -use hyper::server::conn::AddrIncoming; +use hyper::server::conn::{AddrIncoming, AddrStream}; +use std::convert::Infallible; use std::num::NonZeroI32; use std::{ fmt::Debug, @@ -52,7 +50,9 @@ use std::{ sync::Arc, }; use tokio_util::sync::CancellationToken; +use tower::Layer; use trace::TraceCollector; +use trace_http::tower::TraceLayer; /// Constants used in API error codes. /// @@ -350,79 +350,56 @@ impl From<server::Error> for ApplicationError { } } +#[derive(Debug)] struct Server<M> where M: ConnectionManager + Send + Sync + Debug + 'static, { + application: Arc<ApplicationState>, app_server: Arc<AppServer<M>>, lp_metrics: Arc<LineProtocolMetrics>, max_request_size: usize, } -fn router<M>( - application: Arc<ApplicationState>, - app_server: Arc<AppServer<M>>, - max_request_size: usize, -) -> Router<Body, ApplicationError> +async fn route_request<M>( + server: Arc<Server<M>>, + mut req: Request<Body>, +) -> Result<Response<Body>, Infallible> where M: ConnectionManager + Send + Sync + Debug + 'static, { - let server = Server { - app_server, - max_request_size, - lp_metrics: Arc::new(LineProtocolMetrics::new( - application.metric_registry().as_ref(), - )), - }; + // we don't need the authorization header and we don't want to accidentally log it. + req.headers_mut().remove("authorization"); + debug!(request = ?req,"Processing request"); - // Create a router and specify the the handlers. - Router::builder() - .data(server) - .data(application) - .middleware(Middleware::pre(|mut req| async move { - // we don't need the authorization header and we don't want to accidentally log it. - req.headers_mut().remove("authorization"); - debug!(request = ?req,"Processing request"); - Ok(req) - })) - .middleware(Middleware::post(|res| async move { - debug!(response = ?res, "Successfully processed request"); - Ok(res) - })) // this endpoint is for API backward compatibility with InfluxDB 2.x - .post("/api/v2/write", write::<M>) - .get("/health", health::<M>) - .get("/metrics", handle_metrics::<M>) - .get("/api/v3/query", query::<M>) - .get("/debug/pprof", pprof_home::<M>) - .get("/debug/pprof/profile", pprof_profile::<M>) - .get("/debug/pprof/allocs", pprof_heappy_profile::<M>) - // Specify the error handler to handle any errors caused by - // a route or any middleware. - .err_handler_with_info(error_handler) - .build() - .unwrap() -} - -// The API-global error handler, handles ApplicationErrors originating from -// individual routes and middlewares, along with errors from the router itself -async fn error_handler(err: RouterError<ApplicationError>, req: RequestInfo) -> Response<Body> { let method = req.method().clone(); let uri = req.uri().clone(); - let span_id = req.headers().get("x-b3-spanid"); - let content_length = req.headers().get("content-length"); - error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, ?span_id, ?content_length, "Error while handling request"); + let content_length = req.headers().get("content-length").cloned(); - match err { - RouterError::HandleRequest(e, _) - | RouterError::HandlePreMiddlewareRequest(e) - | RouterError::HandlePostMiddlewareWithInfoRequest(e) - | RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => e.response(), - _ => { - let json = serde_json::json!({"error": err.to_string()}).to_string(); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(json)) - .unwrap() + let response = match (method.clone(), uri.path()) { + (Method::GET, "/health") => health(), + (Method::GET, "/metrics") => handle_metrics(server.application.as_ref()), + (Method::POST, "/api/v2/write") => write(req, server.as_ref()).await, + (Method::GET, "/api/v3/query") => query(req, server.as_ref()).await, + (Method::GET, "/debug/pprof") => pprof_home(req).await, + (Method::GET, "/debug/pprof/profile") => pprof_profile(req).await, + (Method::GET, "/debug/pprof/allocs") => pprof_heappy_profile(req).await, + + (method, path) => Err(ApplicationError::RouteNotFound { + method, + path: path.to_string(), + }), + }; + + // TODO: Move logging to TraceLayer + match response { + Ok(response) => { + debug!(?response, "Successfully processed request"); + Ok(response) + } + Err(error) => { + error!(%error, %method, %uri, ?content_length, "Error while handling request"); + Ok(error.response()) } } } @@ -486,7 +463,10 @@ async fn parse_body(req: hyper::Request<Body>, max_size: usize) -> Result<Bytes, } } -async fn write<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError> +async fn write<M>( + req: Request<Body>, + server: &Server<M>, +) -> Result<Response<Body>, ApplicationError> where M: ConnectionManager + Send + Sync + Debug + 'static, { @@ -494,7 +474,8 @@ where app_server: server, lp_metrics, max_request_size, - } = req.data::<Server<M>>().expect("server state"); + .. + } = server; let max_request_size = *max_request_size; let server = Arc::clone(server); @@ -575,8 +556,9 @@ fn default_format() -> String { async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>( req: Request<Body>, + server: &Server<M>, ) -> Result<Response<Body>, ApplicationError> { - let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server); + let server = &server.app_server; let uri_query = req.uri().query().context(ExpectedQueryString {})?; @@ -617,20 +599,12 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>( Ok(response) } -async fn health<M: ConnectionManager + Send + Sync + Debug + 'static>( - _req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +fn health() -> Result<Response<Body>, ApplicationError> { let response_body = "OK"; Ok(Response::new(Body::from(response_body.to_string()))) } -async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>( - req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { - let application = req - .data::<Arc<ApplicationState>>() - .expect("application state"); - +fn handle_metrics(application: &ApplicationState) -> Result<Response<Body>, ApplicationError> { let mut body: Vec<u8> = Default::default(); let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body); application.metric_registry().report(&mut reporter); @@ -638,18 +612,7 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>( Ok(Response::new(Body::from(body))) } -#[derive(Deserialize, Debug)] -/// Arguments in the query string of the request to /snapshot -struct SnapshotInfo { - org: String, - bucket: String, - partition: String, - table_name: String, -} - -async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>( - req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +async fn pprof_home(req: Request<Body>) -> Result<Response<Body>, ApplicationError> { let default_host = HeaderValue::from_static("localhost"); let host = req .headers() @@ -715,9 +678,7 @@ impl PProfAllocsArgs { } #[cfg(feature = "pprof")] -async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>( - req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +async fn pprof_profile(req: Request<Body>) -> Result<Response<Body>, ApplicationError> { use ::pprof::protos::Message; let query_string = req.uri().query().unwrap_or_default(); let query: PProfArgs = @@ -758,17 +719,13 @@ async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>( } #[cfg(not(feature = "pprof"))] -async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>( - _req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +async fn pprof_profile(_req: Request<Body>) -> Result<Response<Body>, ApplicationError> { PProfIsNotCompiled {}.fail() } // If heappy support is enabled, call it #[cfg(feature = "heappy")] -async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'static>( - req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +async fn pprof_heappy_profile(req: Request<Body>) -> Result<Response<Body>, ApplicationError> { let query_string = req.uri().query().unwrap_or_default(); let query: PProfAllocsArgs = serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?; @@ -802,16 +759,14 @@ async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'stat // Return error if heappy not enabled #[cfg(not(feature = "heappy"))] -async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'static>( - _req: Request<Body>, -) -> Result<Response<Body>, ApplicationError> { +async fn pprof_heappy_profile(_req: Request<Body>) -> Result<Response<Body>, ApplicationError> { HeappyIsNotCompiled {}.fail() } pub async fn serve<M>( addr: AddrIncoming, application: Arc<ApplicationState>, - server: Arc<AppServer<M>>, + app_server: Arc<AppServer<M>>, shutdown: CancellationToken, max_request_size: usize, trace_header_parser: TraceHeaderParser, @@ -821,16 +776,29 @@ where M: ConnectionManager + Send + Sync + Debug + 'static, { let metric_registry = Arc::clone(application.metric_registry()); - let router = router(application, server, max_request_size); - let new_service = tower::MakeService::new( - router, - trace_header_parser, - trace_collector, - metric_registry, - ); + + let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false); + let lp_metrics = Arc::new(LineProtocolMetrics::new( + application.metric_registry().as_ref(), + )); + + let server = Arc::new(Server { + application, + app_server, + lp_metrics, + max_request_size, + }); hyper::Server::builder(addr) - .serve(new_service) + .serve(hyper::service::make_service_fn(|_conn: &AddrStream| { + let server = Arc::clone(&server); + let service = hyper::service::service_fn(move |request: Request<_>| { + route_request(Arc::clone(&server), request) + }); + + let service = trace_layer.layer(service); + futures::future::ready(Ok::<_, Infallible>(service)) + })) .with_graceful_shutdown(shutdown.cancelled()) .await } diff --git a/src/influxdb_ioxd/http/tower.rs b/src/influxdb_ioxd/http/tower.rs deleted file mode 100644 index 83b5f3676a..0000000000 --- a/src/influxdb_ioxd/http/tower.rs +++ /dev/null @@ -1,74 +0,0 @@ -use std::convert::Infallible; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use futures::future::BoxFuture; -use futures::{ready, FutureExt}; -use hyper::server::conn::AddrStream; -use hyper::Body; -use routerify::{RequestService, Router, RouterService}; - -use trace::TraceCollector; -use trace_http::ctx::TraceHeaderParser; -use trace_http::tower::{TraceLayer, TraceService}; - -use super::ApplicationError; -use tower::Layer; - -/// `MakeService` can be thought of as a hyper-compatible connection factory -/// -/// Specifically it implements the necessary trait to be used with `hyper::server::Builder::serve` -pub struct MakeService { - inner: RouterService<Body, ApplicationError>, - trace_layer: trace_http::tower::TraceLayer, -} - -impl MakeService { - pub fn new( - router: Router<Body, ApplicationError>, - trace_header_parser: TraceHeaderParser, - collector: Option<Arc<dyn TraceCollector>>, - metric_registry: Arc<metric::Registry>, - ) -> Self { - Self { - inner: RouterService::new(router).unwrap(), - trace_layer: TraceLayer::new(trace_header_parser, metric_registry, collector, false), - } - } -} - -impl tower::Service<&AddrStream> for MakeService { - type Response = Service; - type Error = Infallible; - type Future = MakeServiceFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, conn: &AddrStream) -> Self::Future { - MakeServiceFuture { - inner: self.inner.call(conn), - trace_layer: self.trace_layer.clone(), - } - } -} - -/// A future produced by `MakeService` that resolves to a `Service` -pub struct MakeServiceFuture { - inner: BoxFuture<'static, Result<RequestService<Body, ApplicationError>, Infallible>>, - trace_layer: trace_http::tower::TraceLayer, -} - -impl Future for MakeServiceFuture { - type Output = Result<Service, Infallible>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let maybe_service = ready!(self.inner.poll_unpin(cx)); - Poll::Ready(maybe_service.map(|service| self.trace_layer.layer(service))) - } -} - -pub type Service = TraceService<RequestService<Body, ApplicationError>>;