From 80b12d417c408340909fad139526288b9989532e Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 14 Jan 2022 11:34:42 +0000 Subject: [PATCH 1/6] feat: abstract DML handler Defines the DmlHandler trait responsible for processing a request in some abstract way, decoupling the HTTP/gRPC request handlers from the underlying routing logic. --- Cargo.lock | 12 +++++ dml/src/lib.rs | 12 +++++ router2/Cargo.toml | 8 ++++ router2/src/dml_handler/mock.rs | 78 ++++++++++++++++++++++++++++++++ router2/src/dml_handler/mod.rs | 14 ++++++ router2/src/dml_handler/nop.rs | 28 ++++++++++++ router2/src/dml_handler/trait.rs | 32 +++++++++++++ router2/src/lib.rs | 1 + 8 files changed, 185 insertions(+) create mode 100644 router2/src/dml_handler/mock.rs create mode 100644 router2/src/dml_handler/mod.rs create mode 100644 router2/src/dml_handler/nop.rs create mode 100644 router2/src/dml_handler/trait.rs diff --git a/Cargo.lock b/Cargo.lock index 36d1d2a0cb..28b48b7b3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-stream" version = "0.3.2" @@ -3627,10 +3633,16 @@ dependencies = [ name = "router2" version = "0.1.0" dependencies = [ + "assert_matches", + "async-trait", "dml", "generated_types", "hyper", "metric", + "mutable_batch_lp", + "observability_deps", + "parking_lot", + "paste", "thiserror", "tonic", "trace", diff --git a/dml/src/lib.rs b/dml/src/lib.rs index b2a0982aad..7de1cd8074 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -131,6 +131,18 @@ impl DmlOperation { } } +impl From for DmlOperation { + fn from(v: DmlWrite) -> Self { + Self::Write(v) + } +} + +impl From for DmlOperation { + fn from(v: DmlDelete) -> Self { + Self::Delete(v) + } +} + /// A collection of writes to potentially multiple tables within the same database #[derive(Debug, Clone)] pub struct DmlWrite { diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 6c29667850..cc758a8c0b 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -6,11 +6,19 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" dml = { path = "../dml" } generated_types = { path = "../generated_types" } hyper = "0.14" metric = { path = "../metric" } +mutable_batch_lp = { path = "../mutable_batch_lp" } +observability_deps = { path = "../observability_deps" } +parking_lot = "0.11" thiserror = "1.0" tonic = "0.6" trace = { path = "../trace/" } workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +assert_matches = "1.5" +paste = "1.0.6" diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs new file mode 100644 index 0000000000..20d84e85dc --- /dev/null +++ b/router2/src/dml_handler/mock.rs @@ -0,0 +1,78 @@ +use std::{collections::VecDeque, sync::Arc}; + +use async_trait::async_trait; +use dml::DmlOperation; +use mutable_batch_lp::PayloadStatistics; +use parking_lot::Mutex; + +use super::{DmlError, DmlHandler}; + +#[derive(Debug, Clone)] +pub enum MockDmlHandlerCall { + Dispatch { + db_name: String, + op: DmlOperation, + payload_stats: PayloadStatistics, + body_len: usize, + }, +} + +#[derive(Debug, Default)] +struct Inner { + calls: Vec, + dispatch_return: VecDeque>, +} + +impl Inner { + fn record_call(&mut self, call: MockDmlHandlerCall) { + self.calls.push(call); + } +} + +#[derive(Debug, Default)] +pub struct MockDmlHandler(Mutex); + +impl MockDmlHandler { + pub fn with_dispatch_return(self, ret: impl Into>>) -> Self { + self.0.lock().dispatch_return = ret.into(); + self + } + + pub fn calls(&self) -> Vec { + self.0.lock().calls.clone() + } +} + +/// Mock helper to record a call and return the pre-configured value. +/// +/// Pushes `$call` to call record, popping `self.state.$return` and returning it +/// to the caller. If no value exists, the pop attempt causes a panic. +macro_rules! record_and_return { + ($self:ident, $call:expr, $return:ident) => {{ + let mut guard = $self.0.lock(); + guard.record_call($call); + guard.$return.pop_front().expect("no mock value to return") + }}; +} + +#[async_trait] +impl DmlHandler for Arc { + async fn dispatch<'a>( + &'a self, + db_name: impl Into + Send + Sync + 'a, + op: impl Into + Send + Sync + 'a, + payload_stats: PayloadStatistics, + body_len: usize, + ) -> Result<(), DmlError> { + record_and_return!( + self, + MockDmlHandlerCall::Dispatch { + db_name: db_name.into(), + op: op.into().clone(), + payload_stats, + body_len, + }, + dispatch_return + ) + } +} diff --git a/router2/src/dml_handler/mod.rs b/router2/src/dml_handler/mod.rs new file mode 100644 index 0000000000..8295396e2b --- /dev/null +++ b/router2/src/dml_handler/mod.rs @@ -0,0 +1,14 @@ +//! DML operation handling. +//! +//! An IOx node operating as a router exposes multiple API interfaces (HTTP, +//! gRPC) which funnel requests into a common [`DmlHandler`] implementation, +//! responsible for processing the request before pushing the results into the +//! appropriate write buffer sink. + +mod r#trait; +pub use r#trait::*; + +pub mod nop; + +#[cfg(test)] +pub mod mock; diff --git a/router2/src/dml_handler/nop.rs b/router2/src/dml_handler/nop.rs new file mode 100644 index 0000000000..024a97d59a --- /dev/null +++ b/router2/src/dml_handler/nop.rs @@ -0,0 +1,28 @@ +//! A NOP implementation of [`DmlHandler`]. + +use async_trait::async_trait; +use dml::DmlOperation; +use mutable_batch_lp::PayloadStatistics; +use observability_deps::tracing::*; + +use super::{DmlError, DmlHandler}; + +/// A [`DmlHandler`] implementation that does nothing. +#[derive(Debug, Default)] +pub struct NopDmlHandler; + +#[async_trait] +impl DmlHandler for NopDmlHandler { + async fn dispatch<'a>( + &'a self, + db_name: impl Into + Send + Sync + 'a, + op: impl Into + Send + Sync + 'a, + _payload_stats: PayloadStatistics, + _body_len: usize, + ) -> Result<(), DmlError> { + let db_name = db_name.into(); + let op = op.into(); + info!(%db_name, ?op, "dropping dml operation"); + Ok(()) + } +} diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs new file mode 100644 index 0000000000..bb87c7b616 --- /dev/null +++ b/router2/src/dml_handler/trait.rs @@ -0,0 +1,32 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use dml::DmlOperation; +use mutable_batch_lp::PayloadStatistics; +use thiserror::Error; + +/// Errors emitted by a [`DmlHandler`] implementation during DML request +/// processing. +#[derive(Debug, Error)] +pub enum DmlError { + /// The database specified by the caller does not exist. + #[error("database {0} does not exist")] + DatabaseNotFound(String), + + /// An unknown error occured while processing the DML request. + #[error("internal dml handler error: {0}")] + Internal(Box), +} + +/// An abstract handler of [`DmlOperation`] requests. +#[async_trait] +pub trait DmlHandler: Debug + Send + Sync { + /// Apply `op` to `db_name`. + async fn dispatch<'a>( + &'a self, + db_name: impl Into + Send + Sync + 'a, + op: impl Into + Send + Sync + 'a, + payload_stats: PayloadStatistics, + body_len: usize, + ) -> Result<(), DmlError>; +} diff --git a/router2/src/lib.rs b/router2/src/lib.rs index 859b0d0a46..52c529394c 100644 --- a/router2/src/lib.rs +++ b/router2/src/lib.rs @@ -25,4 +25,5 @@ )] #![allow(clippy::missing_docs_in_private_items)] +pub mod dml_handler; pub mod server; From 40a290f6f789d0b54612a74c1cc300069cde8f65 Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 14 Jan 2022 13:34:16 +0000 Subject: [PATCH 2/6] feat: router2 HTTP handlers Implements the HTTP v2 write API endpoint for router2. --- Cargo.lock | 8 + influxdb_iox/src/commands/run/router2.rs | 11 +- .../src/influxdb_ioxd/server_type/router2.rs | 28 +- router2/Cargo.toml | 8 + router2/src/server.rs | 39 +- router2/src/server/http.rs | 485 +++++++++++++++++- 6 files changed, 548 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28b48b7b3e..49cb080e8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,7 +3635,11 @@ version = "0.1.0" dependencies = [ "assert_matches", "async-trait", + "bytes", + "data_types", "dml", + "flate2", + "futures", "generated_types", "hyper", "metric", @@ -3643,7 +3647,11 @@ dependencies = [ "observability_deps", "parking_lot", "paste", + "serde", + "serde_urlencoded", "thiserror", + "time 0.1.0", + "tokio", "tonic", "trace", "workspace-hack", diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 43a4a6c3b6..f7c61a54db 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -13,7 +13,10 @@ use crate::{ }, }; use observability_deps::tracing::*; -use router2::server::RouterServer; +use router2::{ + dml_handler::nop::NopDmlHandler, + server::{http::HttpDelegate, RouterServer}, +}; use thiserror::Error; #[derive(Debug, Error)] @@ -53,7 +56,11 @@ pub struct Config { pub async fn command(config: Config) -> Result<()> { let common_state = CommonServerState::from_config(config.run_config.clone())?; - let router_server = RouterServer::default(); + let http = HttpDelegate::new( + config.run_config.max_http_request_size, + NopDmlHandler::default(), + ); + let router_server = RouterServer::new(http, Default::default()); let server_type = Arc::new(RouterServerType::new(router_server, &common_state)); info!("starting router2"); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs index 7c37c64d82..a8562c1080 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs @@ -1,9 +1,12 @@ -use std::{fmt::Display, sync::Arc}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; use hyper::{Body, Request, Response}; use metric::Registry; -use router2::server::RouterServer; +use router2::{dml_handler::DmlHandler, server::RouterServer}; use tokio_util::sync::CancellationToken; use trace::TraceCollector; @@ -14,14 +17,14 @@ use crate::influxdb_ioxd::{ }; #[derive(Debug)] -pub struct RouterServerType { - server: RouterServer, +pub struct RouterServerType { + server: RouterServer, shutdown: CancellationToken, trace_collector: Option>, } -impl RouterServerType { - pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { +impl RouterServerType { + pub fn new(server: RouterServer, common_state: &CommonServerState) -> Self { Self { server, shutdown: CancellationToken::new(), @@ -31,7 +34,10 @@ impl RouterServerType { } #[async_trait] -impl ServerType for RouterServerType { +impl ServerType for RouterServerType +where + D: DmlHandler + 'static, +{ type RouteError = IoxHttpErrorAdaptor; /// Return the [`metric::Registry`] used by the router. @@ -51,7 +57,11 @@ impl ServerType for RouterServerType { &self, req: Request, ) -> Result, Self::RouteError> { - self.server.http().route(req).map_err(IoxHttpErrorAdaptor) + self.server + .http() + .route(req) + .await + .map_err(IoxHttpErrorAdaptor) } /// Registers the services exposed by the router [`GrpcDelegate`] delegate. @@ -82,7 +92,7 @@ pub struct IoxHttpErrorAdaptor(router2::server::http::Error); impl Display for IoxHttpErrorAdaptor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) + Display::fmt(&self.0, f) } } diff --git a/router2/Cargo.toml b/router2/Cargo.toml index cc758a8c0b..b028e9acde 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -7,14 +7,22 @@ edition = "2021" [dependencies] async-trait = "0.1" +bytes = "1.1" +data_types = { path = "../data_types" } dml = { path = "../dml" } +flate2 = "1.0" +futures = "0.3.19" generated_types = { path = "../generated_types" } hyper = "0.14" metric = { path = "../metric" } mutable_batch_lp = { path = "../mutable_batch_lp" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11" +serde = "1.0" +serde_urlencoded = "0.7" thiserror = "1.0" +time = { path = "../time" } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tonic = "0.6" trace = { path = "../trace/" } workspace-hack = { path = "../workspace-hack"} diff --git a/router2/src/server.rs b/router2/src/server.rs index d6b4e16a8c..376b8c3d06 100644 --- a/router2/src/server.rs +++ b/router2/src/server.rs @@ -2,6 +2,8 @@ use std::sync::Arc; +use crate::dml_handler::DmlHandler; + use self::{grpc::GrpcDelegate, http::HttpDelegate}; pub mod grpc; @@ -10,22 +12,22 @@ pub mod http; /// The [`RouterServer`] manages the lifecycle and contains all state for a /// `router2` server instance. #[derive(Debug, Default)] -pub struct RouterServer { +pub struct RouterServer { metrics: Arc, - http: HttpDelegate, + http: HttpDelegate, grpc: GrpcDelegate, } -impl RouterServer { - /// Get a reference to the router http delegate. - pub fn http(&self) -> &HttpDelegate { - &self.http - } - - /// Get a reference to the router grpc delegate. - pub fn grpc(&self) -> &GrpcDelegate { - &self.grpc +impl RouterServer { + /// Initialise a new [`RouterServer`] using the provided HTTP and gRPC + /// handlers. + pub fn new(http: HttpDelegate, grpc: GrpcDelegate) -> Self { + Self { + metrics: Default::default(), + http, + grpc, + } } /// Return the [`metric::Registry`] used by the router. @@ -33,3 +35,18 @@ impl RouterServer { Arc::clone(&self.metrics) } } + +impl RouterServer +where + D: DmlHandler, +{ + /// Get a reference to the router http delegate. + pub fn http(&self) -> &HttpDelegate { + &self.http + } + + /// Get a reference to the router grpc delegate. + pub fn grpc(&self) -> &GrpcDelegate { + &self.grpc + } +} diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 9432f121ed..11e6b078e2 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -1,14 +1,62 @@ //! HTTP service implementations for `router2`. -use hyper::{Body, Request, Response, StatusCode}; +use std::str::Utf8Error; + +use bytes::{Bytes, BytesMut}; +use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; +use dml::{DmlMeta, DmlWrite}; +use futures::StreamExt; +use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; +use observability_deps::tracing::*; +use serde::Deserialize; use thiserror::Error; +use time::{SystemProvider, TimeProvider}; +use trace::ctx::SpanContext; + +use crate::dml_handler::{DmlError, DmlHandler}; /// Errors returned by the `router2` HTTP request handler. #[derive(Debug, Error)] pub enum Error { /// The requested path has no registered handler. #[error("not found")] - NotFound, + NoHandler, + + /// An error with the org/bucket in the request. + #[error(transparent)] + InvalidOrgBucket(#[from] OrgBucketError), + + /// The request body content is not valid utf8. + #[error("body content is not valid utf8: {0}")] + NonUtf8Body(Utf8Error), + + /// The `Content-Encoding` header is invalid and cannot be read. + #[error("invalid content-encoding header: {0}")] + NonUtf8ContentHeader(hyper::header::ToStrError), + + /// The specified `Content-Encoding` is not acceptable. + #[error("unacceptable content-encoding: {0}")] + InvalidContentEncoding(String), + + /// The client disconnected. + #[error("client disconnected")] + ClientHangup(hyper::Error), + + /// The client sent a request body that exceeds the configured maximum. + #[error("max request size ({0} bytes) exceeded")] + RequestSizeExceeded(usize), + + /// Decoding a gzip-compressed stream of data failed. + #[error("error decoding gzip stream: {0}")] + InvalidGzip(std::io::Error), + + /// Failure to decode the provided line protocol. + #[error("failed to parse line protocol: {0}")] + ParseLineProtocol(mutable_batch_lp::Error), + + /// An error returned from the [`DmlHandler`]. + #[error("dml handler error: {0}")] + DmlHandler(#[from] DmlError), } impl Error { @@ -16,11 +64,65 @@ impl Error { /// the end user. pub fn as_status_code(&self) -> StatusCode { match self { - Error::NotFound => StatusCode::NOT_FOUND, + Error::NoHandler | Error::DmlHandler(DmlError::DatabaseNotFound(_)) => { + StatusCode::NOT_FOUND + } + Error::InvalidOrgBucket(_) => StatusCode::BAD_REQUEST, + Error::ClientHangup(_) => StatusCode::BAD_REQUEST, + Error::InvalidGzip(_) => StatusCode::BAD_REQUEST, + Error::NonUtf8ContentHeader(_) => StatusCode::BAD_REQUEST, + Error::NonUtf8Body(_) => StatusCode::BAD_REQUEST, + Error::InvalidContentEncoding(_) => { + // https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13 + StatusCode::UNSUPPORTED_MEDIA_TYPE + } + Error::RequestSizeExceeded(_) => StatusCode::PAYLOAD_TOO_LARGE, + Error::ParseLineProtocol(_) => StatusCode::BAD_REQUEST, + Error::DmlHandler(DmlError::Internal(_)) => StatusCode::INTERNAL_SERVER_ERROR, } } } +/// Errors returned when decoding the organisation / bucket information from a +/// HTTP request and deriving the database name from it. +#[derive(Debug, Error)] +pub enum OrgBucketError { + /// The request contains no org/bucket destination information. + #[error("no org/bucket destination provided")] + NotSpecified, + + /// The request contains invalid parameters. + #[error("failed to deserialise org/bucket in request: {0}")] + DecodeFail(#[from] serde::de::value::Error), + + /// The provided org/bucket could not be converted into a database name. + #[error(transparent)] + MappingFail(#[from] OrgBucketMappingError), +} + +#[derive(Debug, Deserialize)] +/// Org & bucket identifiers for a DML operation. +pub struct OrgBucketInfo { + org: String, + bucket: String, +} + +impl TryFrom<&Request> for OrgBucketInfo { + type Error = OrgBucketError; + + fn try_from(req: &Request) -> Result { + let query = req.uri().query().ok_or(OrgBucketError::NotSpecified)?; + let got: OrgBucketInfo = serde_urlencoded::from_str(query)?; + + // An empty org or bucket is not acceptable. + if got.org.is_empty() || got.bucket.is_empty() { + return Err(OrgBucketError::NotSpecified); + } + + Ok(got) + } +} + /// This type is responsible for servicing requests to the `router2` HTTP /// endpoint. /// @@ -28,12 +130,377 @@ impl Error { /// server runner framework takes care of implementing the heath endpoint, /// metrics, pprof, etc. #[derive(Debug, Default)] -pub struct HttpDelegate; +pub struct HttpDelegate { + max_request_bytes: usize, + time_provider: T, + dml_handler: D, +} -impl HttpDelegate { - /// Routes `req` to the appropriate handler, if any, returning the handler - /// response. - pub fn route(&self, _req: Request) -> Result, Error> { - unimplemented!() +impl HttpDelegate { + /// Initialise a new [`HttpDelegate`] passing valid requests to the + /// specified `dml_handler`. + /// + /// HTTP request bodies are limited to `max_request_bytes` in size, + /// returning an error if exceeded. + pub fn new(max_request_bytes: usize, dml_handler: D) -> Self { + Self { + max_request_bytes, + time_provider: SystemProvider::default(), + dml_handler, + } } } + +impl HttpDelegate +where + D: DmlHandler, + T: TimeProvider, +{ + /// Routes `req` to the appropriate handler, if any, returning the handler + /// response. + pub async fn route(&self, req: Request) -> Result, Error> { + match (req.method(), req.uri().path()) { + (&Method::POST, "/api/v2/write") => self.write_handler(req).await, + (&Method::POST, "/api/v2/delete") => self.delete_handler(req).await, + _ => return Err(Error::NoHandler), + } + .map(|_| response_no_content()) + } + + async fn write_handler(&self, req: Request) -> Result<(), Error> { + let span_ctx: Option = req.extensions().get().cloned(); + + let account = OrgBucketInfo::try_from(&req)?; + let db_name = org_and_bucket_to_database(&account.org, &account.bucket) + .map_err(OrgBucketError::MappingFail)?; + + trace!(org=%account.org, bucket=%account.bucket, db_name=%db_name, "processing write request"); + + // Read the HTTP body and convert it to a str. + let body = self.read_body(req).await?; + let body = std::str::from_utf8(&body).map_err(Error::NonUtf8Body)?; + + // The time, in nanoseconds since the epoch, to assign to any points that don't + // contain a timestamp + let default_time = self.time_provider.now().timestamp_nanos(); + + let (tables, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) { + Ok(v) => v, + Err(mutable_batch_lp::Error::EmptyPayload) => { + debug!("nothing to write"); + return Ok(()); + } + Err(e) => return Err(Error::ParseLineProtocol(e)), + }; + + let op = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx)); + self.dml_handler + .dispatch(db_name, op, stats, body.len()) + .await?; + + Ok(()) + } + + async fn delete_handler(&self, _req: Request) -> Result<(), Error> { + unimplemented!() + } + + /// Parse the request's body into raw bytes, applying the configured size + /// limits and decoding any content encoding. + async fn read_body(&self, req: hyper::Request) -> Result { + let encoding = req + .headers() + .get(&CONTENT_ENCODING) + .map(|v| v.to_str().map_err(Error::NonUtf8ContentHeader)) + .transpose()?; + let ungzip = match encoding { + None => false, + Some("gzip") => true, + Some(v) => return Err(Error::InvalidContentEncoding(v.to_string())), + }; + + let mut payload = req.into_body(); + + let mut body = BytesMut::new(); + while let Some(chunk) = payload.next().await { + let chunk = chunk.map_err(Error::ClientHangup)?; + // limit max size of in-memory payload + if (body.len() + chunk.len()) > self.max_request_bytes { + return Err(Error::RequestSizeExceeded(self.max_request_bytes)); + } + body.extend_from_slice(&chunk); + } + let body = body.freeze(); + + // If the body is not compressed, return early. + if !ungzip { + return Ok(body); + } + + // Unzip the gzip-encoded content + use std::io::Read; + let decoder = flate2::read::GzDecoder::new(&body[..]); + + // Read at most max_request_bytes bytes to prevent a decompression bomb + // based DoS. + // + // In order to detect if the entire stream ahs been read, or truncated, + // read an extra byte beyond the limit and check the resulting data + // length - see the max_request_size_truncation test. + let mut decoder = decoder.take(self.max_request_bytes as u64 + 1); + let mut decoded_data = Vec::new(); + decoder + .read_to_end(&mut decoded_data) + .map_err(Error::InvalidGzip)?; + + // If the length is max_size+1, the body is at least max_size+1 bytes in + // length, and possibly longer, but truncated. + if decoded_data.len() > self.max_request_bytes { + return Err(Error::RequestSizeExceeded(self.max_request_bytes)); + } + + Ok(decoded_data.into()) + } +} + +fn response_no_content() -> Response { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap() +} + +#[cfg(test)] +mod tests { + use std::{io::Write, iter, sync::Arc}; + + use assert_matches::assert_matches; + use dml::DmlOperation; + use flate2::{write::GzEncoder, Compression}; + use hyper::header::HeaderValue; + + use crate::dml_handler::mock::{MockDmlHandler, MockDmlHandlerCall}; + + use super::*; + + const MAX_BYTES: usize = 1024; + + // Generate two write handler tests - one for a plain request and one with a + // gzip-encoded body (and appropriate header), asserting the handler return + // value & write op. + macro_rules! test_write_handler { + ( + $name:ident, + query_string = $query_string:expr, // Request URI query string + body = $body:expr, // Request body content + dml_handler = $dml_handler:expr, // DML handler response (if called) + want_write_db = $want_write_db:expr, // Expected write DB name (empty if no write) + want_return = $($want_return:tt )+ // Expected HTTP response + ) => { + // Generate the two test cases by feed the same inputs, but varying + // the encoding. + test_write_handler!( + $name, + encoding=plain, + query_string = $query_string, + body = $body, + dml_handler = $dml_handler, + want_write_db = $want_write_db, + want_return = $($want_return)+ + ); + test_write_handler!( + $name, + encoding=gzip, + query_string = $query_string, + body = $body, + dml_handler = $dml_handler, + want_write_db = $want_write_db, + want_return = $($want_return)+ + ); + }; + + // Actual test body generator. + ( + $name:ident, + encoding = $encoding:tt, + query_string = $query_string:expr, + body = $body:expr, + dml_handler = $dml_handler:expr, + want_write_db = $want_write_db:expr, + want_return = $($want_return:tt )+ + ) => { + paste::paste! { + #[tokio::test] + async fn []() { + let body = $body; + let want_body_len = body.len(); + + // Optionally generate a fragment of code to encode the body + let body = test_write_handler!(encoding=$encoding, body); + + #[allow(unused_mut)] + let mut request = Request::builder() + .uri(format!("https://bananas.example/api/v2/write{}", $query_string)) + .method("POST") + .body(Body::from(body)) + .unwrap(); + + // Optionally modify request to account for the desired + // encoding + test_write_handler!(encoding_header=$encoding, request); + + let dml_handler = Arc::new(MockDmlHandler::default().with_dispatch_return($dml_handler)); + let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler)); + + let got = delegate.route(request).await; + assert_matches!(got, $($want_return)+); + + let calls = dml_handler.calls(); + if !$want_write_db.is_empty() { + assert_eq!(calls.len(), 1); + + // Validate the write op + let op = assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, op, body_len, .. } => { + assert_eq!(db_name, $want_write_db); + assert_eq!(*body_len, want_body_len); + op + }); + assert_matches!(op, DmlOperation::Write(_)); + } else { + assert!(calls.is_empty()); + } + } + } + }; + (encoding=plain, $body:ident) => { + $body + }; + (encoding=gzip, $body:ident) => {{ + // Apply gzip compression to the body + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(&$body).unwrap(); + e.finish().expect("failed to compress test body") + }}; + (encoding_header=plain, $request:ident) => {}; + (encoding_header=gzip, $request:ident) => {{ + // Set the gzip content encoding + $request + .headers_mut() + .insert(CONTENT_ENCODING, HeaderValue::from_static("gzip")); + }}; + } + + test_write_handler!( + ok, + query_string = "?org=bananas&bucket=test", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "bananas_test", + want_return = Ok(r) => { + assert_eq!(r.status(), StatusCode::NO_CONTENT); + } + ); + + test_write_handler!( + no_query_params, + query_string = "", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)) + ); + + test_write_handler!( + no_org_bucket, + query_string = "?", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::InvalidOrgBucket(OrgBucketError::DecodeFail(_))) + ); + + test_write_handler!( + empty_org_bucket, + query_string = "?org=&bucket=", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::InvalidOrgBucket(OrgBucketError::NotSpecified)) + ); + + test_write_handler!( + invalid_org_bucket, + query_string = format!( + "?org=test&bucket={}", + iter::repeat("A").take(1000).collect::() + ), + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::InvalidOrgBucket(OrgBucketError::MappingFail(_))) + ); + + test_write_handler!( + invalid_line_protocol, + query_string = "?org=bananas&bucket=test", + body = "not line protocol".as_bytes(), + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::ParseLineProtocol(_)) + ); + + test_write_handler!( + non_utf8_body, + query_string = "?org=bananas&bucket=test", + body = vec![0xc3, 0x28], + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::NonUtf8Body(_)) + ); + + test_write_handler!( + max_request_size_truncation, + query_string = "?org=bananas&bucket=test", + body = { + // Generate a LP string in the form of: + // + // bananas,A=AAAAAAAAAA(repeated)... B=42 + // ^ + // | + // MAX_BYTES boundary + // + // So that reading MAX_BYTES number of bytes produces the string: + // + // bananas,A=AAAAAAAAAA(repeated)... + // + // Effectively trimming off the " B=42" suffix. + let body = "bananas,A="; + iter::once(body) + .chain(iter::repeat("A").take(MAX_BYTES - body.len())) + .chain(iter::once(" B=42\n")) + .flat_map(|s| s.bytes()) + .collect::>() + }, + dml_handler = [Ok(())], + want_write_db = "", // None + want_return = Err(Error::RequestSizeExceeded(_)) + ); + + test_write_handler!( + db_not_found, + query_string = "?org=bananas&bucket=test", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Err(DmlError::DatabaseNotFound("bananas_test".to_string()))], + want_write_db = "bananas_test", + want_return = Err(Error::DmlHandler(DmlError::DatabaseNotFound(_))) + ); + + test_write_handler!( + dml_handler_error, + query_string = "?org=bananas&bucket=test", + body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), + dml_handler = [Err(DmlError::Internal("💣".into()))], + want_write_db = "bananas_test", + want_return = Err(Error::DmlHandler(DmlError::Internal(_))) + ); +} From 7f99d18dd135f85f566a4eef1b193023fa792100 Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 14 Jan 2022 16:01:53 +0000 Subject: [PATCH 3/6] refactor: clippy --- router2/src/dml_handler/mock.rs | 2 +- router2/src/server/http.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs index 20d84e85dc..73b44e058b 100644 --- a/router2/src/dml_handler/mock.rs +++ b/router2/src/dml_handler/mock.rs @@ -68,7 +68,7 @@ impl DmlHandler for Arc { self, MockDmlHandlerCall::Dispatch { db_name: db_name.into(), - op: op.into().clone(), + op: op.into(), payload_stats, body_len, }, diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 11e6b078e2..1042ad71e2 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -430,10 +430,7 @@ mod tests { test_write_handler!( invalid_org_bucket, - query_string = format!( - "?org=test&bucket={}", - iter::repeat("A").take(1000).collect::() - ), + query_string = format!("?org=test&bucket={}", "A".repeat(1000)), body = "platanos,tag1=A,tag2=B val=42i 123456".as_bytes(), dml_handler = [Ok(())], want_write_db = "", // None From 885c831aff2d01bdd3a43273c433d35ae898f68f Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 14 Jan 2022 16:52:03 +0000 Subject: [PATCH 4/6] refactor: avoid constructing DmlOperation Instead of converting the set of MutableBatches into a DmlOperation to shard into more DmlOperation instances, the sharder can operate directly on the MutableBatches. --- Cargo.lock | 2 ++ router2/Cargo.toml | 2 ++ router2/src/dml_handler/mock.rs | 17 +++++++++++------ router2/src/dml_handler/nop.rs | 17 ++++++++++------- router2/src/dml_handler/trait.rs | 15 ++++++++++----- router2/src/server/http.rs | 15 ++++++--------- 6 files changed, 41 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49cb080e8b..4257d2a4d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3641,8 +3641,10 @@ dependencies = [ "flate2", "futures", "generated_types", + "hashbrown", "hyper", "metric", + "mutable_batch", "mutable_batch_lp", "observability_deps", "parking_lot", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index b028e9acde..0c9701a800 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -13,8 +13,10 @@ dml = { path = "../dml" } flate2 = "1.0" futures = "0.3.19" generated_types = { path = "../generated_types" } +hashbrown = "0.11" hyper = "0.14" metric = { path = "../metric" } +mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11" diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs index 73b44e058b..36dbd29eb3 100644 --- a/router2/src/dml_handler/mock.rs +++ b/router2/src/dml_handler/mock.rs @@ -1,9 +1,13 @@ use std::{collections::VecDeque, sync::Arc}; use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use parking_lot::Mutex; +use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -11,7 +15,7 @@ use super::{DmlError, DmlHandler}; pub enum MockDmlHandlerCall { Dispatch { db_name: String, - op: DmlOperation, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, }, @@ -57,18 +61,19 @@ macro_rules! record_and_return { #[async_trait] impl DmlHandler for Arc { - async fn dispatch<'a>( + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, + _span_ctx: Option, ) -> Result<(), DmlError> { record_and_return!( self, MockDmlHandlerCall::Dispatch { db_name: db_name.into(), - op: op.into(), + batches, payload_stats, body_len, }, diff --git a/router2/src/dml_handler/nop.rs b/router2/src/dml_handler/nop.rs index 024a97d59a..5b4eff2abf 100644 --- a/router2/src/dml_handler/nop.rs +++ b/router2/src/dml_handler/nop.rs @@ -1,9 +1,13 @@ //! A NOP implementation of [`DmlHandler`]. use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use observability_deps::tracing::*; +use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; @@ -13,16 +17,15 @@ pub struct NopDmlHandler; #[async_trait] impl DmlHandler for NopDmlHandler { - async fn dispatch<'a>( + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, _payload_stats: PayloadStatistics, _body_len: usize, + _span_ctx: Option, ) -> Result<(), DmlError> { - let db_name = db_name.into(); - let op = op.into(); - info!(%db_name, ?op, "dropping dml operation"); + info!(%db_name, ?batches, "dropping write operation"); Ok(()) } } diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs index bb87c7b616..da5aedd755 100644 --- a/router2/src/dml_handler/trait.rs +++ b/router2/src/dml_handler/trait.rs @@ -1,9 +1,13 @@ use std::fmt::Debug; use async_trait::async_trait; -use dml::DmlOperation; +use data_types::DatabaseName; + +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use mutable_batch_lp::PayloadStatistics; use thiserror::Error; +use trace::ctx::SpanContext; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -21,12 +25,13 @@ pub enum DmlError { /// An abstract handler of [`DmlOperation`] requests. #[async_trait] pub trait DmlHandler: Debug + Send + Sync { - /// Apply `op` to `db_name`. - async fn dispatch<'a>( + /// Write `batches` to `db_name`. + async fn write<'a>( &'a self, - db_name: impl Into + Send + Sync + 'a, - op: impl Into + Send + Sync + 'a, + db_name: DatabaseName<'_>, + batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, + span_ctx: Option, ) -> Result<(), DmlError>; } diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 1042ad71e2..06ed587609 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -4,7 +4,7 @@ use std::str::Utf8Error; use bytes::{Bytes, BytesMut}; use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; -use dml::{DmlMeta, DmlWrite}; + use futures::StreamExt; use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; use observability_deps::tracing::*; @@ -184,7 +184,7 @@ where // contain a timestamp let default_time = self.time_provider.now().timestamp_nanos(); - let (tables, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) { + let (batches, stats) = match mutable_batch_lp::lines_to_batches_stats(body, default_time) { Ok(v) => v, Err(mutable_batch_lp::Error::EmptyPayload) => { debug!("nothing to write"); @@ -193,9 +193,8 @@ where Err(e) => return Err(Error::ParseLineProtocol(e)), }; - let op = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx)); self.dml_handler - .dispatch(db_name, op, stats, body.len()) + .write(db_name, batches, stats, body.len(), span_ctx) .await?; Ok(()) @@ -275,7 +274,7 @@ mod tests { use std::{io::Write, iter, sync::Arc}; use assert_matches::assert_matches; - use dml::DmlOperation; + use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; @@ -360,12 +359,10 @@ mod tests { assert_eq!(calls.len(), 1); // Validate the write op - let op = assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, op, body_len, .. } => { + assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, body_len, .. } => { assert_eq!(db_name, $want_write_db); assert_eq!(*body_len, want_body_len); - op - }); - assert_matches!(op, DmlOperation::Write(_)); + }) } else { assert!(calls.is_empty()); } From 7badf37250d43c03ed9e9c21aa0cefad06078d44 Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 17 Jan 2022 11:46:09 +0000 Subject: [PATCH 5/6] refactor: db_name -> namespace Renames all "database name" references to "namespace". --- router2/src/dml_handler/mock.rs | 6 +++--- router2/src/dml_handler/nop.rs | 4 ++-- router2/src/dml_handler/trait.rs | 4 ++-- router2/src/server/http.rs | 12 ++++++------ 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/router2/src/dml_handler/mock.rs b/router2/src/dml_handler/mock.rs index 36dbd29eb3..b71680a71b 100644 --- a/router2/src/dml_handler/mock.rs +++ b/router2/src/dml_handler/mock.rs @@ -14,7 +14,7 @@ use super::{DmlError, DmlHandler}; #[derive(Debug, Clone)] pub enum MockDmlHandlerCall { Dispatch { - db_name: String, + namespace: String, batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, @@ -63,7 +63,7 @@ macro_rules! record_and_return { impl DmlHandler for Arc { async fn write<'a>( &'a self, - db_name: DatabaseName<'_>, + namespace: DatabaseName<'_>, batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, @@ -72,7 +72,7 @@ impl DmlHandler for Arc { record_and_return!( self, MockDmlHandlerCall::Dispatch { - db_name: db_name.into(), + namespace: namespace.into(), batches, payload_stats, body_len, diff --git a/router2/src/dml_handler/nop.rs b/router2/src/dml_handler/nop.rs index 5b4eff2abf..a8cd807346 100644 --- a/router2/src/dml_handler/nop.rs +++ b/router2/src/dml_handler/nop.rs @@ -19,13 +19,13 @@ pub struct NopDmlHandler; impl DmlHandler for NopDmlHandler { async fn write<'a>( &'a self, - db_name: DatabaseName<'_>, + namespace: DatabaseName<'_>, batches: HashMap, _payload_stats: PayloadStatistics, _body_len: usize, _span_ctx: Option, ) -> Result<(), DmlError> { - info!(%db_name, ?batches, "dropping write operation"); + info!(%namespace, ?batches, "dropping write operation"); Ok(()) } } diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs index da5aedd755..382d208923 100644 --- a/router2/src/dml_handler/trait.rs +++ b/router2/src/dml_handler/trait.rs @@ -25,10 +25,10 @@ pub enum DmlError { /// An abstract handler of [`DmlOperation`] requests. #[async_trait] pub trait DmlHandler: Debug + Send + Sync { - /// Write `batches` to `db_name`. + /// Write `batches` to `namespace`. async fn write<'a>( &'a self, - db_name: DatabaseName<'_>, + namespace: DatabaseName<'_>, batches: HashMap, payload_stats: PayloadStatistics, body_len: usize, diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 06ed587609..2a013e3f10 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -171,10 +171,10 @@ where let span_ctx: Option = req.extensions().get().cloned(); let account = OrgBucketInfo::try_from(&req)?; - let db_name = org_and_bucket_to_database(&account.org, &account.bucket) + let namespace = org_and_bucket_to_database(&account.org, &account.bucket) .map_err(OrgBucketError::MappingFail)?; - trace!(org=%account.org, bucket=%account.bucket, db_name=%db_name, "processing write request"); + trace!(org=%account.org, bucket=%account.bucket, %namespace, "processing write request"); // Read the HTTP body and convert it to a str. let body = self.read_body(req).await?; @@ -194,7 +194,7 @@ where }; self.dml_handler - .write(db_name, batches, stats, body.len(), span_ctx) + .write(namespace, batches, stats, body.len(), span_ctx) .await?; Ok(()) @@ -274,7 +274,7 @@ mod tests { use std::{io::Write, iter, sync::Arc}; use assert_matches::assert_matches; - + use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; @@ -359,8 +359,8 @@ mod tests { assert_eq!(calls.len(), 1); // Validate the write op - assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ db_name, body_len, .. } => { - assert_eq!(db_name, $want_write_db); + assert_matches!(&calls[0], MockDmlHandlerCall::Dispatch{ namespace, body_len, .. } => { + assert_eq!(namespace, $want_write_db); assert_eq!(*body_len, want_body_len); }) } else { From 1b7369e743b3e601c624bc666f419a64cf9bdeca Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 17 Jan 2022 11:55:23 +0000 Subject: [PATCH 6/6] docs: fix broken doc link --- router2/src/dml_handler/trait.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router2/src/dml_handler/trait.rs b/router2/src/dml_handler/trait.rs index 382d208923..c2e4676d4e 100644 --- a/router2/src/dml_handler/trait.rs +++ b/router2/src/dml_handler/trait.rs @@ -22,7 +22,7 @@ pub enum DmlError { Internal(Box), } -/// An abstract handler of [`DmlOperation`] requests. +/// An abstract handler of DML requests. #[async_trait] pub trait DmlHandler: Debug + Send + Sync { /// Write `batches` to `namespace`.