From 6417e7dc2a9ee17e5bca13b1faf97f801091adce Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 9 Jun 2022 15:10:16 -0400 Subject: [PATCH] feat: Extract sharder to its own crate --- Cargo.lock | 18 ++++++++++++- Cargo.toml | 1 + ioxd_router/Cargo.toml | 1 + ioxd_router/src/lib.rs | 2 +- router/Cargo.toml | 6 +---- router/benches/e2e.rs | 2 +- .../src/dml_handlers/sharded_write_buffer.rs | 9 +++---- router/src/lib.rs | 2 -- router/src/namespace_cache/sharded_cache.rs | 2 +- router/src/sharder/mod.rs | 10 ------- router/tests/http.rs | 2 +- sharder/Cargo.toml | 21 +++++++++++++++ {router => sharder}/benches/sharder.rs | 2 +- .../src/sharder => sharder/src}/jumphash.rs | 0 sharder/src/lib.rs | 27 +++++++++++++++++++ {router/src/sharder => sharder/src}/mock.rs | 0 {router/src/sharder => sharder/src}/trait.rs | 0 17 files changed, 77 insertions(+), 28 deletions(-) delete mode 100644 router/src/sharder/mod.rs create mode 100644 sharder/Cargo.toml rename {router => sharder}/benches/sharder.rs (98%) rename {router/src/sharder => sharder/src}/jumphash.rs (100%) create mode 100644 sharder/src/lib.rs rename {router/src/sharder => sharder/src}/mock.rs (100%) rename {router/src/sharder => sharder/src}/trait.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 78e0072cfe..67fab05e59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2562,6 +2562,7 @@ dependencies = [ "object_store", "observability_deps", "router", + "sharder", "thiserror", "tokio", "tokio-util", @@ -4321,7 +4322,7 @@ dependencies = [ "service_grpc_catalog", "service_grpc_object_store", "service_grpc_schema", - "siphasher", + "sharder", "test_helpers", "thiserror", "tokio", @@ -4924,6 +4925,21 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "sharder" +version = "0.1.0" +dependencies = [ + "criterion", + "data_types", + "hashbrown 0.12.1", + "mutable_batch", + "mutable_batch_lp", + "parking_lot 0.12.1", + "rand", + "siphasher", + "workspace-hack", +] + [[package]] name = "shlex" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index be786ec3ea..b9a42a186b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ members = [ "service_grpc_catalog", "service_grpc_schema", "service_grpc_testing", + "sharder", "sqlx-hotswap-pool", "test_helpers", "test_helpers_end_to_end", diff --git a/ioxd_router/Cargo.toml b/ioxd_router/Cargo.toml index d735909f1a..7225e44f34 100644 --- a/ioxd_router/Cargo.toml +++ b/ioxd_router/Cargo.toml @@ -14,6 +14,7 @@ mutable_batch = { path = "../mutable_batch" } object_store = "0.0.1" observability_deps = { path = "../observability_deps" } router = { path = "../router" } +sharder = { path = "../sharder" } trace = { path = "../trace" } write_buffer = { path = "../write_buffer" } write_summary = { path = "../write_summary" } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 7dee342606..fca2a8a486 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -27,8 +27,8 @@ use router::{ }, sequencer::Sequencer, server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer}, - sharder::JumpHash, }; +use sharder::JumpHash; use std::{ collections::BTreeSet, fmt::{Debug, Display}, diff --git a/router/Cargo.toml b/router/Cargo.toml index 764cf569bf..de70069e07 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -29,7 +29,7 @@ serde = "1.0" serde_urlencoded = "0.7" service_grpc_schema = { path = "../service_grpc_schema" } service_grpc_object_store = { path = "../service_grpc_object_store" } -siphasher = "0.3" +sharder = { path = "../sharder" } thiserror = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tonic = "0.7" @@ -49,10 +49,6 @@ schema = { path = "../schema" } test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] } tokio-stream = { version = "0.1.9", default_features = false, features = [] } -[[bench]] -name = "sharder" -harness = false - [[bench]] name = "schema_validator" harness = false diff --git a/router/benches/e2e.rs b/router/benches/e2e.rs index 903fbd4913..18bcce8bd3 100644 --- a/router/benches/e2e.rs +++ b/router/benches/e2e.rs @@ -10,8 +10,8 @@ use router::{ namespace_cache::{MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, server::http::HttpDelegate, - sharder::JumpHash, }; +use sharder::JumpHash; use std::{collections::BTreeSet, iter, sync::Arc}; use tokio::runtime::Runtime; use write_buffer::{ diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index 62c05fc2c9..73de4ee132 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -1,7 +1,7 @@ //! Logic to shard writes/deletes and push them into a write buffer sequencer. use super::Partitioned; -use crate::{dml_handlers::DmlHandler, sequencer::Sequencer, sharder::Sharder}; +use crate::{dml_handlers::DmlHandler, sequencer::Sequencer}; use async_trait::async_trait; use data_types::{DatabaseName, DeletePredicate, NonEmptyString}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; @@ -9,6 +9,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use hashbrown::HashMap; use mutable_batch::MutableBatch; use observability_deps::tracing::*; +use sharder::Sharder; use std::{ fmt::{Debug, Display}, sync::Arc, @@ -203,12 +204,10 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{ - dml_handlers::DmlHandler, - sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload}, - }; + use crate::dml_handlers::DmlHandler; use assert_matches::assert_matches; use data_types::TimestampRange; + use sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload}; use std::sync::Arc; use write_buffer::mock::{MockBufferForWriting, MockBufferSharedState}; diff --git a/router/src/lib.rs b/router/src/lib.rs index 14c884743d..cc880b760f 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -9,7 +9,6 @@ //! * Deriving the partition key of each DML operation. //! * Applying sharding logic. //! * Push resulting operations into the appropriate kafka topics. -//! #![deny( rustdoc::broken_intra_doc_links, @@ -30,4 +29,3 @@ pub mod dml_handlers; pub mod namespace_cache; pub mod sequencer; pub mod server; -pub mod sharder; diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index b28043f84e..48dbb453e7 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -1,6 +1,6 @@ use super::NamespaceCache; -use crate::sharder::JumpHash; use data_types::{DatabaseName, NamespaceSchema}; +use sharder::JumpHash; use std::sync::Arc; /// A decorator sharding the [`NamespaceCache`] keyspace into a set of `T`. diff --git a/router/src/sharder/mod.rs b/router/src/sharder/mod.rs deleted file mode 100644 index fa30436fad..0000000000 --- a/router/src/sharder/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Sharder logic to consistently map operations to a specific sequencer. - -mod r#trait; -pub use r#trait::*; - -mod jumphash; -pub use jumphash::*; - -#[cfg(test)] -pub mod mock; diff --git a/router/tests/http.rs b/router/tests/http.rs index 116c21c850..7dbcfef149 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -15,8 +15,8 @@ use router::{ namespace_cache::{MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, server::http::HttpDelegate, - sharder::JumpHash, }; +use sharder::JumpHash; use std::{collections::BTreeSet, iter, string::String, sync::Arc}; use write_buffer::{ core::WriteBufferWriting, diff --git a/sharder/Cargo.toml b/sharder/Cargo.toml new file mode 100644 index 0000000000..7f9e5f2990 --- /dev/null +++ b/sharder/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "sharder" +version = "0.1.0" +edition = "2021" + +[dependencies] +data_types = { path = "../data_types" } +hashbrown = "0.12" +mutable_batch = { path = "../mutable_batch" } +mutable_batch_lp = { path = "../mutable_batch_lp" } +parking_lot = "0.12" +siphasher = "0.3" +workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] } +rand = "0.8.3" + +[[bench]] +name = "sharder" +harness = false diff --git a/router/benches/sharder.rs b/sharder/benches/sharder.rs similarity index 98% rename from router/benches/sharder.rs rename to sharder/benches/sharder.rs index 7e5f353e2e..92919519bb 100644 --- a/router/benches/sharder.rs +++ b/sharder/benches/sharder.rs @@ -6,7 +6,7 @@ use criterion::{ use data_types::DatabaseName; use mutable_batch::MutableBatch; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use router::sharder::{JumpHash, Sharder}; +use sharder::{JumpHash, Sharder}; fn get_random_string(length: usize) -> String { thread_rng() diff --git a/router/src/sharder/jumphash.rs b/sharder/src/jumphash.rs similarity index 100% rename from router/src/sharder/jumphash.rs rename to sharder/src/jumphash.rs diff --git a/sharder/src/lib.rs b/sharder/src/lib.rs new file mode 100644 index 0000000000..3631637619 --- /dev/null +++ b/sharder/src/lib.rs @@ -0,0 +1,27 @@ +//! IOx sharder implementation. +//! +//! Given a table and a namespace, assign a consistent shard from the set of shards. + +#![deny( + rustdoc::broken_intra_doc_links, + rust_2018_idioms, + missing_debug_implementations, + unreachable_pub +)] +#![warn( + missing_docs, + clippy::todo, + clippy::dbg_macro, + clippy::clone_on_ref_ptr, + clippy::future_not_send +)] +#![allow(clippy::missing_docs_in_private_items)] + +mod r#trait; +pub use r#trait::*; + +mod jumphash; +pub use jumphash::*; + +#[allow(missing_docs)] +pub mod mock; diff --git a/router/src/sharder/mock.rs b/sharder/src/mock.rs similarity index 100% rename from router/src/sharder/mock.rs rename to sharder/src/mock.rs diff --git a/router/src/sharder/trait.rs b/sharder/src/trait.rs similarity index 100% rename from router/src/sharder/trait.rs rename to sharder/src/trait.rs