feat(sharder): round-robin sharder

A "sharder" that simply round-robins requests over the set of T within
it.

This uses thread-local counters to eliminate write contention of the
"index" variable, so that throughput scales linearly with cores. (As
opposed to an atomic which would cause the variable to ping-pong between
core caches with a 6000% reduction in throughput, or slower yet, a
mutex-wrapped counter.)
pull/24376/head
Dom Dwyer 2022-11-14 15:23:21 +01:00
parent bf1681f4fe
commit a4e54f7a0c
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
3 changed files with 135 additions and 22 deletions

View File

@ -6,7 +6,7 @@ use criterion::{
use data_types::NamespaceName; use data_types::NamespaceName;
use mutable_batch::MutableBatch; use mutable_batch::MutableBatch;
use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand::{distributions::Alphanumeric, thread_rng, Rng};
use sharder::{JumpHash, Sharder}; use sharder::{JumpHash, RoundRobin, Sharder};
fn get_random_string(length: usize) -> String { fn get_random_string(length: usize) -> String {
thread_rng() thread_rng()
@ -16,83 +16,98 @@ fn get_random_string(length: usize) -> String {
.collect() .collect()
} }
fn sharder_benchmarks(c: &mut Criterion) { fn sharder_benchmarks(mut c: &mut Criterion) {
let mut group = c.benchmark_group("sharder"); benchmark_impl(&mut c, "jumphash", |num_buckets| {
JumpHash::new((0..num_buckets).map(Arc::new))
});
benchmark_impl(&mut c, "round_robin", |num_buckets| {
RoundRobin::new((0..num_buckets).map(Arc::new))
});
}
fn benchmark_impl<T, F>(c: &mut Criterion, name: &str, init: F)
where
T: Sharder<MutableBatch>,
F: Fn(usize) -> T,
{
let mut group = c.benchmark_group(name);
// benchmark sharder with fixed table name and namespace, with varying number of buckets // benchmark sharder with fixed table name and namespace, with varying number of buckets
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
1_000,
"basic 1k buckets", "basic 1k buckets",
"table", "table",
&NamespaceName::try_from("namespace").unwrap(), &NamespaceName::try_from("namespace").unwrap(),
init(1_000),
); );
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
10_000,
"basic 10k buckets", "basic 10k buckets",
"table", "table",
&NamespaceName::try_from("namespace").unwrap(), &NamespaceName::try_from("namespace").unwrap(),
init(10_000),
); );
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
100_000,
"basic 100k buckets", "basic 100k buckets",
"table", "table",
&NamespaceName::try_from("namespace").unwrap(), &NamespaceName::try_from("namespace").unwrap(),
init(100_000),
); );
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
1_000_000,
"basic 1M buckets", "basic 1M buckets",
"table", "table",
&NamespaceName::try_from("namespace").unwrap(), &NamespaceName::try_from("namespace").unwrap(),
init(1_000_000),
); );
// benchmark sharder with random table name and namespace of length 16 // benchmark sharder with random table name and namespace of length 16
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
10_000,
"random with key-length 16", "random with key-length 16",
get_random_string(16).as_str(), get_random_string(16).as_str(),
&NamespaceName::try_from(get_random_string(16)).unwrap(), &NamespaceName::try_from(get_random_string(16)).unwrap(),
init(10_000),
); );
// benchmark sharder with random table name and namespace of length 32 // benchmark sharder with random table name and namespace of length 32
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
10_000,
"random with key-length 32", "random with key-length 32",
get_random_string(32).as_str(), get_random_string(32).as_str(),
&NamespaceName::try_from(get_random_string(32)).unwrap(), &NamespaceName::try_from(get_random_string(32)).unwrap(),
init(10_000),
); );
// benchmark sharder with random table name and namespace of length 64 // benchmark sharder with random table name and namespace of length 64
benchmark_sharder( benchmark_scenario(
&mut group, &mut group,
10_000,
"random with key-length 64", "random with key-length 64",
get_random_string(64).as_str(), get_random_string(64).as_str(),
&NamespaceName::try_from(get_random_string(64)).unwrap(), &NamespaceName::try_from(get_random_string(64)).unwrap(),
init(10_000),
); );
group.finish(); group.finish();
} }
fn benchmark_sharder( fn benchmark_scenario<T>(
group: &mut BenchmarkGroup<WallTime>, group: &mut BenchmarkGroup<WallTime>,
num_buckets: usize,
bench_name: &str, bench_name: &str,
table: &str, table: &str,
namespace: &NamespaceName<'_>, namespace: &NamespaceName<'_>,
) { sharder: T,
let hasher = JumpHash::new((0..num_buckets).map(Arc::new)); ) where
T: Sharder<MutableBatch>,
{
let batch = MutableBatch::default(); let batch = MutableBatch::default();
group.throughput(Throughput::Elements(1)); group.throughput(Throughput::Elements(1));
group.bench_function(bench_name, |b| { group.bench_function(bench_name, |b| {
b.iter(|| { b.iter(|| {
hasher.shard(table, namespace, &batch); sharder.shard(table, namespace, &batch);
}); });
}); });
} }

View File

@ -20,6 +20,9 @@
mod r#trait; mod r#trait;
pub use r#trait::*; pub use r#trait::*;
mod round_robin;
pub use round_robin::*;
mod jumphash; mod jumphash;
pub use jumphash::*; pub use jumphash::*;

View File

@ -0,0 +1,95 @@
use std::{cell::RefCell, fmt::Debug, sync::Arc};
use crate::Sharder;
thread_local! {
/// A per-thread counter incremented once per call to
/// [`RoundRobin::next()`].
static COUNTER: RefCell<usize> = RefCell::new(0);
}
/// A round-robin sharder (with no data locality) that arbitrarily maps to `T`
/// with an approximately uniform distribution.
///
/// # Distribution
///
/// Requests are distributed uniformly across all shards **per thread**. Given
/// enough requests (where `N` is significantly larger than the number of
/// threads) an approximately uniform distribution is achieved.
#[derive(Debug)]
pub struct RoundRobin<T> {
shards: Vec<T>,
}
impl<T> RoundRobin<T> {
/// Construct a new [`RoundRobin`] sharder that maps requests to each of
/// `shards`.
pub fn new(shards: impl IntoIterator<Item = T>) -> Self {
Self {
shards: shards.into_iter().collect(),
}
}
/// Return the next `T` to be used.
pub fn next(&self) -> &T {
// Grab and increment the current counter.
let counter = COUNTER.with(|cell| {
let mut cell = cell.borrow_mut();
let new_value = cell.wrapping_add(1);
*cell = new_value;
new_value
});
// Reduce it to the range of [0, N) where N is the number of shards in
// this sharder.
let idx = counter % self.shards.len();
self.shards.get(idx).expect("mapped to out-of-bounds shard")
}
}
impl<T, U> Sharder<U> for RoundRobin<Arc<T>>
where
T: Send + Sync + Debug,
U: Send + Sync + Debug,
{
type Item = Arc<T>;
fn shard(
&self,
_table: &str,
_namespace: &data_types::NamespaceName<'_>,
_payload: &U,
) -> Self::Item {
Arc::clone(self.next())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
// Note this is a property test that asserts the round-robin nature of the
// returned results, not the values themselves.
#[test]
fn test_round_robin() {
// Initialise sharder with a set of 5 shards
let shards = ["s1", "s2", "s3", "s4", "s5"];
let sharder = RoundRobin::new(shards.iter().map(Arc::new));
// Request the first N mappings.
#[allow(clippy::needless_collect)] // Incorrect lint
let mappings = (0..shards.len())
.map(|_| sharder.next())
.collect::<Vec<_>>();
// Request another 100 shard mappings, and ensure the shards are
// yielded in round-robin fashion (matching the initial shard
// mappings)
for want in mappings.into_iter().cycle().take(100) {
assert_eq!(want, sharder.next());
}
}
}