fix: Remove now-unused DML sharding and related types
parent
94be7407ba
commit
b76c1e1ad6
|
@ -15,7 +15,6 @@ pub mod consistent_hasher;
|
|||
pub mod error;
|
||||
pub mod job;
|
||||
pub mod partition_metadata;
|
||||
pub mod router;
|
||||
pub mod server_id;
|
||||
pub mod timestamp;
|
||||
pub mod write_buffer;
|
||||
|
|
|
@ -1,141 +0,0 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use regex::Regex;
|
||||
|
||||
use crate::{
|
||||
consistent_hasher::ConsistentHasher, server_id::ServerId, write_buffer::WriteBufferConnection,
|
||||
};
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct ShardId(u32);
|
||||
|
||||
impl ShardId {
|
||||
pub fn new(id: u32) -> Self {
|
||||
Self(id)
|
||||
}
|
||||
|
||||
pub fn get(&self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ShardId({})", self.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// ShardConfig defines rules for assigning a line/row to an individual
|
||||
/// host or a group of hosts. A shard
|
||||
/// is a logical concept, but the usage is meant to split data into
|
||||
/// mutually exclusive areas. The rough order of organization is:
|
||||
/// database -> shard -> partition -> chunk. For example, you could shard
|
||||
/// based on table name and assign to 1 of 10 shards. Within each
|
||||
/// shard you would have partitions, which would likely be based off time.
|
||||
/// This makes it possible to horizontally scale out writes.
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Default)]
|
||||
pub struct ShardConfig {
|
||||
/// Each matcher, if any, is evaluated in order.
|
||||
/// If there is a match, the route will be evaluated to
|
||||
/// the given targets, otherwise the hash ring will be evaluated.
|
||||
/// This is useful for overriding the hashring function on some hot spot. For
|
||||
/// example, if you use the table name as the input to the hash function
|
||||
/// and your ring has 4 slots. If two tables that are very hot get
|
||||
/// assigned to the same slot you can override that by putting in a
|
||||
/// specific matcher to pull that table over to a different node.
|
||||
pub specific_targets: Vec<MatcherToShard>,
|
||||
|
||||
/// An optional default hasher which will route to one in a collection of
|
||||
/// nodes.
|
||||
pub hash_ring: Option<HashRing>,
|
||||
}
|
||||
|
||||
/// Maps a matcher with specific shard. If the line/row matches
|
||||
/// it should be sent to the group.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct MatcherToShard {
|
||||
pub matcher: Matcher,
|
||||
pub shard: ShardId,
|
||||
}
|
||||
|
||||
/// HashRing is a rule for creating a hash key for a row and mapping that to
|
||||
/// an individual node on a ring.
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Default)]
|
||||
pub struct HashRing {
|
||||
/// ring of shard ids
|
||||
pub shards: ConsistentHasher<ShardId>,
|
||||
}
|
||||
|
||||
/// A matcher is used to match routing rules or subscriptions on a row-by-row
|
||||
/// (or line) basis.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct Matcher {
|
||||
/// if provided, match if the table name matches against the regex
|
||||
pub table_name_regex: Option<Regex>,
|
||||
}
|
||||
|
||||
impl PartialEq for Matcher {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// this is kind of janky, but it's only used during tests and should get the job
|
||||
// done
|
||||
format!("{:?}", self.table_name_regex) == format!("{:?}", other.table_name_regex)
|
||||
}
|
||||
}
|
||||
impl Eq for Matcher {}
|
||||
|
||||
/// Sinks for query requests.
|
||||
///
|
||||
/// Queries are sent to one of these sinks and the resulting data is received from it.
|
||||
///
|
||||
/// Note that the query results are flowing into the opposite direction (aka a query sink is a result source).
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Default)]
|
||||
pub struct QuerySinks {
|
||||
pub grpc_remotes: Vec<ServerId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum WriteSinkVariant {
|
||||
/// gRPC-based remote, addressed by its server ID.
|
||||
GrpcRemote(ServerId),
|
||||
|
||||
/// Write buffer connection.
|
||||
WriteBuffer(WriteBufferConnection),
|
||||
}
|
||||
|
||||
/// Sink of write requests aka new data.
|
||||
///
|
||||
/// Data is sent to this sink and a status is received from it.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct WriteSink {
|
||||
pub sink: WriteSinkVariant,
|
||||
|
||||
/// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure.
|
||||
pub ignore_errors: bool,
|
||||
}
|
||||
|
||||
/// Set of write sinks.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct WriteSinkSet {
|
||||
/// Sinks within the set.
|
||||
pub sinks: Vec<WriteSink>,
|
||||
}
|
||||
|
||||
/// Router for writes and queries.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct Router {
|
||||
/// Router name.
|
||||
///
|
||||
/// The name corresponds to the database name on the database node.
|
||||
///
|
||||
/// The router name is unique for this router node.
|
||||
pub name: String,
|
||||
|
||||
/// Write sharder.
|
||||
pub write_sharder: ShardConfig,
|
||||
|
||||
/// Sinks for write requests.
|
||||
pub write_sinks: BTreeMap<ShardId, WriteSinkSet>,
|
||||
|
||||
/// Sinks for query requests.
|
||||
pub query_sinks: QuerySinks,
|
||||
}
|
302
dml/src/lib.rs
302
dml/src/lib.rs
|
@ -11,12 +11,10 @@
|
|||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
use data_types::router::{ShardConfig, ShardId};
|
||||
use data_types2::{DeletePredicate, NonEmptyString, Sequence, StatValues, Statistics};
|
||||
use hashbrown::HashMap;
|
||||
use iox_time::Time;
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
/// Metadata information about a DML operation
|
||||
|
@ -126,22 +124,6 @@ impl DmlOperation {
|
|||
}
|
||||
}
|
||||
|
||||
/// Shards this [`DmlOperation`]
|
||||
pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
|
||||
match self {
|
||||
DmlOperation::Write(write) => write
|
||||
.shard(config)
|
||||
.into_iter()
|
||||
.map(|(shard, write)| (shard, Self::Write(write)))
|
||||
.collect(),
|
||||
DmlOperation::Delete(delete) => delete
|
||||
.shard(config)
|
||||
.into_iter()
|
||||
.map(|(shard, delete)| (shard, Self::Delete(delete)))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the approximate memory size of the operation, in bytes.
|
||||
///
|
||||
/// This includes `Self`.
|
||||
|
@ -278,31 +260,6 @@ impl DmlWrite {
|
|||
self.max_timestamp
|
||||
}
|
||||
|
||||
/// Shards this [`DmlWrite`]
|
||||
pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
|
||||
let mut batches: HashMap<ShardId, HashMap<String, MutableBatch>> = HashMap::new();
|
||||
|
||||
for (table, batch) in self.tables {
|
||||
if let Some(shard_id) = shard_table(&table, config) {
|
||||
assert!(batches
|
||||
.entry(shard_id)
|
||||
.or_default()
|
||||
.insert(table, batch.clone())
|
||||
.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
batches
|
||||
.into_iter()
|
||||
.map(|(shard_id, tables)| {
|
||||
(
|
||||
shard_id,
|
||||
Self::new(&self.namespace, tables, self.meta.clone()),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return the approximate memory size of the write, in bytes.
|
||||
///
|
||||
/// This includes `Self`.
|
||||
|
@ -368,32 +325,6 @@ impl DmlDelete {
|
|||
self.meta = meta
|
||||
}
|
||||
|
||||
/// Shards this [`DmlDelete`]
|
||||
pub fn shard(self, config: &ShardConfig) -> BTreeMap<ShardId, Self> {
|
||||
if let Some(table) = self.table_name() {
|
||||
if let Some(shard_id) = shard_table(table, config) {
|
||||
BTreeMap::from([(shard_id, self)])
|
||||
} else {
|
||||
BTreeMap::default()
|
||||
}
|
||||
} else {
|
||||
let shards: HashSet<ShardId> =
|
||||
config
|
||||
.specific_targets
|
||||
.iter()
|
||||
.map(|matcher2shard| matcher2shard.shard)
|
||||
.chain(config.hash_ring.iter().flat_map(|hashring| {
|
||||
Vec::<ShardId>::from(hashring.shards.clone()).into_iter()
|
||||
}))
|
||||
.collect();
|
||||
|
||||
shards
|
||||
.into_iter()
|
||||
.map(|shard| (shard, self.clone()))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the approximate memory size of the delete, in bytes.
|
||||
///
|
||||
/// This includes `Self`.
|
||||
|
@ -409,25 +340,6 @@ impl DmlDelete {
|
|||
}
|
||||
}
|
||||
|
||||
/// Shard only based on table name
|
||||
fn shard_table(table: &str, config: &ShardConfig) -> Option<ShardId> {
|
||||
for matcher2shard in &config.specific_targets {
|
||||
if let Some(regex) = &matcher2shard.matcher.table_name_regex {
|
||||
if regex.is_match(table) {
|
||||
return Some(matcher2shard.shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(hash_ring) = &config.hash_ring {
|
||||
if let Some(id) = hash_ring.shards.find(table) {
|
||||
return Some(id);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Test utilities
|
||||
pub mod test_util {
|
||||
use arrow_util::display::pretty_format_batches;
|
||||
|
@ -479,217 +391,3 @@ pub mod test_util {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_util::assert_writes_eq;
|
||||
use data_types::{
|
||||
consistent_hasher::ConsistentHasher,
|
||||
router::{HashRing, Matcher, MatcherToShard},
|
||||
};
|
||||
use data_types2::TimestampRange;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use regex::Regex;
|
||||
|
||||
#[test]
|
||||
fn test_write_sharding() {
|
||||
let config = ShardConfig {
|
||||
specific_targets: vec![
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: None,
|
||||
},
|
||||
shard: ShardId::new(1),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("some_foo").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(2),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("other").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(3),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("some_.*").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(4),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("baz").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(2),
|
||||
},
|
||||
],
|
||||
hash_ring: Some(HashRing {
|
||||
shards: ConsistentHasher::new(&[
|
||||
ShardId::new(11),
|
||||
ShardId::new(12),
|
||||
ShardId::new(13),
|
||||
]),
|
||||
}),
|
||||
};
|
||||
|
||||
let meta = DmlMeta::unsequenced(None);
|
||||
let write = db_write(
|
||||
&[
|
||||
"some_foo x=1 10",
|
||||
"some_foo x=2 20",
|
||||
"some_bar y=3 30",
|
||||
"other z=4 40",
|
||||
"rnd1 r=5 50",
|
||||
"rnd2 r=6 60",
|
||||
"rnd3 r=7 70",
|
||||
"baz b=8 80",
|
||||
],
|
||||
&meta,
|
||||
);
|
||||
|
||||
let actual = write.shard(&config);
|
||||
let expected = BTreeMap::from([
|
||||
(
|
||||
ShardId::new(2),
|
||||
db_write(&["some_foo x=1 10", "some_foo x=2 20", "baz b=8 80"], &meta),
|
||||
),
|
||||
(ShardId::new(3), db_write(&["other z=4 40"], &meta)),
|
||||
(ShardId::new(4), db_write(&["some_bar y=3 30"], &meta)),
|
||||
(ShardId::new(11), db_write(&["rnd1 r=5 50"], &meta)),
|
||||
(ShardId::new(12), db_write(&["rnd3 r=7 70"], &meta)),
|
||||
(ShardId::new(13), db_write(&["rnd2 r=6 60"], &meta)),
|
||||
]);
|
||||
|
||||
let actual_shard_ids: Vec<_> = actual.keys().cloned().collect();
|
||||
let expected_shard_ids: Vec<_> = expected.keys().cloned().collect();
|
||||
assert_eq!(actual_shard_ids, expected_shard_ids);
|
||||
|
||||
for (actual_write, expected_write) in actual.values().zip(expected.values()) {
|
||||
assert_writes_eq(actual_write, expected_write);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_no_match() {
|
||||
let config = ShardConfig::default();
|
||||
|
||||
let meta = DmlMeta::default();
|
||||
let write = db_write(&["foo x=1 10"], &meta);
|
||||
|
||||
let actual = write.shard(&config);
|
||||
assert!(actual.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_sharding() {
|
||||
let config = ShardConfig {
|
||||
specific_targets: vec![
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: None,
|
||||
},
|
||||
shard: ShardId::new(1),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("some_foo").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(2),
|
||||
},
|
||||
MatcherToShard {
|
||||
matcher: Matcher {
|
||||
table_name_regex: Some(Regex::new("some_.*").unwrap()),
|
||||
},
|
||||
shard: ShardId::new(3),
|
||||
},
|
||||
],
|
||||
hash_ring: Some(HashRing {
|
||||
shards: ConsistentHasher::new(&[
|
||||
ShardId::new(11),
|
||||
ShardId::new(12),
|
||||
ShardId::new(13),
|
||||
]),
|
||||
}),
|
||||
};
|
||||
|
||||
// Deletes w/o table name go to all shards
|
||||
let meta = DmlMeta::unsequenced(None);
|
||||
let delete = DmlDelete::new(
|
||||
"test_db",
|
||||
DeletePredicate {
|
||||
range: TimestampRange::new(1, 2),
|
||||
exprs: vec![],
|
||||
},
|
||||
None,
|
||||
meta,
|
||||
);
|
||||
|
||||
let actual = delete.clone().shard(&config);
|
||||
let expected = BTreeMap::from([
|
||||
(ShardId::new(1), delete.clone()),
|
||||
(ShardId::new(2), delete.clone()),
|
||||
(ShardId::new(3), delete.clone()),
|
||||
(ShardId::new(11), delete.clone()),
|
||||
(ShardId::new(12), delete.clone()),
|
||||
(ShardId::new(13), delete),
|
||||
]);
|
||||
assert_sharded_deletes_eq(&actual, &expected);
|
||||
|
||||
// Deletes are matched by table name regex
|
||||
let meta = DmlMeta::unsequenced(None);
|
||||
let delete = DmlDelete::new(
|
||||
"test_db",
|
||||
DeletePredicate {
|
||||
range: TimestampRange::new(3, 4),
|
||||
exprs: vec![],
|
||||
},
|
||||
Some(NonEmptyString::new("some_foo").unwrap()),
|
||||
meta,
|
||||
);
|
||||
|
||||
let actual = delete.clone().shard(&config);
|
||||
let expected = BTreeMap::from([(ShardId::new(2), delete)]);
|
||||
assert_sharded_deletes_eq(&actual, &expected);
|
||||
|
||||
// Deletes can be matched by hash-ring
|
||||
let meta = DmlMeta::unsequenced(None);
|
||||
let delete = DmlDelete::new(
|
||||
"test_db",
|
||||
DeletePredicate {
|
||||
range: TimestampRange::new(5, 6),
|
||||
exprs: vec![],
|
||||
},
|
||||
Some(NonEmptyString::new("bar").unwrap()),
|
||||
meta,
|
||||
);
|
||||
|
||||
let actual = delete.clone().shard(&config);
|
||||
let expected = BTreeMap::from([(ShardId::new(13), delete)]);
|
||||
assert_sharded_deletes_eq(&actual, &expected);
|
||||
}
|
||||
|
||||
fn db_write(lines: &[&str], meta: &DmlMeta) -> DmlWrite {
|
||||
DmlWrite::new(
|
||||
"test_db",
|
||||
lines_to_batches(&lines.join("\n"), 0).unwrap(),
|
||||
meta.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn assert_sharded_deletes_eq(
|
||||
actual: &BTreeMap<ShardId, DmlDelete>,
|
||||
expected: &BTreeMap<ShardId, DmlDelete>,
|
||||
) {
|
||||
let actual_shard_ids: Vec<_> = actual.keys().cloned().collect();
|
||||
let expected_shard_ids: Vec<_> = expected.keys().cloned().collect();
|
||||
assert_eq!(actual_shard_ids, expected_shard_ids);
|
||||
|
||||
for (actual_delete, expected_delete) in actual.values().zip(expected.values()) {
|
||||
assert_eq!(actual_delete, expected_delete);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue