revert: "fix: Revert "feat: remove routerify (#2586)" (#2623)" (#2649)

This reverts commit 18941fcbff.
pull/24376/head
Raphael Taylor-Davies 2021-09-28 15:42:17 +01:00 committed by GitHub
parent 46363c7893
commit daa2ec2f4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 197 deletions

14
Cargo.lock generated
View File

@ -1700,7 +1700,6 @@ dependencies = [
"rdkafka",
"read_buffer",
"reqwest",
"routerify",
"rustyline",
"serde",
"serde_json",
@ -3510,19 +3509,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "routerify"
version = "2.0.0-beta-2"
source = "git+https://github.com/influxdata/routerify?rev=274e250#274e250e556b968ad02259282c0433e445db4200"
dependencies = [
"http",
"hyper",
"lazy_static",
"percent-encoding",
"regex",
"thiserror",
]
[[package]]
name = "rusoto_core"
version = "0.47.0"

View File

@ -142,8 +142,6 @@ prettytable-rs = "0.8"
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.8"
rustyline = { version = "9.0", default-features = false }
# Forked to upgrade hyper and tokio
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_urlencoded = "0.7.0"
@ -200,3 +198,4 @@ aws = ["object_store/aws"] # Optional AWS / S3 object store support
# Cargo cannot currently implement mutually exclusive features so let's force every build
# to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better.
jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]

View File

@ -16,8 +16,6 @@ mod heappy;
#[cfg(feature = "pprof")]
mod pprof;
mod tower;
mod metrics;
// Influx crates
@ -38,13 +36,13 @@ use futures::{self, StreamExt};
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
use observability_deps::tracing::{debug, error};
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use trace_http::ctx::TraceHeaderParser;
use crate::influxdb_ioxd::http::metrics::LineProtocolMetrics;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::{AddrIncoming, AddrStream};
use std::convert::Infallible;
use std::num::NonZeroI32;
use std::{
fmt::Debug,
@ -52,7 +50,9 @@ use std::{
sync::Arc,
};
use tokio_util::sync::CancellationToken;
use tower::Layer;
use trace::TraceCollector;
use trace_http::tower::TraceLayer;
/// Constants used in API error codes.
///
@ -350,79 +350,56 @@ impl From<server::Error> for ApplicationError {
}
}
#[derive(Debug)]
struct Server<M>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
application: Arc<ApplicationState>,
app_server: Arc<AppServer<M>>,
lp_metrics: Arc<LineProtocolMetrics>,
max_request_size: usize,
}
fn router<M>(
application: Arc<ApplicationState>,
app_server: Arc<AppServer<M>>,
max_request_size: usize,
) -> Router<Body, ApplicationError>
async fn route_request<M>(
server: Arc<Server<M>>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
let server = Server {
app_server,
max_request_size,
lp_metrics: Arc::new(LineProtocolMetrics::new(
application.metric_registry().as_ref(),
)),
};
// we don't need the authorization header and we don't want to accidentally log it.
req.headers_mut().remove("authorization");
debug!(request = ?req,"Processing request");
// Create a router and specify the the handlers.
Router::builder()
.data(server)
.data(application)
.middleware(Middleware::pre(|mut req| async move {
// we don't need the authorization header and we don't want to accidentally log it.
req.headers_mut().remove("authorization");
debug!(request = ?req,"Processing request");
Ok(req)
}))
.middleware(Middleware::post(|res| async move {
debug!(response = ?res, "Successfully processed request");
Ok(res)
})) // this endpoint is for API backward compatibility with InfluxDB 2.x
.post("/api/v2/write", write::<M>)
.get("/health", health::<M>)
.get("/metrics", handle_metrics::<M>)
.get("/api/v3/query", query::<M>)
.get("/debug/pprof", pprof_home::<M>)
.get("/debug/pprof/profile", pprof_profile::<M>)
.get("/debug/pprof/allocs", pprof_heappy_profile::<M>)
// Specify the error handler to handle any errors caused by
// a route or any middleware.
.err_handler_with_info(error_handler)
.build()
.unwrap()
}
// The API-global error handler, handles ApplicationErrors originating from
// individual routes and middlewares, along with errors from the router itself
async fn error_handler(err: RouterError<ApplicationError>, req: RequestInfo) -> Response<Body> {
let method = req.method().clone();
let uri = req.uri().clone();
let span_id = req.headers().get("x-b3-spanid");
let content_length = req.headers().get("content-length");
error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, ?span_id, ?content_length, "Error while handling request");
let content_length = req.headers().get("content-length").cloned();
match err {
RouterError::HandleRequest(e, _)
| RouterError::HandlePreMiddlewareRequest(e)
| RouterError::HandlePostMiddlewareWithInfoRequest(e)
| RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => e.response(),
_ => {
let json = serde_json::json!({"error": err.to_string()}).to_string();
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(json))
.unwrap()
let response = match (method.clone(), uri.path()) {
(Method::GET, "/health") => health(),
(Method::GET, "/metrics") => handle_metrics(server.application.as_ref()),
(Method::POST, "/api/v2/write") => write(req, server.as_ref()).await,
(Method::GET, "/api/v3/query") => query(req, server.as_ref()).await,
(Method::GET, "/debug/pprof") => pprof_home(req).await,
(Method::GET, "/debug/pprof/profile") => pprof_profile(req).await,
(Method::GET, "/debug/pprof/allocs") => pprof_heappy_profile(req).await,
(method, path) => Err(ApplicationError::RouteNotFound {
method,
path: path.to_string(),
}),
};
// TODO: Move logging to TraceLayer
match response {
Ok(response) => {
debug!(?response, "Successfully processed request");
Ok(response)
}
Err(error) => {
error!(%error, %method, %uri, ?content_length, "Error while handling request");
Ok(error.response())
}
}
}
@ -486,7 +463,10 @@ async fn parse_body(req: hyper::Request<Body>, max_size: usize) -> Result<Bytes,
}
}
async fn write<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
async fn write<M>(
req: Request<Body>,
server: &Server<M>,
) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
@ -494,7 +474,8 @@ where
app_server: server,
lp_metrics,
max_request_size,
} = req.data::<Server<M>>().expect("server state");
..
} = server;
let max_request_size = *max_request_size;
let server = Arc::clone(server);
@ -575,8 +556,9 @@ fn default_format() -> String {
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
server: &Server<M>,
) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
let server = &server.app_server;
let uri_query = req.uri().query().context(ExpectedQueryString {})?;
@ -617,20 +599,12 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(response)
}
async fn health<M: ConnectionManager + Send + Sync + Debug + 'static>(
_req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
fn health() -> Result<Response<Body>, ApplicationError> {
let response_body = "OK";
Ok(Response::new(Body::from(response_body.to_string())))
}
async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let application = req
.data::<Arc<ApplicationState>>()
.expect("application state");
fn handle_metrics(application: &ApplicationState) -> Result<Response<Body>, ApplicationError> {
let mut body: Vec<u8> = Default::default();
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body);
application.metric_registry().report(&mut reporter);
@ -638,18 +612,7 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(Response::new(Body::from(body)))
}
#[derive(Deserialize, Debug)]
/// Arguments in the query string of the request to /snapshot
struct SnapshotInfo {
org: String,
bucket: String,
partition: String,
table_name: String,
}
async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
async fn pprof_home(req: Request<Body>) -> Result<Response<Body>, ApplicationError> {
let default_host = HeaderValue::from_static("localhost");
let host = req
.headers()
@ -715,9 +678,7 @@ impl PProfAllocsArgs {
}
#[cfg(feature = "pprof")]
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
async fn pprof_profile(req: Request<Body>) -> Result<Response<Body>, ApplicationError> {
use ::pprof::protos::Message;
let query_string = req.uri().query().unwrap_or_default();
let query: PProfArgs =
@ -758,17 +719,13 @@ async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
}
#[cfg(not(feature = "pprof"))]
async fn pprof_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
_req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
async fn pprof_profile(_req: Request<Body>) -> Result<Response<Body>, ApplicationError> {
PProfIsNotCompiled {}.fail()
}
// If heappy support is enabled, call it
#[cfg(feature = "heappy")]
async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
async fn pprof_heappy_profile(req: Request<Body>) -> Result<Response<Body>, ApplicationError> {
let query_string = req.uri().query().unwrap_or_default();
let query: PProfAllocsArgs =
serde_urlencoded::from_str(query_string).context(InvalidQueryString { query_string })?;
@ -802,16 +759,14 @@ async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'stat
// Return error if heappy not enabled
#[cfg(not(feature = "heappy"))]
async fn pprof_heappy_profile<M: ConnectionManager + Send + Sync + Debug + 'static>(
_req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
async fn pprof_heappy_profile(_req: Request<Body>) -> Result<Response<Body>, ApplicationError> {
HeappyIsNotCompiled {}.fail()
}
pub async fn serve<M>(
addr: AddrIncoming,
application: Arc<ApplicationState>,
server: Arc<AppServer<M>>,
app_server: Arc<AppServer<M>>,
shutdown: CancellationToken,
max_request_size: usize,
trace_header_parser: TraceHeaderParser,
@ -821,16 +776,29 @@ where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
let metric_registry = Arc::clone(application.metric_registry());
let router = router(application, server, max_request_size);
let new_service = tower::MakeService::new(
router,
trace_header_parser,
trace_collector,
metric_registry,
);
let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false);
let lp_metrics = Arc::new(LineProtocolMetrics::new(
application.metric_registry().as_ref(),
));
let server = Arc::new(Server {
application,
app_server,
lp_metrics,
max_request_size,
});
hyper::Server::builder(addr)
.serve(new_service)
.serve(hyper::service::make_service_fn(|_conn: &AddrStream| {
let server = Arc::clone(&server);
let service = hyper::service::service_fn(move |request: Request<_>| {
route_request(Arc::clone(&server), request)
});
let service = trace_layer.layer(service);
futures::future::ready(Ok::<_, Infallible>(service))
}))
.with_graceful_shutdown(shutdown.cancelled())
.await
}

View File

@ -1,74 +0,0 @@
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::future::BoxFuture;
use futures::{ready, FutureExt};
use hyper::server::conn::AddrStream;
use hyper::Body;
use routerify::{RequestService, Router, RouterService};
use trace::TraceCollector;
use trace_http::ctx::TraceHeaderParser;
use trace_http::tower::{TraceLayer, TraceService};
use super::ApplicationError;
use tower::Layer;
/// `MakeService` can be thought of as a hyper-compatible connection factory
///
/// Specifically it implements the necessary trait to be used with `hyper::server::Builder::serve`
pub struct MakeService {
inner: RouterService<Body, ApplicationError>,
trace_layer: trace_http::tower::TraceLayer,
}
impl MakeService {
pub fn new(
router: Router<Body, ApplicationError>,
trace_header_parser: TraceHeaderParser,
collector: Option<Arc<dyn TraceCollector>>,
metric_registry: Arc<metric::Registry>,
) -> Self {
Self {
inner: RouterService::new(router).unwrap(),
trace_layer: TraceLayer::new(trace_header_parser, metric_registry, collector, false),
}
}
}
impl tower::Service<&AddrStream> for MakeService {
type Response = Service;
type Error = Infallible;
type Future = MakeServiceFuture;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, conn: &AddrStream) -> Self::Future {
MakeServiceFuture {
inner: self.inner.call(conn),
trace_layer: self.trace_layer.clone(),
}
}
}
/// A future produced by `MakeService` that resolves to a `Service`
pub struct MakeServiceFuture {
inner: BoxFuture<'static, Result<RequestService<Body, ApplicationError>, Infallible>>,
trace_layer: trace_http::tower::TraceLayer,
}
impl Future for MakeServiceFuture {
type Output = Result<Service, Infallible>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let maybe_service = ready!(self.inner.poll_unpin(cx));
Poll::Ready(maybe_service.map(|service| self.trace_layer.layer(service)))
}
}
pub type Service = TraceService<RequestService<Body, ApplicationError>>;