diff --git a/Cargo.lock b/Cargo.lock index 27b12ccb60..5d82ae3ae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4945,6 +4945,8 @@ dependencies = [ "parking_lot 0.12.1", "rand", "siphasher", + "snafu", + "test_helpers", "workspace-hack", ] diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index b2172e63bd..765a09b095 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -63,7 +63,7 @@ mod tests { async fn test_get_namespaces_empty() { let catalog = TestCatalog::new(); - // QuerierDatabase::new panics if there are no sequencers in the catalog + // QuerierDatabase::new returns an error if there are no sequencers in the catalog catalog.create_sequencer(0).await; let catalog_cache = Arc::new(QuerierCatalogCache::new( @@ -98,7 +98,7 @@ mod tests { async fn test_get_namespaces() { let catalog = TestCatalog::new(); - // QuerierDatabase::new panics if there are no sequencers in the catalog + // QuerierDatabase::new returns an error if there are no sequencers in the catalog catalog.create_sequencer(0).await; let catalog_cache = Arc::new(QuerierCatalogCache::new( diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index fca2a8a486..2b6fb091a4 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -49,6 +49,9 @@ pub enum Error { #[error("Catalog DSN error: {0}")] CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), + + #[error("failed to initialize sharded cache: {0}")] + Sharder(#[from] sharder::Error), } pub type Result = std::result::Result; @@ -175,7 +178,7 @@ pub async fn create_router_server_type( let ns_cache = Arc::new(InstrumentedCache::new( Arc::new(ShardedCache::new( std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )), + )?), &*metrics, )); @@ -319,13 +322,12 @@ async fn init_write_buffer( "connected to write buffer topic", ); - Ok(ShardedWriteBuffer::new( + Ok(ShardedWriteBuffer::new(JumpHash::new( shards .into_iter() .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics)) - .map(Arc::new) - .collect::>(), - )) + .map(Arc::new), + )?)) } /// Pre-populate `cache` with the all existing schemas in `catalog`. diff --git a/querier/src/database.rs b/querier/src/database.rs index cacfcf2dc0..f8dbda424c 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -30,6 +30,9 @@ pub enum Error { Catalog { source: iox_catalog::interface::Error, }, + + #[snafu(display("Sharder error: {source}"))] + Sharder { source: sharder::Error }, } /// Database for the querier. @@ -202,16 +205,15 @@ pub async fn create_sharder(catalog: &dyn Catalog) -> Result ShardedWriteBuffer = write_buffer.sequencer_ids(); ShardedWriteBuffer::new( - shards - .into_iter() - .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default())) - .map(Arc::new) - .collect::>(), + JumpHash::new( + shards + .into_iter() + .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default())) + .map(Arc::new), + ) + .expect("failed to init sharder"), ) } @@ -57,9 +59,12 @@ fn e2e_benchmarks(c: &mut Criterion) { let delegate = { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )); + let ns_cache = Arc::new( + ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + ) + .unwrap(), + ); let write_buffer = init_write_buffer(1); let schema_validator = diff --git a/router/benches/schema_validator.rs b/router/benches/schema_validator.rs index 20bd9e9cc6..0bb91f9c14 100644 --- a/router/benches/schema_validator.rs +++ b/router/benches/schema_validator.rs @@ -42,9 +42,10 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )); + let ns_cache = Arc::new( + ShardedCache::new(iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10)) + .unwrap(), + ); let validator = SchemaValidator::new(catalog, ns_cache, &*metrics); for i in 0..65_000 { diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index 48dbb453e7..119730988b 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -12,10 +12,10 @@ pub struct ShardedCache { impl ShardedCache { /// initialise a [`ShardedCache`] splitting the keyspace over the given /// instances of `T`. - pub fn new(shards: impl IntoIterator) -> Self { - Self { - shards: JumpHash::new(shards), - } + pub fn new(shards: impl IntoIterator) -> Result { + Ok(Self { + shards: JumpHash::new(shards)?, + }) } } @@ -71,9 +71,12 @@ mod tests { // The number of shards to hash into. const SHARDS: usize = 10; - let cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS), - )); + let cache = Arc::new( + ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS), + ) + .unwrap(), + ); // Build a set of namespace -> unique integer to validate the shard // mapping later. diff --git a/router/tests/http.rs b/router/tests/http.rs index 7dbcfef149..2692c0b9a4 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -83,17 +83,22 @@ impl TestContext { let shards: BTreeSet<_> = write_buffer.sequencer_ids(); let sharded_write_buffer = ShardedWriteBuffer::new( - shards - .into_iter() - .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics)) - .map(Arc::new) - .collect::>(), + JumpHash::new( + shards + .into_iter() + .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics)) + .map(Arc::new), + ) + .unwrap(), ); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )); + let ns_cache = Arc::new( + ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + ) + .unwrap(), + ); let ns_creator = NamespaceAutocreation::new( Arc::clone(&catalog), diff --git a/sharder/Cargo.toml b/sharder/Cargo.toml index 1b1b62ee3b..478992352f 100644 --- a/sharder/Cargo.toml +++ b/sharder/Cargo.toml @@ -8,6 +8,7 @@ data_types = { path = "../data_types" } mutable_batch = { path = "../mutable_batch" } parking_lot = "0.12" siphasher = "0.3" +snafu = "0.7" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] @@ -15,6 +16,7 @@ criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } hashbrown = "0.12" mutable_batch_lp = { path = "../mutable_batch_lp" } rand = "0.8.3" +test_helpers = { path = "../test_helpers" } [[bench]] name = "sharder" diff --git a/sharder/benches/sharder.rs b/sharder/benches/sharder.rs index 92919519bb..b945a02086 100644 --- a/sharder/benches/sharder.rs +++ b/sharder/benches/sharder.rs @@ -86,7 +86,7 @@ fn benchmark_sharder( table: &str, namespace: &DatabaseName<'_>, ) { - let hasher = JumpHash::new((0..num_buckets).map(Arc::new)); + let hasher = JumpHash::new((0..num_buckets).map(Arc::new)).unwrap(); let batch = MutableBatch::default(); group.throughput(Throughput::Elements(1)); diff --git a/sharder/src/jumphash.rs b/sharder/src/jumphash.rs index c0708ccdcc..6d6b20e113 100644 --- a/sharder/src/jumphash.rs +++ b/sharder/src/jumphash.rs @@ -2,12 +2,20 @@ use super::Sharder; use data_types::{DatabaseName, DeletePredicate}; use mutable_batch::MutableBatch; use siphasher::sip::SipHasher13; +use snafu::{ensure, Snafu}; use std::{ fmt::Debug, hash::{Hash, Hasher}, sync::Arc, }; +#[derive(Snafu, Debug)] +#[allow(missing_docs)] +pub enum Error { + #[snafu(display("Cannot initialize sharder with no shards"))] + NoShards, +} + /// A [`JumpHash`] maps operations for a given table in a given namespace /// consistently to the same shard, irrespective of the operation itself with /// near perfect distribution. @@ -39,7 +47,7 @@ impl JumpHash { /// # Panics /// /// This constructor panics if the number of elements in `shards` is 0. - pub fn new(shards: impl IntoIterator) -> Self { + pub fn new(shards: impl IntoIterator) -> Result { // A randomly generated static siphash key to ensure all router // instances hash the same input to the same u64 sharding key. // @@ -50,15 +58,12 @@ impl JumpHash { ]; let shards = shards.into_iter().collect::>(); - assert!( - !shards.is_empty(), - "cannot initialise sharder with no shards" - ); + ensure!(!shards.is_empty(), NoShardsSnafu,); - Self { + Ok(Self { hasher: SipHasher13::new_with_key(&key), shards, - } + }) } /// Return a slice of all the shards this instance is configured with, @@ -102,12 +107,6 @@ impl JumpHash { } } -impl FromIterator for JumpHash { - fn from_iter>(iter: U) -> Self { - Self::new(iter) - } -} - #[derive(Hash)] struct HashKey<'a> { table: &'a str, @@ -169,17 +168,18 @@ where #[cfg(test)] mod tests { + use super::*; use data_types::TimestampRange; use hashbrown::HashMap; - - use super::*; + use std::iter; + use test_helpers::assert_error; #[test] fn test_consistent_hashing() { const NUM_TESTS: usize = 10_000; const NUM_SHARDS: usize = 10; - let hasher = JumpHash::new(0..NUM_SHARDS); + let hasher = JumpHash::new(0..NUM_SHARDS).unwrap(); // Create a HashMap to verify against. let mappings = (0..NUM_TESTS) @@ -198,7 +198,7 @@ mod tests { .all(|(&key, &value)| hasher.hash(key) == value)); // Reinitialise the hasher with the same (default) key - let hasher = JumpHash::new(0..NUM_SHARDS); + let hasher = JumpHash::new(0..NUM_SHARDS).unwrap(); // And assert the mappings are the same assert!(mappings @@ -206,7 +206,9 @@ mod tests { .all(|(&key, &value)| hasher.hash(key) == value)); // Reinitialise the hasher with the a different key - let hasher = JumpHash::new(0..NUM_SHARDS).with_seed_key(&[42; 16]); + let hasher = JumpHash::new(0..NUM_SHARDS) + .unwrap() + .with_seed_key(&[42; 16]); // And assert the mappings are the NOT all same (some may be the same) assert!(!mappings @@ -216,7 +218,7 @@ mod tests { #[test] fn test_sharder_impl() { - let hasher = JumpHash::new((0..10_000).map(Arc::new)); + let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap(); let a = hasher.shard( "table", @@ -261,7 +263,7 @@ mod tests { #[test] fn test_sharder_prefix_collision() { - let hasher = JumpHash::new((0..10_000).map(Arc::new)); + let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap(); let a = hasher.shard( "a", &DatabaseName::try_from("bc").unwrap(), @@ -289,7 +291,7 @@ mod tests { // strategy would that accounts for this mapping change. #[test] fn test_key_bucket_fixture() { - let hasher = JumpHash::new((0..1_000).map(Arc::new)); + let hasher = JumpHash::new((0..1_000).map(Arc::new)).unwrap(); let namespace = DatabaseName::try_from("bananas").unwrap(); let mut batches = mutable_batch_lp::lines_to_batches("cpu a=1i", 42).unwrap(); @@ -308,7 +310,7 @@ mod tests { #[test] fn test_distribution() { - let hasher = JumpHash::new((0..100).map(Arc::new)); + let hasher = JumpHash::new((0..100).map(Arc::new)).unwrap(); let namespace = DatabaseName::try_from("bananas").unwrap(); let mut mapping = HashMap::<_, usize>::new(); @@ -336,7 +338,7 @@ mod tests { fn test_delete_with_table() { let namespace = DatabaseName::try_from("bananas").unwrap(); - let hasher = JumpHash::new((0..10_000).map(Arc::new)); + let hasher = JumpHash::new((0..10_000).map(Arc::new)).unwrap(); let predicate = DeletePredicate { range: TimestampRange::new(1, 2), @@ -362,7 +364,7 @@ mod tests { let namespace = DatabaseName::try_from("bananas").unwrap(); let shards = (0..10_000).map(Arc::new).collect::>(); - let hasher = JumpHash::new(shards.clone()); + let hasher = JumpHash::new(shards.clone()).unwrap(); let predicate = DeletePredicate { range: TimestampRange::new(1, 2), @@ -373,4 +375,10 @@ mod tests { assert_eq!(got, shards); } + + #[test] + fn no_shards() { + let shards: iter::Empty = iter::empty(); + assert_error!(JumpHash::new(shards), Error::NoShards,); + } }