feat: Add API for ShardConfig

pull/24376/head
Marko Mikulicic 2021-03-19 10:19:13 +01:00 committed by kodiakhq[bot]
parent 9817dbef2f
commit cf51a1a3f1
3 changed files with 336 additions and 2 deletions

View File

@ -758,7 +758,7 @@ pub struct ShardConfig {
/// Maps a matcher with specific target group. If the line/row matches
/// it should be sent to the group.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct MatcherToTargets {
pub matcher: Matcher,
pub target: NodeGroup,
@ -781,7 +781,7 @@ pub struct HashRing {
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct Matcher {
/// if provided, match if the table name matches against the regex
#[serde(with = "serde_regex")]
@ -800,6 +800,147 @@ impl PartialEq for Matcher {
}
impl Eq for Matcher {}
impl From<ShardConfig> for management::ShardConfig {
fn from(shard_config: ShardConfig) -> Self {
Self {
specific_targets: shard_config.specific_targets.map(|i| i.into()),
hash_ring: shard_config.hash_ring.map(|i| i.into()),
ignore_errors: shard_config.ignore_errors,
}
}
}
impl TryFrom<management::ShardConfig> for ShardConfig {
type Error = FieldViolation;
fn try_from(proto: management::ShardConfig) -> Result<Self, Self::Error> {
Ok(Self {
specific_targets: proto
.specific_targets
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))?,
hash_ring: proto
.hash_ring
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))?,
ignore_errors: proto.ignore_errors,
})
}
}
/// Returns none if v matches its default value.
fn none_if_default<T: Default + PartialEq>(v: T) -> Option<T> {
if v == Default::default() {
None
} else {
Some(v)
}
}
impl From<MatcherToTargets> for management::MatcherToTargets {
fn from(matcher_to_targets: MatcherToTargets) -> Self {
Self {
matcher: none_if_default(matcher_to_targets.matcher.into()),
target: none_if_default(from_node_group_for_management_node_group(
matcher_to_targets.target,
)),
}
}
}
impl TryFrom<management::MatcherToTargets> for MatcherToTargets {
type Error = FieldViolation;
fn try_from(proto: management::MatcherToTargets) -> Result<Self, Self::Error> {
Ok(Self {
matcher: proto.matcher.unwrap_or_default().try_into()?,
target: try_from_management_node_group_for_node_group(
proto.target.unwrap_or_default(),
)?,
})
}
}
impl From<HashRing> for management::HashRing {
fn from(hash_ring: HashRing) -> Self {
Self {
table_name: hash_ring.table_name,
columns: hash_ring.columns,
node_groups: hash_ring
.node_groups
.into_iter()
.map(from_node_group_for_management_node_group)
.collect(),
}
}
}
impl TryFrom<management::HashRing> for HashRing {
type Error = FieldViolation;
fn try_from(proto: management::HashRing) -> Result<Self, Self::Error> {
Ok(Self {
table_name: proto.table_name,
columns: proto.columns,
node_groups: proto
.node_groups
.into_iter()
.map(try_from_management_node_group_for_node_group)
.collect::<Result<Vec<_>, _>>()?,
})
}
}
// cannot (and/or don't know how to) add impl From inside prost generated code
fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup {
management::NodeGroup {
nodes: node_group
.into_iter()
.map(|id| management::node_group::Node { id })
.collect(),
}
}
fn try_from_management_node_group_for_node_group(
proto: management::NodeGroup,
) -> Result<NodeGroup, FieldViolation> {
Ok(proto.nodes.into_iter().map(|i| i.id).collect())
}
impl From<Matcher> for management::Matcher {
fn from(matcher: Matcher) -> Self {
Self {
table_name_regex: matcher
.table_name_regex
.map_or_else(|| "".into(), |r| r.to_string()),
predicate: matcher.predicate.unwrap_or_else(|| "".into()),
}
}
}
impl TryFrom<management::Matcher> for Matcher {
type Error = FieldViolation;
fn try_from(proto: management::Matcher) -> Result<Self, Self::Error> {
let table_name_regex = match &proto.table_name_regex as &str {
"" => None,
re => Some(Regex::new(re).map_err(|e| FieldViolation {
field: "table_name_regex".to_string(),
description: e.to_string(),
})?),
};
let predicate = match proto.predicate {
p if p.is_empty() => None,
p => Some(p),
};
Ok(Self {
table_name_regex,
predicate,
})
}
}
/// `PartitionId` is the object storage identifier for a specific partition. It
/// should be a path that can be used against an object store to locate all the
/// files and subdirectories for a partition. It takes the form of `/<writer
@ -1262,4 +1403,128 @@ mod tests {
assert_eq!(err3.field, "column.column_name");
assert_eq!(err3.description, "Field is required");
}
#[test]
fn test_matcher_default() {
let protobuf = management::Matcher {
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert!(matcher.table_name_regex.is_none());
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
assert_eq!(matcher.predicate, None);
assert_eq!(protobuf.predicate, back.predicate);
}
#[test]
fn test_matcher_regexp() {
let protobuf = management::Matcher {
table_name_regex: "^foo$".into(),
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$");
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
}
#[test]
fn test_matcher_bad_regexp() {
let protobuf = management::Matcher {
table_name_regex: "*".into(),
..Default::default()
};
let matcher: Result<Matcher, FieldViolation> = protobuf.clone().try_into();
assert!(matcher.is_err());
assert_eq!(matcher.err().unwrap().field, "table_name_regex");
}
#[test]
fn test_hash_ring_default() {
let protobuf = management::HashRing {
..Default::default()
};
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
let back: management::HashRing = hash_ring.clone().into();
assert_eq!(hash_ring.table_name, false);
assert_eq!(protobuf.table_name, back.table_name);
assert!(hash_ring.columns.is_empty());
assert_eq!(protobuf.columns, back.columns);
assert!(hash_ring.node_groups.is_empty());
assert_eq!(protobuf.node_groups, back.node_groups);
}
#[test]
fn test_hash_ring_nodes() {
let protobuf = management::HashRing {
node_groups: vec![
management::NodeGroup {
nodes: vec![
management::node_group::Node { id: 10 },
management::node_group::Node { id: 11 },
management::node_group::Node { id: 12 },
],
},
management::NodeGroup {
nodes: vec![management::node_group::Node { id: 20 }],
},
],
..Default::default()
};
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
assert_eq!(hash_ring.node_groups.len(), 2);
assert_eq!(hash_ring.node_groups[0].len(), 3);
assert_eq!(hash_ring.node_groups[1].len(), 1);
}
#[test]
fn test_matcher_to_targets_default() {
let protobuf = management::MatcherToTargets {
..Default::default()
};
let matcher_to_targets: MatcherToTargets = protobuf.clone().try_into().unwrap();
let back: management::MatcherToTargets = matcher_to_targets.clone().into();
assert_eq!(
matcher_to_targets.matcher,
Matcher {
..Default::default()
}
);
assert_eq!(protobuf.matcher, back.matcher);
assert_eq!(matcher_to_targets.target, Vec::<WriterId>::new());
assert_eq!(protobuf.target, back.target);
}
#[test]
fn test_shard_config_default() {
let protobuf = management::ShardConfig {
..Default::default()
};
let shard_config: ShardConfig = protobuf.clone().try_into().unwrap();
let back: management::ShardConfig = shard_config.clone().into();
assert!(shard_config.specific_targets.is_none());
assert_eq!(protobuf.specific_targets, back.specific_targets);
assert!(shard_config.hash_ring.is_none());
assert_eq!(protobuf.hash_ring, back.hash_ring);
assert_eq!(shard_config.ignore_errors, false);
assert_eq!(protobuf.ignore_errors, back.ignore_errors);
}
}

View File

@ -37,6 +37,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
management_path.join("chunk.proto"),
management_path.join("partition.proto"),
management_path.join("service.proto"),
management_path.join("shard.proto"),
management_path.join("jobs.proto"),
write_path.join("service.proto"),
root.join("grpc/health/v1/service.proto"),

View File

@ -0,0 +1,68 @@
syntax = "proto3";
package influxdata.iox.management.v1;
// NOTE: documentation is manually synced from data_types/src/database_rules.rs
// `ShardConfig` defines rules for assigning a line/row to an individual
// host or a group of hosts. A shard
// is a logical concept, but the usage is meant to split data into
// mutually exclusive areas. The rough order of organization is:
// database -> shard -> partition -> chunk. For example, you could shard
// based on table name and assign to 1 of 10 shards. Within each
// shard you would have partitions, which would likely be based off time.
// This makes it possible to horizontally scale out writes.
message ShardConfig {
/// An optional matcher. If there is a match, the route will be evaluated to
/// the given targets, otherwise the hash ring will be evaluated. This is
/// useful for overriding the hashring function on some hot spot. For
/// example, if you use the table name as the input to the hash function
/// and your ring has 4 slots. If two tables that are very hot get
/// assigned to the same slot you can override that by putting in a
/// specific matcher to pull that table over to a different node.
MatcherToTargets specific_targets = 1;
/// An optional default hasher which will route to one in a collection of
/// nodes.
HashRing hash_ring = 2;
/// If set to true the router will ignore any errors sent by the remote
/// targets in this route. That is, the write request will succeed
/// regardless of this route's success.
bool ignore_errors = 3;
}
// Maps a matcher with specific target group. If the line/row matches
// it should be sent to the group.
message MatcherToTargets {
Matcher matcher = 1;
NodeGroup target = 2;
}
/// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis.
message Matcher {
// if provided, match if the table name matches against the regex
string table_name_regex = 1;
// paul: what should we use for predicate matching here against a single row/line?
string predicate = 2;
}
// A collection of IOx nodes
message NodeGroup {
message Node {
uint32 id = 1;
}
repeated Node nodes = 1;
}
// HashRing is a rule for creating a hash key for a row and mapping that to
// an individual node on a ring.
message HashRing {
// If true the table name will be included in the hash key
bool table_name = 1;
// include the values of these columns in the hash key
repeated string columns = 2;
// ring of node groups. Each group holds a shard
repeated NodeGroup node_groups = 3;
}