feat: Extract sharder to its own crate

pull/24376/head
Carol (Nichols || Goulding) 2022-06-09 15:10:16 -04:00
parent 3bd24b67ba
commit 6417e7dc2a
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
17 changed files with 77 additions and 28 deletions

18
Cargo.lock generated
View File

@ -2562,6 +2562,7 @@ dependencies = [
"object_store",
"observability_deps",
"router",
"sharder",
"thiserror",
"tokio",
"tokio-util",
@ -4321,7 +4322,7 @@ dependencies = [
"service_grpc_catalog",
"service_grpc_object_store",
"service_grpc_schema",
"siphasher",
"sharder",
"test_helpers",
"thiserror",
"tokio",
@ -4924,6 +4925,21 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "sharder"
version = "0.1.0"
dependencies = [
"criterion",
"data_types",
"hashbrown 0.12.1",
"mutable_batch",
"mutable_batch_lp",
"parking_lot 0.12.1",
"rand",
"siphasher",
"workspace-hack",
]
[[package]]
name = "shlex"
version = "1.1.0"

View File

@ -60,6 +60,7 @@ members = [
"service_grpc_catalog",
"service_grpc_schema",
"service_grpc_testing",
"sharder",
"sqlx-hotswap-pool",
"test_helpers",
"test_helpers_end_to_end",

View File

@ -14,6 +14,7 @@ mutable_batch = { path = "../mutable_batch" }
object_store = "0.0.1"
observability_deps = { path = "../observability_deps" }
router = { path = "../router" }
sharder = { path = "../sharder" }
trace = { path = "../trace" }
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }

View File

@ -27,8 +27,8 @@ use router::{
},
sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
};
use sharder::JumpHash;
use std::{
collections::BTreeSet,
fmt::{Debug, Display},

View File

@ -29,7 +29,7 @@ serde = "1.0"
serde_urlencoded = "0.7"
service_grpc_schema = { path = "../service_grpc_schema" }
service_grpc_object_store = { path = "../service_grpc_object_store" }
siphasher = "0.3"
sharder = { path = "../sharder" }
thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tonic = "0.7"
@ -49,10 +49,6 @@ schema = { path = "../schema" }
test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] }
tokio-stream = { version = "0.1.9", default_features = false, features = [] }
[[bench]]
name = "sharder"
harness = false
[[bench]]
name = "schema_validator"
harness = false

View File

@ -10,8 +10,8 @@ use router::{
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::http::HttpDelegate,
sharder::JumpHash,
};
use sharder::JumpHash;
use std::{collections::BTreeSet, iter, sync::Arc};
use tokio::runtime::Runtime;
use write_buffer::{

View File

@ -1,7 +1,7 @@
//! Logic to shard writes/deletes and push them into a write buffer sequencer.
use super::Partitioned;
use crate::{dml_handlers::DmlHandler, sequencer::Sequencer, sharder::Sharder};
use crate::{dml_handlers::DmlHandler, sequencer::Sequencer};
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate, NonEmptyString};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
@ -9,6 +9,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
use sharder::Sharder;
use std::{
fmt::{Debug, Display},
sync::Arc,
@ -203,12 +204,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{
dml_handlers::DmlHandler,
sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload},
};
use crate::dml_handlers::DmlHandler;
use assert_matches::assert_matches;
use data_types::TimestampRange;
use sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload};
use std::sync::Arc;
use write_buffer::mock::{MockBufferForWriting, MockBufferSharedState};

View File

@ -9,7 +9,6 @@
//! * Deriving the partition key of each DML operation.
//! * Applying sharding logic.
//! * Push resulting operations into the appropriate kafka topics.
//!
#![deny(
rustdoc::broken_intra_doc_links,
@ -30,4 +29,3 @@ pub mod dml_handlers;
pub mod namespace_cache;
pub mod sequencer;
pub mod server;
pub mod sharder;

View File

@ -1,6 +1,6 @@
use super::NamespaceCache;
use crate::sharder::JumpHash;
use data_types::{DatabaseName, NamespaceSchema};
use sharder::JumpHash;
use std::sync::Arc;
/// A decorator sharding the [`NamespaceCache`] keyspace into a set of `T`.

View File

@ -1,10 +0,0 @@
//! Sharder logic to consistently map operations to a specific sequencer.
mod r#trait;
pub use r#trait::*;
mod jumphash;
pub use jumphash::*;
#[cfg(test)]
pub mod mock;

View File

@ -15,8 +15,8 @@ use router::{
namespace_cache::{MemoryNamespaceCache, ShardedCache},
sequencer::Sequencer,
server::http::HttpDelegate,
sharder::JumpHash,
};
use sharder::JumpHash;
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use write_buffer::{
core::WriteBufferWriting,

21
sharder/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "sharder"
version = "0.1.0"
edition = "2021"
[dependencies]
data_types = { path = "../data_types" }
hashbrown = "0.12"
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
parking_lot = "0.12"
siphasher = "0.3"
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
criterion = { version = "0.3.4", features = ["async_tokio", "html_reports"] }
rand = "0.8.3"
[[bench]]
name = "sharder"
harness = false

View File

@ -6,7 +6,7 @@ use criterion::{
use data_types::DatabaseName;
use mutable_batch::MutableBatch;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use router::sharder::{JumpHash, Sharder};
use sharder::{JumpHash, Sharder};
fn get_random_string(length: usize) -> String {
thread_rng()

27
sharder/src/lib.rs Normal file
View File

@ -0,0 +1,27 @@
//! IOx sharder implementation.
//!
//! Given a table and a namespace, assign a consistent shard from the set of shards.
#![deny(
rustdoc::broken_intra_doc_links,
rust_2018_idioms,
missing_debug_implementations,
unreachable_pub
)]
#![warn(
missing_docs,
clippy::todo,
clippy::dbg_macro,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
#![allow(clippy::missing_docs_in_private_items)]
mod r#trait;
pub use r#trait::*;
mod jumphash;
pub use jumphash::*;
#[allow(missing_docs)]
pub mod mock;