diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 74d600006b..7da948f108 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -15,7 +15,6 @@ pub mod consistent_hasher; pub mod error; pub mod job; pub mod partition_metadata; -pub mod router; pub mod server_id; pub mod timestamp; pub mod write_buffer; diff --git a/data_types/src/router.rs b/data_types/src/router.rs deleted file mode 100644 index 5b1ff68a27..0000000000 --- a/data_types/src/router.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::collections::BTreeMap; - -use regex::Regex; - -use crate::{ - consistent_hasher::ConsistentHasher, server_id::ServerId, write_buffer::WriteBufferConnection, -}; - -#[derive(Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Copy)] -pub struct ShardId(u32); - -impl ShardId { - pub fn new(id: u32) -> Self { - Self(id) - } - - pub fn get(&self) -> u32 { - self.0 - } -} - -impl std::fmt::Display for ShardId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ShardId({})", self.get()) - } -} - -/// ShardConfig defines rules for assigning a line/row to an individual -/// host or a group of hosts. A shard -/// is a logical concept, but the usage is meant to split data into -/// mutually exclusive areas. The rough order of organization is: -/// database -> shard -> partition -> chunk. For example, you could shard -/// based on table name and assign to 1 of 10 shards. Within each -/// shard you would have partitions, which would likely be based off time. -/// This makes it possible to horizontally scale out writes. -#[derive(Debug, Eq, PartialEq, Clone, Default)] -pub struct ShardConfig { - /// Each matcher, if any, is evaluated in order. - /// If there is a match, the route will be evaluated to - /// the given targets, otherwise the hash ring will be evaluated. - /// This is useful for overriding the hashring function on some hot spot. For - /// example, if you use the table name as the input to the hash function - /// and your ring has 4 slots. If two tables that are very hot get - /// assigned to the same slot you can override that by putting in a - /// specific matcher to pull that table over to a different node. - pub specific_targets: Vec, - - /// An optional default hasher which will route to one in a collection of - /// nodes. - pub hash_ring: Option, -} - -/// Maps a matcher with specific shard. If the line/row matches -/// it should be sent to the group. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct MatcherToShard { - pub matcher: Matcher, - pub shard: ShardId, -} - -/// HashRing is a rule for creating a hash key for a row and mapping that to -/// an individual node on a ring. -#[derive(Debug, Eq, PartialEq, Clone, Default)] -pub struct HashRing { - /// ring of shard ids - pub shards: ConsistentHasher, -} - -/// A matcher is used to match routing rules or subscriptions on a row-by-row -/// (or line) basis. -#[derive(Debug, Clone, Default)] -pub struct Matcher { - /// if provided, match if the table name matches against the regex - pub table_name_regex: Option, -} - -impl PartialEq for Matcher { - fn eq(&self, other: &Self) -> bool { - // this is kind of janky, but it's only used during tests and should get the job - // done - format!("{:?}", self.table_name_regex) == format!("{:?}", other.table_name_regex) - } -} -impl Eq for Matcher {} - -/// Sinks for query requests. -/// -/// Queries are sent to one of these sinks and the resulting data is received from it. -/// -/// Note that the query results are flowing into the opposite direction (aka a query sink is a result source). -#[derive(Debug, Eq, PartialEq, Clone, Default)] -pub struct QuerySinks { - pub grpc_remotes: Vec, -} - -#[derive(Debug, Eq, PartialEq, Clone)] -pub enum WriteSinkVariant { - /// gRPC-based remote, addressed by its server ID. - GrpcRemote(ServerId), - - /// Write buffer connection. - WriteBuffer(WriteBufferConnection), -} - -/// Sink of write requests aka new data. -/// -/// Data is sent to this sink and a status is received from it. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct WriteSink { - pub sink: WriteSinkVariant, - - /// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure. - pub ignore_errors: bool, -} - -/// Set of write sinks. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct WriteSinkSet { - /// Sinks within the set. - pub sinks: Vec, -} - -/// Router for writes and queries. -#[derive(Debug, Eq, PartialEq, Clone)] -pub struct Router { - /// Router name. - /// - /// The name corresponds to the database name on the database node. - /// - /// The router name is unique for this router node. - pub name: String, - - /// Write sharder. - pub write_sharder: ShardConfig, - - /// Sinks for write requests. - pub write_sinks: BTreeMap, - - /// Sinks for query requests. - pub query_sinks: QuerySinks, -} diff --git a/dml/src/lib.rs b/dml/src/lib.rs index c7750fdfcf..dcd4239a9b 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -11,12 +11,10 @@ clippy::clone_on_ref_ptr )] -use data_types::router::{ShardConfig, ShardId}; use data_types2::{DeletePredicate, NonEmptyString, Sequence, StatValues, Statistics}; use hashbrown::HashMap; use iox_time::Time; use mutable_batch::MutableBatch; -use std::collections::{BTreeMap, HashSet}; use trace::ctx::SpanContext; /// Metadata information about a DML operation @@ -126,22 +124,6 @@ impl DmlOperation { } } - /// Shards this [`DmlOperation`] - pub fn shard(self, config: &ShardConfig) -> BTreeMap { - match self { - DmlOperation::Write(write) => write - .shard(config) - .into_iter() - .map(|(shard, write)| (shard, Self::Write(write))) - .collect(), - DmlOperation::Delete(delete) => delete - .shard(config) - .into_iter() - .map(|(shard, delete)| (shard, Self::Delete(delete))) - .collect(), - } - } - /// Return the approximate memory size of the operation, in bytes. /// /// This includes `Self`. @@ -278,31 +260,6 @@ impl DmlWrite { self.max_timestamp } - /// Shards this [`DmlWrite`] - pub fn shard(self, config: &ShardConfig) -> BTreeMap { - let mut batches: HashMap> = HashMap::new(); - - for (table, batch) in self.tables { - if let Some(shard_id) = shard_table(&table, config) { - assert!(batches - .entry(shard_id) - .or_default() - .insert(table, batch.clone()) - .is_none()); - } - } - - batches - .into_iter() - .map(|(shard_id, tables)| { - ( - shard_id, - Self::new(&self.namespace, tables, self.meta.clone()), - ) - }) - .collect() - } - /// Return the approximate memory size of the write, in bytes. /// /// This includes `Self`. @@ -368,32 +325,6 @@ impl DmlDelete { self.meta = meta } - /// Shards this [`DmlDelete`] - pub fn shard(self, config: &ShardConfig) -> BTreeMap { - if let Some(table) = self.table_name() { - if let Some(shard_id) = shard_table(table, config) { - BTreeMap::from([(shard_id, self)]) - } else { - BTreeMap::default() - } - } else { - let shards: HashSet = - config - .specific_targets - .iter() - .map(|matcher2shard| matcher2shard.shard) - .chain(config.hash_ring.iter().flat_map(|hashring| { - Vec::::from(hashring.shards.clone()).into_iter() - })) - .collect(); - - shards - .into_iter() - .map(|shard| (shard, self.clone())) - .collect() - } - } - /// Return the approximate memory size of the delete, in bytes. /// /// This includes `Self`. @@ -409,25 +340,6 @@ impl DmlDelete { } } -/// Shard only based on table name -fn shard_table(table: &str, config: &ShardConfig) -> Option { - for matcher2shard in &config.specific_targets { - if let Some(regex) = &matcher2shard.matcher.table_name_regex { - if regex.is_match(table) { - return Some(matcher2shard.shard); - } - } - } - - if let Some(hash_ring) = &config.hash_ring { - if let Some(id) = hash_ring.shards.find(table) { - return Some(id); - } - } - - None -} - /// Test utilities pub mod test_util { use arrow_util::display::pretty_format_batches; @@ -479,217 +391,3 @@ pub mod test_util { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_util::assert_writes_eq; - use data_types::{ - consistent_hasher::ConsistentHasher, - router::{HashRing, Matcher, MatcherToShard}, - }; - use data_types2::TimestampRange; - use mutable_batch_lp::lines_to_batches; - use regex::Regex; - - #[test] - fn test_write_sharding() { - let config = ShardConfig { - specific_targets: vec![ - MatcherToShard { - matcher: Matcher { - table_name_regex: None, - }, - shard: ShardId::new(1), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("some_foo").unwrap()), - }, - shard: ShardId::new(2), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("other").unwrap()), - }, - shard: ShardId::new(3), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("some_.*").unwrap()), - }, - shard: ShardId::new(4), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("baz").unwrap()), - }, - shard: ShardId::new(2), - }, - ], - hash_ring: Some(HashRing { - shards: ConsistentHasher::new(&[ - ShardId::new(11), - ShardId::new(12), - ShardId::new(13), - ]), - }), - }; - - let meta = DmlMeta::unsequenced(None); - let write = db_write( - &[ - "some_foo x=1 10", - "some_foo x=2 20", - "some_bar y=3 30", - "other z=4 40", - "rnd1 r=5 50", - "rnd2 r=6 60", - "rnd3 r=7 70", - "baz b=8 80", - ], - &meta, - ); - - let actual = write.shard(&config); - let expected = BTreeMap::from([ - ( - ShardId::new(2), - db_write(&["some_foo x=1 10", "some_foo x=2 20", "baz b=8 80"], &meta), - ), - (ShardId::new(3), db_write(&["other z=4 40"], &meta)), - (ShardId::new(4), db_write(&["some_bar y=3 30"], &meta)), - (ShardId::new(11), db_write(&["rnd1 r=5 50"], &meta)), - (ShardId::new(12), db_write(&["rnd3 r=7 70"], &meta)), - (ShardId::new(13), db_write(&["rnd2 r=6 60"], &meta)), - ]); - - let actual_shard_ids: Vec<_> = actual.keys().cloned().collect(); - let expected_shard_ids: Vec<_> = expected.keys().cloned().collect(); - assert_eq!(actual_shard_ids, expected_shard_ids); - - for (actual_write, expected_write) in actual.values().zip(expected.values()) { - assert_writes_eq(actual_write, expected_write); - } - } - - #[test] - fn test_write_no_match() { - let config = ShardConfig::default(); - - let meta = DmlMeta::default(); - let write = db_write(&["foo x=1 10"], &meta); - - let actual = write.shard(&config); - assert!(actual.is_empty()); - } - - #[test] - fn test_delete_sharding() { - let config = ShardConfig { - specific_targets: vec![ - MatcherToShard { - matcher: Matcher { - table_name_regex: None, - }, - shard: ShardId::new(1), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("some_foo").unwrap()), - }, - shard: ShardId::new(2), - }, - MatcherToShard { - matcher: Matcher { - table_name_regex: Some(Regex::new("some_.*").unwrap()), - }, - shard: ShardId::new(3), - }, - ], - hash_ring: Some(HashRing { - shards: ConsistentHasher::new(&[ - ShardId::new(11), - ShardId::new(12), - ShardId::new(13), - ]), - }), - }; - - // Deletes w/o table name go to all shards - let meta = DmlMeta::unsequenced(None); - let delete = DmlDelete::new( - "test_db", - DeletePredicate { - range: TimestampRange::new(1, 2), - exprs: vec![], - }, - None, - meta, - ); - - let actual = delete.clone().shard(&config); - let expected = BTreeMap::from([ - (ShardId::new(1), delete.clone()), - (ShardId::new(2), delete.clone()), - (ShardId::new(3), delete.clone()), - (ShardId::new(11), delete.clone()), - (ShardId::new(12), delete.clone()), - (ShardId::new(13), delete), - ]); - assert_sharded_deletes_eq(&actual, &expected); - - // Deletes are matched by table name regex - let meta = DmlMeta::unsequenced(None); - let delete = DmlDelete::new( - "test_db", - DeletePredicate { - range: TimestampRange::new(3, 4), - exprs: vec![], - }, - Some(NonEmptyString::new("some_foo").unwrap()), - meta, - ); - - let actual = delete.clone().shard(&config); - let expected = BTreeMap::from([(ShardId::new(2), delete)]); - assert_sharded_deletes_eq(&actual, &expected); - - // Deletes can be matched by hash-ring - let meta = DmlMeta::unsequenced(None); - let delete = DmlDelete::new( - "test_db", - DeletePredicate { - range: TimestampRange::new(5, 6), - exprs: vec![], - }, - Some(NonEmptyString::new("bar").unwrap()), - meta, - ); - - let actual = delete.clone().shard(&config); - let expected = BTreeMap::from([(ShardId::new(13), delete)]); - assert_sharded_deletes_eq(&actual, &expected); - } - - fn db_write(lines: &[&str], meta: &DmlMeta) -> DmlWrite { - DmlWrite::new( - "test_db", - lines_to_batches(&lines.join("\n"), 0).unwrap(), - meta.clone(), - ) - } - - fn assert_sharded_deletes_eq( - actual: &BTreeMap, - expected: &BTreeMap, - ) { - let actual_shard_ids: Vec<_> = actual.keys().cloned().collect(); - let expected_shard_ids: Vec<_> = expected.keys().cloned().collect(); - assert_eq!(actual_shard_ids, expected_shard_ids); - - for (actual_delete, expected_delete) in actual.values().zip(expected.values()) { - assert_eq!(actual_delete, expected_delete); - } - } -}