feat: write partitioner

Implements a write partitioning DML handler that splits per-table
MutableBatch instances into per-partition, per-table MutableBatch and
concurrently calls the inner DML handler with each.
pull/24376/head
Dom Dwyer 2022-02-09 13:31:13 +00:00
parent 5c254339fa
commit 92218ce8aa
6 changed files with 420 additions and 40 deletions

67
Cargo.lock generated
View File

@ -3231,7 +3231,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core 0.9.0",
"parking_lot_core 0.9.1",
]
[[package]]
@ -3250,15 +3250,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2f4f894f3865f6c0e02810fc597300f34dc2510f66400da262d8ae10e75767d"
checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.29.0",
"windows-sys 0.32.0",
]
[[package]]
@ -4137,7 +4137,9 @@ dependencies = [
"generated_types",
"hashbrown 0.12.0",
"hyper",
"influxdb_line_protocol",
"iox_catalog",
"lazy_static",
"metric",
"mutable_batch",
"mutable_batch_lp",
@ -4145,6 +4147,7 @@ dependencies = [
"parking_lot 0.12.0",
"paste",
"predicate",
"pretty_assertions",
"rand",
"schema",
"serde",
@ -5881,19 +5884,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceb069ac8b2117d36924190469735767f0990833935ab430155e71a44bafe148"
dependencies = [
"windows_aarch64_msvc 0.29.0",
"windows_i686_gnu 0.29.0",
"windows_i686_msvc 0.29.0",
"windows_x86_64_gnu 0.29.0",
"windows_x86_64_msvc 0.29.0",
]
[[package]]
name = "windows-sys"
version = "0.30.0"
@ -5908,10 +5898,17 @@ dependencies = [
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.29.0"
name = "windows-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d027175d00b01e0cbeb97d6ab6ebe03b12330a35786cbaca5252b1c4bf5d9b"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc 0.32.0",
"windows_i686_gnu 0.32.0",
"windows_i686_msvc 0.32.0",
"windows_x86_64_gnu 0.32.0",
"windows_x86_64_msvc 0.32.0",
]
[[package]]
name = "windows_aarch64_msvc"
@ -5920,10 +5917,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29277a4435d642f775f63c7d1faeb927adba532886ce0287bd985bffb16b6bca"
[[package]]
name = "windows_i686_gnu"
version = "0.29.0"
name = "windows_aarch64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8793f59f7b8e8b01eda1a652b2697d87b93097198ae85f823b969ca5b89bba58"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"
[[package]]
name = "windows_i686_gnu"
@ -5932,10 +5929,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145e1989da93956c68d1864f32fb97c8f561a8f89a5125f6a2b7ea75524e4b8"
[[package]]
name = "windows_i686_msvc"
version = "0.29.0"
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8602f6c418b67024be2996c512f5f995de3ba417f4c75af68401ab8756796ae4"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"
[[package]]
name = "windows_i686_msvc"
@ -5944,10 +5941,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4a09e3a0d4753b73019db171c1339cd4362c8c44baf1bcea336235e955954a6"
[[package]]
name = "windows_x86_64_gnu"
version = "0.29.0"
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d615f419543e0bd7d2b3323af0d86ff19cbc4f816e6453f36a2c2ce889c354"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"
[[package]]
name = "windows_x86_64_gnu"
@ -5956,10 +5953,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ca64fcb0220d58db4c119e050e7af03c69e6f4f415ef69ec1773d9aab422d5a"
[[package]]
name = "windows_x86_64_msvc"
version = "0.29.0"
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11d95421d9ed3672c280884da53201a5c46b7b2765ca6faf34b0d71cf34a3561"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"
[[package]]
name = "windows_x86_64_msvc"
@ -5967,6 +5964,12 @@ version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08cabc9f0066848fef4bc6a1c1668e6efce38b661d2aeec75d18d8617eebb5f1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]]
name = "winreg"
version = "0.7.0"

View File

@ -15,6 +15,7 @@ futures = "0.3.21"
generated_types = { path = "../generated_types" }
hashbrown = "0.12"
hyper = "0.14"
influxdb_line_protocol = { version = "0.1.0", path = "../influxdb_line_protocol" }
iox_catalog = { path = "../iox_catalog" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
@ -36,7 +37,9 @@ write_buffer = { path = "../write_buffer" }
[dev-dependencies]
assert_matches = "1.5"
criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
lazy_static = "1.4.0"
paste = "1.0.6"
pretty_assertions = "1.1.0"
rand = "0.8.3"
schema = { path = "../schema" }

View File

@ -78,5 +78,8 @@ pub use sharded_write_buffer::*;
mod ns_autocreation;
pub use ns_autocreation::*;
mod partitioner;
pub use partitioner::*;
#[cfg(test)]
pub mod mock;

View File

@ -0,0 +1,357 @@
use async_trait::async_trait;
use data_types::{
database_rules::PartitionTemplate, delete_predicate::DeletePredicate, DatabaseName,
};
use futures::stream::{FuturesUnordered, TryStreamExt};
use hashbrown::HashMap;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use observability_deps::tracing::*;
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{DmlError, DmlHandler};
/// An error raised by the [`Partitioner`] handler.
#[derive(Debug, Error)]
pub enum PartitionError {
/// Failed to write to the partitioned table batch.
#[error("error batching into partitioned write: {0}")]
BatchWrite(#[from] mutable_batch::Error),
/// The inner DML handler returned an error.
#[error(transparent)]
Inner(Box<DmlError>),
}
/// A decorator of `T`, tagging it with the partition key derived from it.
#[derive(Debug, PartialEq, Clone)]
pub struct Partitioned<T> {
key: String,
payload: T,
}
impl<T> Partitioned<T> {
/// Wrap `payload` with a partition `key`.
pub fn new(key: String, payload: T) -> Self {
Self { key, payload }
}
/// Get a reference to the partition payload.
pub fn payload(&self) -> &T {
&self.payload
}
/// Unwrap `Self` returning the inner payload `T` and the partition key.
pub fn into_parts(self) -> (String, T) {
(self.key, self.payload)
}
}
/// A [`DmlHandler`] implementation that splits per-table [`MutableBatch`] into
/// partitioned per-table [`MutableBatch`] instances according to a configured
/// [`PartitionTemplate`]. Deletes pass through unmodified.
///
/// Each partition is passed through to the inner DML handler (or chain of
/// handlers) concurrently, aborting if an error occurs. This may allow a
/// partial write to be observable down-stream of the [`Partitioner`] if at
/// least one partitioned write succeeds and at least one partitioned write
/// fails. When a partial write occurs, the handler returns an error describing
/// the failure.
#[derive(Debug)]
pub struct Partitioner<D> {
partition_template: PartitionTemplate,
inner: D,
}
impl<D> Partitioner<D> {
/// Initialise a new [`Partitioner`], splitting writes according to the
/// specified [`PartitionTemplate`] before calling `inner`.
pub fn new(inner: D, partition_template: PartitionTemplate) -> Self {
Self {
partition_template,
inner,
}
}
}
#[async_trait]
impl<D> DmlHandler for Partitioner<D>
where
D: DmlHandler<WriteInput = Partitioned<HashMap<String, MutableBatch>>>,
{
type WriteError = PartitionError;
type DeleteError = D::DeleteError;
type WriteInput = HashMap<String, MutableBatch>;
/// Partition the per-table [`MutableBatch`] and call the inner handler with
/// each partition.
async fn write(
&self,
namespace: DatabaseName<'static>,
batch: Self::WriteInput,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::WriteError> {
// A collection of partition-keyed, per-table MutableBatch instances.
let mut partitions: HashMap<_, HashMap<_, MutableBatch>> = HashMap::default();
for (table_name, batch) in batch {
// Partition the table batch according to the configured partition
// template and write it into the partition-keyed map.
for (partition_key, partition_payload) in
PartitionWrite::partition(&table_name, &batch, &self.partition_template)
{
let partition = partitions.entry(partition_key).or_default();
let table_batch = partition
.raw_entry_mut()
.from_key(&table_name)
.or_insert_with(|| (table_name.to_owned(), MutableBatch::default()));
partition_payload.write_to_batch(table_batch.1)?;
}
}
partitions
.into_iter()
.map(|(key, batch)| {
let p = Partitioned {
key,
payload: batch,
};
let namespace = namespace.clone();
let span_ctx = span_ctx.clone();
async move { self.inner.write(namespace, p, span_ctx).await }
})
.collect::<FuturesUnordered<_>>()
.try_for_each(|_| async move {
trace!("partitioned write complete");
Ok(())
})
.await
.map_err(|e| PartitionError::Inner(Box::new(e.into())))
}
/// Pass the delete request through unmodified to the next handler.
async fn delete<'a>(
&self,
namespace: DatabaseName<'static>,
table_name: impl Into<String> + Send + Sync + 'a,
predicate: DeletePredicate,
span_ctx: Option<SpanContext>,
) -> Result<(), Self::DeleteError> {
self.inner
.delete(namespace, table_name, predicate, span_ctx)
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::database_rules::TemplatePart;
use lazy_static::lazy_static;
use time::Time;
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
use super::*;
lazy_static! {
/// A static default time to use in tests (1971-05-02 UTC).
static ref DEFAULT_TIME: Time = Time::from_timestamp_nanos(42000000000000000);
}
// Generate a test case that partitions "lp" and calls a mock inner DML
// handler for each partition, which returns the values specified in
// "inner_write_returns".
//
// Assert the partition-to-table mapping in "want_writes" and assert the
// handler write() return value in "want_handler_ret".
macro_rules! test_write {
(
$name:ident,
lp = $lp:expr,
inner_write_returns = $inner_write_returns:expr,
want_writes = [$($want_writes:tt)*], // "partition key" => ["mapped", "tables"] or [unchecked] to skip assert
want_handler_ret = $($want_handler_ret:tt)+
) => {
paste::paste! {
#[tokio::test]
async fn [<test_write_ $name>]() {
use pretty_assertions::assert_eq;
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
};
let inner = Arc::new(MockDmlHandler::default().with_write_return($inner_write_returns));
let partitioner = Partitioner::new(Arc::clone(&inner), partition_template);
let ns = DatabaseName::new("bananas").expect("valid db name");
let (writes, _) = mutable_batch_lp::lines_to_batches_stats($lp, DEFAULT_TIME.timestamp_nanos()).expect("failed to parse test LP");
let handler_ret = partitioner.write(ns.clone(), writes, None).await;
assert_matches!(handler_ret, $($want_handler_ret)+);
// Collect writes into a <partition_key, table_names> map.
let calls = inner.calls().into_iter().map(|v| match v {
MockDmlHandlerCall::Write { namespace, write_input, .. } => {
assert_eq!(namespace, *ns);
// Extract the table names for comparison
let mut tables = write_input
.payload
.keys()
.cloned()
.collect::<Vec<String>>();
tables.sort();
(write_input.key.clone(), tables)
},
MockDmlHandlerCall::Delete { .. } => unreachable!("mock should not observe deletes"),
})
.collect::<HashMap<String, _>>();
test_write!(@assert_writes, calls, $($want_writes)*);
}
}
};
// Generate a NOP that doesn't assert the writes if "unchecked" is
// specified.
//
// This is useful for tests that cause non-deterministic partial writes.
(@assert_writes, $got:ident, unchecked) => { let _x = $got; };
// Generate a block of code that validates tokens in the form of:
//
// key => ["table", "names"]
//
// Matches the partition key / tables names observed by the mock.
(@assert_writes, $got:ident, $($partition_key:expr => $want_tables:expr, )*) => {
// Construct the desired writes, keyed by partition key
#[allow(unused_mut)]
let mut want_writes: HashMap<String, _> = Default::default();
$(
let mut want: Vec<String> = $want_tables.into_iter().map(|t| t.to_string()).collect();
want.sort();
want_writes.insert($partition_key.to_string(), want);
)*
assert_eq!(want_writes, $got);
};
}
test_write!(
single_partition_ok,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 2\n\
another,tag1=A,tag2=B value=42i 3\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1\n\
",
inner_write_returns = [Ok(())],
want_writes = [
"1970-01-01" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Ok(())
);
test_write!(
single_partition_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 2\n\
another,tag1=A,tag2=B value=42i 3\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1\n\
",
inner_write_returns = [Err(DmlError::DatabaseNotFound("missing".to_owned()))],
want_writes = [
// Attempted write recorded by the mock
"1970-01-01" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
multiple_partitions_ok,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [Ok(()), Ok(()), Ok(())],
want_writes = [
"1970-01-01" => ["bananas"],
"2016-06-13" => ["platanos", "another"],
"2022-02-08" => ["table"],
],
want_handler_ret = Ok(())
);
test_write!(
multiple_partitions_total_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Err(DmlError::DatabaseNotFound("missing".to_owned())),
],
want_writes = [unchecked],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
multiple_partitions_partial_err,
lp = "\
bananas,tag1=A,tag2=B val=42i 1\n\
platanos,tag1=A,tag2=B value=42i 1465839830100400200\n\
another,tag1=A,tag2=B value=42i 1465839830100400200\n\
bananas,tag1=A,tag2=B val=42i 2\n\
table,tag1=A,tag2=B val=42i 1644347270670952000\n\
",
inner_write_returns = [
Err(DmlError::DatabaseNotFound("missing".to_owned())),
Ok(()),
Ok(()),
],
want_writes = [unchecked],
want_handler_ret = Err(PartitionError::Inner(e)) => {
assert_matches!(*e, DmlError::DatabaseNotFound(_));
}
);
test_write!(
no_specified_timestamp,
lp = "\
bananas,tag1=A,tag2=B val=42i\n\
platanos,tag1=A,tag2=B value=42i\n\
another,tag1=A,tag2=B value=42i\n\
bananas,tag1=A,tag2=B val=42i\n\
table,tag1=A,tag2=B val=42i\n\
",
inner_write_returns = [Ok(())],
want_writes = [
"1971-05-02" => ["bananas", "platanos", "another", "table"],
],
want_handler_ret = Ok(())
);
}

View File

@ -6,7 +6,7 @@ use data_types::{delete_predicate::DeletePredicate, DatabaseName};
use thiserror::Error;
use trace::ctx::SpanContext;
use super::{NamespaceCreationError, SchemaError, ShardError};
use super::{partitioner::PartitionError, NamespaceCreationError, SchemaError, ShardError};
/// Errors emitted by a [`DmlHandler`] implementation during DML request
/// processing.
@ -28,6 +28,10 @@ pub enum DmlError {
#[error(transparent)]
NamespaceCreation(#[from] NamespaceCreationError),
/// An error partitioning the request.
#[error(transparent)]
Partition(#[from] PartitionError),
/// An unknown error occured while processing the DML request.
#[error("internal dml handler error: {0}")]
Internal(Box<dyn Error + Send + Sync>),

View File

@ -16,7 +16,7 @@ use thiserror::Error;
use time::{SystemProvider, TimeProvider};
use trace::ctx::SpanContext;
use crate::dml_handlers::{DmlError, DmlHandler};
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError};
/// Errors returned by the `router2` HTTP request handler.
#[derive(Debug, Error)]
@ -71,9 +71,7 @@ impl Error {
/// the end user.
pub fn as_status_code(&self) -> StatusCode {
match self {
Error::NoHandler | Error::DmlHandler(DmlError::DatabaseNotFound(_)) => {
StatusCode::NOT_FOUND
}
Error::NoHandler => StatusCode::NOT_FOUND,
Error::InvalidOrgBucket(_) => StatusCode::BAD_REQUEST,
Error::ClientHangup(_) => StatusCode::BAD_REQUEST,
Error::InvalidGzip(_) => StatusCode::BAD_REQUEST,
@ -87,9 +85,21 @@ impl Error {
// https://www.rfc-editor.org/rfc/rfc7231#section-6.5.13
StatusCode::UNSUPPORTED_MEDIA_TYPE
}
Error::DmlHandler(
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_),
) => StatusCode::INTERNAL_SERVER_ERROR,
Error::DmlHandler(err) => StatusCode::from(err),
}
}
}
impl From<&DmlError> for StatusCode {
fn from(e: &DmlError) -> Self {
match e {
DmlError::DatabaseNotFound(_) => StatusCode::NOT_FOUND,
DmlError::Schema(_) => StatusCode::BAD_REQUEST,
DmlError::Internal(_) | DmlError::WriteBuffer(_) | DmlError::NamespaceCreation(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::Partition(PartitionError::BatchWrite(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::Partition(PartitionError::Inner(err)) => StatusCode::from(&**err),
}
}
}