Merge pull request #2130 from influxdata/dom/datatypes-serde

feat: DatabaseRules (de)serialisation support
pull/24376/head
kodiakhq[bot] 2021-07-27 16:15:56 +00:00 committed by GitHub
commit 6759bffefe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 16 deletions

11
Cargo.lock generated
View File

@ -832,6 +832,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"regex", "regex",
"serde", "serde",
"serde_regex",
"snafu", "snafu",
"test_helpers", "test_helpers",
] ]
@ -3751,6 +3752,16 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_regex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf"
dependencies = [
"regex",
"serde",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.0" version = "0.7.0"

View File

@ -15,6 +15,7 @@ once_cell = { version = "1.4.0", features = ["parking_lot"] }
percent-encoding = "2.1.0" percent-encoding = "2.1.0"
regex = "1.4" regex = "1.4"
serde = { version = "1.0", features = ["rc", "derive"] } serde = { version = "1.0", features = ["rc", "derive"] }
serde_regex = "1.1"
snafu = "0.6" snafu = "0.6"
[dev-dependencies] # In alphabetical order [dev-dependencies] # In alphabetical order

View File

@ -1,6 +1,8 @@
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use serde::{Deserialize, Serialize};
/// A ConsistentHasher implements a simple consistent hashing mechanism /// A ConsistentHasher implements a simple consistent hashing mechanism
/// that maps a point to the nearest "node" N. /// that maps a point to the nearest "node" N.
/// ///
@ -11,7 +13,7 @@ use std::hash::{Hash, Hasher};
/// ///
/// e.g. you can use it find the ShardID in vector of ShardIds /// e.g. you can use it find the ShardID in vector of ShardIds
/// that is closest to a given hash value. /// that is closest to a given hash value.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
pub struct ConsistentHasher<T> pub struct ConsistentHasher<T>
where where
T: Copy + Hash, T: Copy + Hash,

View File

@ -9,6 +9,7 @@ use std::{
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, Snafu}; use snafu::{OptionExt, Snafu};
use influxdb_line_protocol::ParsedLine; use influxdb_line_protocol::ParsedLine;
@ -34,7 +35,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// DatabaseRules contains the rules for replicating data, sending data to /// DatabaseRules contains the rules for replicating data, sending data to
/// subscribers, and querying data for a single database. /// subscribers, and querying data for a single database.
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct DatabaseRules { pub struct DatabaseRules {
/// The name of the database /// The name of the database
pub name: DatabaseName<'static>, pub name: DatabaseName<'static>,
@ -58,13 +59,13 @@ pub struct DatabaseRules {
pub write_buffer_connection: Option<WriteBufferConnection>, pub write_buffer_connection: Option<WriteBufferConnection>,
} }
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum WriteBufferConnection { pub enum WriteBufferConnection {
Writing(String), Writing(String),
Reading(String), Reading(String),
} }
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum RoutingRules { pub enum RoutingRules {
// A routing config defines the target where all data plane operations for // A routing config defines the target where all data plane operations for
// a given database are delegated to. // a given database are delegated to.
@ -120,7 +121,7 @@ pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60;
pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60; pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60;
/// Configures how data automatically flows through the system /// Configures how data automatically flows through the system
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct LifecycleRules { pub struct LifecycleRules {
/// Once the total amount of buffered data in memory reaches this size start /// Once the total amount of buffered data in memory reaches this size start
/// dropping data from memory /// dropping data from memory
@ -211,7 +212,7 @@ impl Default for LifecycleRules {
/// ///
/// The key is constructed in order of the template parts; thus ordering changes /// The key is constructed in order of the template parts; thus ordering changes
/// what partition key is generated. /// what partition key is generated.
#[derive(Debug, Default, Eq, PartialEq, Clone)] #[derive(Debug, Default, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct PartitionTemplate { pub struct PartitionTemplate {
pub parts: Vec<TemplatePart>, pub parts: Vec<TemplatePart>,
} }
@ -244,7 +245,7 @@ impl Partitioner for PartitionTemplate {
/// `TemplatePart` specifies what part of a row should be used to compute this /// `TemplatePart` specifies what part of a row should be used to compute this
/// part of a partition key. /// part of a partition key.
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum TemplatePart { pub enum TemplatePart {
/// The name of a table /// The name of a table
Table, Table,
@ -264,7 +265,7 @@ pub enum TemplatePart {
/// `RegexCapture` is for pulling parts of a string column into the partition /// `RegexCapture` is for pulling parts of a string column into the partition
/// key. /// key.
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct RegexCapture { pub struct RegexCapture {
pub column: String, pub column: String,
pub regex: String, pub regex: String,
@ -279,7 +280,7 @@ pub struct RegexCapture {
/// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce /// For example, a time format of "%Y-%m-%d %H:%M:%S" will produce
/// partition key parts such as "2021-03-14 12:25:21" and /// partition key parts such as "2021-03-14 12:25:21" and
/// "2021-04-14 12:24:21" /// "2021-04-14 12:24:21"
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct StrftimeColumn { pub struct StrftimeColumn {
pub column: String, pub column: String,
pub format: String, pub format: String,
@ -287,7 +288,7 @@ pub struct StrftimeColumn {
/// A routing config defines the destination where to route all data plane operations /// A routing config defines the destination where to route all data plane operations
/// for a given database. /// for a given database.
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct RoutingConfig { pub struct RoutingConfig {
pub sink: Sink, pub sink: Sink,
} }
@ -309,7 +310,7 @@ pub trait Sharder {
/// based on table name and assign to 1 of 10 shards. Within each /// based on table name and assign to 1 of 10 shards. Within each
/// shard you would have partitions, which would likely be based off time. /// shard you would have partitions, which would likely be based off time.
/// This makes it possible to horizontally scale out writes. /// This makes it possible to horizontally scale out writes.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
pub struct ShardConfig { pub struct ShardConfig {
/// Each matcher, if any, is evaluated in order. /// Each matcher, if any, is evaluated in order.
/// If there is a match, the route will be evaluated to /// If there is a match, the route will be evaluated to
@ -333,7 +334,7 @@ pub struct ShardConfig {
} }
/// Configuration for a specific IOx sink /// Configuration for a specific IOx sink
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum Sink { pub enum Sink {
Iox(NodeGroup), Iox(NodeGroup),
Kafka(KafkaProducer), Kafka(KafkaProducer),
@ -384,7 +385,7 @@ impl Sharder for ShardConfig {
/// Maps a matcher with specific shard. If the line/row matches /// Maps a matcher with specific shard. If the line/row matches
/// it should be sent to the group. /// it should be sent to the group.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
pub struct MatcherToShard { pub struct MatcherToShard {
pub matcher: Matcher, pub matcher: Matcher,
pub shard: ShardId, pub shard: ShardId,
@ -393,12 +394,12 @@ pub struct MatcherToShard {
/// A collection of IOx nodes /// A collection of IOx nodes
pub type NodeGroup = Vec<ServerId>; pub type NodeGroup = Vec<ServerId>;
#[derive(Debug, Eq, PartialEq, Clone)] #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct KafkaProducer {} pub struct KafkaProducer {}
/// HashRing is a rule for creating a hash key for a row and mapping that to /// HashRing is a rule for creating a hash key for a row and mapping that to
/// an individual node on a ring. /// an individual node on a ring.
#[derive(Debug, Eq, PartialEq, Clone, Default)] #[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
pub struct HashRing { pub struct HashRing {
/// If true the table name will be included in the hash key /// If true the table name will be included in the hash key
pub table_name: bool, pub table_name: bool,
@ -410,9 +411,10 @@ pub struct HashRing {
/// A matcher is used to match routing rules or subscriptions on a row-by-row /// A matcher is used to match routing rules or subscriptions on a row-by-row
/// (or line) basis. /// (or line) basis.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Matcher { pub struct Matcher {
/// if provided, match if the table name matches against the regex /// if provided, match if the table name matches against the regex
#[serde(with = "serde_regex")]
pub table_name_regex: Option<Regex>, pub table_name_regex: Option<Regex>,
// paul: what should we use for predicate matching here against a single row/line? // paul: what should we use for predicate matching here against a single row/line?
pub predicate: Option<String>, pub predicate: Option<String>,
@ -455,6 +457,7 @@ pub type PartitionId = String;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use influxdb_line_protocol::parse_lines; use influxdb_line_protocol::parse_lines;
use serde::{Deserialize, Serialize};
use super::*; use super::*;
@ -724,4 +727,11 @@ mod tests {
fn parse_line(line: &str) -> ParsedLine<'_> { fn parse_line(line: &str) -> ParsedLine<'_> {
parsed_lines(line).pop().unwrap() parsed_lines(line).pop().unwrap()
} }
#[test]
fn test_assert_serde() {
fn ensure<'de, T: Serialize + Deserialize<'de>>(_t: T) {}
ensure(DatabaseRules::new(DatabaseName::new("bananas").unwrap()));
}
} }