diff --git a/Cargo.lock b/Cargo.lock index 5b8f371489..75be61e910 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2869,7 +2869,7 @@ dependencies = [ [[package]] name = "routerify" version = "2.0.0-beta-2" -source = "git+https://github.com/influxdata/routerify?rev=bfe198e#bfe198e006d85b1b648ac96101d676b620d6f569" +source = "git+https://github.com/influxdata/routerify?rev=274e250#274e250e556b968ad02259282c0433e445db4200" dependencies = [ "http", "hyper", diff --git a/Cargo.toml b/Cargo.toml index 9d65ef9d11..2e8a5e3297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ bytes = "1.0" hyper = "0.14" # Forked to upgrade hyper and tokio -routerify = { git = "https://github.com/influxdata/routerify", rev = "bfe198e" } +routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" } tokio = { version = "1.0", features=["macros", "rt-multi-thread"] } tokio-stream = {version = "0.1.2", features=["net"]} diff --git a/src/influxdb_ioxd/http_routes.rs b/src/influxdb_ioxd/http_routes.rs index 5f46f634ff..fd7ef4e7d8 100644 --- a/src/influxdb_ioxd/http_routes.rs +++ b/src/influxdb_ioxd/http_routes.rs @@ -26,7 +26,7 @@ use bytes::{Bytes, BytesMut}; use futures::{self, StreamExt}; use http::header::CONTENT_ENCODING; use hyper::{Body, Method, Request, Response, StatusCode}; -use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterService}; +use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use tracing::{debug, error, info}; @@ -146,8 +146,8 @@ pub enum ApplicationError { } impl ApplicationError { - pub fn response(&self) -> Result, Self> { - Ok(match self { + pub fn response(&self) -> Response { + match self { Self::BucketByName { .. } => self.internal_error(), Self::BucketMappingError { .. } => self.internal_error(), Self::WritingPoints { .. } => self.internal_error(), @@ -171,7 +171,7 @@ impl ApplicationError { Self::ErrorCreatingDatabase { .. } => self.bad_request(), Self::DatabaseNameError { .. } => self.bad_request(), Self::DatabaseNotFound { .. } => self.not_found(), - }) + } } fn bad_request(&self) -> Response { @@ -247,14 +247,14 @@ where info!(response = ?res, "Successfully processed request"); Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x - .post("/api/v2/write", write_handler::) + .post("/api/v2/write", write::) .get("/ping", ping) - .get("/api/v2/read", read_handler::) - .put("/iox/api/v1/databases/:name", create_database_handler::) - .get("/iox/api/v1/databases/:name", get_database_handler::) - .put("/iox/api/v1/id", set_writer_handler::) - .get("/api/v1/partitions", list_partitions_handler::) - .post("/api/v1/snapshot", snapshot_partition_handler::) + .get("/api/v2/read", read::) + .put("/iox/api/v1/databases/:name", create_database::) + .get("/iox/api/v1/databases/:name", get_database::) + .put("/iox/api/v1/id", set_writer::) + .get("/api/v1/partitions", list_partitions::) + .post("/api/v1/snapshot", snapshot_partition::) // Specify the error handler to handle any errors caused by // a route or any middleware. .err_handler_with_info(error_handler) @@ -262,19 +262,29 @@ where .unwrap() } -// the Routerify error handler. This should be the handler of last resort. -// Errors should be handled with responses built in the individual handlers for -// specific ApplicationError(s) -async fn error_handler(err: routerify::Error, req: RequestInfo) -> Response { - let method = req.method().clone(); - let uri = req.uri().clone(); - error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, "Error while handling request"); +// The API-global error handler, handles ApplicationErrors originating from +// individual routes and middlewares, along with errors from the router itself +async fn error_handler(err: RouterError, req: RequestInfo) -> Response { + match err { + RouterError::HandleRequest(e, _) + | RouterError::HandlePreMiddlewareRequest(e) + | RouterError::HandlePostMiddlewareWithInfoRequest(e) + | RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => { + error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); + e.response() + } + _ => { + let method = req.method().clone(); + let uri = req.uri().clone(); + error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, "Error while handling request"); - let json = serde_json::json!({"error": err.to_string()}).to_string(); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(json)) - .unwrap() + let json = serde_json::json!({"error": err.to_string()}).to_string(); + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(json)) + .unwrap() + } + } } #[derive(Debug, Deserialize)] @@ -336,20 +346,6 @@ async fn parse_body(req: hyper::Request) -> Result(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match write::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn write(req: Request) -> Result, ApplicationError> where @@ -410,21 +406,6 @@ struct ReadInfo { sql_query: String, } -#[tracing::instrument(level = "debug")] -async fn read_handler(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match read::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - // TODO: figure out how to stream read results out rather than rendering the // whole thing in mem #[tracing::instrument(level = "debug")] @@ -467,21 +448,6 @@ async fn read( Ok(Response::new(Body::from(results.into_bytes()))) } -#[tracing::instrument(level = "debug")] -async fn create_database_handler(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match create_database::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn create_database( req: Request, @@ -508,21 +474,6 @@ async fn create_database( Ok(Response::new(Body::empty())) } -#[tracing::instrument(level = "debug")] -async fn get_database_handler(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match get_database::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn get_database( req: Request, @@ -553,21 +504,6 @@ async fn get_database( Ok(response) } -#[tracing::instrument(level = "debug")] -async fn set_writer_handler(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match set_writer::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn set_writer( req: Request, @@ -615,21 +551,6 @@ struct DatabaseInfo { bucket: String, } -#[tracing::instrument(level = "debug")] -async fn list_partitions_handler(req: Request) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match list_partitions::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn list_partitions( req: Request, @@ -674,23 +595,6 @@ struct SnapshotInfo { partition: String, } -#[tracing::instrument(level = "debug")] -async fn snapshot_partition_handler( - req: Request, -) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ - match snapshot_partition::(req).await { - Err(e) => { - error!(error = ?e, error_message = ?e.to_string(), "Error while handling request"); - - e.response() - } - res => res, - } -} - #[tracing::instrument(level = "debug")] async fn snapshot_partition( req: Request,