Merge pull request #4491 from influxdata/dom/delete-shard-no-table

feat: shard delete ops with no table predicate
pull/24376/head
kodiakhq[bot] 2022-05-09 13:53:00 +00:00 committed by GitHub
commit a96fd1cde2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 377 additions and 61 deletions

View File

@ -1,7 +1,10 @@
use std::sync::Arc;
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, Throughput,
};
use data_types::DatabaseName;
use mutable_batch::MutableBatch;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use router::sharder::{JumpHash, Sharder};
@ -83,12 +86,13 @@ fn benchmark_sharder(
table: &str,
namespace: &DatabaseName<'_>,
) {
let hasher = JumpHash::new(0..num_buckets);
let hasher = JumpHash::new((0..num_buckets).map(Arc::new));
let batch = MutableBatch::default();
group.throughput(Throughput::Elements(1));
group.bench_function(bench_name, |b| {
b.iter(|| {
hasher.shard(table, namespace, &0);
hasher.shard(table, namespace, &batch);
});
});
}

View File

@ -56,6 +56,12 @@ where
/// The buffering / async return behaviour of the methods on this type are
/// defined by the behaviour of the underlying [write buffer] implementation.
///
/// Operations that require writing to multiple shards may experience partial
/// failures - the op may be successfully wrote to one shard, while failing to
/// write to another shard. Users are expected to retry the partially failed
/// operation to converge the system. The order of writes across multiple shards
/// is non-deterministic.
///
/// [write buffer]: write_buffer::core::WriteBufferWriting
#[derive(Debug)]
pub struct ShardedWriteBuffer<S> {
@ -74,7 +80,7 @@ impl<S> ShardedWriteBuffer<S> {
impl<S> DmlHandler for ShardedWriteBuffer<S>
where
S: Sharder<MutableBatch, Item = Arc<Sequencer>>
+ Sharder<DeletePredicate, Item = Arc<Sequencer>>,
+ Sharder<DeletePredicate, Item = Vec<Arc<Sequencer>>>,
{
type WriteError = ShardError;
type DeleteError = ShardError;
@ -98,7 +104,7 @@ where
// per shard to maximise the size of each write, and therefore increase
// the effectiveness of compression of ops in the write buffer.
for (table, batch) in writes.into_iter() {
let sequencer = Arc::clone(self.sharder.shard(&table, namespace, &batch));
let sequencer = self.sharder.shard(&table, namespace, &batch);
let existing = collated
.entry(sequencer)
@ -135,9 +141,7 @@ where
span_ctx: Option<SpanContext>,
) -> Result<(), ShardError> {
let predicate = predicate.clone();
let sequencer = self.sharder.shard(table_name, namespace, &predicate);
trace!(sequencer_id=%sequencer.id(), %table_name, %namespace, "routing delete to shard");
let sequencers = self.sharder.shard(table_name, namespace, &predicate);
let dml = DmlDelete::new(
namespace,
@ -146,13 +150,14 @@ where
DmlMeta::unsequenced(span_ctx),
);
sequencer
.enqueue(DmlOperation::from(dml))
.await
.map_err(|e| ShardError::WriteBufferErrors {
successes: 0,
errs: vec![e],
})?;
let iter = sequencers.into_iter().map(|s| {
trace!(sequencer_id=%s.id(), %table_name, %namespace, "routing delete to shard");
(s, DmlOperation::from(dml.clone()))
});
// TODO: return sequencer metadata
parallel_enqueue(iter).await?;
Ok(())
}
@ -162,7 +167,7 @@ where
/// the [`DmlOperation`] to its paired [`Sequencer`], executes all the futures
/// in parallel and gathers any errors.
///
/// Returns a list of the sequences that were written
/// Returns a list of the sequences that were written.
async fn parallel_enqueue<T>(v: T) -> Result<Vec<DmlMeta>, ShardError>
where
T: Iterator<Item = (Arc<Sequencer>, DmlOperation)> + Send,
@ -200,7 +205,7 @@ mod tests {
use super::*;
use crate::{
dml_handlers::DmlHandler,
sharder::mock::{MockSharder, MockSharderCall},
sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload},
};
use assert_matches::assert_matches;
use data_types::TimestampRange;
@ -464,4 +469,187 @@ mod tests {
assert_eq!(*d.predicate(), predicate);
});
}
#[tokio::test]
async fn test_shard_delete_no_table() {
let write_buffer = init_write_buffer(1);
let write_buffer_state = write_buffer.state();
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
// Configure the sharder to return shards containing the mock write
// buffer.
let shard = Arc::new(Sequencer::new(
0,
Arc::new(write_buffer),
&Default::default(),
));
let sharder = Arc::new(MockSharder::default().with_return([Arc::clone(&shard)]));
let w = ShardedWriteBuffer::new(Arc::clone(&sharder));
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("namespace").unwrap();
w.delete(&ns, "", &predicate, None)
.await
.expect("delete failed");
// Assert the table name was captured as empty.
let calls = sharder.calls();
assert_matches!(calls.as_slice(), [MockSharderCall{table_name, payload, ..}] => {
assert_eq!(table_name, "");
assert_matches!(payload, MockSharderPayload::DeletePredicate(..));
});
// All writes were dispatched to the same shard, which should observe
// one op containing all writes lines (asserting that all the writes for
// one shard are collated into one op).
//
// The table name should be None as it was specified as an empty string.
let mut got = write_buffer_state.get_messages(shard.id() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
.unwrap()
.expect("write should have been successful");
assert_matches!(got, DmlOperation::Delete(d) => {
assert_eq!(d.table_name(), None);
assert_eq!(*d.predicate(), predicate);
});
}
#[derive(Debug)]
struct MultiDeleteSharder(Vec<Arc<Sequencer>>);
impl Sharder<DeletePredicate> for MultiDeleteSharder {
type Item = Vec<Arc<Sequencer>>;
fn shard(
&self,
_table: &str,
_namespace: &DatabaseName<'_>,
_payload: &DeletePredicate,
) -> Self::Item {
self.0.clone()
}
}
impl Sharder<MutableBatch> for MultiDeleteSharder {
type Item = Arc<Sequencer>;
fn shard(
&self,
_table: &str,
_namespace: &DatabaseName<'_>,
_payload: &MutableBatch,
) -> Self::Item {
unreachable!()
}
}
#[tokio::test]
async fn test_shard_delete_multiple_shards() {
const TABLE: &str = "bananas";
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
// Configure the first shard to write to one write buffer
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
Arc::new(write_buffer1),
&Default::default(),
));
// Configure the second shard to write to another write buffer
let write_buffer2 = init_write_buffer(1);
let write_buffer2_state = write_buffer2.state();
let shard2 = Arc::new(Sequencer::new(
0,
Arc::new(write_buffer2),
&Default::default(),
));
let sharder = MultiDeleteSharder(vec![Arc::clone(&shard1), Arc::clone(&shard2)]);
let w = ShardedWriteBuffer::new(sharder);
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("namespace").unwrap();
w.delete(&ns, TABLE, &predicate, None)
.await
.expect("delete failed");
// The write buffer for shard 1 should observe the delete
let mut got = write_buffer1_state.get_messages(shard1.id() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
.unwrap()
.expect("write should have been successful");
assert_matches!(got, DmlOperation::Delete(_));
// The second shard should observe the delete as well
let mut got = write_buffer2_state.get_messages(shard2.id() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
.unwrap()
.expect("write should have been successful");
assert_matches!(got, DmlOperation::Delete(_));
}
#[tokio::test]
async fn test_shard_delete_multiple_shards_partial_failure() {
const TABLE: &str = "bananas";
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
// Configure the first shard to write to one write buffer
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
Arc::new(write_buffer1),
&Default::default(),
));
// Configure the second shard to write to a write buffer that always fails
let write_buffer2 = init_write_buffer(1);
// Non-existant sequencer ID to trigger an error.
let shard2 = Arc::new(Sequencer::new(
13,
Arc::new(write_buffer2),
&Default::default(),
));
let sharder = MultiDeleteSharder(vec![Arc::clone(&shard1), Arc::clone(&shard2)]);
let w = ShardedWriteBuffer::new(sharder);
// Call the ShardedWriteBuffer and drive the test
let ns = DatabaseName::new("namespace").unwrap();
let err = w
.delete(&ns, TABLE, &predicate, None)
.await
.expect_err("delete should fail");
assert_matches!(err, ShardError::WriteBufferErrors{successes, errs} => {
assert_eq!(errs.len(), 1);
assert_eq!(successes, 1);
});
// The write buffer for shard 1 will still observer the delete.
let got = write_buffer1_state.get_messages(shard1.id() as _);
assert_eq!(got.len(), 1);
}
}

View File

@ -1,9 +1,11 @@
use super::Sharder;
use data_types::DatabaseName;
use data_types::{DatabaseName, DeletePredicate};
use mutable_batch::MutableBatch;
use siphasher::sip::SipHasher13;
use std::{
fmt::Debug,
hash::{Hash, Hasher},
sync::Arc,
};
/// A [`JumpHash`] maps operations for a given table in a given namespace
@ -58,6 +60,11 @@ impl<T> JumpHash<T> {
shards,
}
}
/// Return a slice of all the shards this instance is configured with,
pub fn shards(&self) -> &[T] {
&self.shards
}
}
impl<T> JumpHash<T> {
@ -107,27 +114,62 @@ struct HashKey<'a> {
namespace: &'a str,
}
/// A [`JumpHash`] sharder implementation is generic over `P`, the payload type,
/// enabling it to map any type of payload to a shard as it only considers the
/// table name and namespace when making a sharding decision.
impl<T, P> Sharder<P> for JumpHash<T>
/// A [`JumpHash`] sharder mapping a [`MutableBatch`] reference according to
/// the`namespace it is destined for.
impl<T> Sharder<MutableBatch> for JumpHash<Arc<T>>
where
T: Debug + Send + Sync,
{
type Item = T;
type Item = Arc<T>;
fn shard(&self, table: &str, namespace: &DatabaseName<'_>, _payload: &P) -> &Self::Item {
fn shard(
&self,
table: &str,
namespace: &DatabaseName<'_>,
_payload: &MutableBatch,
) -> Self::Item {
// The derived hash impl for HashKey is hardened against prefix
// collisions when combining the two fields.
self.hash(&HashKey {
Arc::clone(self.hash(&HashKey {
table,
namespace: namespace.as_ref(),
})
}))
}
}
/// A [`JumpHash`] sharder mapping a [`DeletePredicate`] reference to all
/// shards unless a table is specified, in which case the table & namespace are
/// used to shard to the same destination as a write with the same table &
/// namespace would.
impl<T> Sharder<DeletePredicate> for JumpHash<Arc<T>>
where
T: Debug + Send + Sync,
{
type Item = Vec<Arc<T>>;
fn shard(
&self,
table: &str,
namespace: &DatabaseName<'_>,
_payload: &DeletePredicate,
) -> Self::Item {
// A delete that does not specify a table is mapped to all shards.
if table.is_empty() {
return self.shards.iter().map(Arc::clone).collect();
}
// A delete that specifies a table is mapped to the shard responsible
// for this (namespace, table) tuple.
vec![Arc::clone(self.hash(&HashKey {
table,
namespace: namespace.as_ref(),
}))]
}
}
#[cfg(test)]
mod tests {
use data_types::TimestampRange;
use hashbrown::HashMap;
use super::*;
@ -174,27 +216,62 @@ mod tests {
#[test]
fn test_sharder_impl() {
let hasher = JumpHash::new(0..10_000);
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0);
let b = hasher.shard("table", &DatabaseName::try_from("namespace2").unwrap(), &0);
let a = hasher.shard(
"table",
&DatabaseName::try_from("namespace").unwrap(),
&MutableBatch::default(),
);
let b = hasher.shard(
"table",
&DatabaseName::try_from("namespace2").unwrap(),
&MutableBatch::default(),
);
assert_ne!(a, b);
let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0);
let b = hasher.shard("table2", &DatabaseName::try_from("namespace").unwrap(), &0);
let a = hasher.shard(
"table",
&DatabaseName::try_from("namespace").unwrap(),
&MutableBatch::default(),
);
let b = hasher.shard(
"table2",
&DatabaseName::try_from("namespace").unwrap(),
&MutableBatch::default(),
);
assert_ne!(a, b);
let mut batches = mutable_batch_lp::lines_to_batches("cpu a=1i", 42).unwrap();
let batch = batches.remove("cpu").unwrap();
// Assert payloads are ignored for this sharder
let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0);
let b = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &42);
let a = hasher.shard(
"table",
&DatabaseName::try_from("namespace").unwrap(),
&MutableBatch::default(),
);
let b = hasher.shard(
"table",
&DatabaseName::try_from("namespace").unwrap(),
&batch,
);
assert_eq!(a, b);
}
#[test]
fn test_sharder_prefix_collision() {
let hasher = JumpHash::new(0..10_000);
let a = hasher.shard("a", &DatabaseName::try_from("bc").unwrap(), &0);
let b = hasher.shard("ab", &DatabaseName::try_from("c").unwrap(), &0);
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let a = hasher.shard(
"a",
&DatabaseName::try_from("bc").unwrap(),
&MutableBatch::default(),
);
let b = hasher.shard(
"ab",
&DatabaseName::try_from("c").unwrap(),
&MutableBatch::default(),
);
assert_ne!(a, b);
}
@ -212,23 +289,36 @@ mod tests {
// strategy would that accounts for this mapping change.
#[test]
fn test_key_bucket_fixture() {
let hasher = JumpHash::new(0..1000);
let hasher = JumpHash::new((0..1_000).map(Arc::new));
let namespace = DatabaseName::try_from("bananas").unwrap();
assert_eq!(*hasher.shard("42", &namespace, &0), 904);
assert_eq!(*hasher.shard("4242", &namespace, &1), 230);
assert_eq!(*hasher.shard("bananas", &namespace, &2), 183);
let mut batches = mutable_batch_lp::lines_to_batches("cpu a=1i", 42).unwrap();
let batch = batches.remove("cpu").unwrap();
assert_eq!(
*hasher.shard("42", &namespace, &MutableBatch::default()),
904
);
assert_eq!(
*hasher.shard("4242", &namespace, &MutableBatch::default()),
230
);
assert_eq!(*hasher.shard("bananas", &namespace, &batch), 183);
}
#[test]
fn test_distribution() {
let hasher = JumpHash::new(0..100);
let hasher = JumpHash::new((0..100).map(Arc::new));
let namespace = DatabaseName::try_from("bananas").unwrap();
let mut mapping = HashMap::<_, usize>::new();
for i in 0..10_000_000 {
let bucket = hasher.shard(format!("{}", i).as_str(), &namespace, &0);
let bucket = hasher.shard(
format!("{}", i).as_str(),
&namespace,
&MutableBatch::default(),
);
*mapping.entry(bucket).or_default() += 1;
}
@ -241,4 +331,46 @@ mod tests {
// of the total 10M values
assert!(max - min < 5000, "min: {}, max: {}", min, max);
}
#[test]
fn test_delete_with_table() {
let namespace = DatabaseName::try_from("bananas").unwrap();
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
let batch = MutableBatch::default();
for i in 0..100_usize {
// A delete with a table should map to exactly one shard.
let mut got = hasher.shard(i.to_string().as_str(), &namespace, &predicate);
assert_eq!(got.len(), 1);
let delete_shard = got.pop().unwrap();
// And a write to the same table & namespace MUST map to the same shard.
let write_shard = hasher.shard(i.to_string().as_str(), &namespace, &batch);
assert_eq!(delete_shard, write_shard);
}
}
#[test]
fn test_delete_no_table_shards_to_all() {
let namespace = DatabaseName::try_from("bananas").unwrap();
let shards = (0..10_000).map(Arc::new).collect::<Vec<_>>();
let hasher = JumpHash::new(shards.clone());
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
exprs: vec![],
};
let got = hasher.shard("", &namespace, &predicate);
assert_eq!(got, shards);
}
}

View File

@ -53,10 +53,6 @@ impl<T> Default for MockSharder<T> {
impl<T> MockSharder<T> {
/// Return the values specified in `ret` in sequence for calls to `shard`,
/// starting from the front.
///
/// # Memory Leak
///
/// Each call to `shard` leaks the memory of the `T` it returns.
pub fn with_return(self, ret: impl Into<VecDeque<T>>) -> Self {
self.0.lock().shard_return = ret.into();
self
@ -78,19 +74,17 @@ where
table: &str,
namespace: &DatabaseName<'_>,
payload: &MutableBatch,
) -> &Self::Item {
) -> Self::Item {
let mut guard = self.0.lock();
guard.record_call(MockSharderCall {
table_name: table.to_string(),
namespace: namespace.to_string(),
payload: MockSharderPayload::MutableBatch(payload.clone()),
});
Box::leak(Box::new(
guard
.shard_return
.pop_front()
.expect("no shard mock value to return"),
))
guard
.shard_return
.pop_front()
.expect("no shard mock value to return")
}
}
@ -98,25 +92,23 @@ impl<T> Sharder<DeletePredicate> for Arc<MockSharder<T>>
where
T: Debug + Send + Sync,
{
type Item = T;
type Item = Vec<T>;
fn shard(
&self,
table: &str,
namespace: &DatabaseName<'_>,
payload: &DeletePredicate,
) -> &Self::Item {
) -> Self::Item {
let mut guard = self.0.lock();
guard.record_call(MockSharderCall {
table_name: table.to_string(),
namespace: namespace.to_string(),
payload: MockSharderPayload::DeletePredicate(payload.clone()),
});
Box::leak(Box::new(
guard
.shard_return
.pop_front()
.expect("no shard mock value to return"),
))
vec![guard
.shard_return
.pop_front()
.expect("no shard mock value to return")]
}
}

View File

@ -19,5 +19,5 @@ pub trait Sharder<P>: Debug + Send + Sync {
type Item: Debug + Send + Sync;
/// Map the specified `payload` to a shard.
fn shard(&self, table: &str, namespace: &DatabaseName<'_>, payload: &P) -> &Self::Item;
fn shard(&self, table: &str, namespace: &DatabaseName<'_>, payload: &P) -> Self::Item;
}