fix: Change the sharder to return error instead of panicking for no shards

pull/24376/head
Carol (Nichols || Goulding) 2022-06-15 11:09:57 -04:00
parent e9cdaffe74
commit 03f6f59a9b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
12 changed files with 110 additions and 77 deletions

2
Cargo.lock generated
View File

@ -4945,6 +4945,8 @@ dependencies = [
"parking_lot 0.12.1",
"rand",
"siphasher",
"snafu",
"test_helpers",
"workspace-hack",
]

View File

@ -63,7 +63,7 @@ mod tests {
async fn test_get_namespaces_empty() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
// QuerierDatabase::new returns an error if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(QuerierCatalogCache::new(
@ -98,7 +98,7 @@ mod tests {
async fn test_get_namespaces() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
// QuerierDatabase::new returns an error if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(QuerierCatalogCache::new(

View File

@ -49,6 +49,9 @@ pub enum Error {
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
#[error("failed to initialize sharded cache: {0}")]
Sharder(#[from] sharder::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -175,7 +178,7 @@ pub async fn create_router_server_type(
let ns_cache = Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
)?),
&*metrics,
));
@ -319,13 +322,12 @@ async fn init_write_buffer(
"connected to write buffer topic",
);
Ok(ShardedWriteBuffer::new(
Ok(ShardedWriteBuffer::new(JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
.map(Arc::new)
.collect::<JumpHash<_>>(),
))
.map(Arc::new),
)?))
}
/// Pre-populate `cache` with the all existing schemas in `catalog`.

View File

@ -30,6 +30,9 @@ pub enum Error {
Catalog {
source: iox_catalog::interface::Error,
},
#[snafu(display("Sharder error: {source}"))]
Sharder { source: sharder::Error },
}
/// Database for the querier.
@ -202,16 +205,15 @@ pub async fn create_sharder(catalog: &dyn Catalog) -> Result<JumpHash<Arc<KafkaP
.map(|sequencer| sequencer.kafka_partition)
.collect();
Ok(shards.into_iter().map(Arc::new).collect())
JumpHash::new(shards.into_iter().map(Arc::new)).context(SharderSnafu)
}
#[cfg(test)]
mod tests {
use iox_tests::util::TestCatalog;
use crate::create_ingester_connection_for_testing;
use super::*;
use crate::create_ingester_connection_for_testing;
use iox_tests::util::TestCatalog;
use test_helpers::assert_error;
#[tokio::test]
#[should_panic(
@ -239,7 +241,6 @@ mod tests {
}
#[tokio::test]
#[should_panic(expected = "cannot initialise sharder with no shards")]
async fn sequencers_in_catalog_are_required_for_startup() {
let catalog = TestCatalog::new();
@ -250,22 +251,26 @@ mod tests {
usize::MAX,
));
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await
.unwrap();
assert_error!(
QuerierDatabase::new(
catalog_cache,
catalog.metric_registry(),
ParquetStorage::new(catalog.object_store()),
catalog.exec(),
create_ingester_connection_for_testing(),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
)
.await,
Error::Sharder {
source: sharder::Error::NoShards
},
);
}
#[tokio::test]
async fn test_namespace() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
// QuerierDatabase::new returns an error if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(CatalogCache::new(
@ -294,7 +299,7 @@ mod tests {
#[tokio::test]
async fn test_namespaces() {
let catalog = TestCatalog::new();
// QuerierDatabase::new panics if there are no sequencers in the catalog
// QuerierDatabase::new returns an error if there are no sequencers in the catalog
catalog.create_sequencer(0).await;
let catalog_cache = Arc::new(CatalogCache::new(

View File

@ -172,7 +172,7 @@ mod tests {
Arc::clone(&metric_registry),
usize::MAX,
));
// QuerierDatabase::new panics if there are no sequencers in the catalog
// QuerierDatabase::new returns an error if there are no sequencers in the catalog
{
let mut repos = catalog.repositories().await;

View File

@ -35,11 +35,13 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer<JumpHash<Arc<Seque
let shards: BTreeSet<_> = write_buffer.sequencer_ids();
ShardedWriteBuffer::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default()))
.map(Arc::new)
.collect::<JumpHash<_>>(),
JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default()))
.map(Arc::new),
)
.expect("failed to init sharder"),
)
}
@ -57,9 +59,12 @@ fn e2e_benchmarks(c: &mut Criterion) {
let delegate = {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let ns_cache = Arc::new(
ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)
.unwrap(),
);
let write_buffer = init_write_buffer(1);
let schema_validator =

View File

@ -42,9 +42,10 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let ns_cache = Arc::new(
ShardedCache::new(iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10))
.unwrap(),
);
let validator = SchemaValidator::new(catalog, ns_cache, &*metrics);
for i in 0..65_000 {

View File

@ -12,10 +12,10 @@ pub struct ShardedCache<T> {
impl<T> ShardedCache<T> {
/// initialise a [`ShardedCache`] splitting the keyspace over the given
/// instances of `T`.
pub fn new(shards: impl IntoIterator<Item = T>) -> Self {
Self {
shards: JumpHash::new(shards),
}
pub fn new(shards: impl IntoIterator<Item = T>) -> Result<Self, sharder::Error> {
Ok(Self {
shards: JumpHash::new(shards)?,
})
}
}
@ -71,9 +71,12 @@ mod tests {
// The number of shards to hash into.
const SHARDS: usize = 10;
let cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS),
));
let cache = Arc::new(
ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS),
)
.unwrap(),
);
// Build a set of namespace -> unique integer to validate the shard
// mapping later.

View File

@ -83,17 +83,22 @@ impl TestContext {
let shards: BTreeSet<_> = write_buffer.sequencer_ids();
let sharded_write_buffer = ShardedWriteBuffer::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
.map(Arc::new)
.collect::<JumpHash<_>>(),
JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
.map(Arc::new),
)
.unwrap(),
);
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let ns_cache = Arc::new(
ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)
.unwrap(),
);
let ns_creator = NamespaceAutocreation::new(
Arc::clone(&catalog),

View File

@ -8,6 +8,7 @@ data_types = { path = "../data_types" }
mutable_batch = { path = "../mutable_batch" }
parking_lot = "0.12"
siphasher = "0.3"
snafu = "0.7"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
@ -15,6 +16,7 @@ criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
hashbrown = "0.12"
mutable_batch_lp = { path = "../mutable_batch_lp" }
rand = "0.8.3"
test_helpers = { path = "../test_helpers" }
[[bench]]
name = "sharder"

View File

@ -86,7 +86,7 @@ fn benchmark_sharder(
table: &str,
namespace: &DatabaseName<'_>,
) {
let hasher = JumpHash::new((0..num_buckets).map(Arc::new));
let hasher = JumpHash::new((0..num_buckets).map(Arc::new)).unwrap();
let batch = MutableBatch::default();
group.throughput(Throughput::Elements(1));

View File

@ -2,12 +2,20 @@ use super::Sharder;
use data_types::{DatabaseName, DeletePredicate};
use mutable_batch::MutableBatch;
use siphasher::sip::SipHasher13;
use snafu::{ensure, Snafu};
use std::{
fmt::Debug,
hash::{Hash, Hasher},
sync::Arc,
};
#[derive(Snafu, Debug)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Cannot initialize sharder with no shards"))]
NoShards,
}
/// A [`JumpHash`] maps operations for a given table in a given namespace
/// consistently to the same shard, irrespective of the operation itself with
/// near perfect distribution.
@ -39,7 +47,7 @@ impl<T> JumpHash<T> {
/// # Panics
///
/// This constructor panics if the number of elements in `shards` is 0.
pub fn new(shards: impl IntoIterator<Item = T>) -> Self {
pub fn new(shards: impl IntoIterator<Item = T>) -> Result<Self, Error> {
// A randomly generated static siphash key to ensure all router
// instances hash the same input to the same u64 sharding key.
//
@ -50,15 +58,12 @@ impl<T> JumpHash<T> {
];
let shards = shards.into_iter().collect::<Vec<_>>();
assert!(
!shards.is_empty(),
"cannot initialise sharder with no shards"
);
ensure!(!shards.is_empty(), NoShardsSnafu,);
Self {
Ok(Self {
hasher: SipHasher13::new_with_key(&key),
shards,
}
})
}
/// Return a slice of all the shards this instance is configured with,
@ -102,12 +107,6 @@ impl<T> JumpHash<T> {
}
}
impl<T> FromIterator<T> for JumpHash<T> {
fn from_iter<U: IntoIterator<Item = T>>(iter: U) -> Self {
Self::new(iter)
}
}
#[derive(Hash)]
struct HashKey<'a> {
table: &'a str,
@ -169,17 +168,18 @@ where
#[cfg(test)]
mod tests {
use super::*;
use data_types::TimestampRange;
use hashbrown::HashMap;
use super::*;
use std::iter;
use test_helpers::assert_error;
#[test]
fn test_consistent_hashing() {
const NUM_TESTS: usize = 10_000;
const NUM_SHARDS: usize = 10;
let hasher = JumpHash::new(0..NUM_SHARDS);
let hasher = JumpHash::new(0..NUM_SHARDS).unwrap();
// Create a HashMap<key, shard> to verify against.
let mappings = (0..NUM_TESTS)
@ -198,7 +198,7 @@ mod tests {
.all(|(&key, &value)| hasher.hash(key) == value));
// Reinitialise the hasher with the same (default) key
let hasher = JumpHash::new(0..NUM_SHARDS);
let hasher = JumpHash::new(0..NUM_SHARDS).unwrap();
// And assert the mappings are the same
assert!(mappings
@ -206,7 +206,9 @@ mod tests {
.all(|(&key, &value)| hasher.hash(key) == value));
// Reinitialise the hasher with the a different key
let hasher = JumpHash::new(0..NUM_SHARDS).with_seed_key(&[42; 16]);
let hasher = JumpHash::new(0..NUM_SHARDS)
.unwrap()
.with_seed_key(&[42; 16]);
// And assert the mappings are the NOT all same (some may be the same)
assert!(!mappings
@ -216,7 +218,7 @@ mod tests {
#[test]
fn test_sharder_impl() {
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap();
let a = hasher.shard(
"table",
@ -261,7 +263,7 @@ mod tests {
#[test]
fn test_sharder_prefix_collision() {
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap();
let a = hasher.shard(
"a",
&DatabaseName::try_from("bc").unwrap(),
@ -289,7 +291,7 @@ mod tests {
// strategy would that accounts for this mapping change.
#[test]
fn test_key_bucket_fixture() {
let hasher = JumpHash::new((0..1_000).map(Arc::new));
let hasher = JumpHash::new((0..1_000).map(Arc::new)).unwrap();
let namespace = DatabaseName::try_from("bananas").unwrap();
let mut batches = mutable_batch_lp::lines_to_batches("cpu a=1i", 42).unwrap();
@ -308,7 +310,7 @@ mod tests {
#[test]
fn test_distribution() {
let hasher = JumpHash::new((0..100).map(Arc::new));
let hasher = JumpHash::new((0..100).map(Arc::new)).unwrap();
let namespace = DatabaseName::try_from("bananas").unwrap();
let mut mapping = HashMap::<_, usize>::new();
@ -336,7 +338,7 @@ mod tests {
fn test_delete_with_table() {
let namespace = DatabaseName::try_from("bananas").unwrap();
let hasher = JumpHash::new((0..10_000).map(Arc::new));
let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap();
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
@ -362,7 +364,7 @@ mod tests {
let namespace = DatabaseName::try_from("bananas").unwrap();
let shards = (0..10_000).map(Arc::new).collect::<Vec<_>>();
let hasher = JumpHash::new(shards.clone());
let hasher = JumpHash::new(shards.clone()).unwrap();
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
@ -373,4 +375,10 @@ mod tests {
assert_eq!(got, shards);
}
#[test]
fn no_shards() {
let shards: iter::Empty<i32> = iter::empty();
assert_error!(JumpHash::new(shards), Error::NoShards,);
}
}