From fdcb8baec1cb83cd5d066fc4b3fbc53d17253456 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 26 Feb 2021 10:10:47 +0000 Subject: [PATCH] feat: conversion to/from data_types and generated_types (#848) --- data_types/src/database_rules.rs | 938 ++++++++++++++++++++++++++++- data_types/src/field_validation.rs | 112 ++++ data_types/src/lib.rs | 2 + 3 files changed, 1047 insertions(+), 5 deletions(-) create mode 100644 data_types/src/field_validation.rs diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index c11160d4e1..f66ba8ad93 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -1,9 +1,19 @@ -use influxdb_line_protocol::ParsedLine; +use std::convert::{TryFrom, TryInto}; use chrono::{DateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use generated_types::google::protobuf::Empty; +use generated_types::{ + google::{FieldViolation, FieldViolationExt}, + influxdata::iox::management::v1 as management, +}; +use influxdb_line_protocol::ParsedLine; + +use crate::field_validation::{FromField, FromFieldOpt, FromFieldString, FromFieldVec}; +use crate::DatabaseName; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Error in {}: {}", source_module, source))] @@ -22,7 +32,7 @@ pub struct DatabaseRules { /// The unencoded name of the database. This gets put in by the create /// database call, so an empty default is fine. #[serde(default)] - pub name: String, + pub name: String, // TODO: Use DatabaseName here /// Template that generates a partition key for each row inserted into the /// db #[serde(default)] @@ -137,6 +147,82 @@ impl Partitioner for DatabaseRules { } } +impl From for management::DatabaseRules { + fn from(rules: DatabaseRules) -> Self { + let subscriptions: Vec = + rules.subscriptions.into_iter().map(Into::into).collect(); + + let replication_config = management::ReplicationConfig { + replications: rules.replication, + replication_count: rules.replication_count as _, + replication_queue_max_size: rules.replication_queue_max_size as _, + }; + + let query_config = management::QueryConfig { + query_local: rules.query_local, + primary: rules.primary_query_group.unwrap_or_default(), + secondaries: rules.secondary_query_groups, + read_only_partitions: rules.read_only_partitions, + }; + + Self { + name: rules.name, + partition_template: Some(rules.partition_template.into()), + replication_config: Some(replication_config), + subscription_config: Some(management::SubscriptionConfig { subscriptions }), + query_config: Some(query_config), + wal_buffer_config: rules.wal_buffer_config.map(Into::into), + mutable_buffer_config: rules.mutable_buffer_config.map(Into::into), + } + } +} + +impl TryFrom for DatabaseRules { + type Error = FieldViolation; + + fn try_from(proto: management::DatabaseRules) -> Result { + DatabaseName::new(&proto.name).field("name")?; + + let subscriptions = proto + .subscription_config + .map(|s| { + s.subscriptions + .vec_field("subscription_config.subscriptions") + }) + .transpose()? + .unwrap_or_default(); + + let wal_buffer_config = proto.wal_buffer_config.optional("wal_buffer_config")?; + + let mutable_buffer_config = proto + .mutable_buffer_config + .optional("mutable_buffer_config")?; + + let partition_template = proto + .partition_template + .optional("partition_template")? + .unwrap_or_default(); + + let query = proto.query_config.unwrap_or_default(); + let replication = proto.replication_config.unwrap_or_default(); + + Ok(Self { + name: proto.name, + partition_template, + replication: replication.replications, + replication_count: replication.replication_count as _, + replication_queue_max_size: replication.replication_queue_max_size as _, + subscriptions, + query_local: query.query_local, + primary_query_group: query.primary.optional(), + secondary_query_groups: query.secondaries, + read_only_partitions: query.read_only_partitions, + wal_buffer_config, + mutable_buffer_config, + }) + } +} + /// MutableBufferConfig defines the configuration for the in-memory database /// that is hot for writes as they arrive. Operators can define rules for /// evicting data once the mutable buffer passes a set memory threshold. @@ -190,6 +276,47 @@ impl Default for MutableBufferConfig { } } +impl From for management::MutableBufferConfig { + fn from(config: MutableBufferConfig) -> Self { + Self { + buffer_size: config.buffer_size as _, + reject_if_not_persisted: config.reject_if_not_persisted, + partition_drop_order: Some(config.partition_drop_order.into()), + persist_after_cold_seconds: config.persist_after_cold_seconds.unwrap_or_default(), + } + } +} + +impl TryFrom for MutableBufferConfig { + type Error = FieldViolation; + + fn try_from(proto: management::MutableBufferConfig) -> Result { + let partition_drop_order = proto + .partition_drop_order + .optional("partition_drop_order")? + .unwrap_or_default(); + + let buffer_size = if proto.buffer_size == 0 { + DEFAULT_MUTABLE_BUFFER_SIZE + } else { + proto.buffer_size as usize + }; + + let persist_after_cold_seconds = if proto.persist_after_cold_seconds == 0 { + None + } else { + Some(proto.persist_after_cold_seconds) + }; + + Ok(Self { + buffer_size, + reject_if_not_persisted: proto.reject_if_not_persisted, + partition_drop_order, + persist_after_cold_seconds, + }) + } +} + /// This struct specifies the rules for the order to sort partitions /// from the mutable buffer. This is used to determine which order to drop them /// in. The last partition in the list will be dropped, until enough space has @@ -204,7 +331,7 @@ impl Default for MutableBufferConfig { /// sort: PartitionSort::CreatedAtTime, /// }; /// ``` -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct PartitionSortRules { /// Sort partitions by this order. Last will be dropped. pub order: Order, @@ -213,6 +340,30 @@ pub struct PartitionSortRules { pub sort: PartitionSort, } +impl From for management::mutable_buffer_config::PartitionDropOrder { + fn from(ps: PartitionSortRules) -> Self { + let order: management::Order = ps.order.into(); + + Self { + order: order as _, + sort: Some(ps.sort.into()), + } + } +} + +impl TryFrom for PartitionSortRules { + type Error = FieldViolation; + + fn try_from( + proto: management::mutable_buffer_config::PartitionDropOrder, + ) -> Result { + Ok(Self { + order: proto.order().scope("order")?, + sort: proto.sort.optional("sort")?.unwrap_or_default(), + }) + } +} + /// What to sort the partition by. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub enum PartitionSort { @@ -235,6 +386,57 @@ pub enum PartitionSort { Column(String, ColumnType, ColumnValue), } +impl Default for PartitionSort { + fn default() -> Self { + Self::LastWriteTime + } +} + +impl From for management::mutable_buffer_config::partition_drop_order::Sort { + fn from(ps: PartitionSort) -> Self { + use management::mutable_buffer_config::partition_drop_order::ColumnSort; + + match ps { + PartitionSort::LastWriteTime => Self::LastWriteTime(Empty {}), + PartitionSort::CreatedAtTime => Self::CreatedAtTime(Empty {}), + PartitionSort::Column(column_name, column_type, column_value) => { + let column_type: management::ColumnType = column_type.into(); + let column_value: management::Aggregate = column_value.into(); + + Self::Column(ColumnSort { + column_name, + column_type: column_type as _, + column_value: column_value as _, + }) + } + } + } +} + +impl TryFrom for PartitionSort { + type Error = FieldViolation; + + fn try_from( + proto: management::mutable_buffer_config::partition_drop_order::Sort, + ) -> Result { + use management::mutable_buffer_config::partition_drop_order::Sort; + + Ok(match proto { + Sort::LastWriteTime(_) => Self::LastWriteTime, + Sort::CreatedAtTime(_) => Self::CreatedAtTime, + Sort::Column(column_sort) => { + let column_type = column_sort.column_type().scope("column.column_type")?; + let column_value = column_sort.column_value().scope("column.column_value")?; + Self::Column( + column_sort.column_name.required("column.column_name")?, + column_type, + column_value, + ) + } + }) + } +} + /// The sort order. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub enum Order { @@ -242,6 +444,33 @@ pub enum Order { Desc, } +impl Default for Order { + fn default() -> Self { + Self::Asc + } +} + +impl From for management::Order { + fn from(o: Order) -> Self { + match o { + Order::Asc => Self::Asc, + Order::Desc => Self::Desc, + } + } +} + +impl TryFrom for Order { + type Error = FieldViolation; + + fn try_from(proto: management::Order) -> Result { + Ok(match proto { + management::Order::Unspecified => Self::default(), + management::Order::Asc => Self::Asc, + management::Order::Desc => Self::Desc, + }) + } +} + /// Use columns of this type. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub enum ColumnType { @@ -252,6 +481,33 @@ pub enum ColumnType { Bool, } +impl From for management::ColumnType { + fn from(t: ColumnType) -> Self { + match t { + ColumnType::I64 => Self::I64, + ColumnType::U64 => Self::U64, + ColumnType::F64 => Self::F64, + ColumnType::String => Self::String, + ColumnType::Bool => Self::Bool, + } + } +} + +impl TryFrom for ColumnType { + type Error = FieldViolation; + + fn try_from(proto: management::ColumnType) -> Result { + Ok(match proto { + management::ColumnType::Unspecified => return Err(FieldViolation::required("")), + management::ColumnType::I64 => Self::I64, + management::ColumnType::U64 => Self::U64, + management::ColumnType::F64 => Self::F64, + management::ColumnType::String => Self::String, + management::ColumnType::Bool => Self::Bool, + }) + } +} + /// Use either the min or max summary statistic. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub enum ColumnValue { @@ -259,6 +515,29 @@ pub enum ColumnValue { Max, } +impl From for management::Aggregate { + fn from(v: ColumnValue) -> Self { + match v { + ColumnValue::Min => Self::Min, + ColumnValue::Max => Self::Max, + } + } +} + +impl TryFrom for ColumnValue { + type Error = FieldViolation; + + fn try_from(proto: management::Aggregate) -> Result { + use management::Aggregate; + + Ok(match proto { + Aggregate::Unspecified => return Err(FieldViolation::required("")), + Aggregate::Min => Self::Min, + Aggregate::Max => Self::Max, + }) + } +} + /// WalBufferConfig defines the configuration for buffering data from the WAL in /// memory. This buffer is used for asynchronous replication and to collect /// segments before sending them to object storage. @@ -294,6 +573,45 @@ pub struct WalBufferConfig { pub close_segment_after: Option, } +impl From for management::WalBufferConfig { + fn from(rollover: WalBufferConfig) -> Self { + let buffer_rollover: management::wal_buffer_config::Rollover = + rollover.buffer_rollover.into(); + + Self { + buffer_size: rollover.buffer_size, + segment_size: rollover.segment_size, + buffer_rollover: buffer_rollover as _, + persist_segments: rollover.store_segments, + close_segment_after: rollover.close_segment_after.map(Into::into), + } + } +} + +impl TryFrom for WalBufferConfig { + type Error = FieldViolation; + + fn try_from(proto: management::WalBufferConfig) -> Result { + let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?; + let close_segment_after = proto + .close_segment_after + .map(TryInto::try_into) + .transpose() + .map_err(|_| FieldViolation { + field: "closeSegmentAfter".to_string(), + description: "Duration must be positive".to_string(), + })?; + + Ok(Self { + buffer_size: proto.buffer_size, + segment_size: proto.segment_size, + buffer_rollover, + store_segments: proto.persist_segments, + close_segment_after, + }) + } +} + /// WalBufferRollover defines the behavior of what should happen if a write /// comes in that would cause the buffer to exceed its max size AND the oldest /// segment can't be dropped because it has not yet been persisted. @@ -311,6 +629,30 @@ pub enum WalBufferRollover { ReturnError, } +impl From for management::wal_buffer_config::Rollover { + fn from(rollover: WalBufferRollover) -> Self { + match rollover { + WalBufferRollover::DropOldSegment => Self::DropOldSegment, + WalBufferRollover::DropIncoming => Self::DropIncoming, + WalBufferRollover::ReturnError => Self::ReturnError, + } + } +} + +impl TryFrom for WalBufferRollover { + type Error = FieldViolation; + + fn try_from(proto: management::wal_buffer_config::Rollover) -> Result { + use management::wal_buffer_config::Rollover; + Ok(match proto { + Rollover::Unspecified => return Err(FieldViolation::required("")), + Rollover::DropOldSegment => Self::DropOldSegment, + Rollover::DropIncoming => Self::DropIncoming, + Rollover::ReturnError => Self::ReturnError, + }) + } +} + /// `PartitionTemplate` is used to compute the partition key of each row that /// gets written. It can consist of the table name, a column name and its value, /// a formatted time, or a string column and regex captures of its value. For @@ -353,6 +695,23 @@ impl PartitionTemplate { } } +impl From for management::PartitionTemplate { + fn from(pt: PartitionTemplate) -> Self { + Self { + parts: pt.parts.into_iter().map(Into::into).collect(), + } + } +} + +impl TryFrom for PartitionTemplate { + type Error = FieldViolation; + + fn try_from(proto: management::PartitionTemplate) -> Result { + let parts = proto.parts.vec_field("parts")?; + Ok(Self { parts }) + } +} + /// `TemplatePart` specifies what part of a row should be used to compute this /// part of a partition key. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] @@ -380,6 +739,67 @@ pub struct StrftimeColumn { format: String, } +impl From for management::partition_template::part::Part { + fn from(part: TemplatePart) -> Self { + use management::partition_template::part::ColumnFormat; + + match part { + TemplatePart::Table => Self::Table(Empty {}), + TemplatePart::Column(column) => Self::Column(column), + TemplatePart::RegexCapture(RegexCapture { column, regex }) => { + Self::Regex(ColumnFormat { + column, + format: regex, + }) + } + TemplatePart::StrftimeColumn(StrftimeColumn { column, format }) => { + Self::StrfTime(ColumnFormat { column, format }) + } + TemplatePart::TimeFormat(format) => Self::Time(format), + } + } +} + +impl TryFrom for TemplatePart { + type Error = FieldViolation; + + fn try_from(proto: management::partition_template::part::Part) -> Result { + use management::partition_template::part::{ColumnFormat, Part}; + + Ok(match proto { + Part::Table(_) => Self::Table, + Part::Column(column) => Self::Column(column.required("column")?), + Part::Regex(ColumnFormat { column, format }) => Self::RegexCapture(RegexCapture { + column: column.required("regex.column")?, + regex: format.required("regex.format")?, + }), + Part::StrfTime(ColumnFormat { column, format }) => { + Self::StrftimeColumn(StrftimeColumn { + column: column.required("strf_time.column")?, + format: format.required("strf_time.format")?, + }) + } + Part::Time(format) => Self::TimeFormat(format.required("time")?), + }) + } +} + +impl From for management::partition_template::Part { + fn from(part: TemplatePart) -> Self { + Self { + part: Some(part.into()), + } + } +} + +impl TryFrom for TemplatePart { + type Error = FieldViolation; + + fn try_from(proto: management::partition_template::Part) -> Result { + proto.part.required("part") + } +} + /// `PartitionId` is the object storage identifier for a specific partition. It /// should be a path that can be used against an object store to locate all the /// files and subdirectories for a partition. It takes the form of `/ for management::subscription_config::Subscription { + fn from(s: Subscription) -> Self { + Self { + name: s.name, + host_group_id: s.host_group_id, + matcher: Some(s.matcher.into()), + } + } +} + +impl TryFrom for Subscription { + type Error = FieldViolation; + + fn try_from(proto: management::subscription_config::Subscription) -> Result { + Ok(Self { + name: proto.name.required("name")?, + host_group_id: proto.host_group_id.required("host_group_id")?, + matcher: proto.matcher.optional("matcher")?.unwrap_or_default(), + }) + } +} + /// `Matcher` specifies the rule against the table name and/or a predicate /// against the row to determine if it matches the write rule. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct Matcher { pub tables: MatchTables, // TODO: make this work with query::Predicate @@ -413,6 +855,26 @@ pub struct Matcher { pub predicate: Option, } +impl From for management::Matcher { + fn from(m: Matcher) -> Self { + Self { + predicate: m.predicate.unwrap_or_default(), + table_matcher: Some(m.tables.into()), + } + } +} + +impl TryFrom for Matcher { + type Error = FieldViolation; + + fn try_from(proto: management::Matcher) -> Result { + Ok(Self { + tables: proto.table_matcher.required("table_matcher")?, + predicate: proto.predicate.optional(), + }) + } +} + /// `MatchTables` looks at the table name of a row to determine if it should /// match the rule. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] @@ -424,6 +886,35 @@ pub enum MatchTables { Regex(String), } +impl Default for MatchTables { + fn default() -> Self { + Self::All + } +} + +impl From for management::matcher::TableMatcher { + fn from(m: MatchTables) -> Self { + match m { + MatchTables::All => Self::All(Empty {}), + MatchTables::Table(table) => Self::Table(table), + MatchTables::Regex(regex) => Self::Regex(regex), + } + } +} + +impl TryFrom for MatchTables { + type Error = FieldViolation; + + fn try_from(proto: management::matcher::TableMatcher) -> Result { + use management::matcher::TableMatcher; + Ok(match proto { + TableMatcher::All(_) => Self::All, + TableMatcher::Table(table) => Self::Table(table.required("table_matcher.table")?), + TableMatcher::Regex(regex) => Self::Regex(regex.required("table_matcher.regex")?), + }) + } +} + pub type HostGroupId = String; #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] @@ -435,9 +926,10 @@ pub struct HostGroup { #[cfg(test)] mod tests { - use super::*; use influxdb_line_protocol::parse_lines; + use super::*; + type TestError = Box; type Result = std::result::Result; @@ -598,4 +1090,440 @@ mod tests { fn parse_line(line: &str) -> ParsedLine<'_> { parsed_lines(line).pop().unwrap() } + + #[test] + fn test_database_rules_defaults() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(rules.name, protobuf.name); + assert_eq!(protobuf.name, back.name); + + assert_eq!(rules.partition_template.parts.len(), 0); + assert_eq!(rules.subscriptions.len(), 0); + assert!(rules.primary_query_group.is_none()); + assert_eq!(rules.read_only_partitions.len(), 0); + assert_eq!(rules.secondary_query_groups.len(), 0); + + // These will be defaulted as optionality not preserved on non-protobuf + // DatabaseRules + assert_eq!(back.replication_config, Some(Default::default())); + assert_eq!(back.subscription_config, Some(Default::default())); + assert_eq!(back.query_config, Some(Default::default())); + assert_eq!(back.partition_template, Some(Default::default())); + + // These should be none as preserved on non-protobuf DatabaseRules + assert!(back.wal_buffer_config.is_none()); + assert!(back.mutable_buffer_config.is_none()); + } + + #[test] + fn test_database_rules_query() { + let readonly = vec!["readonly1".to_string(), "readonly2".to_string()]; + let secondaries = vec!["secondary1".to_string(), "secondary2".to_string()]; + + let protobuf = management::DatabaseRules { + name: "database".to_string(), + query_config: Some(management::QueryConfig { + query_local: true, + primary: "primary".to_string(), + secondaries: secondaries.clone(), + read_only_partitions: readonly.clone(), + }), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(rules.name, protobuf.name); + assert_eq!(protobuf.name, back.name); + + assert_eq!(rules.read_only_partitions, readonly); + assert_eq!(rules.primary_query_group, Some("primary".to_string())); + assert_eq!(rules.secondary_query_groups, secondaries); + assert_eq!(rules.subscriptions.len(), 0); + assert_eq!(rules.partition_template.parts.len(), 0); + + // Should be the same as was specified + assert_eq!(back.query_config, protobuf.query_config); + assert!(back.wal_buffer_config.is_none()); + assert!(back.mutable_buffer_config.is_none()); + + // These will be defaulted as optionality not preserved on non-protobuf + // DatabaseRules + assert_eq!(back.replication_config, Some(Default::default())); + assert_eq!(back.subscription_config, Some(Default::default())); + assert_eq!(back.partition_template, Some(Default::default())); + } + + #[test] + fn test_query_config_default() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + query_config: Some(Default::default()), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert!(rules.primary_query_group.is_none()); + assert_eq!(rules.secondary_query_groups.len(), 0); + assert_eq!(rules.read_only_partitions.len(), 0); + assert_eq!(rules.query_local, false); + + assert_eq!(protobuf.query_config, back.query_config); + } + + #[test] + fn test_partition_template_default() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + partition_template: Some(management::PartitionTemplate { parts: vec![] }), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(rules.partition_template.parts.len(), 0); + assert_eq!(protobuf.partition_template, back.partition_template); + } + + #[test] + fn test_partition_template_no_part() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + partition_template: Some(management::PartitionTemplate { + parts: vec![Default::default()], + }), + ..Default::default() + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "partition_template.parts.0.part"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_partition_template() { + use management::partition_template::part::{ColumnFormat, Part}; + + let protobuf = management::PartitionTemplate { + parts: vec![ + management::partition_template::Part { + part: Some(Part::Time("time".to_string())), + }, + management::partition_template::Part { + part: Some(Part::Table(Empty {})), + }, + management::partition_template::Part { + part: Some(Part::Regex(ColumnFormat { + column: "column".to_string(), + format: "format".to_string(), + })), + }, + ], + }; + + let pt: PartitionTemplate = protobuf.clone().try_into().unwrap(); + let back: management::PartitionTemplate = pt.clone().into(); + + assert_eq!( + pt.parts, + vec![ + TemplatePart::TimeFormat("time".to_string()), + TemplatePart::Table, + TemplatePart::RegexCapture(RegexCapture { + column: "column".to_string(), + regex: "format".to_string() + }) + ] + ); + assert_eq!(protobuf, back); + } + + #[test] + fn test_partition_template_empty() { + use management::partition_template::part::{ColumnFormat, Part}; + + let protobuf = management::PartitionTemplate { + parts: vec![management::partition_template::Part { + part: Some(Part::Regex(ColumnFormat { + ..Default::default() + })), + }], + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "parts.0.part.regex.column"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_wal_buffer_config_default() { + let protobuf: management::WalBufferConfig = Default::default(); + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "buffer_rollover"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_wal_buffer_config_rollover() { + let protobuf = management::WalBufferConfig { + buffer_rollover: management::wal_buffer_config::Rollover::DropIncoming as _, + ..Default::default() + }; + + let config: WalBufferConfig = protobuf.clone().try_into().unwrap(); + let back: management::WalBufferConfig = config.clone().into(); + + assert_eq!(config.buffer_rollover, WalBufferRollover::DropIncoming); + assert_eq!(protobuf, back); + } + + #[test] + fn test_wal_buffer_config_negative_duration() { + use generated_types::google::protobuf::Duration; + + let protobuf = management::WalBufferConfig { + buffer_rollover: management::wal_buffer_config::Rollover::DropOldSegment as _, + close_segment_after: Some(Duration { + seconds: -1, + nanos: -40, + }), + ..Default::default() + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "closeSegmentAfter"); + assert_eq!(&err.description, "Duration must be positive"); + } + + #[test] + fn test_matcher_default() { + let protobuf: management::Matcher = Default::default(); + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "table_matcher"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_matcher() { + let protobuf = management::Matcher { + predicate: Default::default(), + table_matcher: Some(management::matcher::TableMatcher::Regex( + "regex".to_string(), + )), + }; + let matcher: Matcher = protobuf.try_into().unwrap(); + + assert_eq!(matcher.tables, MatchTables::Regex("regex".to_string())); + assert!(matcher.predicate.is_none()); + } + + #[test] + fn test_subscription_default() { + let pb_matcher = Some(management::Matcher { + predicate: "predicate1".to_string(), + table_matcher: Some(management::matcher::TableMatcher::Table( + "table".to_string(), + )), + }); + + let matcher = Matcher { + tables: MatchTables::Table("table".to_string()), + predicate: Some("predicate1".to_string()), + }; + + let subscription_config = management::SubscriptionConfig { + subscriptions: vec![ + management::subscription_config::Subscription { + name: "subscription1".to_string(), + host_group_id: "host group".to_string(), + matcher: pb_matcher.clone(), + }, + management::subscription_config::Subscription { + name: "subscription2".to_string(), + host_group_id: "host group".to_string(), + matcher: pb_matcher, + }, + ], + }; + + let protobuf = management::DatabaseRules { + name: "database".to_string(), + subscription_config: Some(subscription_config), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(protobuf.subscription_config, back.subscription_config); + assert_eq!( + rules.subscriptions, + vec![ + Subscription { + name: "subscription1".to_string(), + host_group_id: "host group".to_string(), + matcher: matcher.clone() + }, + Subscription { + name: "subscription2".to_string(), + host_group_id: "host group".to_string(), + matcher + } + ] + ) + } + + #[test] + fn mutable_buffer_config_default() { + let protobuf: management::MutableBufferConfig = Default::default(); + + let config: MutableBufferConfig = protobuf.try_into().unwrap(); + let back: management::MutableBufferConfig = config.clone().into(); + + assert_eq!(config.buffer_size, DEFAULT_MUTABLE_BUFFER_SIZE); + assert_eq!(config.persist_after_cold_seconds, None); + assert_eq!(config.partition_drop_order, PartitionSortRules::default()); + assert!(!config.reject_if_not_persisted); + + assert_eq!(back.reject_if_not_persisted, config.reject_if_not_persisted); + assert_eq!(back.buffer_size as usize, config.buffer_size); + assert_eq!( + back.partition_drop_order, + Some(PartitionSortRules::default().into()) + ); + assert_eq!(back.persist_after_cold_seconds, 0); + } + + #[test] + fn mutable_buffer_config() { + let protobuf = management::MutableBufferConfig { + buffer_size: 32, + reject_if_not_persisted: true, + partition_drop_order: Some(management::mutable_buffer_config::PartitionDropOrder { + order: management::Order::Desc as _, + sort: None, + }), + persist_after_cold_seconds: 439, + }; + + let config: MutableBufferConfig = protobuf.clone().try_into().unwrap(); + let back: management::MutableBufferConfig = config.clone().into(); + + assert_eq!(config.buffer_size, protobuf.buffer_size as usize); + assert_eq!( + config.persist_after_cold_seconds, + Some(protobuf.persist_after_cold_seconds) + ); + assert_eq!(config.partition_drop_order.order, Order::Desc); + assert!(config.reject_if_not_persisted); + + assert_eq!(back.reject_if_not_persisted, config.reject_if_not_persisted); + assert_eq!(back.buffer_size as usize, config.buffer_size); + assert_eq!( + back.persist_after_cold_seconds, + protobuf.persist_after_cold_seconds + ); + } + + #[test] + fn partition_drop_order_default() { + let protobuf: management::mutable_buffer_config::PartitionDropOrder = Default::default(); + let config: PartitionSortRules = protobuf.try_into().unwrap(); + + assert_eq!(config, PartitionSortRules::default()); + assert_eq!(config.order, Order::default()); + assert_eq!(config.sort, PartitionSort::default()); + } + + #[test] + fn partition_drop_order() { + use management::mutable_buffer_config::{partition_drop_order::Sort, PartitionDropOrder}; + let protobuf = PartitionDropOrder { + order: management::Order::Asc as _, + sort: Some(Sort::CreatedAtTime(Empty {})), + }; + let config: PartitionSortRules = protobuf.clone().try_into().unwrap(); + let back: PartitionDropOrder = config.clone().into(); + + assert_eq!(protobuf, back); + assert_eq!(config.order, Order::Asc); + assert_eq!(config.sort, PartitionSort::CreatedAtTime); + } + + #[test] + fn partition_sort() { + use management::mutable_buffer_config::partition_drop_order::{ColumnSort, Sort}; + + let created_at: PartitionSort = Sort::CreatedAtTime(Empty {}).try_into().unwrap(); + let last_write: PartitionSort = Sort::LastWriteTime(Empty {}).try_into().unwrap(); + let column: PartitionSort = Sort::Column(ColumnSort { + column_name: "column".to_string(), + column_type: management::ColumnType::Bool as _, + column_value: management::Aggregate::Min as _, + }) + .try_into() + .unwrap(); + + assert_eq!(created_at, PartitionSort::CreatedAtTime); + assert_eq!(last_write, PartitionSort::LastWriteTime); + assert_eq!( + column, + PartitionSort::Column("column".to_string(), ColumnType::Bool, ColumnValue::Min) + ); + } + + #[test] + fn partition_sort_column_sort() { + use management::mutable_buffer_config::partition_drop_order::{ColumnSort, Sort}; + + let res: Result = Sort::Column(Default::default()).try_into(); + let err1 = res.expect_err("expected failure"); + + let res: Result = Sort::Column(ColumnSort { + column_type: management::ColumnType::F64 as _, + ..Default::default() + }) + .try_into(); + let err2 = res.expect_err("expected failure"); + + let res: Result = Sort::Column(ColumnSort { + column_type: management::ColumnType::F64 as _, + column_value: management::Aggregate::Max as _, + ..Default::default() + }) + .try_into(); + let err3 = res.expect_err("expected failure"); + + assert_eq!(err1.field, "column.column_type"); + assert_eq!(err1.description, "Field is required"); + + assert_eq!(err2.field, "column.column_value"); + assert_eq!(err2.description, "Field is required"); + + assert_eq!(err3.field, "column.column_name"); + assert_eq!(err3.description, "Field is required"); + } } diff --git a/data_types/src/field_validation.rs b/data_types/src/field_validation.rs new file mode 100644 index 0000000000..49ae635a6d --- /dev/null +++ b/data_types/src/field_validation.rs @@ -0,0 +1,112 @@ +//! A collection of extension traits for types that +//! implement TryInto +//! +//! Allows associating field context with the generated errors +//! as they propagate up the struct topology + +use generated_types::google::FieldViolation; +use std::convert::TryInto; + +/// An extension trait that adds the method `scope` to any type +/// implementing `TryInto` +pub(crate) trait FromField { + fn scope(self, field: impl Into) -> Result; +} + +impl FromField for T +where + T: TryInto, +{ + /// Try to convert type using TryInto calling `FieldViolation::scope` + /// on any returned error + fn scope(self, field: impl Into) -> Result { + self.try_into().map_err(|e| e.scope(field)) + } +} + +/// An extension trait that adds the methods `optional` and `required` to any +/// Option containing a type implementing `TryInto` +pub(crate) trait FromFieldOpt { + /// Try to convert inner type, if any, using TryInto calling + /// `FieldViolation::scope` on any error encountered + /// + /// Returns None if empty + fn optional(self, field: impl Into) -> Result, FieldViolation>; + + /// Try to convert inner type, using TryInto calling `FieldViolation::scope` + /// on any error encountered + /// + /// Returns an error if empty + fn required(self, field: impl Into) -> Result; +} + +impl FromFieldOpt for Option +where + T: TryInto, +{ + fn optional(self, field: impl Into) -> Result, FieldViolation> { + self.map(|t| t.scope(field)).transpose() + } + + fn required(self, field: impl Into) -> Result { + match self { + None => Err(FieldViolation::required(field)), + Some(t) => t.scope(field), + } + } +} + +/// An extension trait that adds the methods `optional` and `required` to any +/// String +/// +/// Prost will default string fields to empty, whereas IOx sometimes +/// uses Option, this helper aids mapping between them +/// +/// TODO: Review mixed use of Option and String in IOX +pub(crate) trait FromFieldString { + /// Returns a Ok if the String is not empty + fn required(self, field: impl Into) -> Result; + + /// Wraps non-empty strings in Some(_), returns None for empty strings + fn optional(self) -> Option; +} + +impl FromFieldString for String { + fn required(self, field: impl Into) -> Result { + if self.is_empty() { + return Err(FieldViolation::required(field)); + } + Ok(self) + } + + fn optional(self) -> Option { + if self.is_empty() { + return None; + } + Some(self) + } +} + +/// An extension trait that adds the method `vec_field` to any Vec of a type +/// implementing `TryInto` +pub(crate) trait FromFieldVec { + /// Converts to a `Vec`, short-circuiting on the first error and + /// returning a correctly scoped `FieldViolation` for where the error + /// was encountered + fn vec_field(self, field: impl Into) -> Result; +} + +impl FromFieldVec> for Vec +where + T: TryInto, +{ + fn vec_field(self, field: impl Into) -> Result, FieldViolation> { + let res: Result<_, _> = self + .into_iter() + .enumerate() + .map(|(i, t)| t.scope(i.to_string())) + .collect(); + + res.map_err(|e| e.scope(field)) + } +} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 7cf358c180..d3766053b6 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -32,3 +32,5 @@ pub mod wal; mod database_name; pub use database_name::*; + +pub(crate) mod field_validation;