Merge pull request #972 from influxdata/pd-routing-rules
feat: add configuration for routing rulespull/24376/head
commit
b81300b36c
|
@ -803,7 +803,9 @@ dependencies = [
|
||||||
"generated_types",
|
"generated_types",
|
||||||
"influxdb_line_protocol",
|
"influxdb_line_protocol",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_regex",
|
||||||
"snafu",
|
"snafu",
|
||||||
"test_helpers",
|
"test_helpers",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -3158,6 +3160,16 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_regex"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf"
|
||||||
|
dependencies = [
|
||||||
|
"regex",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_urlencoded"
|
name = "serde_urlencoded"
|
||||||
version = "0.6.1"
|
version = "0.6.1"
|
||||||
|
|
|
@ -15,6 +15,8 @@ percent-encoding = "2.1.0"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
snafu = "0.6"
|
snafu = "0.6"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
regex = "1.4"
|
||||||
|
serde_regex = "1.1"
|
||||||
|
|
||||||
[dev-dependencies] # In alphabetical order
|
[dev-dependencies] # In alphabetical order
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
|
|
||||||
use chrono::{DateTime, TimeZone, Utc};
|
use chrono::{DateTime, TimeZone, Utc};
|
||||||
|
use regex::Regex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
|
|
||||||
|
@ -51,6 +52,16 @@ pub struct DatabaseRules {
|
||||||
/// in object storage.
|
/// in object storage.
|
||||||
#[serde(default = "MutableBufferConfig::default_option")]
|
#[serde(default = "MutableBufferConfig::default_option")]
|
||||||
pub mutable_buffer_config: Option<MutableBufferConfig>,
|
pub mutable_buffer_config: Option<MutableBufferConfig>,
|
||||||
|
|
||||||
|
/// An optional config to split writes into different "shards". A shard
|
||||||
|
/// is a logical concept, but the usage is meant to split data into
|
||||||
|
/// mutually exclusive areas. The rough order of organization is:
|
||||||
|
/// database -> shard -> partition -> chunk. For example, you could shard
|
||||||
|
/// based on table name and assign to 1 of 10 shards. Within each
|
||||||
|
/// shard you would have partitions, which would likely be based off time.
|
||||||
|
/// This makes it possible to horizontally scale out writes.
|
||||||
|
#[serde(default)]
|
||||||
|
pub shard_config: Option<ShardConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DatabaseRules {
|
impl DatabaseRules {
|
||||||
|
@ -118,6 +129,7 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
|
||||||
partition_template,
|
partition_template,
|
||||||
wal_buffer_config,
|
wal_buffer_config,
|
||||||
mutable_buffer_config,
|
mutable_buffer_config,
|
||||||
|
shard_config: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -701,6 +713,77 @@ impl TryFrom<management::partition_template::Part> for TemplatePart {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ShardConfig defines rules for assigning a line/row to an individual
|
||||||
|
/// host or a group of hosts. A shard
|
||||||
|
/// is a logical concept, but the usage is meant to split data into
|
||||||
|
/// mutually exclusive areas. The rough order of organization is:
|
||||||
|
/// database -> shard -> partition -> chunk. For example, you could shard
|
||||||
|
/// based on table name and assign to 1 of 10 shards. Within each
|
||||||
|
/// shard you would have partitions, which would likely be based off time.
|
||||||
|
/// This makes it possible to horizontally scale out writes.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||||
|
pub struct ShardConfig {
|
||||||
|
/// An optional matcher. If there is a match, the route will be evaluated to
|
||||||
|
/// the given targets, otherwise the hash ring will be evaluated. This is
|
||||||
|
/// useful for overriding the hashring function on some hot spot. For
|
||||||
|
/// example, if you use the table name as the input to the hash function
|
||||||
|
/// and your ring has 4 slots. If two tables that are very hot get
|
||||||
|
/// assigned to the same slot you can override that by putting in a
|
||||||
|
/// specific matcher to pull that table over to a different node.
|
||||||
|
pub specific_targets: Option<MatcherToTargets>,
|
||||||
|
/// An optional default hasher which will route to one in a collection of
|
||||||
|
/// nodes.
|
||||||
|
pub hash_ring: Option<HashRing>,
|
||||||
|
/// If set to true the router will ignore any errors sent by the remote
|
||||||
|
/// targets in this route. That is, the write request will succeed
|
||||||
|
/// regardless of this route's success.
|
||||||
|
pub ignore_errors: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Maps a matcher with specific target group. If the line/row matches
|
||||||
|
/// it should be sent to the group.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||||
|
pub struct MatcherToTargets {
|
||||||
|
pub matcher: Matcher,
|
||||||
|
pub target: NodeGroup,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A collection of IOx nodes
|
||||||
|
pub type NodeGroup = Vec<WriterId>;
|
||||||
|
|
||||||
|
/// HashRing is a rule for creating a hash key for a row and mapping that to
|
||||||
|
/// an individual node on a ring.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||||
|
pub struct HashRing {
|
||||||
|
/// If true the table name will be included in the hash key
|
||||||
|
pub table_name: bool,
|
||||||
|
/// include the values of these columns in the hash key
|
||||||
|
pub columns: Vec<String>,
|
||||||
|
/// ring of node groups. Each group holds a shard
|
||||||
|
pub node_groups: Vec<NodeGroup>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A matcher is used to match routing rules or subscriptions on a row-by-row
|
||||||
|
/// (or line) basis.
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct Matcher {
|
||||||
|
/// if provided, match if the table name matches against the regex
|
||||||
|
#[serde(with = "serde_regex")]
|
||||||
|
pub table_name_regex: Option<Regex>,
|
||||||
|
// paul: what should we use for predicate matching here against a single row/line?
|
||||||
|
pub predicate: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for Matcher {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
// this is kind of janky, but it's only used during tests and should get the job
|
||||||
|
// done
|
||||||
|
format!("{:?}{:?}", self.table_name_regex, self.predicate)
|
||||||
|
== format!("{:?}{:?}", other.table_name_regex, other.predicate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Eq for Matcher {}
|
||||||
|
|
||||||
/// `PartitionId` is the object storage identifier for a specific partition. It
|
/// `PartitionId` is the object storage identifier for a specific partition. It
|
||||||
/// should be a path that can be used against an object store to locate all the
|
/// should be a path that can be used against an object store to locate all the
|
||||||
/// files and subdirectories for a partition. It takes the form of `/<writer
|
/// files and subdirectories for a partition. It takes the form of `/<writer
|
||||||
|
|
Loading…
Reference in New Issue