Merge pull request #742 from influxdata/routerify-error-handling

feat: consistent global error handling and logging
pull/24376/head
Raphael Taylor-Davies 2021-02-04 15:04:25 +00:00 committed by GitHub
commit 42de58497e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 131 deletions

2
Cargo.lock generated
View File

@ -2869,7 +2869,7 @@ dependencies = [
[[package]]
name = "routerify"
version = "2.0.0-beta-2"
source = "git+https://github.com/influxdata/routerify?rev=bfe198e#bfe198e006d85b1b648ac96101d676b620d6f569"
source = "git+https://github.com/influxdata/routerify?rev=274e250#274e250e556b968ad02259282c0433e445db4200"
dependencies = [
"http",
"hyper",

View File

@ -57,7 +57,7 @@ bytes = "1.0"
hyper = "0.14"
# Forked to upgrade hyper and tokio
routerify = { git = "https://github.com/influxdata/routerify", rev = "bfe198e" }
routerify = { git = "https://github.com/influxdata/routerify", rev = "274e250" }
tokio = { version = "1.0", features=["macros", "rt-multi-thread"] }
tokio-stream = {version = "0.1.2", features=["net"]}

View File

@ -26,7 +26,7 @@ use bytes::{Bytes, BytesMut};
use futures::{self, StreamExt};
use http::header::CONTENT_ENCODING;
use hyper::{Body, Method, Request, Response, StatusCode};
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterService};
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError, RouterService};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, error, info};
@ -146,8 +146,8 @@ pub enum ApplicationError {
}
impl ApplicationError {
pub fn response(&self) -> Result<Response<Body>, Self> {
Ok(match self {
pub fn response(&self) -> Response<Body> {
match self {
Self::BucketByName { .. } => self.internal_error(),
Self::BucketMappingError { .. } => self.internal_error(),
Self::WritingPoints { .. } => self.internal_error(),
@ -171,7 +171,7 @@ impl ApplicationError {
Self::ErrorCreatingDatabase { .. } => self.bad_request(),
Self::DatabaseNameError { .. } => self.bad_request(),
Self::DatabaseNotFound { .. } => self.not_found(),
})
}
}
fn bad_request(&self) -> Response<Body> {
@ -247,14 +247,14 @@ where
info!(response = ?res, "Successfully processed request");
Ok(res)
})) // this endpoint is for API backward compatibility with InfluxDB 2.x
.post("/api/v2/write", write_handler::<M>)
.post("/api/v2/write", write::<M>)
.get("/ping", ping)
.get("/api/v2/read", read_handler::<M>)
.put("/iox/api/v1/databases/:name", create_database_handler::<M>)
.get("/iox/api/v1/databases/:name", get_database_handler::<M>)
.put("/iox/api/v1/id", set_writer_handler::<M>)
.get("/api/v1/partitions", list_partitions_handler::<M>)
.post("/api/v1/snapshot", snapshot_partition_handler::<M>)
.get("/api/v2/read", read::<M>)
.put("/iox/api/v1/databases/:name", create_database::<M>)
.get("/iox/api/v1/databases/:name", get_database::<M>)
.put("/iox/api/v1/id", set_writer::<M>)
.get("/api/v1/partitions", list_partitions::<M>)
.post("/api/v1/snapshot", snapshot_partition::<M>)
// Specify the error handler to handle any errors caused by
// a route or any middleware.
.err_handler_with_info(error_handler)
@ -262,19 +262,29 @@ where
.unwrap()
}
// the Routerify error handler. This should be the handler of last resort.
// Errors should be handled with responses built in the individual handlers for
// specific ApplicationError(s)
async fn error_handler(err: routerify::Error, req: RequestInfo) -> Response<Body> {
let method = req.method().clone();
let uri = req.uri().clone();
error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, "Error while handling request");
// The API-global error handler, handles ApplicationErrors originating from
// individual routes and middlewares, along with errors from the router itself
async fn error_handler(err: RouterError<ApplicationError>, req: RequestInfo) -> Response<Body> {
match err {
RouterError::HandleRequest(e, _)
| RouterError::HandlePreMiddlewareRequest(e)
| RouterError::HandlePostMiddlewareWithInfoRequest(e)
| RouterError::HandlePostMiddlewareWithoutInfoRequest(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
_ => {
let method = req.method().clone();
let uri = req.uri().clone();
error!(error = ?err, error_message = ?err.to_string(), method = ?method, uri = ?uri, "Error while handling request");
let json = serde_json::json!({"error": err.to_string()}).to_string();
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(json))
.unwrap()
let json = serde_json::json!({"error": err.to_string()}).to_string();
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(json))
.unwrap()
}
}
}
#[derive(Debug, Deserialize)]
@ -336,20 +346,6 @@ async fn parse_body(req: hyper::Request<Body>) -> Result<Bytes, ApplicationError
}
}
#[tracing::instrument(level = "debug")]
async fn write_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match write::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn write<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
@ -410,21 +406,6 @@ struct ReadInfo {
sql_query: String,
}
#[tracing::instrument(level = "debug")]
async fn read_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match read::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
// TODO: figure out how to stream read results out rather than rendering the
// whole thing in mem
#[tracing::instrument(level = "debug")]
@ -467,21 +448,6 @@ async fn read<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(Response::new(Body::from(results.into_bytes())))
}
#[tracing::instrument(level = "debug")]
async fn create_database_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match create_database::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn create_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
@ -508,21 +474,6 @@ async fn create_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(Response::new(Body::empty()))
}
#[tracing::instrument(level = "debug")]
async fn get_database_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match get_database::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn get_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
@ -553,21 +504,6 @@ async fn get_database<M: ConnectionManager + Send + Sync + Debug + 'static>(
Ok(response)
}
#[tracing::instrument(level = "debug")]
async fn set_writer_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match set_writer::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn set_writer<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
@ -615,21 +551,6 @@ struct DatabaseInfo {
bucket: String,
}
#[tracing::instrument(level = "debug")]
async fn list_partitions_handler<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match list_partitions::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
@ -674,23 +595,6 @@ struct SnapshotInfo {
partition: String,
}
#[tracing::instrument(level = "debug")]
async fn snapshot_partition_handler<M>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
match snapshot_partition::<M>(req).await {
Err(e) => {
error!(error = ?e, error_message = ?e.to_string(), "Error while handling request");
e.response()
}
res => res,
}
}
#[tracing::instrument(level = "debug")]
async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,