diff --git a/Cargo.lock b/Cargo.lock index 3537b07ce8..6012035630 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4295,6 +4295,7 @@ dependencies = [ "generated_types", "hashbrown", "hyper", + "influxdb_line_protocol", "iox_catalog", "iox_tests", "iox_time", diff --git a/router/Cargo.toml b/router/Cargo.toml index 8bf6094358..b2c59455b8 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -45,6 +45,7 @@ write_summary = { path = "../write_summary" } [dev-dependencies] assert_matches = "1.5" criterion = { version = "0.4", default-features = false, features = ["async_tokio", "rayon"]} +influxdb_line_protocol = { path = "../influxdb_line_protocol" } iox_tests = { path = "../iox_tests" } once_cell = "1" paste = "1.0.9" diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 53c8659ed7..85c0db67e3 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -2,8 +2,6 @@ mod delete_predicate; -use std::{str::Utf8Error, time::Instant}; - use bytes::{Bytes, BytesMut}; use data_types::{org_and_bucket_to_database, OrgBucketMappingError}; use futures::StreamExt; @@ -16,6 +14,7 @@ use mutable_batch_lp::LinesConverter; use observability_deps::tracing::*; use predicate::delete_predicate::parse_delete_predicate; use serde::Deserialize; +use std::{str::Utf8Error, time::Instant}; use thiserror::Error; use tokio::sync::{Semaphore, TryAcquireError}; use trace::ctx::SpanContext; @@ -370,7 +369,12 @@ where let namespace = org_and_bucket_to_database(&write_info.org, &write_info.bucket) .map_err(OrgBucketError::MappingFail)?; - trace!(org=%write_info.org, bucket=%write_info.bucket, %namespace, "processing write request"); + trace!( + org=%write_info.org, + bucket=%write_info.bucket, + %namespace, + "processing write request" + ); // Read the HTTP body and convert it to a str. let body = self.read_body(req).await?; @@ -538,23 +542,22 @@ where #[cfg(test)] mod tests { - use std::{io::Write, iter, sync::Arc, time::Duration}; - - use assert_matches::assert_matches; - use data_types::NamespaceId; - use flate2::{write::GzEncoder, Compression}; - use hyper::header::HeaderValue; - use metric::{Attributes, Metric}; - use mutable_batch::column::ColumnData; - use mutable_batch_lp::LineWriteError; - use test_helpers::timeout::FutureTimeout; - use tokio_stream::wrappers::ReceiverStream; - use super::*; use crate::{ dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}, namespace_resolver::mock::MockNamespaceResolver, }; + use assert_matches::assert_matches; + use data_types::{DatabaseNameError, NamespaceId}; + use flate2::{write::GzEncoder, Compression}; + use hyper::header::HeaderValue; + use metric::{Attributes, Metric}; + use mutable_batch::column::ColumnData; + use mutable_batch_lp::LineWriteError; + use serde::de::Error as _; + use std::{io::Write, iter, sync::Arc, time::Duration}; + use test_helpers::timeout::FutureTimeout; + use tokio_stream::wrappers::ReceiverStream; const MAX_BYTES: usize = 1024; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); @@ -1313,4 +1316,194 @@ mod tests { // And the request rejected metric must remain unchanged assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(1)); } + + // The display text of Error gets passed through `ioxd_router::IoxHttpErrorAdaptor` then + // `ioxd_common::http::error::HttpApiError` as the JSON "message" value in error response + // bodies. These are fixture tests to document error messages that users might see when + // making requests to `/api/v2/write`. + macro_rules! check_errors { + ( + $(( // This macro expects a list of tuples, each specifying: + $variant:ident // - One of the error enum variants + $(($data:expr))?, // - If needed, an expression to construct the variant's data + $msg:expr $(,)? // - The string expected for `Display`ing this variant + )),*, + ) => { + // Generate code that contains all possible error variants, to ensure a compiler error + // if any errors are not explicitly covered in this test. + #[test] + fn all_error_variants_are_checked() { + #[allow(dead_code)] + fn all_documented(ensure_all_error_variants_are_checked: Error) { + #[allow(unreachable_patterns)] + match ensure_all_error_variants_are_checked { + $(Error::$variant { .. } => {},)* + // If this test doesn't compile because of a non-exhaustive pattern, + // a variant needs to be added to the `check_errors!` call with the + // expected `to_string()` text. + } + } + } + + // A test that covers all errors given to this macro. + #[tokio::test] + async fn error_messages_match() { + // Generate an assert for each error given to this macro. + $( + let e = Error::$variant $(($data))?; + assert_eq!(e.to_string(), $msg); + )* + } + + #[test] + fn print_out_error_text() { + println!("{}", concat!($(stringify!($variant), "\t", $msg, "\n",)*),) + } + }; + } + + check_errors! { + ( + NoHandler, + "not found", + ), + + (InvalidOrgBucket(OrgBucketError::NotSpecified), "no org/bucket destination provided"), + + ( + InvalidOrgBucket({ + let e = serde::de::value::Error::custom("[deserialization error]"); + OrgBucketError::DecodeFail(e) + }), + "failed to deserialize org/bucket/precision in request: [deserialization error]", + ), + + ( + InvalidOrgBucket(OrgBucketError::MappingFail(OrgBucketMappingError::NotSpecified)), + "missing org/bucket value", + ), + + ( + InvalidOrgBucket({ + let e = DatabaseNameError::LengthConstraint { name: "[too long name]".into() }; + let e = OrgBucketMappingError::InvalidDatabaseName { source: e }; + OrgBucketError::MappingFail(e) + }), + "Invalid database name: \ + Database name [too long name] length must be between 1 and 64 characters", + ), + + ( + NonUtf8Body(std::str::from_utf8(&[0, 159]).unwrap_err()), + "body content is not valid utf8: invalid utf-8 sequence of 1 bytes from index 1", + ), + + ( + NonUtf8ContentHeader({ + hyper::header::HeaderValue::from_bytes(&[159]).unwrap().to_str().unwrap_err() + }), + "invalid content-encoding header: failed to convert header to a str", + ), + + ( + InvalidContentEncoding("[invalid content encoding value]".into()), + "unacceptable content-encoding: [invalid content encoding value]", + ), + + ( + ClientHangup({ + let url = "wrong://999.999.999.999:999999".parse().unwrap(); + hyper::Client::new().get(url).await.unwrap_err() + }), + "client disconnected", + ), + + ( + RequestSizeExceeded(1337), + "max request size (1337 bytes) exceeded", + ), + + ( + InvalidGzip(std::io::Error::new(std::io::ErrorKind::Other, "[io Error]")), + "error decoding gzip stream: [io Error]", + ), + + ( + ParseLineProtocol(mutable_batch_lp::Error::LineProtocol { + source: influxdb_line_protocol::Error::FieldSetMissing, + line: 42, + }), + "failed to parse line protocol: \ + error parsing line 42 (1-based): No fields were provided", + ), + + ( + ParseLineProtocol(mutable_batch_lp::Error::Write { + source: mutable_batch_lp::LineWriteError::DuplicateTag { + name: "host".into(), + }, + line: 42, + }), + "failed to parse line protocol: \ + error writing line 42: \ + the tag 'host' is specified more than once with conflicting values", + ), + + ( + ParseLineProtocol(mutable_batch_lp::Error::Write { + source: mutable_batch_lp::LineWriteError::ConflictedFieldTypes { + name: "bananas".into(), + }, + line: 42, + }), + "failed to parse line protocol: \ + error writing line 42: \ + the field 'bananas' is specified more than once with conflicting types", + ), + + ( + ParseLineProtocol(mutable_batch_lp::Error::EmptyPayload), + "failed to parse line protocol: empty write payload", + ), + + ( + ParseLineProtocol(mutable_batch_lp::Error::TimestampOverflow), + "failed to parse line protocol: timestamp overflows i64", + ), + + ( + ParseDelete({ + predicate::delete_predicate::Error::InvalidSyntax { value: "[syntax]".into() } + }), + "failed to parse delete predicate: Invalid predicate syntax: ([syntax])", + ), + + ( + ParseHttpDelete({ + delete_predicate::Error::TableInvalid { value: "[table name]".into() } + }), + "failed to parse delete predicate from http request: \ + Invalid table name in delete '[table name]'" + ), + + ( + DmlHandler(DmlError::DatabaseNotFound("[database name]".into())), + "dml handler error: database [database name] does not exist", + ), + + ( + NamespaceResolver({ + let e = iox_catalog::interface::Error::NameExists { name: "[name]".into() }; + crate::namespace_resolver::Error::Lookup(e) + }), + "failed to resolve namespace ID: \ + failed to resolve namespace ID: \ + name [name] already exists", + ), + + ( + RequestLimit, + "this service is overloaded, please try again later", + ), + } }