diff --git a/Cargo.lock b/Cargo.lock index 8a685db051..b9d975abb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,6 +1889,7 @@ dependencies = [ "logfmt", "metric", "metric_exporters", + "mutable_batch", "mutable_batch_lp", "mutable_batch_pb", "mutable_buffer", @@ -3232,7 +3233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core 0.9.0", + "parking_lot_core 0.9.1", ] [[package]] @@ -3251,15 +3252,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2f4f894f3865f6c0e02810fc597300f34dc2510f66400da262d8ae10e75767d" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-sys 0.29.0", + "windows-sys 0.32.0", ] [[package]] @@ -4138,7 +4139,9 @@ dependencies = [ "generated_types", "hashbrown 0.12.0", "hyper", + "influxdb_line_protocol", "iox_catalog", + "lazy_static", "metric", "mutable_batch", "mutable_batch_lp", @@ -4146,6 +4149,7 @@ dependencies = [ "parking_lot 0.12.0", "paste", "predicate", + "pretty_assertions", "rand", "schema", "serde", @@ -5911,19 +5915,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-sys" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ceb069ac8b2117d36924190469735767f0990833935ab430155e71a44bafe148" -dependencies = [ - "windows_aarch64_msvc 0.29.0", - "windows_i686_gnu 0.29.0", - "windows_i686_msvc 0.29.0", - "windows_x86_64_gnu 0.29.0", - "windows_x86_64_msvc 0.29.0", -] - [[package]] name = "windows-sys" version = "0.30.0" @@ -5938,10 +5929,17 @@ dependencies = [ ] [[package]] -name = "windows_aarch64_msvc" -version = "0.29.0" +name = "windows-sys" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d027175d00b01e0cbeb97d6ab6ebe03b12330a35786cbaca5252b1c4bf5d9b" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc 0.32.0", + "windows_i686_gnu 0.32.0", + "windows_i686_msvc 0.32.0", + "windows_x86_64_gnu 0.32.0", + "windows_x86_64_msvc 0.32.0", +] [[package]] name = "windows_aarch64_msvc" @@ -5950,10 +5948,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29277a4435d642f775f63c7d1faeb927adba532886ce0287bd985bffb16b6bca" [[package]] -name = "windows_i686_gnu" -version = "0.29.0" +name = "windows_aarch64_msvc" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8793f59f7b8e8b01eda1a652b2697d87b93097198ae85f823b969ca5b89bba58" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" [[package]] name = "windows_i686_gnu" @@ -5962,10 +5960,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1145e1989da93956c68d1864f32fb97c8f561a8f89a5125f6a2b7ea75524e4b8" [[package]] -name = "windows_i686_msvc" -version = "0.29.0" +name = "windows_i686_gnu" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8602f6c418b67024be2996c512f5f995de3ba417f4c75af68401ab8756796ae4" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" [[package]] name = "windows_i686_msvc" @@ -5974,10 +5972,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4a09e3a0d4753b73019db171c1339cd4362c8c44baf1bcea336235e955954a6" [[package]] -name = "windows_x86_64_gnu" -version = "0.29.0" +name = "windows_i686_msvc" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d615f419543e0bd7d2b3323af0d86ff19cbc4f816e6453f36a2c2ce889c354" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" [[package]] name = "windows_x86_64_gnu" @@ -5986,10 +5984,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca64fcb0220d58db4c119e050e7af03c69e6f4f415ef69ec1773d9aab422d5a" [[package]] -name = "windows_x86_64_msvc" -version = "0.29.0" +name = "windows_x86_64_gnu" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d95421d9ed3672c280884da53201a5c46b7b2765ca6faf34b0d71cf34a3561" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" [[package]] name = "windows_x86_64_msvc" @@ -5997,6 +5995,12 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08cabc9f0066848fef4bc6a1c1668e6efce38b661d2aeec75d18d8617eebb5f1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "winreg" version = "0.7.0" diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 632c30894b..dde32fec3d 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -24,6 +24,7 @@ job_registry = { path = "../job_registry" } logfmt = { path = "../logfmt" } metric = { path = "../metric" } metric_exporters = { path = "../metric_exporters" } +mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } mutable_batch_pb = { path = "../mutable_batch_pb" } mutable_buffer = { path = "../mutable_buffer" } diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index b9b2d9c9ee..a609f6a8c5 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -14,9 +14,10 @@ use crate::{ }, }, }; +use data_types::database_rules::{PartitionTemplate, TemplatePart}; use observability_deps::tracing::*; use router2::{ - dml_handlers::{NamespaceAutocreation, SchemaValidator, ShardedWriteBuffer}, + dml_handlers::{NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer}, namespace_cache::{MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, server::{http::HttpDelegate, RouterServer}, @@ -93,12 +94,24 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; + // Initialise a namespace cache to be shared with the schema validator, and + // namespace auto-creator. let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); + // Add the schema validator layer. let handler_stack = SchemaValidator::new(write_buffer, Arc::clone(&catalog), Arc::clone(&ns_cache)); + // Add a write partitioner into the handler stack that splits by the date + // portion of the write's timestamp. + let handler_stack = Partitioner::new( + handler_stack, + PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }, + ); + //////////////////////////////////////////////////////////////////////////// // // THIS CODE IS FOR TESTING ONLY. diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs index de43783f9d..70630b4159 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/router2.rs @@ -4,8 +4,10 @@ use std::{ }; use async_trait::async_trait; +use hashbrown::HashMap; use hyper::{Body, Request, Response}; use metric::Registry; +use mutable_batch::MutableBatch; use router2::{dml_handlers::DmlHandler, server::RouterServer}; use tokio_util::sync::CancellationToken; use trace::TraceCollector; @@ -36,7 +38,7 @@ impl RouterServerType { #[async_trait] impl ServerType for RouterServerType where - D: DmlHandler + 'static, + D: DmlHandler> + 'static, { type RouteError = IoxHttpErrorAdaptor; diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 7314a52bd2..78f6b3aaab 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.21" generated_types = { path = "../generated_types" } hashbrown = "0.12" hyper = "0.14" +influxdb_line_protocol = { version = "0.1.0", path = "../influxdb_line_protocol" } iox_catalog = { path = "../iox_catalog" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } @@ -36,7 +37,9 @@ write_buffer = { path = "../write_buffer" } [dev-dependencies] assert_matches = "1.5" criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } +lazy_static = "1.4.0" paste = "1.0.6" +pretty_assertions = "1.1.0" rand = "0.8.3" schema = { path = "../schema" } diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index 7cc9d1b8ca..f29afa0582 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use criterion::measurement::WallTime; use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion, Throughput}; +use data_types::database_rules::{PartitionTemplate, TemplatePart}; use hyper::{Body, Request}; -use router2::dml_handlers::ShardedWriteBuffer; +use router2::dml_handlers::{Partitioner, ShardedWriteBuffer}; use router2::sequencer::Sequencer; use router2::server::http::HttpDelegate; use router2::sharder::JumpHash; @@ -36,9 +37,15 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer HttpDelegate>>> { - let write_buffer = init_write_buffer(1); - HttpDelegate::new(1024, write_buffer) +fn setup_server() -> HttpDelegate>>>> { + let handler_stack = init_write_buffer(1); + let handler_stack = Partitioner::new( + handler_stack, + PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }, + ); + HttpDelegate::new(1024, handler_stack) } fn runtime() -> Runtime { @@ -65,7 +72,7 @@ fn e2e_benchmarks(c: &mut Criterion) { fn benchmark_e2e( group: &mut BenchmarkGroup, - http_delegate: &HttpDelegate>>>, + http_delegate: &HttpDelegate>>>>, uri: &'static str, body_str: &'static str, ) { diff --git a/router2/src/dml_handlers/mock.rs b/router2/src/dml_handlers/mock.rs index 9431a4bbf8..5a3e520fc9 100644 --- a/router2/src/dml_handlers/mock.rs +++ b/router2/src/dml_handlers/mock.rs @@ -1,20 +1,20 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; -use hashbrown::HashMap; -use mutable_batch::MutableBatch; use parking_lot::Mutex; use trace::ctx::SpanContext; use super::{DmlError, DmlHandler}; +/// A captured call to a [`MockDmlHandler`], generic over `W`, the captured +/// [`DmlHandler::WriteInput`] type. #[derive(Debug, Clone)] -pub enum MockDmlHandlerCall { +pub enum MockDmlHandlerCall { Write { namespace: String, - batches: HashMap, + write_input: W, }, Delete { namespace: String, @@ -23,23 +23,42 @@ pub enum MockDmlHandlerCall { }, } -#[derive(Debug, Default)] -struct Inner { - calls: Vec, +#[derive(Debug)] +struct Inner { + calls: Vec>, write_return: VecDeque>, delete_return: VecDeque>, } -impl Inner { - fn record_call(&mut self, call: MockDmlHandlerCall) { +impl Default for Inner { + fn default() -> Self { + Self { + calls: Default::default(), + write_return: Default::default(), + delete_return: Default::default(), + } + } +} + +impl Inner { + fn record_call(&mut self, call: MockDmlHandlerCall) { self.calls.push(call); } } -#[derive(Debug, Default)] -pub struct MockDmlHandler(Mutex); +#[derive(Debug)] +pub struct MockDmlHandler(Mutex>); -impl MockDmlHandler { +impl Default for MockDmlHandler { + fn default() -> Self { + Self(Default::default()) + } +} + +impl MockDmlHandler +where + W: Clone, +{ pub fn with_write_return(self, ret: impl Into>>) -> Self { self.0.lock().write_return = ret.into(); self @@ -50,7 +69,7 @@ impl MockDmlHandler { self } - pub fn calls(&self) -> Vec { + pub fn calls(&self) -> Vec> { self.0.lock().calls.clone() } } @@ -68,21 +87,25 @@ macro_rules! record_and_return { } #[async_trait] -impl DmlHandler for Arc { +impl DmlHandler for Arc> +where + W: Debug + Send + Sync, +{ type WriteError = DmlError; type DeleteError = DmlError; + type WriteInput = W; async fn write( &self, namespace: DatabaseName<'static>, - batches: HashMap, + write_input: Self::WriteInput, _span_ctx: Option, ) -> Result<(), Self::WriteError> { record_and_return!( self, MockDmlHandlerCall::Write { namespace: namespace.into(), - batches, + write_input, }, write_return ) diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index eeb9d6ca2b..ae498bd9f3 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -4,44 +4,49 @@ //! processing handler chain: //! //! ```text -//! ┌──────────────┐ ┌──────────────┐ -//! │ HTTP API │ │ gRPC API │ -//! └──────────────┘ └──────────────┘ -//! │ │ -//! └─────────┬─────────┘ -//! │ -//! ▼ -//! ╔═ DmlHandler Stack ═════╗ -//! ║ ║ -//! ║ ┌──────────────────┐ ║ -//! ║ │ Namespace │ ║ -//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─ -//! ║ └──────────────────┘ ║ │ -//! ║ │ ║ ┌─────────────────┐ -//! ║ ▼ ║ │ Namespace Cache │ -//! ║ ┌──────────────────┐ ║ └─────────────────┘ -//! ║ │ Schema │ ║ │ -//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ -//! ║ └──────────────────┘ ║ -//! ║ │ ║ -//! ║ ▼ ║ -//! ┌───────┐ ║ ┌──────────────────┐ ║ -//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║ -//! └───────┘ ║ └──────────────────┘ ║ -//! ║ │ ║ -//! ╚════════════│═══════════╝ -//! │ -//! ▼ -//! ┌──────────────┐ -//! │ Write Buffer │ -//! └──────────────┘ -//! │ -//! │ -//! ┌────────▼─────┐ -//! │ Kafka ├┐ -//! └┬─────────────┘├┐ -//! └┬─────────────┘│ +//! ┌──────────────┐ ┌──────────────┐ +//! │ HTTP API │ │ gRPC API │ +//! └──────────────┘ └──────────────┘ +//! │ │ +//! └─────────┬─────────┘ +//! │ +//! ▼ +//! ╔═ DmlHandler Stack ═════╗ +//! ║ ║ +//! ║ ┌──────────────────┐ ║ +//! ║ │ Namespace │ ║ +//! ║ │ Autocreation │─ ─ ─ ─ ─ ─ ─ ┐ +//! ║ └──────────────────┘ ║ +//! ║ │ ║ │ +//! ║ ▼ ║ +//! ║ ┌──────────────────┐ ║ │ +//! ║ │ Partitioner │ ║ +//! ║ └──────────────────┘ ║ │ +//! ║ │ ║ ┌─────────────────┐ +//! ║ ▼ ║ │ Namespace Cache │ +//! ║ ┌──────────────────┐ ║ └─────────────────┘ +//! ║ │ Schema │ ║ │ +//! ║ │ Validation │ ─║─ ─ ─ ─ ─ ─ +//! ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ║ ▼ ║ +//! ┌───────┐ ║ ┌──────────────────┐ ║ +//! │Sharder│◀ ─ ─ ▶│ShardedWriteBuffer│ ║ +//! └───────┘ ║ └──────────────────┘ ║ +//! ║ │ ║ +//! ╚════════════│═══════════╝ +//! │ +//! ▼ +//! ┌──────────────┐ +//! │ Write Buffer │ //! └──────────────┘ +//! │ +//! │ +//! ┌────────▼─────┐ +//! │ Kafka ├┐ +//! └┬─────────────┘├┐ +//! └┬─────────────┘│ +//! └──────────────┘ //! ``` //! //! The HTTP / gRPC APIs decode their respective request format and funnel the @@ -53,10 +58,14 @@ //! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending //! requests to the catalog for namespaces that are known to exist. //! -//! Writes pass through the [`SchemaValidator`] applying schema enforcement (a -//! NOP layer for deletes) which pushes additive schema changes to the catalog -//! and populates the [`NamespaceCache`], converging it to match the set of -//! [`NamespaceSchema`] in the global catalog. +//! Incoming line-protocol writes then pass through the [`Partitioner`], parsing +//! the LP and splitting them into batches per partition, before passing each +//! partitioned batch through the rest of the request pipeline. +//! +//! Writes then pass through the [`SchemaValidator`] applying schema enforcement +//! (a NOP layer for deletes) which pushes additive schema changes to the +//! catalog and populates the [`NamespaceCache`], converging it to match the set +//! of [`NamespaceSchema`] in the global catalog. //! //! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML //! operations into a fixed set of sequencers. @@ -78,5 +87,8 @@ pub use sharded_write_buffer::*; mod ns_autocreation; pub use ns_autocreation::*; +mod partitioner; +pub use partitioner::*; + #[cfg(test)] pub mod mock; diff --git a/router2/src/dml_handlers/nop.rs b/router2/src/dml_handlers/nop.rs index 5d2a98245c..5139ef3ee7 100644 --- a/router2/src/dml_handlers/nop.rs +++ b/router2/src/dml_handlers/nop.rs @@ -18,11 +18,12 @@ pub struct NopDmlHandler; impl DmlHandler for NopDmlHandler { type WriteError = DmlError; type DeleteError = DmlError; + type WriteInput = HashMap; async fn write( &self, namespace: DatabaseName<'static>, - batches: HashMap, + batches: Self::WriteInput, _span_ctx: Option, ) -> Result<(), Self::WriteError> { info!(%namespace, ?batches, "dropping write operation"); diff --git a/router2/src/dml_handlers/ns_autocreation.rs b/router2/src/dml_handlers/ns_autocreation.rs index 3048dd1539..fa9226e4bc 100644 --- a/router2/src/dml_handlers/ns_autocreation.rs +++ b/router2/src/dml_handlers/ns_autocreation.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; -use hashbrown::HashMap; use iox_catalog::interface::{Catalog, KafkaTopicId, QueryPoolId}; -use mutable_batch::MutableBatch; use observability_deps::tracing::*; use thiserror::Error; use trace::ctx::SpanContext; @@ -71,19 +69,24 @@ impl NamespaceAutocreation { } #[async_trait] -impl DmlHandler for NamespaceAutocreation +impl DmlHandler for NamespaceAutocreation where - D: DmlHandler, + D: DmlHandler, C: NamespaceCache, + T: Debug + Send + Sync + 'static, { type WriteError = NamespaceCreationError; type DeleteError = NamespaceCreationError; + // This handler accepts any input type, passing it through to the next + // handler unmodified. + type WriteInput = T; + /// Write `batches` to `namespace`. async fn write( &self, namespace: DatabaseName<'static>, - batches: HashMap, + batches: Self::WriteInput, span_ctx: Option, ) -> Result<(), Self::WriteError> { // If the namespace does not exist in the schema cache (populated by the @@ -173,7 +176,7 @@ mod tests { ); let catalog: Arc = Arc::new(MemCatalog::default()); - let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let mock_handler = Arc::new(MockDmlHandler::<()>::default().with_write_return([Ok(())])); let creator = NamespaceAutocreation::new( Arc::clone(&catalog), @@ -185,7 +188,7 @@ mod tests { ); creator - .write(ns.clone(), Default::default(), None) + .write(ns.clone(), (), None) .await .expect("handler should succeed"); @@ -214,7 +217,7 @@ mod tests { let cache = Arc::new(MemoryNamespaceCache::default()); let catalog: Arc = Arc::new(MemCatalog::default()); - let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())])); + let mock_handler = Arc::new(MockDmlHandler::<()>::default().with_write_return([Ok(())])); let creator = NamespaceAutocreation::new( Arc::clone(&catalog), @@ -226,7 +229,7 @@ mod tests { ); creator - .write(ns.clone(), Default::default(), None) + .write(ns.clone(), (), None) .await .expect("handler should succeed"); diff --git a/router2/src/dml_handlers/partitioner.rs b/router2/src/dml_handlers/partitioner.rs new file mode 100644 index 0000000000..0d925454b5 --- /dev/null +++ b/router2/src/dml_handlers/partitioner.rs @@ -0,0 +1,357 @@ +use async_trait::async_trait; +use data_types::{ + database_rules::PartitionTemplate, delete_predicate::DeletePredicate, DatabaseName, +}; +use futures::stream::{FuturesUnordered, TryStreamExt}; +use hashbrown::HashMap; +use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; +use observability_deps::tracing::*; +use thiserror::Error; +use trace::ctx::SpanContext; + +use super::{DmlError, DmlHandler}; + +/// An error raised by the [`Partitioner`] handler. +#[derive(Debug, Error)] +pub enum PartitionError { + /// Failed to write to the partitioned table batch. + #[error("error batching into partitioned write: {0}")] + BatchWrite(#[from] mutable_batch::Error), + + /// The inner DML handler returned an error. + #[error(transparent)] + Inner(Box), +} + +/// A decorator of `T`, tagging it with the partition key derived from it. +#[derive(Debug, PartialEq, Clone)] +pub struct Partitioned { + key: String, + payload: T, +} + +impl Partitioned { + /// Wrap `payload` with a partition `key`. + pub fn new(key: String, payload: T) -> Self { + Self { key, payload } + } + + /// Get a reference to the partition payload. + pub fn payload(&self) -> &T { + &self.payload + } + + /// Unwrap `Self` returning the inner payload `T` and the partition key. + pub fn into_parts(self) -> (String, T) { + (self.key, self.payload) + } +} + +/// A [`DmlHandler`] implementation that splits per-table [`MutableBatch`] into +/// partitioned per-table [`MutableBatch`] instances according to a configured +/// [`PartitionTemplate`]. Deletes pass through unmodified. +/// +/// Each partition is passed through to the inner DML handler (or chain of +/// handlers) concurrently, aborting if an error occurs. This may allow a +/// partial write to be observable down-stream of the [`Partitioner`] if at +/// least one partitioned write succeeds and at least one partitioned write +/// fails. When a partial write occurs, the handler returns an error describing +/// the failure. +#[derive(Debug)] +pub struct Partitioner { + partition_template: PartitionTemplate, + inner: D, +} + +impl Partitioner { + /// Initialise a new [`Partitioner`], splitting writes according to the + /// specified [`PartitionTemplate`] before calling `inner`. + pub fn new(inner: D, partition_template: PartitionTemplate) -> Self { + Self { + partition_template, + inner, + } + } +} + +#[async_trait] +impl DmlHandler for Partitioner +where + D: DmlHandler>>, +{ + type WriteError = PartitionError; + type DeleteError = D::DeleteError; + + type WriteInput = HashMap; + + /// Partition the per-table [`MutableBatch`] and call the inner handler with + /// each partition. + async fn write( + &self, + namespace: DatabaseName<'static>, + batch: Self::WriteInput, + span_ctx: Option, + ) -> Result<(), Self::WriteError> { + // A collection of partition-keyed, per-table MutableBatch instances. + let mut partitions: HashMap<_, HashMap<_, MutableBatch>> = HashMap::default(); + + for (table_name, batch) in batch { + // Partition the table batch according to the configured partition + // template and write it into the partition-keyed map. + for (partition_key, partition_payload) in + PartitionWrite::partition(&table_name, &batch, &self.partition_template) + { + let partition = partitions.entry(partition_key).or_default(); + let table_batch = partition + .raw_entry_mut() + .from_key(&table_name) + .or_insert_with(|| (table_name.to_owned(), MutableBatch::default())); + + partition_payload.write_to_batch(table_batch.1)?; + } + } + + partitions + .into_iter() + .map(|(key, batch)| { + let p = Partitioned { + key, + payload: batch, + }; + + let namespace = namespace.clone(); + let span_ctx = span_ctx.clone(); + async move { self.inner.write(namespace, p, span_ctx).await } + }) + .collect::>() + .try_for_each(|_| async move { + trace!("partitioned write complete"); + Ok(()) + }) + .await + .map_err(|e| PartitionError::Inner(Box::new(e.into()))) + } + + /// Pass the delete request through unmodified to the next handler. + async fn delete<'a>( + &self, + namespace: DatabaseName<'static>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + self.inner + .delete(namespace, table_name, predicate, span_ctx) + .await + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use data_types::database_rules::TemplatePart; + use lazy_static::lazy_static; + use time::Time; + + use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; + + use super::*; + + lazy_static! { + /// A static default time to use in tests (1971-05-02 UTC). + static ref DEFAULT_TIME: Time = Time::from_timestamp_nanos(42000000000000000); + } + + // Generate a test case that partitions "lp" and calls a mock inner DML + // handler for each partition, which returns the values specified in + // "inner_write_returns". + // + // Assert the partition-to-table mapping in "want_writes" and assert the + // handler write() return value in "want_handler_ret". + macro_rules! test_write { + ( + $name:ident, + lp = $lp:expr, + inner_write_returns = $inner_write_returns:expr, + want_writes = [$($want_writes:tt)*], // "partition key" => ["mapped", "tables"] or [unchecked] to skip assert + want_handler_ret = $($want_handler_ret:tt)+ + ) => { + paste::paste! { + #[tokio::test] + async fn []() { + use pretty_assertions::assert_eq; + + let partition_template = PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], + }; + + let inner = Arc::new(MockDmlHandler::default().with_write_return($inner_write_returns)); + let partitioner = Partitioner::new(Arc::clone(&inner), partition_template); + let ns = DatabaseName::new("bananas").expect("valid db name"); + + let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIME.timestamp_nanos()).expect("failed to parse test LP"); + + let handler_ret = partitioner.write(ns.clone(), writes, None).await; + assert_matches!(handler_ret, $($want_handler_ret)+); + + // Collect writes into a map. + let calls = inner.calls().into_iter().map(|v| match v { + MockDmlHandlerCall::Write { namespace, write_input, .. } => { + assert_eq!(namespace, *ns); + + // Extract the table names for comparison + let mut tables = write_input + .payload + .keys() + .cloned() + .collect::>(); + + tables.sort(); + + (write_input.key.clone(), tables) + }, + MockDmlHandlerCall::Delete { .. } => unreachable!("mock should not observe deletes"), + }) + .collect::>(); + + test_write!(@assert_writes, calls, $($want_writes)*); + } + } + }; + + // Generate a NOP that doesn't assert the writes if "unchecked" is + // specified. + // + // This is useful for tests that cause non-deterministic partial writes. + (@assert_writes, $got:ident, unchecked) => { let _x = $got; }; + + // Generate a block of code that validates tokens in the form of: + // + // key => ["table", "names"] + // + // Matches the partition key / tables names observed by the mock. + (@assert_writes, $got:ident, $($partition_key:expr => $want_tables:expr, )*) => { + // Construct the desired writes, keyed by partition key + #[allow(unused_mut)] + let mut want_writes: HashMap = Default::default(); + $( + let mut want: Vec = $want_tables.into_iter().map(|t| t.to_string()).collect(); + want.sort(); + want_writes.insert($partition_key.to_string(), want); + )* + + assert_eq!(want_writes, $got); + }; + } + + test_write!( + single_partition_ok, + lp = "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 2\n\ + another,tag1=A,tag2=B value=42i 3\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1\n\ + ", + inner_write_returns = [Ok(())], + want_writes = [ + "1970-01-01" => ["bananas", "platanos", "another", "table"], + ], + want_handler_ret = Ok(()) + ); + + test_write!( + single_partition_err, + lp = "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 2\n\ + another,tag1=A,tag2=B value=42i 3\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1\n\ + ", + inner_write_returns = [Err(DmlError::DatabaseNotFound("missing".to_owned()))], + want_writes = [ + // Attempted write recorded by the mock + "1970-01-01" => ["bananas", "platanos", "another", "table"], + ], + want_handler_ret = Err(PartitionError::Inner(e)) => { + assert_matches!(*e, DmlError::DatabaseNotFound(_)); + } + ); + + test_write!( + multiple_partitions_ok, + lp = "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\ + another,tag1=A,tag2=B value=42i 1465839830100400200\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1644347270670952000\n\ + ", + inner_write_returns = [Ok(()), Ok(()), Ok(())], + want_writes = [ + "1970-01-01" => ["bananas"], + "2016-06-13" => ["platanos", "another"], + "2022-02-08" => ["table"], + ], + want_handler_ret = Ok(()) + ); + + test_write!( + multiple_partitions_total_err, + lp = "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\ + another,tag1=A,tag2=B value=42i 1465839830100400200\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1644347270670952000\n\ + ", + inner_write_returns = [ + Err(DmlError::DatabaseNotFound("missing".to_owned())), + Err(DmlError::DatabaseNotFound("missing".to_owned())), + Err(DmlError::DatabaseNotFound("missing".to_owned())), + ], + want_writes = [unchecked], + want_handler_ret = Err(PartitionError::Inner(e)) => { + assert_matches!(*e, DmlError::DatabaseNotFound(_)); + } + ); + + test_write!( + multiple_partitions_partial_err, + lp = "\ + bananas,tag1=A,tag2=B val=42i 1\n\ + platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\ + another,tag1=A,tag2=B value=42i 1465839830100400200\n\ + bananas,tag1=A,tag2=B val=42i 2\n\ + table,tag1=A,tag2=B val=42i 1644347270670952000\n\ + ", + inner_write_returns = [ + Err(DmlError::DatabaseNotFound("missing".to_owned())), + Ok(()), + Ok(()), + ], + want_writes = [unchecked], + want_handler_ret = Err(PartitionError::Inner(e)) => { + assert_matches!(*e, DmlError::DatabaseNotFound(_)); + } + ); + + test_write!( + no_specified_timestamp, + lp = "\ + bananas,tag1=A,tag2=B val=42i\n\ + platanos,tag1=A,tag2=B value=42i\n\ + another,tag1=A,tag2=B value=42i\n\ + bananas,tag1=A,tag2=B val=42i\n\ + table,tag1=A,tag2=B val=42i\n\ + ", + inner_write_returns = [Ok(())], + want_writes = [ + "1971-05-02" => ["bananas", "platanos", "another", "table"], + ], + want_handler_ret = Ok(()) + ); +} diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index 1c010fc749..b700841bc4 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -14,7 +14,7 @@ use trace::ctx::SpanContext; use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache}; -use super::{DmlError, DmlHandler}; +use super::{DmlError, DmlHandler, Partitioned}; /// Errors emitted during schema validation. #[derive(Debug, Error)] @@ -106,12 +106,14 @@ impl SchemaValidator { #[async_trait] impl DmlHandler for SchemaValidator where - D: DmlHandler, + D: DmlHandler>>, C: NamespaceCache, { type WriteError = SchemaError; type DeleteError = D::DeleteError; + type WriteInput = Partitioned>; + /// Validate the schema of all the writes in `batches` before passing the /// request onto the inner handler. /// @@ -132,7 +134,7 @@ where async fn write( &self, namespace: DatabaseName<'static>, - batches: HashMap, + batches: Self::WriteInput, span_ctx: Option, ) -> Result<(), Self::WriteError> { let mut repos = self.catalog.repositories().await; @@ -162,7 +164,7 @@ where }; let maybe_new_schema = validate_or_insert_schema( - batches.iter().map(|(k, v)| (k.as_str(), v)), + batches.payload().iter().map(|(k, v)| (k.as_str(), v)), &schema, repos.deref_mut(), ) @@ -238,10 +240,10 @@ mod tests { } // Parse `lp` into a table-keyed MutableBatch map. - fn lp_to_writes(lp: &str) -> HashMap { + fn lp_to_writes(lp: &str) -> Partitioned> { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) .expect("failed to build test writes from LP"); - writes + Partitioned::new("key".to_owned(), writes) } /// Initialise an in-memory [`MemCatalog`] and create a single namespace @@ -361,9 +363,9 @@ mod tests { assert_matches!(err, SchemaError::Validate(_)); // THe mock should observe exactly one write from the first call. - assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, batches}] => { + assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, write_input}] => { assert_eq!(namespace, NAMESPACE); - let batch = batches.get("bananas").expect("table not found in write"); + let batch = write_input.payload().get("bananas").expect("table not found in write"); assert_eq!(batch.rows(), 1); let col = batch.column("val").expect("column not found in write"); assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer)); diff --git a/router2/src/dml_handlers/sharded_write_buffer.rs b/router2/src/dml_handlers/sharded_write_buffer.rs index 9d3c04ac21..05bb64afeb 100644 --- a/router2/src/dml_handlers/sharded_write_buffer.rs +++ b/router2/src/dml_handlers/sharded_write_buffer.rs @@ -19,6 +19,8 @@ use write_buffer::core::WriteBufferError; use crate::{dml_handlers::DmlHandler, sequencer::Sequencer, sharder::Sharder}; +use super::Partitioned; + /// Errors occurring while writing to one or more write buffer shards. #[derive(Debug, Error)] pub enum ShardError { @@ -81,15 +83,20 @@ where type WriteError = ShardError; type DeleteError = ShardError; + type WriteInput = Partitioned>; + /// Shard `writes` and dispatch the resultant DML operations. async fn write( &self, namespace: DatabaseName<'static>, - writes: HashMap, + writes: Self::WriteInput, span_ctx: Option, ) -> Result<(), ShardError> { let mut collated: HashMap<_, HashMap> = HashMap::new(); + // Extract the partition key & DML writes. + let (partition_key, writes) = writes.into_parts(); + // Shard each entry in `writes` and collate them into one DML operation // per shard to maximise the size of each write, and therefore increase // the effectiveness of compression of ops in the write buffer. @@ -108,6 +115,7 @@ where let dml = DmlWrite::new(&namespace, batch, DmlMeta::unsequenced(span_ctx.clone())); trace!( + %partition_key, sequencer_id=%sequencer.id(), tables=%dml.table_count(), %namespace, @@ -199,10 +207,10 @@ mod tests { use super::*; // Parse `lp` into a table-keyed MutableBatch map. - fn lp_to_writes(lp: &str) -> HashMap { + fn lp_to_writes(lp: &str) -> Partitioned> { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) .expect("failed to build test writes from LP"); - writes + Partitioned::new("key".to_owned(), writes) } // Init a mock write buffer with the given number of sequencers. diff --git a/router2/src/dml_handlers/trait.rs b/router2/src/dml_handlers/trait.rs index f19d02cb6b..d40ff784b4 100644 --- a/router2/src/dml_handlers/trait.rs +++ b/router2/src/dml_handlers/trait.rs @@ -3,12 +3,10 @@ use std::{error::Error, fmt::Debug}; use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, DatabaseName}; -use hashbrown::HashMap; -use mutable_batch::MutableBatch; use thiserror::Error; use trace::ctx::SpanContext; -use super::{NamespaceCreationError, SchemaError, ShardError}; +use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError}; /// Errors emitted by a [`DmlHandler`] implementation during DML request /// processing. @@ -30,6 +28,10 @@ pub enum DmlError { #[error(transparent)] NamespaceCreation(#[from] NamespaceCreationError), + /// An error partitioning the request. + #[error(transparent)] + Partition(#[from] PartitionError), + /// An unknown error occured while processing the DML request. #[error("internal dml handler error: {0}")] Internal(Box), @@ -38,11 +40,19 @@ pub enum DmlError { /// A composable, abstract handler of DML requests. #[async_trait] pub trait DmlHandler: Debug + Send + Sync { + /// The input type this handler expects for a DML write. + /// + /// By allowing handlers to vary their input type, it is possible to + /// construct a chain of [`DmlHandler`] implementations that transform the + /// input request as it progresses through the handler pipeline. + type WriteInput: Debug + Send + Sync; + /// The type of error a [`DmlHandler`] implementation produces for write /// requests. /// /// All errors must be mappable into the concrete [`DmlError`] type. type WriteError: Error + Into + Send; + /// The error type of the delete handler. type DeleteError: Error + Into + Send; @@ -50,7 +60,7 @@ pub trait DmlHandler: Debug + Send + Sync { async fn write( &self, namespace: DatabaseName<'static>, - batches: HashMap, + input: Self::WriteInput, span_ctx: Option, ) -> Result<(), Self::WriteError>; diff --git a/router2/src/lib.rs b/router2/src/lib.rs index 80e69d4dcf..14c884743d 100644 --- a/router2/src/lib.rs +++ b/router2/src/lib.rs @@ -6,6 +6,7 @@ //! * Handling writes: //! * Receiving IOx write/delete requests via HTTP and gRPC endpoints. //! * Enforcing schema validation & synchronising it within the catalog. +//! * Deriving the partition key of each DML operation. //! * Applying sharding logic. //! * Push resulting operations into the appropriate kafka topics. //! diff --git a/router2/src/server.rs b/router2/src/server.rs index a5fd990c34..12977b7b11 100644 --- a/router2/src/server.rs +++ b/router2/src/server.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use crate::dml_handlers::DmlHandler; +use hashbrown::HashMap; +use mutable_batch::MutableBatch; use trace::TraceCollector; use self::{grpc::GrpcDelegate, http::HttpDelegate}; @@ -51,7 +53,7 @@ impl RouterServer { impl RouterServer where - D: DmlHandler, + D: DmlHandler>, { /// Get a reference to the router http delegate. pub fn http(&self) -> &HttpDelegate { diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 1052cdae17..3cf66b3068 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -6,7 +6,9 @@ use bytes::{Bytes, BytesMut}; use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; use futures::StreamExt; +use hashbrown::HashMap; use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; +use mutable_batch::MutableBatch; use observability_deps::tracing::*; use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request}; use serde::Deserialize; @@ -14,7 +16,7 @@ use thiserror::Error; use time::{SystemProvider, TimeProvider}; use trace::ctx::SpanContext; -use crate::dml_handlers::{DmlError, DmlHandler}; +use crate::dml_handlers::{DmlError, DmlHandler, PartitionError}; /// Errors returned by the `router2` HTTP request handler. #[derive(Debug, Error)] @@ -69,9 +71,7 @@ impl Error { /// the end user. pub fn as_status_code(&self) -> StatusCode { match self { - Error::NoHandler | Error::DmlHandler(DmlError::DatabaseNotFound(_)) => { - StatusCode::NOT_FOUND - } + Error::NoHandler => StatusCode::NOT_FOUND, Error::InvalidOrgBucket(_) => StatusCode::BAD_REQUEST, Error::ClientHangup(_) => StatusCode::BAD_REQUEST, Error::InvalidGzip(_) => StatusCode::BAD_REQUEST, @@ -85,9 +85,21 @@ impl Error { // https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13 StatusCode::UNSUPPORTED_MEDIA_TYPE } - Error::DmlHandler( - DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_), - ) => StatusCode::INTERNAL_SERVER_ERROR, + Error::DmlHandler(err) => StatusCode::from(err), + } + } +} + +impl From<&DmlError> for StatusCode { + fn from(e: &DmlError) -> Self { + match e { + DmlError::DatabaseNotFound(_) => StatusCode::NOT_FOUND, + DmlError::Schema(_) => StatusCode::BAD_REQUEST, + DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR, + DmlError::Partition(PartitionError::Inner(err)) => StatusCode::from(&**err), } } } @@ -162,7 +174,7 @@ impl HttpDelegate { impl HttpDelegate where - D: DmlHandler, + D: DmlHandler>, T: TimeProvider, { /// Routes `req` to the appropriate handler, if any, returning the handler