Merge pull request #3746 from influxdata/dom/mb-partitioning

feat(router2): MutableBatch partitioning
pull/24376/head
kodiakhq[bot] 2022-02-16 11:39:28 +00:00 committed by GitHub
commit a3e15ade62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 593 additions and 132 deletions

68
Cargo.lock generated
View File

@ -1889,6 +1889,7 @@ dependencies = [
"logfmt",
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_buffer",
@ -3232,7 +3233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core 0.9.0",
"parking_lot_core 0.9.1",
]
[[package]]
@ -3251,15 +3252,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2f4f894f3865f6c0e02810fc597300f34dc2510f66400da262d8ae10e75767d"
checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.29.0",
"windows-sys 0.32.0",
]
[[package]]
@ -4138,7 +4139,9 @@ dependencies = [
"generated_types",
"hashbrown 0.12.0",
"hyper",
"influxdb_line_protocol",
"iox_catalog",
"lazy_static",
"metric",
"mutable_batch",
"mutable_batch_lp",
@ -4146,6 +4149,7 @@ dependencies = [
"parking_lot 0.12.0",
"paste",
"predicate",
"pretty_assertions",
"rand",
"schema",
"serde",
@ -5911,19 +5915,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceb069ac8b2117d36924190469735767f0990833935ab430155e71a44bafe148"
dependencies = [
"windows_aarch64_msvc 0.29.0",
"windows_i686_gnu 0.29.0",
"windows_i686_msvc 0.29.0",
"windows_x86_64_gnu 0.29.0",
"windows_x86_64_msvc 0.29.0",
]
[[package]]
name = "windows-sys"
version = "0.30.0"
@ -5938,10 +5929,17 @@ dependencies = [
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.29.0"
name = "windows-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d027175d00b01e0cbeb97d6ab6ebe03b12330a35786cbaca5252b1c4bf5d9b"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc 0.32.0",
"windows_i686_gnu 0.32.0",
"windows_i686_msvc 0.32.0",
"windows_x86_64_gnu 0.32.0",
"windows_x86_64_msvc 0.32.0",
]
[[package]]
name = "windows_aarch64_msvc"
@ -5950,10 +5948,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29277a4435d642f775f63c7d1faeb927adba532886ce0287bd985bffb16b6bca"
[[package]]
name = "windows_i686_gnu"
version = "0.29.0"
name = "windows_aarch64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8793f59f7b8e8b01eda1a652b2697d87b93097198ae85f823b969ca5b89bba58"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"
[[package]]
name = "windows_i686_gnu"
@ -5962,10 +5960,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145e1989da93956c68d1864f32fb97c8f561a8f89a5125f6a2b7ea75524e4b8"
[[package]]
name = "windows_i686_msvc"
version = "0.29.0"
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8602f6c418b67024be2996c512f5f995de3ba417f4c75af68401ab8756796ae4"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"
[[package]]
name = "windows_i686_msvc"
@ -5974,10 +5972,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4a09e3a0d4753b73019db171c1339cd4362c8c44baf1bcea336235e955954a6"
[[package]]
name = "windows_x86_64_gnu"
version = "0.29.0"
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d615f419543e0bd7d2b3323af0d86ff19cbc4f816e6453f36a2c2ce889c354"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"
[[package]]
name = "windows_x86_64_gnu"
@ -5986,10 +5984,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ca64fcb0220d58db4c119e050e7af03c69e6f4f415ef69ec1773d9aab422d5a"
[[package]]
name = "windows_x86_64_msvc"
version = "0.29.0"
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11d95421d9ed3672c280884da53201a5c46b7b2765ca6faf34b0d71cf34a3561"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"
[[package]]
name = "windows_x86_64_msvc"
@ -5997,6 +5995,12 @@ version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08cabc9f0066848fef4bc6a1c1668e6efce38b661d2aeec75d18d8617eebb5f1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]]
name = "winreg"
version = "0.7.0"

View File

@ -24,6 +24,7 @@ job_registry = { path = "../job_registry" }
logfmt = { path = "../logfmt" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
mutable_buffer = { path = "../mutable_buffer" }

View File

@ -14,9 +14,10 @@ use crate::{
},
},
};
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use observability_deps::tracing::*;
use router2::{
dml_handlers::{NamespaceAutocreation, SchemaValidator, ShardedWriteBuffer},
dml_handlers::{NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::{http::HttpDelegate, RouterServer},
@ -93,12 +94,24 @@ pub async fn command(config: Config) -> Result<()> {
)
.await?;
// Initialise a namespace cache to be shared with the schema validator, and
// namespace auto-creator.
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
// Add the schema validator layer.
let handler_stack =
SchemaValidator::new(write_buffer, Arc::clone(&catalog), Arc::clone(&ns_cache));
// Add a write partitioner into the handler stack that splits by the date
// portion of the write's timestamp.
let handler_stack = Partitioner::new(
handler_stack,
PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
},
);
////////////////////////////////////////////////////////////////////////////
//
// THIS CODE IS FOR TESTING ONLY.

View File

@ -4,8 +4,10 @@ use std::{
};
use async_trait::async_trait;
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use metric::Registry;
use mutable_batch::MutableBatch;
use router2::{dml_handlers::DmlHandler, server::RouterServer};
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
@ -36,7 +38,7 @@ impl<D> RouterServerType<D> {
#[async_trait]
impl<D> ServerType for RouterServerType<D>
where
D: DmlHandler + 'static,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>> + 'static,
{
type RouteError = IoxHttpErrorAdaptor;

View File

@ -15,6 +15,7 @@ futures = "0.3.21"
generated_types = { path = "../generated_types" }
hashbrown = "0.12"
hyper = "0.14"
influxdb_line_protocol = { version = "0.1.0", path = "../influxdb_line_protocol" }
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
@ -36,7 +37,9 @@ write_buffer = { path = "../write_buffer" }
[dev-dependencies]
assert_matches = "1.5"
criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
lazy_static = "1.4.0"
paste = "1.0.6"
pretty_assertions = "1.1.0"
rand = "0.8.3"
schema = { path = "../schema" }

View File

@ -3,8 +3,9 @@ use std::sync::Arc;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion, Throughput};
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use hyper::{Body, Request};
use router2::dml_handlers::ShardedWriteBuffer;
use router2::dml_handlers::{Partitioner, ShardedWriteBuffer};
use router2::sequencer::Sequencer;
use router2::server::http::HttpDelegate;
use router2::sharder::JumpHash;
@ -36,9 +37,15 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer<JumpHash<Arc<Seque
)
}
fn setup_server() -> HttpDelegate<ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>> {
let write_buffer = init_write_buffer(1);
HttpDelegate::new(1024, write_buffer)
fn setup_server() -> HttpDelegate<Partitioner<ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>>> {
let handler_stack = init_write_buffer(1);
let handler_stack = Partitioner::new(
handler_stack,
PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
},
);
HttpDelegate::new(1024, handler_stack)
}
fn runtime() -> Runtime {
@ -65,7 +72,7 @@ fn e2e_benchmarks(c: &mut Criterion) {
fn benchmark_e2e(
group: &mut BenchmarkGroup<WallTime>,
http_delegate: &HttpDelegate<ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>>,
http_delegate: &HttpDelegate<Partitioner<ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>>>,
uri: &'static str,
body_str: &'static str,
) {

View File

@ -1,20 +1,20 @@
use std::{collections::VecDeque, sync::Arc};
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use parking_lot::Mutex;
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
/// A captured call to a [`MockDmlHandler`], generic over `W`, the captured
/// [`DmlHandler::WriteInput`] type.
#[derive(Debug, Clone)]
pub enum MockDmlHandlerCall {
pub enum MockDmlHandlerCall<W> {
Write {
namespace: String,
batches: HashMap<String, MutableBatch>,
write_input: W,
},
Delete {
namespace: String,
@ -23,23 +23,42 @@ pub enum MockDmlHandlerCall {
},
}
#[derive(Debug, Default)]
struct Inner {
calls: Vec<MockDmlHandlerCall>,
#[derive(Debug)]
struct Inner<W> {
calls: Vec<MockDmlHandlerCall<W>>,
write_return: VecDeque<Result<(), DmlError>>,
delete_return: VecDeque<Result<(), DmlError>>,
}
impl Inner {
fn record_call(&mut self, call: MockDmlHandlerCall) {
impl<W> Default for Inner<W> {
fn default() -> Self {
Self {
calls: Default::default(),
write_return: Default::default(),
delete_return: Default::default(),
}
}
}
impl<W> Inner<W> {
fn record_call(&mut self, call: MockDmlHandlerCall<W>) {
self.calls.push(call);
}
}
#[derive(Debug, Default)]
pub struct MockDmlHandler(Mutex<Inner>);
#[derive(Debug)]
pub struct MockDmlHandler<W>(Mutex<Inner<W>>);
impl MockDmlHandler {
impl<W> Default for MockDmlHandler<W> {
fn default() -> Self {
Self(Default::default())
}
}
impl<W> MockDmlHandler<W>
where
W: Clone,
{
pub fn with_write_return(self, ret: impl Into<VecDeque<Result<(), DmlError>>>) -> Self {
self.0.lock().write_return = ret.into();
self
@ -50,7 +69,7 @@ impl MockDmlHandler {
self
}
pub fn calls(&self) -> Vec<MockDmlHandlerCall> {
pub fn calls(&self) -> Vec<MockDmlHandlerCall<W>> {
self.0.lock().calls.clone()
}
}
@ -68,21 +87,25 @@ macro_rules! record_and_return {
}
#[async_trait]
impl DmlHandler for Arc<MockDmlHandler> {
impl<W> DmlHandler for Arc<MockDmlHandler<W>>
where
W: Debug + Send + Sync,
{
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = W;
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
write_input: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
record_and_return!(
self,
MockDmlHandlerCall::Write {
namespace: namespace.into(),
batches,
write_input,
},
write_return
)

View File

@ -15,7 +15,12 @@
//! ║ ║
//! ║ ┌──────────────────┐ ║
//! ║ │ Namespace │ ║
//! ║ │ Autocreation │ ─║─ ─ ─ ─ ─ ─
//! ║ │ Autocreation │─ ─ ─ ─ ─ ─ ─ ┐
//! ║ └──────────────────┘ ║
//! ║ │ ║ │
//! ║ ▼ ║
//! ║ ┌──────────────────┐ ║ │
//! ║ │ Partitioner │ ║
//! ║ └──────────────────┘ ║ │
//! ║ │ ║ ┌─────────────────┐
//! ║ ▼ ║ │ Namespace Cache │
@ -53,10 +58,14 @@
//! [`NamespaceCache`] as an optimisation, allowing the handler to skip sending
//! requests to the catalog for namespaces that are known to exist.
//!
//! Writes pass through the [`SchemaValidator`] applying schema enforcement (a
//! NOP layer for deletes) which pushes additive schema changes to the catalog
//! and populates the [`NamespaceCache`], converging it to match the set of
//! [`NamespaceSchema`] in the global catalog.
//! Incoming line-protocol writes then pass through the [`Partitioner`], parsing
//! the LP and splitting them into batches per partition, before passing each
//! partitioned batch through the rest of the request pipeline.
//!
//! Writes then pass through the [`SchemaValidator`] applying schema enforcement
//! (a NOP layer for deletes) which pushes additive schema changes to the
//! catalog and populates the [`NamespaceCache`], converging it to match the set
//! of [`NamespaceSchema`] in the global catalog.
//!
//! The [`ShardedWriteBuffer`] uses a sharder implementation to direct the DML
//! operations into a fixed set of sequencers.
@ -78,5 +87,8 @@ pub use sharded_write_buffer::*;
mod ns_autocreation;
pub use ns_autocreation::*;
mod partitioner;
pub use partitioner::*;
#[cfg(test)]
pub mod mock;

View File

@ -18,11 +18,12 @@ pub struct NopDmlHandler;
impl DmlHandler for NopDmlHandler {
type WriteError = DmlError;
type DeleteError = DmlError;
type WriteInput = HashMap<String, MutableBatch>;
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
info!(%namespace, ?batches, "dropping write operation");

View File

@ -1,10 +1,8 @@
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap;
use iox_catalog::interface::{Catalog, KafkaTopicId, QueryPoolId};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use thiserror::Error;
use trace::ctx::SpanContext;
@ -71,19 +69,24 @@ impl<D, C> NamespaceAutocreation<D, C> {
}
#[async_trait]
impl<D, C> DmlHandler for NamespaceAutocreation<D, C>
impl<D, C, T> DmlHandler for NamespaceAutocreation<D, C>
where
D: DmlHandler,
D: DmlHandler<WriteInput = T>,
C: NamespaceCache,
T: Debug + Send + Sync + 'static,
{
type WriteError = NamespaceCreationError;
type DeleteError = NamespaceCreationError;
// This handler accepts any input type, passing it through to the next
// handler unmodified.
type WriteInput = T;
/// Write `batches` to `namespace`.
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
batches: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
// If the namespace does not exist in the schema cache (populated by the
@ -173,7 +176,7 @@ mod tests {
);
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let mock_handler = Arc::new(MockDmlHandler::<()>::default().with_write_return([Ok(())]));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
@ -185,7 +188,7 @@ mod tests {
);
creator
.write(ns.clone(), Default::default(), None)
.write(ns.clone(), (), None)
.await
.expect("handler should succeed");
@ -214,7 +217,7 @@ mod tests {
let cache = Arc::new(MemoryNamespaceCache::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::default());
let mock_handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(())]));
let mock_handler = Arc::new(MockDmlHandler::<()>::default().with_write_return([Ok(())]));
let creator = NamespaceAutocreation::new(
Arc::clone(&catalog),
@ -226,7 +229,7 @@ mod tests {
);
creator
.write(ns.clone(), Default::default(), None)
.write(ns.clone(), (), None)
.await
.expect("handler should succeed");

View File

@ -0,0 +1,357 @@
use async_trait::async_trait;
use data_types::{
database_rules::PartitionTemplate, delete_predicate::DeletePredicate, DatabaseName,
};
use futures::stream::{FuturesUnordered, TryStreamExt};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
/// An error raised by the [`Partitioner`] handler.
#[derive(Debug, Error)]
pub enum PartitionError {
/// Failed to write to the partitioned table batch.
#[error("error batching into partitioned write: {0}")]
BatchWrite(#[from] mutable_batch::Error),
/// The inner DML handler returned an error.
#[error(transparent)]
Inner(Box<DmlError>),
}
/// A decorator of `T`, tagging it with the partition key derived from it.
#[derive(Debug, PartialEq, Clone)]
pub struct Partitioned<T> {
key: String,
payload: T,
}
impl<T> Partitioned<T> {
/// Wrap `payload` with a partition `key`.
pub fn new(key: String, payload: T) -> Self {
Self { key, payload }
}
/// Get a reference to the partition payload.
pub fn payload(&self) -> &T {
&self.payload
}
/// Unwrap `Self` returning the inner payload `T` and the partition key.
pub fn into_parts(self) -> (String, T) {
(self.key, self.payload)
}
}
/// A [`DmlHandler`] implementation that splits per-table [`MutableBatch`] into
/// partitioned per-table [`MutableBatch`] instances according to a configured
/// [`PartitionTemplate`]. Deletes pass through unmodified.
///
/// Each partition is passed through to the inner DML handler (or chain of
/// handlers) concurrently, aborting if an error occurs. This may allow a
/// partial write to be observable down-stream of the [`Partitioner`] if at
/// least one partitioned write succeeds and at least one partitioned write
/// fails. When a partial write occurs, the handler returns an error describing
/// the failure.
#[derive(Debug)]
pub struct Partitioner<D> {
partition_template: PartitionTemplate,
inner: D,
}
impl<D> Partitioner<D> {
/// Initialise a new [`Partitioner`], splitting writes according to the
/// specified [`PartitionTemplate`] before calling `inner`.
pub fn new(inner: D, partition_template: PartitionTemplate) -> Self {
Self {
partition_template,
inner,
}
}
}
#[async_trait]
impl<D> DmlHandler for Partitioner<D>
where
D: DmlHandler<WriteInput = Partitioned<HashMap<String, MutableBatch>>>,
{
type WriteError = PartitionError;
type DeleteError = D::DeleteError;
type WriteInput = HashMap<String, MutableBatch>;
/// Partition the per-table [`MutableBatch`] and call the inner handler with
/// each partition.
async fn write(
&self,
namespace: DatabaseName<'static>,
batch: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
// A collection of partition-keyed, per-table MutableBatch instances.
let mut partitions: HashMap<_, HashMap<_, MutableBatch>> = HashMap::default();
for (table_name, batch) in batch {
// Partition the table batch according to the configured partition
// template and write it into the partition-keyed map.
for (partition_key, partition_payload) in
PartitionWrite::partition(&table_name, &batch, &self.partition_template)
{
let partition = partitions.entry(partition_key).or_default();
let table_batch = partition
.raw_entry_mut()
.from_key(&table_name)
.or_insert_with(|| (table_name.to_owned(), MutableBatch::default()));
partition_payload.write_to_batch(table_batch.1)?;
}
}
partitions
.into_iter()
.map(|(key, batch)| {
let p = Partitioned {
key,
payload: batch,
};
let namespace = namespace.clone();
let span_ctx = span_ctx.clone();
async move { self.inner.write(namespace, p, span_ctx).await }
})
.collect::<FuturesUnordered<_>>()
.try_for_each(|_| async move {
trace!("partitioned write complete");
Ok(())
})
.await
.map_err(|e| PartitionError::Inner(Box::new(e.into())))
}
/// Pass the delete request through unmodified to the next handler.
async fn delete<'a>(
&self,
namespace: DatabaseName<'static>,
table_name: impl Into<String> + Send + Sync + 'a,
predicate: DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::database_rules::TemplatePart;
use lazy_static::lazy_static;
use time::Time;
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
use super::*;
lazy_static! {
/// A static default time to use in tests (1971-05-02 UTC).
static ref DEFAULT_TIME: Time = Time::from_timestamp_nanos(42000000000000000);
}
// Generate a test case that partitions "lp" and calls a mock inner DML
// handler for each partition, which returns the values specified in
// "inner_write_returns".
//
// Assert the partition-to-table mapping in "want_writes" and assert the
// handler write() return value in "want_handler_ret".
macro_rules! test_write {
(
$name:ident,
lp = $lp:expr,
inner_write_returns = $inner_write_returns:expr,
want_writes = [$($want_writes:tt)*], // "partition key" => ["mapped", "tables"] or [unchecked] to skip assert
want_handler_ret = $($want_handler_ret:tt)+
) => {
paste::paste! {
#[tokio::test]
async fn [<test_write_ $name>]() {
use pretty_assertions::assert_eq;
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
};
let inner = Arc::new(MockDmlHandler::default().with_write_return($inner_write_returns));
let partitioner = Partitioner::new(Arc::clone(&inner), partition_template);
let ns = DatabaseName::new("bananas").expect("valid db name");
let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIME.timestamp_nanos()).expect("failed to parse test LP");
let handler_ret = partitioner.write(ns.clone(), writes, None).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
// Collect writes into a <partition_key, table_names> map.
let calls = inner.calls().into_iter().map(|v| match v {
MockDmlHandlerCall::Write { namespace, write_input, .. } => {
assert_eq!(namespace, *ns);
// Extract the table names for comparison
let mut tables = write_input
.payload
.keys()
.cloned()
.collect::<Vec<String>>();
tables.sort();
(write_input.key.clone(), tables)
},
MockDmlHandlerCall::Delete { .. } => unreachable!("mock should not observe deletes"),
})
.collect::<HashMap<String, _>>();
test_write!(@assert_writes, calls, $($want_writes)*);
}
}
};
// Generate a NOP that doesn't assert the writes if "unchecked" is
// specified.
//
// This is useful for tests that cause non-deterministic partial writes.
(@assert_writes, $got:ident, unchecked) => { let _x = $got; };
// Generate a block of code that validates tokens in the form of:
//
// key => ["table", "names"]
//
// Matches the partition key / tables names observed by the mock.
(@assert_writes, $got:ident, $($partition_key:expr => $want_tables:expr, )*) => {
// Construct the desired writes, keyed by partition key
#[allow(unused_mut)]
let mut want_writes: HashMap<String, _> = Default::default();
$(
let mut want: Vec<String> = $want_tables.into_iter().map(|t| t.to_string()).collect();
want.sort();
want_writes.insert($partition_key.to_string(), want);
)*
assert_eq!(want_writes, $got);
};
}
test_write!(
single_partition_ok,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 2\n\
another,tag1=A,tag2=B value=42i 3\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1\n\
",
inner_write_returns = [Ok(())],
want_writes = [
"1970-01-01" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Ok(())
);
test_write!(
single_partition_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 2\n\
another,tag1=A,tag2=B value=42i 3\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1\n\
",
inner_write_returns = [Err(DmlError::DatabaseNotFound("missing".to_owned()))],
want_writes = [
// Attempted write recorded by the mock
"1970-01-01" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
multiple_partitions_ok,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [Ok(()), Ok(()), Ok(())],
want_writes = [
"1970-01-01" => ["bananas"],
"2016-06-13" => ["platanos", "another"],
"2022-02-08" => ["table"],
],
want_handler_ret = Ok(())
);
test_write!(
multiple_partitions_total_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Err(DmlError::DatabaseNotFound("missing".to_owned())),
],
want_writes = [unchecked],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
multiple_partitions_partial_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Ok(()),
Ok(()),
],
want_writes = [unchecked],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
no_specified_timestamp,
lp = "\
bananas,tag1=A,tag2=B val=42i\n\
platanos,tag1=A,tag2=B value=42i\n\
another,tag1=A,tag2=B value=42i\n\
bananas,tag1=A,tag2=B val=42i\n\
table,tag1=A,tag2=B val=42i\n\
",
inner_write_returns = [Ok(())],
want_writes = [
"1971-05-02" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Ok(())
);
}

View File

@ -14,7 +14,7 @@ use trace::ctx::SpanContext;
use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache};
use super::{DmlError, DmlHandler};
use super::{DmlError, DmlHandler, Partitioned};
/// Errors emitted during schema validation.
#[derive(Debug, Error)]
@ -106,12 +106,14 @@ impl<D, C> SchemaValidator<D, C> {
#[async_trait]
impl<D, C> DmlHandler for SchemaValidator<D, C>
where
D: DmlHandler,
D: DmlHandler<WriteInput = Partitioned<HashMap<String, MutableBatch>>>,
C: NamespaceCache,
{
type WriteError = SchemaError;
type DeleteError = D::DeleteError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
/// Validate the schema of all the writes in `batches` before passing the
/// request onto the inner handler.
///
@ -132,7 +134,7 @@ where
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
batches: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
let mut repos = self.catalog.repositories().await;
@ -162,7 +164,7 @@ where
};
let maybe_new_schema = validate_or_insert_schema(
batches.iter().map(|(k, v)| (k.as_str(), v)),
batches.payload().iter().map(|(k, v)| (k.as_str(), v)),
&schema,
repos.deref_mut(),
)
@ -238,10 +240,10 @@ mod tests {
}
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<String, MutableBatch>> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
Partitioned::new("key".to_owned(), writes)
}
/// Initialise an in-memory [`MemCatalog`] and create a single namespace
@ -361,9 +363,9 @@ mod tests {
assert_matches!(err, SchemaError::Validate(_));
// THe mock should observe exactly one write from the first call.
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, batches}] => {
assert_matches!(mock.calls().as_slice(), [MockDmlHandlerCall::Write{namespace, write_input}] => {
assert_eq!(namespace, NAMESPACE);
let batch = batches.get("bananas").expect("table not found in write");
let batch = write_input.payload().get("bananas").expect("table not found in write");
assert_eq!(batch.rows(), 1);
let col = batch.column("val").expect("column not found in write");
assert_matches!(col.influx_type(), InfluxColumnType::Field(InfluxFieldType::Integer));

View File

@ -19,6 +19,8 @@ use write_buffer::core::WriteBufferError;
use crate::{dml_handlers::DmlHandler, sequencer::Sequencer, sharder::Sharder};
use super::Partitioned;
/// Errors occurring while writing to one or more write buffer shards.
#[derive(Debug, Error)]
pub enum ShardError {
@ -81,15 +83,20 @@ where
type WriteError = ShardError;
type DeleteError = ShardError;
type WriteInput = Partitioned<HashMap<String, MutableBatch>>;
/// Shard `writes` and dispatch the resultant DML operations.
async fn write(
&self,
namespace: DatabaseName<'static>,
writes: HashMap<String, MutableBatch>,
writes: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), ShardError> {
let mut collated: HashMap<_, HashMap<String, MutableBatch>> = HashMap::new();
// Extract the partition key & DML writes.
let (partition_key, writes) = writes.into_parts();
// Shard each entry in `writes` and collate them into one DML operation
// per shard to maximise the size of each write, and therefore increase
// the effectiveness of compression of ops in the write buffer.
@ -108,6 +115,7 @@ where
let dml = DmlWrite::new(&namespace, batch, DmlMeta::unsequenced(span_ctx.clone()));
trace!(
%partition_key,
sequencer_id=%sequencer.id(),
tables=%dml.table_count(),
%namespace,
@ -199,10 +207,10 @@ mod tests {
use super::*;
// Parse `lp` into a table-keyed MutableBatch map.
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<String, MutableBatch>> {
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
.expect("failed to build test writes from LP");
writes
Partitioned::new("key".to_owned(), writes)
}
// Init a mock write buffer with the given number of sequencers.

View File

@ -3,12 +3,10 @@ use std::{error::Error, fmt::Debug};
use async_trait::async_trait;
use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{NamespaceCreationError, SchemaError, ShardError};
use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
/// processing.
@ -30,6 +28,10 @@ pub enum DmlError {
#[error(transparent)]
NamespaceCreation(#[from] NamespaceCreationError),
/// An error partitioning the request.
#[error(transparent)]
Partition(#[from] PartitionError),
/// An unknown error occured while processing the DML request.
#[error("internal dml handler error: {0}")]
Internal(Box<dyn Error + Send + Sync>),
@ -38,11 +40,19 @@ pub enum DmlError {
/// A composable, abstract handler of DML requests.
#[async_trait]
pub trait DmlHandler: Debug + Send + Sync {
/// The input type this handler expects for a DML write.
///
/// By allowing handlers to vary their input type, it is possible to
/// construct a chain of [`DmlHandler`] implementations that transform the
/// input request as it progresses through the handler pipeline.
type WriteInput: Debug + Send + Sync;
/// The type of error a [`DmlHandler`] implementation produces for write
/// requests.
///
/// All errors must be mappable into the concrete [`DmlError`] type.
type WriteError: Error + Into<DmlError> + Send;
/// The error type of the delete handler.
type DeleteError: Error + Into<DmlError> + Send;
@ -50,7 +60,7 @@ pub trait DmlHandler: Debug + Send + Sync {
async fn write(
&self,
namespace: DatabaseName<'static>,
batches: HashMap<String, MutableBatch>,
input: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError>;

View File

@ -6,6 +6,7 @@
//! * Handling writes:
//! * Receiving IOx write/delete requests via HTTP and gRPC endpoints.
//! * Enforcing schema validation & synchronising it within the catalog.
//! * Deriving the partition key of each DML operation.
//! * Applying sharding logic.
//! * Push resulting operations into the appropriate kafka topics.
//!

View File

@ -3,6 +3,8 @@
use std::sync::Arc;
use crate::dml_handlers::DmlHandler;
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use trace::TraceCollector;
use self::{grpc::GrpcDelegate, http::HttpDelegate};
@ -51,7 +53,7 @@ impl<D> RouterServer<D> {
impl<D> RouterServer<D>
where
D: DmlHandler,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
{
/// Get a reference to the router http delegate.
pub fn http(&self) -> &HttpDelegate<D> {

View File

@ -6,7 +6,9 @@ use bytes::{Bytes, BytesMut};
use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError};
use futures::StreamExt;
use hashbrown::HashMap;
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
use serde::Deserialize;
@ -14,7 +16,7 @@ use thiserror::Error;
use time::{SystemProvider, TimeProvider};
use trace::ctx::SpanContext;
use crate::dml_handlers::{DmlError, DmlHandler};
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error)]
@ -69,9 +71,7 @@ impl Error {
/// the end user.
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NoHandler | Error::DmlHandler(DmlError::DatabaseNotFound(_)) => {
StatusCode::NOT_FOUND
}
Error::NoHandler => StatusCode::NOT_FOUND,
Error::InvalidOrgBucket(_) => StatusCode::BAD_REQUEST,
Error::ClientHangup(_) => StatusCode::BAD_REQUEST,
Error::InvalidGzip(_) => StatusCode::BAD_REQUEST,
@ -85,9 +85,21 @@ impl Error {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
StatusCode::UNSUPPORTED_MEDIA_TYPE
}
Error::DmlHandler(
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_),
) => StatusCode::INTERNAL_SERVER_ERROR,
Error::DmlHandler(err) => StatusCode::from(err),
}
}
}
impl From<&DmlError> for StatusCode {
fn from(e: &DmlError) -> Self {
match e {
DmlError::DatabaseNotFound(_) => StatusCode::NOT_FOUND,
DmlError::Schema(_) => StatusCode::BAD_REQUEST,
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::Partition(PartitionError::Inner(err)) => StatusCode::from(&**err),
}
}
}
@ -162,7 +174,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
impl<D, T> HttpDelegate<D, T>
where
D: DmlHandler,
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
T: TimeProvider,
{
/// Routes `req` to the appropriate handler, if any, returning the handler