From b76c1e1ad61a85ef7edd458e1009b92b35109dc3 Mon Sep 17 00:00:00 2001
From: "Carol (Nichols || Goulding)" <carol.nichols@gmail.com>
Date: Thu, 5 May 2022 11:16:19 -0400
Subject: [PATCH] fix: Remove now-unused DML sharding and related types

---
 data_types/src/lib.rs    |   1 -
 data_types/src/router.rs | 141 ------------------
 dml/src/lib.rs           | 302 ---------------------------------------
 3 files changed, 444 deletions(-)
 delete mode 100644 data_types/src/router.rs

diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs
index 74d600006b..7da948f108 100644
--- a/data_types/src/lib.rs
+++ b/data_types/src/lib.rs
@@ -15,7 +15,6 @@ pub mod consistent_hasher;
 pub mod error;
 pub mod job;
 pub mod partition_metadata;
-pub mod router;
 pub mod server_id;
 pub mod timestamp;
 pub mod write_buffer;
diff --git a/data_types/src/router.rs b/data_types/src/router.rs
deleted file mode 100644
index 5b1ff68a27..0000000000
--- a/data_types/src/router.rs
+++ /dev/null
@@ -1,141 +0,0 @@
-use std::collections::BTreeMap;
-
-use regex::Regex;
-
-use crate::{
-    consistent_hasher::ConsistentHasher, server_id::ServerId, write_buffer::WriteBufferConnection,
-};
-
-#[derive(Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Copy)]
-pub struct ShardId(u32);
-
-impl ShardId {
-    pub fn new(id: u32) -> Self {
-        Self(id)
-    }
-
-    pub fn get(&self) -> u32 {
-        self.0
-    }
-}
-
-impl std::fmt::Display for ShardId {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "ShardId({})", self.get())
-    }
-}
-
-/// ShardConfig defines rules for assigning a line/row to an individual
-/// host or a group of hosts. A shard
-/// is a logical concept, but the usage is meant to split data into
-/// mutually exclusive areas. The rough order of organization is:
-/// database -> shard -> partition -> chunk. For example, you could shard
-/// based on table name and assign to 1 of 10 shards. Within each
-/// shard you would have partitions, which would likely be based off time.
-/// This makes it possible to horizontally scale out writes.
-#[derive(Debug, Eq, PartialEq, Clone, Default)]
-pub struct ShardConfig {
-    /// Each matcher, if any, is evaluated in order.
-    /// If there is a match, the route will be evaluated to
-    /// the given targets, otherwise the hash ring will be evaluated.
-    /// This is useful for overriding the hashring function on some hot spot. For
-    /// example, if you use the table name as the input to the hash function
-    /// and your ring has 4 slots. If two tables that are very hot get
-    /// assigned to the same slot you can override that by putting in a
-    /// specific matcher to pull that table over to a different node.
-    pub specific_targets: Vec<MatcherToShard>,
-
-    /// An optional default hasher which will route to one in a collection of
-    /// nodes.
-    pub hash_ring: Option<HashRing>,
-}
-
-/// Maps a matcher with specific shard. If the line/row matches
-/// it should be sent to the group.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub struct MatcherToShard {
-    pub matcher: Matcher,
-    pub shard: ShardId,
-}
-
-/// HashRing is a rule for creating a hash key for a row and mapping that to
-/// an individual node on a ring.
-#[derive(Debug, Eq, PartialEq, Clone, Default)]
-pub struct HashRing {
-    /// ring of shard ids
-    pub shards: ConsistentHasher<ShardId>,
-}
-
-/// A matcher is used to match routing rules or subscriptions on a row-by-row
-/// (or line) basis.
-#[derive(Debug, Clone, Default)]
-pub struct Matcher {
-    /// if provided, match if the table name matches against the regex
-    pub table_name_regex: Option<Regex>,
-}
-
-impl PartialEq for Matcher {
-    fn eq(&self, other: &Self) -> bool {
-        // this is kind of janky, but it's only used during tests and should get the job
-        // done
-        format!("{:?}", self.table_name_regex) == format!("{:?}", other.table_name_regex)
-    }
-}
-impl Eq for Matcher {}
-
-/// Sinks for query requests.
-///
-/// Queries are sent to one of these sinks and the resulting data is received from it.
-///
-/// Note that the query results are flowing into the opposite direction (aka a query sink is a result source).
-#[derive(Debug, Eq, PartialEq, Clone, Default)]
-pub struct QuerySinks {
-    pub grpc_remotes: Vec<ServerId>,
-}
-
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub enum WriteSinkVariant {
-    /// gRPC-based remote, addressed by its server ID.
-    GrpcRemote(ServerId),
-
-    /// Write buffer connection.
-    WriteBuffer(WriteBufferConnection),
-}
-
-/// Sink of write requests aka new data.
-///
-/// Data is sent to this sink and a status is received from it.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub struct WriteSink {
-    pub sink: WriteSinkVariant,
-
-    /// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure.
-    pub ignore_errors: bool,
-}
-
-/// Set of write sinks.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub struct WriteSinkSet {
-    /// Sinks within the set.
-    pub sinks: Vec<WriteSink>,
-}
-
-/// Router for writes and queries.
-#[derive(Debug, Eq, PartialEq, Clone)]
-pub struct Router {
-    /// Router name.
-    ///
-    /// The name corresponds to the database name on the database node.
-    ///
-    /// The router name is unique for this router node.
-    pub name: String,
-
-    /// Write sharder.
-    pub write_sharder: ShardConfig,
-
-    /// Sinks for write requests.
-    pub write_sinks: BTreeMap<ShardId, WriteSinkSet>,
-
-    /// Sinks for query requests.
-    pub query_sinks: QuerySinks,
-}
diff --git a/dml/src/lib.rs b/dml/src/lib.rs
index c7750fdfcf..dcd4239a9b 100644
--- a/dml/src/lib.rs
+++ b/dml/src/lib.rs
@@ -11,12 +11,10 @@
     clippy::clone_on_ref_ptr
 )]
 
-use data_types::router::{ShardConfig, ShardId};
 use data_types2::{DeletePredicate, NonEmptyString, Sequence, StatValues, Statistics};
 use hashbrown::HashMap;
 use iox_time::Time;
 use mutable_batch::MutableBatch;
-use std::collections::{BTreeMap, HashSet};
 use trace::ctx::SpanContext;
 
 /// Metadata information about a DML operation
@@ -126,22 +124,6 @@ impl DmlOperation {
         }
     }
 
-    /// Shards this [`DmlOperation`]
-    pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
-        match self {
-            DmlOperation::Write(write) => write
-                .shard(config)
-                .into_iter()
-                .map(|(shard, write)| (shard, Self::Write(write)))
-                .collect(),
-            DmlOperation::Delete(delete) => delete
-                .shard(config)
-                .into_iter()
-                .map(|(shard, delete)| (shard, Self::Delete(delete)))
-                .collect(),
-        }
-    }
-
     /// Return the approximate memory size of the operation, in bytes.
     ///
     /// This includes `Self`.
@@ -278,31 +260,6 @@ impl DmlWrite {
         self.max_timestamp
     }
 
-    /// Shards this [`DmlWrite`]
-    pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
-        let mut batches: HashMap<ShardId, HashMap<String, MutableBatch>> = HashMap::new();
-
-        for (table, batch) in self.tables {
-            if let Some(shard_id) = shard_table(&table, config) {
-                assert!(batches
-                    .entry(shard_id)
-                    .or_default()
-                    .insert(table, batch.clone())
-                    .is_none());
-            }
-        }
-
-        batches
-            .into_iter()
-            .map(|(shard_id, tables)| {
-                (
-                    shard_id,
-                    Self::new(&self.namespace, tables, self.meta.clone()),
-                )
-            })
-            .collect()
-    }
-
     /// Return the approximate memory size of the write, in bytes.
     ///
     /// This includes `Self`.
@@ -368,32 +325,6 @@ impl DmlDelete {
         self.meta = meta
     }
 
-    /// Shards this [`DmlDelete`]
-    pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
-        if let Some(table) = self.table_name() {
-            if let Some(shard_id) = shard_table(table, config) {
-                BTreeMap::from([(shard_id, self)])
-            } else {
-                BTreeMap::default()
-            }
-        } else {
-            let shards: HashSet<ShardId> =
-                config
-                    .specific_targets
-                    .iter()
-                    .map(|matcher2shard| matcher2shard.shard)
-                    .chain(config.hash_ring.iter().flat_map(|hashring| {
-                        Vec::<ShardId>::from(hashring.shards.clone()).into_iter()
-                    }))
-                    .collect();
-
-            shards
-                .into_iter()
-                .map(|shard| (shard, self.clone()))
-                .collect()
-        }
-    }
-
     /// Return the approximate memory size of the delete, in bytes.
     ///
     /// This includes `Self`.
@@ -409,25 +340,6 @@ impl DmlDelete {
     }
 }
 
-/// Shard only based on table name
-fn shard_table(table: &str, config: &ShardConfig) -> Option<ShardId> {
-    for matcher2shard in &config.specific_targets {
-        if let Some(regex) = &matcher2shard.matcher.table_name_regex {
-            if regex.is_match(table) {
-                return Some(matcher2shard.shard);
-            }
-        }
-    }
-
-    if let Some(hash_ring) = &config.hash_ring {
-        if let Some(id) = hash_ring.shards.find(table) {
-            return Some(id);
-        }
-    }
-
-    None
-}
-
 /// Test utilities
 pub mod test_util {
     use arrow_util::display::pretty_format_batches;
@@ -479,217 +391,3 @@ pub mod test_util {
         }
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::test_util::assert_writes_eq;
-    use data_types::{
-        consistent_hasher::ConsistentHasher,
-        router::{HashRing, Matcher, MatcherToShard},
-    };
-    use data_types2::TimestampRange;
-    use mutable_batch_lp::lines_to_batches;
-    use regex::Regex;
-
-    #[test]
-    fn test_write_sharding() {
-        let config = ShardConfig {
-            specific_targets: vec![
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: None,
-                    },
-                    shard: ShardId::new(1),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("some_foo").unwrap()),
-                    },
-                    shard: ShardId::new(2),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("other").unwrap()),
-                    },
-                    shard: ShardId::new(3),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("some_.*").unwrap()),
-                    },
-                    shard: ShardId::new(4),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("baz").unwrap()),
-                    },
-                    shard: ShardId::new(2),
-                },
-            ],
-            hash_ring: Some(HashRing {
-                shards: ConsistentHasher::new(&[
-                    ShardId::new(11),
-                    ShardId::new(12),
-                    ShardId::new(13),
-                ]),
-            }),
-        };
-
-        let meta = DmlMeta::unsequenced(None);
-        let write = db_write(
-            &[
-                "some_foo x=1 10",
-                "some_foo x=2 20",
-                "some_bar y=3 30",
-                "other z=4 40",
-                "rnd1 r=5 50",
-                "rnd2 r=6 60",
-                "rnd3 r=7 70",
-                "baz b=8 80",
-            ],
-            &meta,
-        );
-
-        let actual = write.shard(&config);
-        let expected = BTreeMap::from([
-            (
-                ShardId::new(2),
-                db_write(&["some_foo x=1 10", "some_foo x=2 20", "baz b=8 80"], &meta),
-            ),
-            (ShardId::new(3), db_write(&["other z=4 40"], &meta)),
-            (ShardId::new(4), db_write(&["some_bar y=3 30"], &meta)),
-            (ShardId::new(11), db_write(&["rnd1 r=5 50"], &meta)),
-            (ShardId::new(12), db_write(&["rnd3 r=7 70"], &meta)),
-            (ShardId::new(13), db_write(&["rnd2 r=6 60"], &meta)),
-        ]);
-
-        let actual_shard_ids: Vec<_> = actual.keys().cloned().collect();
-        let expected_shard_ids: Vec<_> = expected.keys().cloned().collect();
-        assert_eq!(actual_shard_ids, expected_shard_ids);
-
-        for (actual_write, expected_write) in actual.values().zip(expected.values()) {
-            assert_writes_eq(actual_write, expected_write);
-        }
-    }
-
-    #[test]
-    fn test_write_no_match() {
-        let config = ShardConfig::default();
-
-        let meta = DmlMeta::default();
-        let write = db_write(&["foo x=1 10"], &meta);
-
-        let actual = write.shard(&config);
-        assert!(actual.is_empty());
-    }
-
-    #[test]
-    fn test_delete_sharding() {
-        let config = ShardConfig {
-            specific_targets: vec![
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: None,
-                    },
-                    shard: ShardId::new(1),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("some_foo").unwrap()),
-                    },
-                    shard: ShardId::new(2),
-                },
-                MatcherToShard {
-                    matcher: Matcher {
-                        table_name_regex: Some(Regex::new("some_.*").unwrap()),
-                    },
-                    shard: ShardId::new(3),
-                },
-            ],
-            hash_ring: Some(HashRing {
-                shards: ConsistentHasher::new(&[
-                    ShardId::new(11),
-                    ShardId::new(12),
-                    ShardId::new(13),
-                ]),
-            }),
-        };
-
-        // Deletes w/o table name go to all shards
-        let meta = DmlMeta::unsequenced(None);
-        let delete = DmlDelete::new(
-            "test_db",
-            DeletePredicate {
-                range: TimestampRange::new(1, 2),
-                exprs: vec![],
-            },
-            None,
-            meta,
-        );
-
-        let actual = delete.clone().shard(&config);
-        let expected = BTreeMap::from([
-            (ShardId::new(1), delete.clone()),
-            (ShardId::new(2), delete.clone()),
-            (ShardId::new(3), delete.clone()),
-            (ShardId::new(11), delete.clone()),
-            (ShardId::new(12), delete.clone()),
-            (ShardId::new(13), delete),
-        ]);
-        assert_sharded_deletes_eq(&actual, &expected);
-
-        // Deletes are matched by table name regex
-        let meta = DmlMeta::unsequenced(None);
-        let delete = DmlDelete::new(
-            "test_db",
-            DeletePredicate {
-                range: TimestampRange::new(3, 4),
-                exprs: vec![],
-            },
-            Some(NonEmptyString::new("some_foo").unwrap()),
-            meta,
-        );
-
-        let actual = delete.clone().shard(&config);
-        let expected = BTreeMap::from([(ShardId::new(2), delete)]);
-        assert_sharded_deletes_eq(&actual, &expected);
-
-        // Deletes can be matched by hash-ring
-        let meta = DmlMeta::unsequenced(None);
-        let delete = DmlDelete::new(
-            "test_db",
-            DeletePredicate {
-                range: TimestampRange::new(5, 6),
-                exprs: vec![],
-            },
-            Some(NonEmptyString::new("bar").unwrap()),
-            meta,
-        );
-
-        let actual = delete.clone().shard(&config);
-        let expected = BTreeMap::from([(ShardId::new(13), delete)]);
-        assert_sharded_deletes_eq(&actual, &expected);
-    }
-
-    fn db_write(lines: &[&str], meta: &DmlMeta) -> DmlWrite {
-        DmlWrite::new(
-            "test_db",
-            lines_to_batches(&lines.join("\n"), 0).unwrap(),
-            meta.clone(),
-        )
-    }
-
-    fn assert_sharded_deletes_eq(
-        actual: &BTreeMap<ShardId, DmlDelete>,
-        expected: &BTreeMap<ShardId, DmlDelete>,
-    ) {
-        let actual_shard_ids: Vec<_> = actual.keys().cloned().collect();
-        let expected_shard_ids: Vec<_> = expected.keys().cloned().collect();
-        assert_eq!(actual_shard_ids, expected_shard_ids);
-
-        for (actual_delete, expected_delete) in actual.values().zip(expected.values()) {
-            assert_eq!(actual_delete, expected_delete);
-        }
-    }
-}