diff --git a/Cargo.lock b/Cargo.lock index 7eb215ca1d..286b5742ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,17 +774,14 @@ name = "data_types" version = "0.1.0" dependencies = [ "chrono", - "generated_types", "influxdb_line_protocol", "observability_deps", "percent-encoding", - "prost", "regex", "serde", "serde_regex", "snafu", "test_helpers", - "tonic", ] [[package]] @@ -1202,6 +1199,7 @@ name = "generated_types" version = "0.1.0" dependencies = [ "bytes", + "data_types", "flatbuffers", "futures", "google_types", @@ -1209,8 +1207,10 @@ dependencies = [ "prost", "prost-build", "prost-types", + "regex", "serde", "serde_json", + "thiserror", "tonic", "tonic-build", ] diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 3bc511915a..8a02fb79f0 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -8,15 +8,12 @@ readme = "README.md" [dependencies] # In alphabetical order chrono = { version = "0.4", features = ["serde"] } -generated_types = { path = "../generated_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } percent-encoding = "2.1.0" -prost = "0.7" regex = "1.4" -serde = { version = "1.0", features = ["rc"] } +serde = { version = "1.0", features = ["rc", "derive"] } serde_regex = "1.1" snafu = "0.6" -tonic = { version = "0.4.0" } observability_deps = { path = "../observability_deps" } [dev-dependencies] # In alphabetical order diff --git a/data_types/src/chunk.rs b/data_types/src/chunk.rs index 60d17f15a0..1eedfaded9 100644 --- a/data_types/src/chunk.rs +++ b/data_types/src/chunk.rs @@ -1,12 +1,7 @@ //! Module contains a representation of chunk metadata -use std::{ - convert::{TryFrom, TryInto}, - sync::Arc, -}; +use std::sync::Arc; -use crate::field_validation::FromField; use chrono::{DateTime, Utc}; -use generated_types::{google::FieldViolation, influxdata::iox::management::v1 as management}; use serde::{Deserialize, Serialize}; /// Which storage system is a chunk located in? @@ -100,221 +95,3 @@ impl ChunkSummary { } } } - -/// Conversion code to management API chunk structure -impl From for management::Chunk { - fn from(summary: ChunkSummary) -> Self { - let ChunkSummary { - partition_key, - table_name, - id, - storage, - estimated_bytes, - row_count, - time_of_first_write, - time_of_last_write, - time_closed, - } = summary; - - let storage: management::ChunkStorage = storage.into(); - let storage = storage.into(); // convert to i32 - - let estimated_bytes = estimated_bytes as u64; - let row_count = row_count as u64; - - let partition_key = match Arc::try_unwrap(partition_key) { - // no one else has a reference so take the string - Ok(partition_key) => partition_key, - // some other reference exists to this string, so clone it - Err(partition_key) => partition_key.as_ref().clone(), - }; - let table_name = match Arc::try_unwrap(table_name) { - // no one else has a reference so take the string - Ok(table_name) => table_name, - // some other reference exists to this string, so clone it - Err(table_name) => table_name.as_ref().clone(), - }; - - let time_of_first_write = time_of_first_write.map(|t| t.into()); - let time_of_last_write = time_of_last_write.map(|t| t.into()); - let time_closed = time_closed.map(|t| t.into()); - - Self { - partition_key, - table_name, - id, - storage, - estimated_bytes, - row_count, - time_of_first_write, - time_of_last_write, - time_closed, - } - } -} - -impl From for management::ChunkStorage { - fn from(storage: ChunkStorage) -> Self { - match storage { - ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer, - ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer, - ChunkStorage::ReadBuffer => Self::ReadBuffer, - ChunkStorage::ReadBufferAndObjectStore => Self::ReadBufferAndObjectStore, - ChunkStorage::ObjectStoreOnly => Self::ObjectStoreOnly, - } - } -} - -/// Conversion code from management API chunk structure -impl TryFrom for ChunkSummary { - type Error = FieldViolation; - - fn try_from(proto: management::Chunk) -> Result { - // Use prost enum conversion - let storage = proto.storage().scope("storage")?; - - let time_of_first_write = proto - .time_of_first_write - .map(TryInto::try_into) - .transpose() - .map_err(|_| FieldViolation { - field: "time_of_first_write".to_string(), - description: "Timestamp must be positive".to_string(), - })?; - - let time_of_last_write = proto - .time_of_last_write - .map(TryInto::try_into) - .transpose() - .map_err(|_| FieldViolation { - field: "time_of_last_write".to_string(), - description: "Timestamp must be positive".to_string(), - })?; - - let time_closed = proto - .time_closed - .map(TryInto::try_into) - .transpose() - .map_err(|_| FieldViolation { - field: "time_closed".to_string(), - description: "Timestamp must be positive".to_string(), - })?; - - let management::Chunk { - partition_key, - table_name, - id, - estimated_bytes, - row_count, - .. - } = proto; - - let estimated_bytes = estimated_bytes as usize; - let row_count = row_count as usize; - let partition_key = Arc::new(partition_key); - let table_name = Arc::new(table_name); - - Ok(Self { - partition_key, - table_name, - id, - storage, - estimated_bytes, - row_count, - time_of_first_write, - time_of_last_write, - time_closed, - }) - } -} - -impl TryFrom for ChunkStorage { - type Error = FieldViolation; - - fn try_from(proto: management::ChunkStorage) -> Result { - match proto { - management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer), - management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer), - management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer), - management::ChunkStorage::ReadBufferAndObjectStore => { - Ok(Self::ReadBufferAndObjectStore) - } - management::ChunkStorage::ObjectStoreOnly => Ok(Self::ObjectStoreOnly), - management::ChunkStorage::Unspecified => Err(FieldViolation::required("")), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn valid_proto_to_summary() { - let proto = management::Chunk { - partition_key: "foo".to_string(), - table_name: "bar".to_string(), - id: 42, - estimated_bytes: 1234, - row_count: 321, - storage: management::ChunkStorage::ObjectStoreOnly.into(), - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }; - - let summary = ChunkSummary::try_from(proto).expect("conversion successful"); - let expected = ChunkSummary { - partition_key: Arc::new("foo".to_string()), - table_name: Arc::new("bar".to_string()), - id: 42, - estimated_bytes: 1234, - row_count: 321, - storage: ChunkStorage::ObjectStoreOnly, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }; - - assert_eq!( - summary, expected, - "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", - summary, expected - ); - } - - #[test] - fn valid_summary_to_proto() { - let summary = ChunkSummary { - partition_key: Arc::new("foo".to_string()), - table_name: Arc::new("bar".to_string()), - id: 42, - estimated_bytes: 1234, - row_count: 321, - storage: ChunkStorage::ObjectStoreOnly, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }; - - let proto = management::Chunk::try_from(summary).expect("conversion successful"); - - let expected = management::Chunk { - partition_key: "foo".to_string(), - table_name: "bar".to_string(), - id: 42, - estimated_bytes: 1234, - row_count: 321, - storage: management::ChunkStorage::ObjectStoreOnly.into(), - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - }; - - assert_eq!( - proto, expected, - "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", - proto, expected - ); - } -} diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 225e809f90..c7ad9f7bb1 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -1,21 +1,11 @@ -use crate::{ - consistent_hasher::ConsistentHasher, - field_validation::{FromField, FromFieldOpt, FromFieldString, FromFieldVec}, - server_id::ServerId, - DatabaseName, -}; +use crate::{consistent_hasher::ConsistentHasher, server_id::ServerId, DatabaseName}; use chrono::{DateTime, TimeZone, Utc}; -use generated_types::{ - google::{protobuf::Empty, FieldViolation, FieldViolationExt}, - influxdata::iox::management::v1 as management, -}; use influxdb_line_protocol::ParsedLine; use regex::Regex; use snafu::{OptionExt, Snafu}; use std::num::NonZeroU64; use std::{ collections::HashMap, - convert::{TryFrom, TryInto}, hash::{Hash, Hasher}, num::{NonZeroU32, NonZeroUsize}, sync::Arc, @@ -29,15 +19,6 @@ pub enum Error { source: Box, }, - #[snafu(context(false))] - ProstDecodeError { source: prost::DecodeError }, - - #[snafu(context(false))] - ProstEncodeError { source: prost::EncodeError }, - - #[snafu(context(false))] - FieldViolation { source: FieldViolation }, - #[snafu(display("No sharding rule matches line: {}", line))] NoShardingRuleMatches { line: String }, @@ -99,18 +80,6 @@ impl DatabaseRules { } } -impl DatabaseRules { - pub fn decode(bytes: prost::bytes::Bytes) -> Result { - let message: management::DatabaseRules = prost::Message::decode(bytes)?; - Ok(message.try_into()?) - } - - pub fn encode(self, bytes: &mut prost::bytes::BytesMut) -> Result<()> { - let encoded: management::DatabaseRules = self.into(); - Ok(prost::Message::encode(&encoded, bytes)?) - } -} - /// Generates a partition key based on the line and the default time. pub trait Partitioner { fn partition_key( @@ -126,51 +95,6 @@ impl Partitioner for DatabaseRules { } } -impl From for management::DatabaseRules { - fn from(rules: DatabaseRules) -> Self { - Self { - name: rules.name.into(), - partition_template: Some(rules.partition_template.into()), - write_buffer_config: rules.write_buffer_config.map(Into::into), - lifecycle_rules: Some(rules.lifecycle_rules.into()), - shard_config: rules.shard_config.map(Into::into), - } - } -} - -impl TryFrom for DatabaseRules { - type Error = FieldViolation; - - fn try_from(proto: management::DatabaseRules) -> Result { - let name = DatabaseName::new(proto.name.clone()).field("name")?; - - let write_buffer_config = proto.write_buffer_config.optional("write_buffer_config")?; - - let lifecycle_rules = proto - .lifecycle_rules - .optional("lifecycle_rules")? - .unwrap_or_default(); - - let partition_template = proto - .partition_template - .optional("partition_template")? - .unwrap_or_default(); - - let shard_config = proto - .shard_config - .optional("shard_config") - .unwrap_or_default(); - - Ok(Self { - name, - partition_template, - write_buffer_config, - lifecycle_rules, - shard_config, - }) - } -} - /// Configures how data automatically flows through the system #[derive(Debug, Default, Eq, PartialEq, Clone)] pub struct LifecycleRules { @@ -221,57 +145,6 @@ pub struct LifecycleRules { pub worker_backoff_millis: Option, } -impl From for management::LifecycleRules { - fn from(config: LifecycleRules) -> Self { - Self { - mutable_linger_seconds: config - .mutable_linger_seconds - .map(Into::into) - .unwrap_or_default(), - mutable_minimum_age_seconds: config - .mutable_minimum_age_seconds - .map(Into::into) - .unwrap_or_default(), - mutable_size_threshold: config - .mutable_size_threshold - .map(|x| x.get() as u64) - .unwrap_or_default(), - buffer_size_soft: config - .buffer_size_soft - .map(|x| x.get() as u64) - .unwrap_or_default(), - buffer_size_hard: config - .buffer_size_hard - .map(|x| x.get() as u64) - .unwrap_or_default(), - sort_order: Some(config.sort_order.into()), - drop_non_persisted: config.drop_non_persisted, - persist: config.persist, - immutable: config.immutable, - worker_backoff_millis: config.worker_backoff_millis.map_or(0, NonZeroU64::get), - } - } -} - -impl TryFrom for LifecycleRules { - type Error = FieldViolation; - - fn try_from(proto: management::LifecycleRules) -> Result { - Ok(Self { - mutable_linger_seconds: proto.mutable_linger_seconds.try_into().ok(), - mutable_minimum_age_seconds: proto.mutable_minimum_age_seconds.try_into().ok(), - mutable_size_threshold: (proto.mutable_size_threshold as usize).try_into().ok(), - buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(), - buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(), - sort_order: proto.sort_order.optional("sort_order")?.unwrap_or_default(), - drop_non_persisted: proto.drop_non_persisted, - persist: proto.persist, - immutable: proto.immutable, - worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis), - }) - } -} - /// This struct specifies the rules for the order to sort partitions /// from the mutable buffer. This is used to determine which order to drop them /// in. The last partition in the list will be dropped, until enough space has @@ -295,28 +168,6 @@ pub struct SortOrder { pub sort: Sort, } -impl From for management::lifecycle_rules::SortOrder { - fn from(ps: SortOrder) -> Self { - let order: management::Order = ps.order.into(); - - Self { - order: order as _, - sort: Some(ps.sort.into()), - } - } -} - -impl TryFrom for SortOrder { - type Error = FieldViolation; - - fn try_from(proto: management::lifecycle_rules::SortOrder) -> Result { - Ok(Self { - order: proto.order().scope("order")?, - sort: proto.sort.optional("sort")?.unwrap_or_default(), - }) - } -} - /// What to sort the partition by. #[derive(Debug, Eq, PartialEq, Clone)] pub enum Sort { @@ -345,49 +196,6 @@ impl Default for Sort { } } -impl From for management::lifecycle_rules::sort_order::Sort { - fn from(ps: Sort) -> Self { - use management::lifecycle_rules::sort_order::ColumnSort; - - match ps { - Sort::LastWriteTime => Self::LastWriteTime(Empty {}), - Sort::CreatedAtTime => Self::CreatedAtTime(Empty {}), - Sort::Column(column_name, column_type, column_value) => { - let column_type: management::ColumnType = column_type.into(); - let column_value: management::Aggregate = column_value.into(); - - Self::Column(ColumnSort { - column_name, - column_type: column_type as _, - column_value: column_value as _, - }) - } - } - } -} - -impl TryFrom for Sort { - type Error = FieldViolation; - - fn try_from(proto: management::lifecycle_rules::sort_order::Sort) -> Result { - use management::lifecycle_rules::sort_order::Sort; - - Ok(match proto { - Sort::LastWriteTime(_) => Self::LastWriteTime, - Sort::CreatedAtTime(_) => Self::CreatedAtTime, - Sort::Column(column_sort) => { - let column_type = column_sort.column_type().scope("column.column_type")?; - let column_value = column_sort.column_value().scope("column.column_value")?; - Self::Column( - column_sort.column_name.required("column.column_name")?, - column_type, - column_value, - ) - } - }) - } -} - /// The sort order. #[derive(Debug, Eq, PartialEq, Clone)] pub enum Order { @@ -401,27 +209,6 @@ impl Default for Order { } } -impl From for management::Order { - fn from(o: Order) -> Self { - match o { - Order::Asc => Self::Asc, - Order::Desc => Self::Desc, - } - } -} - -impl TryFrom for Order { - type Error = FieldViolation; - - fn try_from(proto: management::Order) -> Result { - Ok(match proto { - management::Order::Unspecified => Self::default(), - management::Order::Asc => Self::Asc, - management::Order::Desc => Self::Desc, - }) - } -} - /// Use columns of this type. #[derive(Debug, Eq, PartialEq, Clone)] pub enum ColumnType { @@ -432,33 +219,6 @@ pub enum ColumnType { Bool, } -impl From for management::ColumnType { - fn from(t: ColumnType) -> Self { - match t { - ColumnType::I64 => Self::I64, - ColumnType::U64 => Self::U64, - ColumnType::F64 => Self::F64, - ColumnType::String => Self::String, - ColumnType::Bool => Self::Bool, - } - } -} - -impl TryFrom for ColumnType { - type Error = FieldViolation; - - fn try_from(proto: management::ColumnType) -> Result { - Ok(match proto { - management::ColumnType::Unspecified => return Err(FieldViolation::required("")), - management::ColumnType::I64 => Self::I64, - management::ColumnType::U64 => Self::U64, - management::ColumnType::F64 => Self::F64, - management::ColumnType::String => Self::String, - management::ColumnType::Bool => Self::Bool, - }) - } -} - /// Use either the min or max summary statistic. #[derive(Debug, Eq, PartialEq, Clone)] pub enum ColumnValue { @@ -466,29 +226,6 @@ pub enum ColumnValue { Max, } -impl From for management::Aggregate { - fn from(v: ColumnValue) -> Self { - match v { - ColumnValue::Min => Self::Min, - ColumnValue::Max => Self::Max, - } - } -} - -impl TryFrom for ColumnValue { - type Error = FieldViolation; - - fn try_from(proto: management::Aggregate) -> Result { - use management::Aggregate; - - Ok(match proto { - Aggregate::Unspecified => return Err(FieldViolation::required("")), - Aggregate::Min => Self::Min, - Aggregate::Max => Self::Max, - }) - } -} - /// `WriteBufferConfig` defines the configuration for buffering data from writes /// in memory. This buffer is used for asynchronous replication and to collect /// segments before sending them to object storage. @@ -524,45 +261,6 @@ pub struct WriteBufferConfig { pub close_segment_after: Option, } -impl From for management::WriteBufferConfig { - fn from(rollover: WriteBufferConfig) -> Self { - let buffer_rollover: management::write_buffer_config::Rollover = - rollover.buffer_rollover.into(); - - Self { - buffer_size: rollover.buffer_size as u64, - segment_size: rollover.segment_size as u64, - buffer_rollover: buffer_rollover as _, - persist_segments: rollover.store_segments, - close_segment_after: rollover.close_segment_after.map(Into::into), - } - } -} - -impl TryFrom for WriteBufferConfig { - type Error = FieldViolation; - - fn try_from(proto: management::WriteBufferConfig) -> Result { - let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?; - let close_segment_after = proto - .close_segment_after - .map(TryInto::try_into) - .transpose() - .map_err(|_| FieldViolation { - field: "closeSegmentAfter".to_string(), - description: "Duration must be positive".to_string(), - })?; - - Ok(Self { - buffer_size: proto.buffer_size as usize, - segment_size: proto.segment_size as usize, - buffer_rollover, - store_segments: proto.persist_segments, - close_segment_after, - }) - } -} - /// `WriteBufferRollover` defines the behavior of what should happen if a write /// comes in that would cause the buffer to exceed its max size AND the oldest /// segment can't be dropped because it has not yet been persisted. @@ -580,30 +278,6 @@ pub enum WriteBufferRollover { ReturnError, } -impl From for management::write_buffer_config::Rollover { - fn from(rollover: WriteBufferRollover) -> Self { - match rollover { - WriteBufferRollover::DropOldSegment => Self::DropOldSegment, - WriteBufferRollover::DropIncoming => Self::DropIncoming, - WriteBufferRollover::ReturnError => Self::ReturnError, - } - } -} - -impl TryFrom for WriteBufferRollover { - type Error = FieldViolation; - - fn try_from(proto: management::write_buffer_config::Rollover) -> Result { - use management::write_buffer_config::Rollover; - Ok(match proto { - Rollover::Unspecified => return Err(FieldViolation::required("")), - Rollover::DropOldSegment => Self::DropOldSegment, - Rollover::DropIncoming => Self::DropIncoming, - Rollover::ReturnError => Self::ReturnError, - }) - } -} - /// `PartitionTemplate` is used to compute the partition key of each row that /// gets written. It can consist of the table name, a column name and its value, /// a formatted time, or a string column and regex captures of its value. For @@ -642,23 +316,6 @@ impl Partitioner for PartitionTemplate { } } -impl From for management::PartitionTemplate { - fn from(pt: PartitionTemplate) -> Self { - Self { - parts: pt.parts.into_iter().map(Into::into).collect(), - } - } -} - -impl TryFrom for PartitionTemplate { - type Error = FieldViolation; - - fn try_from(proto: management::PartitionTemplate) -> Result { - let parts = proto.parts.vec_field("parts")?; - Ok(Self { parts }) - } -} - /// `TemplatePart` specifies what part of a row should be used to compute this /// part of a partition key. #[derive(Debug, Eq, PartialEq, Clone)] @@ -683,8 +340,8 @@ pub enum TemplatePart { /// key. #[derive(Debug, Eq, PartialEq, Clone)] pub struct RegexCapture { - column: String, - regex: String, + pub column: String, + pub regex: String, } /// [`StrftimeColumn`] is used to create a time based partition key off some @@ -698,69 +355,8 @@ pub struct RegexCapture { /// "2021-04-14 12:24:21" #[derive(Debug, Eq, PartialEq, Clone)] pub struct StrftimeColumn { - column: String, - format: String, -} - -impl From for management::partition_template::part::Part { - fn from(part: TemplatePart) -> Self { - use management::partition_template::part::ColumnFormat; - - match part { - TemplatePart::Table => Self::Table(Empty {}), - TemplatePart::Column(column) => Self::Column(column), - TemplatePart::RegexCapture(RegexCapture { column, regex }) => { - Self::Regex(ColumnFormat { - column, - format: regex, - }) - } - TemplatePart::StrftimeColumn(StrftimeColumn { column, format }) => { - Self::StrfTime(ColumnFormat { column, format }) - } - TemplatePart::TimeFormat(format) => Self::Time(format), - } - } -} - -impl TryFrom for TemplatePart { - type Error = FieldViolation; - - fn try_from(proto: management::partition_template::part::Part) -> Result { - use management::partition_template::part::{ColumnFormat, Part}; - - Ok(match proto { - Part::Table(_) => Self::Table, - Part::Column(column) => Self::Column(column.required("column")?), - Part::Regex(ColumnFormat { column, format }) => Self::RegexCapture(RegexCapture { - column: column.required("regex.column")?, - regex: format.required("regex.format")?, - }), - Part::StrfTime(ColumnFormat { column, format }) => { - Self::StrftimeColumn(StrftimeColumn { - column: column.required("strf_time.column")?, - format: format.required("strf_time.format")?, - }) - } - Part::Time(format) => Self::TimeFormat(format.required("time")?), - }) - } -} - -impl From for management::partition_template::Part { - fn from(part: TemplatePart) -> Self { - Self { - part: Some(part.into()), - } - } -} - -impl TryFrom for TemplatePart { - type Error = FieldViolation; - - fn try_from(proto: management::partition_template::Part) -> Result { - proto.part.required("part") - } + pub column: String, + pub format: String, } /// ShardId maps to a nodegroup that holds the the shard. @@ -906,164 +502,6 @@ impl Matcher { } } -impl From for management::ShardConfig { - fn from(shard_config: ShardConfig) -> Self { - Self { - specific_targets: shard_config - .specific_targets - .into_iter() - .map(|i| i.into()) - .collect(), - hash_ring: shard_config.hash_ring.map(|i| i.into()), - ignore_errors: shard_config.ignore_errors, - shards: shard_config - .shards - .iter() - .map(|(k, v)| (*k, from_node_group_for_management_node_group(v.clone()))) - .collect(), - } - } -} - -impl TryFrom for ShardConfig { - type Error = FieldViolation; - - fn try_from(proto: management::ShardConfig) -> Result { - Ok(Self { - specific_targets: proto - .specific_targets - .into_iter() - .map(|i| i.try_into()) - .collect::, _>>()?, - hash_ring: proto - .hash_ring - .map(|i| i.try_into()) - .map_or(Ok(None), |r| r.map(Some))?, - ignore_errors: proto.ignore_errors, - shards: Arc::new( - proto - .shards - .into_iter() - .map(|(k, v)| { - try_from_management_node_group_for_node_group(v).map(|ng| (k, ng)) - }) - .collect::, FieldViolation>>()?, - ), - }) - } -} - -/// Returns none if v matches its default value. -fn none_if_default(v: T) -> Option { - if v == Default::default() { - None - } else { - Some(v) - } -} - -impl From for management::MatcherToShard { - fn from(matcher_to_shard: MatcherToShard) -> Self { - Self { - matcher: none_if_default(matcher_to_shard.matcher.into()), - shard: matcher_to_shard.shard, - } - } -} - -impl TryFrom for MatcherToShard { - type Error = FieldViolation; - - fn try_from(proto: management::MatcherToShard) -> Result { - Ok(Self { - matcher: proto.matcher.unwrap_or_default().try_into()?, - shard: proto.shard, - }) - } -} - -impl From for management::HashRing { - fn from(hash_ring: HashRing) -> Self { - Self { - table_name: hash_ring.table_name, - columns: hash_ring.columns, - shards: hash_ring.shards.into(), - } - } -} - -impl TryFrom for HashRing { - type Error = FieldViolation; - - fn try_from(proto: management::HashRing) -> Result { - Ok(Self { - table_name: proto.table_name, - columns: proto.columns, - shards: proto.shards.into(), - }) - } -} - -// cannot (and/or don't know how to) add impl From inside prost generated code -fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup { - management::NodeGroup { - nodes: node_group - .into_iter() - .map(|id| management::node_group::Node { id: id.get_u32() }) - .collect(), - } -} - -fn try_from_management_node_group_for_node_group( - proto: management::NodeGroup, -) -> Result { - proto - .nodes - .into_iter() - .map(|i| { - ServerId::try_from(i.id).map_err(|_| FieldViolation { - field: "id".to_string(), - description: "Node ID must be nonzero".to_string(), - }) - }) - .collect() -} - -impl From for management::Matcher { - fn from(matcher: Matcher) -> Self { - Self { - table_name_regex: matcher - .table_name_regex - .map(|r| r.to_string()) - .unwrap_or_default(), - predicate: matcher.predicate.unwrap_or_default(), - } - } -} - -impl TryFrom for Matcher { - type Error = FieldViolation; - - fn try_from(proto: management::Matcher) -> Result { - let table_name_regex = match &proto.table_name_regex as &str { - "" => None, - re => Some(Regex::new(re).map_err(|e| FieldViolation { - field: "table_name_regex".to_string(), - description: e.to_string(), - })?), - }; - let predicate = match proto.predicate { - p if p.is_empty() => None, - p => Some(p), - }; - - Ok(Self { - table_name_regex, - predicate, - }) - } -} - /// `PartitionId` is the object storage identifier for a specific partition. It /// should be a path that can be used against an object store to locate all the /// files and subdirectories for a partition. It takes the form of `/ Vec> { - parse_lines(lp).map(|l| l.unwrap()).collect() - } - - fn parse_line(line: &str) -> ParsedLine<'_> { - parsed_lines(line).pop().unwrap() - } - - #[test] - fn test_database_rules_defaults() { - let protobuf = management::DatabaseRules { - name: "database".to_string(), - ..Default::default() - }; - - let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); - let back: management::DatabaseRules = rules.clone().into(); - - assert_eq!(rules.name.as_str(), protobuf.name.as_str()); - assert_eq!(protobuf.name, back.name); - - assert_eq!(rules.partition_template.parts.len(), 0); - - // These will be defaulted as optionality not preserved on non-protobuf - // DatabaseRules - assert_eq!(back.partition_template, Some(Default::default())); - assert_eq!(back.lifecycle_rules, Some(LifecycleRules::default().into())); - - // These should be none as preserved on non-protobuf DatabaseRules - assert!(back.write_buffer_config.is_none()); - assert!(back.shard_config.is_none()); - } - - #[test] - fn test_partition_template_default() { - let protobuf = management::DatabaseRules { - name: "database".to_string(), - partition_template: Some(management::PartitionTemplate { parts: vec![] }), - ..Default::default() - }; - - let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); - let back: management::DatabaseRules = rules.clone().into(); - - assert_eq!(rules.partition_template.parts.len(), 0); - assert_eq!(protobuf.partition_template, back.partition_template); - } - - #[test] - fn test_partition_template_no_part() { - let protobuf = management::DatabaseRules { - name: "database".to_string(), - partition_template: Some(management::PartitionTemplate { - parts: vec![Default::default()], - }), - ..Default::default() - }; - - let res: Result = protobuf.try_into(); - let err = res.expect_err("expected failure"); - - assert_eq!(&err.field, "partition_template.parts.0.part"); - assert_eq!(&err.description, "Field is required"); - } - - #[test] - fn test_partition_template() { - use management::partition_template::part::{ColumnFormat, Part}; - - let protobuf = management::PartitionTemplate { - parts: vec![ - management::partition_template::Part { - part: Some(Part::Time("time".to_string())), - }, - management::partition_template::Part { - part: Some(Part::Table(Empty {})), - }, - management::partition_template::Part { - part: Some(Part::Regex(ColumnFormat { - column: "column".to_string(), - format: "format".to_string(), - })), - }, - ], - }; - - let pt: PartitionTemplate = protobuf.clone().try_into().unwrap(); - let back: management::PartitionTemplate = pt.clone().into(); - - assert_eq!( - pt.parts, - vec![ - TemplatePart::TimeFormat("time".to_string()), - TemplatePart::Table, - TemplatePart::RegexCapture(RegexCapture { - column: "column".to_string(), - regex: "format".to_string() - }) - ] - ); - assert_eq!(protobuf, back); - } - - #[test] - fn test_partition_template_empty() { - use management::partition_template::part::{ColumnFormat, Part}; - - let protobuf = management::PartitionTemplate { - parts: vec![management::partition_template::Part { - part: Some(Part::Regex(ColumnFormat { - ..Default::default() - })), - }], - }; - - let res: Result = protobuf.try_into(); - let err = res.expect_err("expected failure"); - - assert_eq!(&err.field, "parts.0.part.regex.column"); - assert_eq!(&err.description, "Field is required"); - } - - #[test] - fn test_write_buffer_config_default() { - let protobuf: management::WriteBufferConfig = Default::default(); - - let res: Result = protobuf.try_into(); - let err = res.expect_err("expected failure"); - - assert_eq!(&err.field, "buffer_rollover"); - assert_eq!(&err.description, "Field is required"); - } - - #[test] - fn test_write_buffer_config_rollover() { - let protobuf = management::WriteBufferConfig { - buffer_rollover: management::write_buffer_config::Rollover::DropIncoming as _, - ..Default::default() - }; - - let config: WriteBufferConfig = protobuf.clone().try_into().unwrap(); - let back: management::WriteBufferConfig = config.clone().into(); - - assert_eq!(config.buffer_rollover, WriteBufferRollover::DropIncoming); - assert_eq!(protobuf, back); - } - - #[test] - fn test_write_buffer_config_negative_duration() { - use generated_types::google::protobuf::Duration; - - let protobuf = management::WriteBufferConfig { - buffer_rollover: management::write_buffer_config::Rollover::DropOldSegment as _, - close_segment_after: Some(Duration { - seconds: -1, - nanos: -40, - }), - ..Default::default() - }; - - let res: Result = protobuf.try_into(); - let err = res.expect_err("expected failure"); - - assert_eq!(&err.field, "closeSegmentAfter"); - assert_eq!(&err.description, "Duration must be positive"); - } - - #[test] - fn lifecycle_rules() { - let protobuf = management::LifecycleRules { - mutable_linger_seconds: 123, - mutable_minimum_age_seconds: 5345, - mutable_size_threshold: 232, - buffer_size_soft: 353, - buffer_size_hard: 232, - sort_order: None, - drop_non_persisted: true, - persist: true, - immutable: true, - worker_backoff_millis: 1000, - }; - - let config: LifecycleRules = protobuf.clone().try_into().unwrap(); - let back: management::LifecycleRules = config.clone().into(); - - assert_eq!(config.sort_order, SortOrder::default()); - assert_eq!( - config.mutable_linger_seconds.unwrap().get(), - protobuf.mutable_linger_seconds - ); - assert_eq!( - config.mutable_minimum_age_seconds.unwrap().get(), - protobuf.mutable_minimum_age_seconds - ); - assert_eq!( - config.mutable_size_threshold.unwrap().get(), - protobuf.mutable_size_threshold as usize - ); - assert_eq!( - config.buffer_size_soft.unwrap().get(), - protobuf.buffer_size_soft as usize - ); - assert_eq!( - config.buffer_size_hard.unwrap().get(), - protobuf.buffer_size_hard as usize - ); - assert_eq!(config.drop_non_persisted, protobuf.drop_non_persisted); - assert_eq!(config.immutable, protobuf.immutable); - - assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds); - assert_eq!( - back.mutable_minimum_age_seconds, - protobuf.mutable_minimum_age_seconds - ); - assert_eq!(back.mutable_size_threshold, protobuf.mutable_size_threshold); - assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft); - assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard); - assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted); - assert_eq!(back.immutable, protobuf.immutable); - assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis); - } - - #[test] - fn default_background_worker_backoff_millis() { - let protobuf = management::LifecycleRules { - worker_backoff_millis: 0, - ..Default::default() - }; - - let config: LifecycleRules = protobuf.clone().try_into().unwrap(); - let back: management::LifecycleRules = config.into(); - assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis); - } - - #[test] - fn sort_order_default() { - let protobuf: management::lifecycle_rules::SortOrder = Default::default(); - let config: SortOrder = protobuf.try_into().unwrap(); - - assert_eq!(config, SortOrder::default()); - assert_eq!(config.order, Order::default()); - assert_eq!(config.sort, Sort::default()); - } - - #[test] - fn sort_order() { - use management::lifecycle_rules::sort_order; - let protobuf = management::lifecycle_rules::SortOrder { - order: management::Order::Asc as _, - sort: Some(sort_order::Sort::CreatedAtTime(Empty {})), - }; - let config: SortOrder = protobuf.clone().try_into().unwrap(); - let back: management::lifecycle_rules::SortOrder = config.clone().into(); - - assert_eq!(protobuf, back); - assert_eq!(config.order, Order::Asc); - assert_eq!(config.sort, Sort::CreatedAtTime); - } - - #[test] - fn sort() { - use management::lifecycle_rules::sort_order; - - let created_at: Sort = sort_order::Sort::CreatedAtTime(Empty {}) - .try_into() - .unwrap(); - let last_write: Sort = sort_order::Sort::LastWriteTime(Empty {}) - .try_into() - .unwrap(); - let column: Sort = sort_order::Sort::Column(sort_order::ColumnSort { - column_name: "column".to_string(), - column_type: management::ColumnType::Bool as _, - column_value: management::Aggregate::Min as _, - }) - .try_into() - .unwrap(); - - assert_eq!(created_at, Sort::CreatedAtTime); - assert_eq!(last_write, Sort::LastWriteTime); - assert_eq!( - column, - Sort::Column("column".to_string(), ColumnType::Bool, ColumnValue::Min) - ); - } - - #[test] - fn partition_sort_column_sort() { - use management::lifecycle_rules::sort_order; - - let res: Result = sort_order::Sort::Column(Default::default()).try_into(); - let err1 = res.expect_err("expected failure"); - - let res: Result = sort_order::Sort::Column(sort_order::ColumnSort { - column_type: management::ColumnType::F64 as _, - ..Default::default() - }) - .try_into(); - let err2 = res.expect_err("expected failure"); - - let res: Result = sort_order::Sort::Column(sort_order::ColumnSort { - column_type: management::ColumnType::F64 as _, - column_value: management::Aggregate::Max as _, - ..Default::default() - }) - .try_into(); - let err3 = res.expect_err("expected failure"); - - assert_eq!(err1.field, "column.column_type"); - assert_eq!(err1.description, "Field is required"); - - assert_eq!(err2.field, "column.column_value"); - assert_eq!(err2.description, "Field is required"); - - assert_eq!(err3.field, "column.column_name"); - assert_eq!(err3.description, "Field is required"); - } - - #[test] - fn test_matcher_default() { - let protobuf = management::Matcher { - ..Default::default() - }; - - let matcher: Matcher = protobuf.clone().try_into().unwrap(); - let back: management::Matcher = matcher.clone().into(); - - assert!(matcher.table_name_regex.is_none()); - assert_eq!(protobuf.table_name_regex, back.table_name_regex); - - assert_eq!(matcher.predicate, None); - assert_eq!(protobuf.predicate, back.predicate); - } - - #[test] - fn test_matcher_regexp() { - let protobuf = management::Matcher { - table_name_regex: "^foo$".into(), - ..Default::default() - }; - - let matcher: Matcher = protobuf.clone().try_into().unwrap(); - let back: management::Matcher = matcher.clone().into(); - - assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$"); - assert_eq!(protobuf.table_name_regex, back.table_name_regex); - } - - #[test] - fn test_matcher_bad_regexp() { - let protobuf = management::Matcher { - table_name_regex: "*".into(), - ..Default::default() - }; - - let matcher: Result = protobuf.try_into(); - assert!(matcher.is_err()); - assert_eq!(matcher.err().unwrap().field, "table_name_regex"); - } - - #[test] - fn test_hash_ring_default() { - let protobuf = management::HashRing { - ..Default::default() - }; - - let hash_ring: HashRing = protobuf.clone().try_into().unwrap(); - let back: management::HashRing = hash_ring.clone().into(); - - assert_eq!(hash_ring.table_name, false); - assert_eq!(protobuf.table_name, back.table_name); - assert!(hash_ring.columns.is_empty()); - assert_eq!(protobuf.columns, back.columns); - assert!(hash_ring.shards.is_empty()); - assert_eq!(protobuf.shards, back.shards); - } - - #[test] - fn test_hash_ring_nodes() { - let protobuf = management::HashRing { - shards: vec![1, 2], - ..Default::default() - }; - - let hash_ring: HashRing = protobuf.try_into().unwrap(); - - assert_eq!(hash_ring.shards.len(), 2); - assert_eq!(hash_ring.shards.find(1), Some(2)); - assert_eq!(hash_ring.shards.find(2), Some(1)); - } - - #[test] - fn test_matcher_to_shard_default() { - let protobuf = management::MatcherToShard { - ..Default::default() - }; - - let matcher_to_shard: MatcherToShard = protobuf.clone().try_into().unwrap(); - let back: management::MatcherToShard = matcher_to_shard.clone().into(); - - assert_eq!( - matcher_to_shard.matcher, - Matcher { - ..Default::default() - } - ); - assert_eq!(protobuf.matcher, back.matcher); - - assert_eq!(matcher_to_shard.shard, 0); - assert_eq!(protobuf.shard, back.shard); - } - - #[test] - fn test_shard_config_default() { - let protobuf = management::ShardConfig { - ..Default::default() - }; - - let shard_config: ShardConfig = protobuf.clone().try_into().unwrap(); - let back: management::ShardConfig = shard_config.clone().into(); - - assert!(shard_config.specific_targets.is_empty()); - assert_eq!(protobuf.specific_targets, back.specific_targets); - - assert!(shard_config.hash_ring.is_none()); - assert_eq!(protobuf.hash_ring, back.hash_ring); - - assert_eq!(shard_config.ignore_errors, false); - assert_eq!(protobuf.ignore_errors, back.ignore_errors); - - assert!(shard_config.shards.is_empty()); - assert_eq!(protobuf.shards, back.shards); - } - - #[test] - fn test_database_rules_shard_config() { - let protobuf = management::DatabaseRules { - name: "database".to_string(), - shard_config: Some(management::ShardConfig { - ..Default::default() - }), - ..Default::default() - }; - - let rules: DatabaseRules = protobuf.try_into().unwrap(); - let back: management::DatabaseRules = rules.into(); - - assert!(back.shard_config.is_some()); - } - - #[test] - fn test_shard_config_shards() { - let protobuf = management::ShardConfig { - shards: vec![ - ( - 1, - management::NodeGroup { - nodes: vec![ - management::node_group::Node { id: 10 }, - management::node_group::Node { id: 11 }, - management::node_group::Node { id: 12 }, - ], - }, - ), - ( - 2, - management::NodeGroup { - nodes: vec![management::node_group::Node { id: 20 }], - }, - ), - ] - .into_iter() - .collect(), - ..Default::default() - }; - - let shard_config: ShardConfig = protobuf.try_into().unwrap(); - - assert_eq!(shard_config.shards.len(), 2); - assert_eq!(shard_config.shards[&1].len(), 3); - assert_eq!(shard_config.shards[&2].len(), 1); - } - #[test] + #[allow(clippy::trivial_regex)] fn test_sharder() { - let protobuf = management::ShardConfig { - specific_targets: vec![management::MatcherToShard { - matcher: Some(management::Matcher { - table_name_regex: "pu$".to_string(), - ..Default::default() - }), + let shards: Vec<_> = (1000..1000000).collect(); + let shard_config = ShardConfig { + specific_targets: vec![MatcherToShard { + matcher: Matcher { + table_name_regex: Some(Regex::new("pu$").unwrap()), + predicate: None, + }, shard: 1, }], - hash_ring: Some(management::HashRing { + hash_ring: Some(HashRing { table_name: true, columns: vec!["t1", "t2", "f1", "f2"] .into_iter() .map(|i| i.to_string()) .collect(), - // in practice we won't have that many shards - // but for tests it's better to have more distinct values - // so we don't hide bugs due to sheer luck. - shards: (1000..1000000).collect(), + shards: ConsistentHasher::new(&shards), }), ..Default::default() }; - let shard_config: ShardConfig = protobuf.try_into().unwrap(); - // hit the specific targets let line = parse_line("cpu,t1=1,t2=2,t3=3 f1=1,f2=2,f3=3 10"); @@ -1771,22 +723,29 @@ mod tests { #[test] fn test_sharder_no_shards() { - let protobuf = management::ShardConfig { - hash_ring: Some(management::HashRing { + let shard_config = ShardConfig { + hash_ring: Some(HashRing { table_name: true, columns: vec!["t1", "t2", "f1", "f2"] .into_iter() .map(|i| i.to_string()) .collect(), - shards: vec![], + shards: ConsistentHasher::new(&[]), }), ..Default::default() }; - let shard_config: ShardConfig = protobuf.try_into().unwrap(); - let line = parse_line("cpu,t1=1,t2=2,t3=3 f1=1,f2=2,f3=3 10"); let err = shard_config.shard(&line).unwrap_err(); + assert!(matches!(err, Error::NoShardsDefined)); } + + fn parsed_lines(lp: &str) -> Vec> { + parse_lines(lp).map(|l| l.unwrap()).collect() + } + + fn parse_line(line: &str) -> ParsedLine<'_> { + parsed_lines(line).pop().unwrap() + } } diff --git a/data_types/src/field_validation.rs b/data_types/src/field_validation.rs deleted file mode 100644 index 29f5be1d6c..0000000000 --- a/data_types/src/field_validation.rs +++ /dev/null @@ -1,112 +0,0 @@ -//! A collection of extension traits for types that -//! implement TryInto -//! -//! Allows associating field context with the generated errors -//! as they propagate up the struct topology - -use generated_types::google::FieldViolation; -use std::convert::TryInto; - -/// An extension trait that adds the method `scope` to any type -/// implementing `TryInto` -pub(crate) trait FromField { - fn scope(self, field: impl Into) -> Result; -} - -impl FromField for T -where - T: TryInto, -{ - /// Try to convert type using TryInto calling `FieldViolation::scope` - /// on any returned error - fn scope(self, field: impl Into) -> Result { - self.try_into().map_err(|e| e.scope(field)) - } -} - -/// An extension trait that adds the methods `optional` and `required` to any -/// Option containing a type implementing `TryInto` -pub trait FromFieldOpt { - /// Try to convert inner type, if any, using TryInto calling - /// `FieldViolation::scope` on any error encountered - /// - /// Returns None if empty - fn optional(self, field: impl Into) -> Result, FieldViolation>; - - /// Try to convert inner type, using TryInto calling `FieldViolation::scope` - /// on any error encountered - /// - /// Returns an error if empty - fn required(self, field: impl Into) -> Result; -} - -impl FromFieldOpt for Option -where - T: TryInto, -{ - fn optional(self, field: impl Into) -> Result, FieldViolation> { - self.map(|t| t.scope(field)).transpose() - } - - fn required(self, field: impl Into) -> Result { - match self { - None => Err(FieldViolation::required(field)), - Some(t) => t.scope(field), - } - } -} - -/// An extension trait that adds the methods `optional` and `required` to any -/// String -/// -/// Prost will default string fields to empty, whereas IOx sometimes -/// uses Option, this helper aids mapping between them -/// -/// TODO: Review mixed use of Option and String in IOX -pub(crate) trait FromFieldString { - /// Returns a Ok if the String is not empty - fn required(self, field: impl Into) -> Result; - - /// Wraps non-empty strings in Some(_), returns None for empty strings - fn optional(self) -> Option; -} - -impl FromFieldString for String { - fn required(self, field: impl Into) -> Result { - if self.is_empty() { - return Err(FieldViolation::required(field)); - } - Ok(self) - } - - fn optional(self) -> Option { - if self.is_empty() { - return None; - } - Some(self) - } -} - -/// An extension trait that adds the method `vec_field` to any Vec of a type -/// implementing `TryInto` -pub(crate) trait FromFieldVec { - /// Converts to a `Vec`, short-circuiting on the first error and - /// returning a correctly scoped `FieldViolation` for where the error - /// was encountered - fn vec_field(self, field: impl Into) -> Result; -} - -impl FromFieldVec> for Vec -where - T: TryInto, -{ - fn vec_field(self, field: impl Into) -> Result, FieldViolation> { - let res: Result<_, _> = self - .into_iter() - .enumerate() - .map(|(i, t)| t.scope(i.to_string())) - .collect(); - - res.map_err(|e| e.scope(field)) - } -} diff --git a/data_types/src/job.rs b/data_types/src/job.rs index 3351c44887..2e2c0747b1 100644 --- a/data_types/src/job.rs +++ b/data_types/src/job.rs @@ -1,9 +1,4 @@ -use generated_types::google::{protobuf::Any, FieldViolation, FieldViolationExt}; -use generated_types::{ - google::longrunning, influxdata::iox::management::v1 as management, protobuf_type_url_eq, -}; use serde::{Deserialize, Serialize}; -use std::convert::TryFrom; /// Metadata associated with a set of background tasks /// Used in combination with TrackerRegistry @@ -32,67 +27,6 @@ pub enum Job { }, } -impl From for management::operation_metadata::Job { - fn from(job: Job) -> Self { - match job { - Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }), - Job::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - } => Self::CloseChunk(management::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - }), - Job::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - } => Self::WriteChunk(management::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - }), - } - } -} - -impl From for Job { - fn from(value: management::operation_metadata::Job) -> Self { - use management::operation_metadata::Job; - match value { - Job::Dummy(management::Dummy { nanos }) => Self::Dummy { nanos }, - Job::CloseChunk(management::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - }) => Self::CloseChunk { - db_name, - partition_key, - table_name, - chunk_id, - }, - Job::WriteChunk(management::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - }) => Self::WriteChunk { - db_name, - partition_key, - table_name, - chunk_id, - }, - } - } -} - impl Job { /// Returns the database name assocated with this job, if any pub fn db_name(&self) -> Option<&str> { @@ -169,45 +103,3 @@ pub struct Operation { /// The status of the running operation pub status: OperationStatus, } - -impl TryFrom for Operation { - type Error = FieldViolation; - - fn try_from(operation: longrunning::Operation) -> Result { - let metadata: Any = operation - .metadata - .ok_or_else(|| FieldViolation::required("metadata"))?; - - if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) { - return Err(FieldViolation { - field: "metadata.type_url".to_string(), - description: "Unexpected field type".to_string(), - }); - } - - let meta: management::OperationMetadata = - prost::Message::decode(metadata.value).field("metadata.value")?; - - let status = match &operation.result { - None => OperationStatus::Running, - Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete, - Some(longrunning::operation::Result::Error(status)) => { - if status.code == tonic::Code::Cancelled as i32 { - OperationStatus::Cancelled - } else { - OperationStatus::Errored - } - } - }; - - Ok(Self { - id: operation.name.parse().field("name")?, - task_count: meta.task_count, - pending_count: meta.pending_count, - wall_time: std::time::Duration::from_nanos(meta.wall_nanos), - cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos), - job: meta.job.map(Into::into), - status, - }) - } -} diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 44abe8ad22..72a0b2bb2c 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -16,7 +16,6 @@ mod database_name; pub use database_name::*; pub mod database_rules; pub mod error; -pub mod field_validation; pub mod http; pub mod job; pub mod names; diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index 5fb4c7395e..0ccecf5c6b 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -6,17 +6,20 @@ edition = "2018" [dependencies] # In alphabetical order bytes = { version = "1.0", features = ["serde"] } +data_types = { path = "../data_types" } # See docs/regenerating_flatbuffers.md about updating generated code when updating the # version of the flatbuffers crate flatbuffers = "0.8" futures = "0.3" +google_types = { path = "../google_types" } +observability_deps = { path = "../observability_deps" } prost = "0.7" prost-types = "0.7" -tonic = "0.4" -observability_deps = { path = "../observability_deps" } -google_types = { path = "../google_types" } +regex = "1.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.44" +thiserror = "1.0.23" +tonic = "0.4" [build-dependencies] # In alphabetical order tonic-build = "0.4" diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs new file mode 100644 index 0000000000..b5e18d73e7 --- /dev/null +++ b/generated_types/src/chunk.rs @@ -0,0 +1,223 @@ +use crate::google::{FieldViolation, FromField}; +use crate::influxdata::iox::management::v1 as management; +use data_types::chunk::{ChunkStorage, ChunkSummary}; +use std::convert::{TryFrom, TryInto}; +use std::sync::Arc; + +/// Conversion code to management API chunk structure +impl From for management::Chunk { + fn from(summary: ChunkSummary) -> Self { + let ChunkSummary { + partition_key, + table_name, + id, + storage, + estimated_bytes, + row_count, + time_of_first_write, + time_of_last_write, + time_closed, + } = summary; + + let storage: management::ChunkStorage = storage.into(); + let storage = storage.into(); // convert to i32 + + let estimated_bytes = estimated_bytes as u64; + let row_count = row_count as u64; + + let partition_key = match Arc::try_unwrap(partition_key) { + // no one else has a reference so take the string + Ok(partition_key) => partition_key, + // some other reference exists to this string, so clone it + Err(partition_key) => partition_key.as_ref().clone(), + }; + let table_name = match Arc::try_unwrap(table_name) { + // no one else has a reference so take the string + Ok(table_name) => table_name, + // some other reference exists to this string, so clone it + Err(table_name) => table_name.as_ref().clone(), + }; + + let time_of_first_write = time_of_first_write.map(|t| t.into()); + let time_of_last_write = time_of_last_write.map(|t| t.into()); + let time_closed = time_closed.map(|t| t.into()); + + Self { + partition_key, + table_name, + id, + storage, + estimated_bytes, + row_count, + time_of_first_write, + time_of_last_write, + time_closed, + } + } +} + +impl From for management::ChunkStorage { + fn from(storage: ChunkStorage) -> Self { + match storage { + ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer, + ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer, + ChunkStorage::ReadBuffer => Self::ReadBuffer, + ChunkStorage::ReadBufferAndObjectStore => Self::ReadBufferAndObjectStore, + ChunkStorage::ObjectStoreOnly => Self::ObjectStoreOnly, + } + } +} + +/// Conversion code from management API chunk structure +impl TryFrom for ChunkSummary { + type Error = FieldViolation; + + fn try_from(proto: management::Chunk) -> Result { + // Use prost enum conversion + let storage = proto.storage().scope("storage")?; + + let time_of_first_write = proto + .time_of_first_write + .map(TryInto::try_into) + .transpose() + .map_err(|_| FieldViolation { + field: "time_of_first_write".to_string(), + description: "Timestamp must be positive".to_string(), + })?; + + let time_of_last_write = proto + .time_of_last_write + .map(TryInto::try_into) + .transpose() + .map_err(|_| FieldViolation { + field: "time_of_last_write".to_string(), + description: "Timestamp must be positive".to_string(), + })?; + + let time_closed = proto + .time_closed + .map(TryInto::try_into) + .transpose() + .map_err(|_| FieldViolation { + field: "time_closed".to_string(), + description: "Timestamp must be positive".to_string(), + })?; + + let management::Chunk { + partition_key, + table_name, + id, + estimated_bytes, + row_count, + .. + } = proto; + + let estimated_bytes = estimated_bytes as usize; + let row_count = row_count as usize; + let partition_key = Arc::new(partition_key); + let table_name = Arc::new(table_name); + + Ok(Self { + partition_key, + table_name, + id, + storage, + estimated_bytes, + row_count, + time_of_first_write, + time_of_last_write, + time_closed, + }) + } +} + +impl TryFrom for ChunkStorage { + type Error = FieldViolation; + + fn try_from(proto: management::ChunkStorage) -> Result { + match proto { + management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer), + management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer), + management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer), + management::ChunkStorage::ReadBufferAndObjectStore => { + Ok(Self::ReadBufferAndObjectStore) + } + management::ChunkStorage::ObjectStoreOnly => Ok(Self::ObjectStoreOnly), + management::ChunkStorage::Unspecified => Err(FieldViolation::required("")), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn valid_proto_to_summary() { + let proto = management::Chunk { + partition_key: "foo".to_string(), + table_name: "bar".to_string(), + id: 42, + estimated_bytes: 1234, + row_count: 321, + storage: management::ChunkStorage::ObjectStoreOnly.into(), + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }; + + let summary = ChunkSummary::try_from(proto).expect("conversion successful"); + let expected = ChunkSummary { + partition_key: Arc::new("foo".to_string()), + table_name: Arc::new("bar".to_string()), + id: 42, + estimated_bytes: 1234, + row_count: 321, + storage: ChunkStorage::ObjectStoreOnly, + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }; + + assert_eq!( + summary, expected, + "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", + summary, expected + ); + } + + #[test] + fn valid_summary_to_proto() { + let summary = ChunkSummary { + partition_key: Arc::new("foo".to_string()), + table_name: Arc::new("bar".to_string()), + id: 42, + estimated_bytes: 1234, + row_count: 321, + storage: ChunkStorage::ObjectStoreOnly, + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }; + + let proto = management::Chunk::try_from(summary).expect("conversion successful"); + + let expected = management::Chunk { + partition_key: "foo".to_string(), + table_name: "bar".to_string(), + id: 42, + estimated_bytes: 1234, + row_count: 321, + storage: management::ChunkStorage::ObjectStoreOnly.into(), + time_of_first_write: None, + time_of_last_write: None, + time_closed: None, + }; + + assert_eq!( + proto, expected, + "Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n", + proto, expected + ); + } +} diff --git a/generated_types/src/database_rules.rs b/generated_types/src/database_rules.rs new file mode 100644 index 0000000000..69a0542bd3 --- /dev/null +++ b/generated_types/src/database_rules.rs @@ -0,0 +1,189 @@ +use std::convert::{TryFrom, TryInto}; + +use thiserror::Error; + +use data_types::database_rules::{ColumnType, ColumnValue, DatabaseRules, Order}; +use data_types::DatabaseName; + +use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt}; +use crate::influxdata::iox::management::v1 as management; + +mod lifecycle; +mod partition; +mod shard; +mod write_buffer; + +impl From for management::DatabaseRules { + fn from(rules: DatabaseRules) -> Self { + Self { + name: rules.name.into(), + partition_template: Some(rules.partition_template.into()), + write_buffer_config: rules.write_buffer_config.map(Into::into), + lifecycle_rules: Some(rules.lifecycle_rules.into()), + shard_config: rules.shard_config.map(Into::into), + } + } +} + +impl TryFrom for DatabaseRules { + type Error = FieldViolation; + + fn try_from(proto: management::DatabaseRules) -> Result { + let name = DatabaseName::new(proto.name.clone()).field("name")?; + + let write_buffer_config = proto.write_buffer_config.optional("write_buffer_config")?; + + let lifecycle_rules = proto + .lifecycle_rules + .optional("lifecycle_rules")? + .unwrap_or_default(); + + let partition_template = proto + .partition_template + .optional("partition_template")? + .unwrap_or_default(); + + let shard_config = proto + .shard_config + .optional("shard_config") + .unwrap_or_default(); + + Ok(Self { + name, + partition_template, + write_buffer_config, + lifecycle_rules, + shard_config, + }) + } +} + +#[derive(Debug, Error)] +pub enum DecodeError { + #[error("failed to decode protobuf: {0}")] + DecodeError(#[from] prost::DecodeError), + + #[error("validation failed: {0}")] + ValidationError(#[from] FieldViolation), +} + +#[derive(Debug, Error)] +pub enum EncodeError { + #[error("failed to encode protobuf: {0}")] + EncodeError(#[from] prost::EncodeError), +} + +pub fn decode_database_rules(bytes: prost::bytes::Bytes) -> Result { + let message: management::DatabaseRules = prost::Message::decode(bytes)?; + Ok(message.try_into()?) +} + +pub fn encode_database_rules( + rules: DatabaseRules, + bytes: &mut prost::bytes::BytesMut, +) -> Result<(), EncodeError> { + let encoded: management::DatabaseRules = rules.into(); + Ok(prost::Message::encode(&encoded, bytes)?) +} + +impl From for management::Order { + fn from(o: Order) -> Self { + match o { + Order::Asc => Self::Asc, + Order::Desc => Self::Desc, + } + } +} + +impl TryFrom for Order { + type Error = FieldViolation; + + fn try_from(proto: management::Order) -> Result { + Ok(match proto { + management::Order::Unspecified => Self::default(), + management::Order::Asc => Self::Asc, + management::Order::Desc => Self::Desc, + }) + } +} + +impl From for management::ColumnType { + fn from(t: ColumnType) -> Self { + match t { + ColumnType::I64 => Self::I64, + ColumnType::U64 => Self::U64, + ColumnType::F64 => Self::F64, + ColumnType::String => Self::String, + ColumnType::Bool => Self::Bool, + } + } +} + +impl TryFrom for ColumnType { + type Error = FieldViolation; + + fn try_from(proto: management::ColumnType) -> Result { + Ok(match proto { + management::ColumnType::Unspecified => return Err(FieldViolation::required("")), + management::ColumnType::I64 => Self::I64, + management::ColumnType::U64 => Self::U64, + management::ColumnType::F64 => Self::F64, + management::ColumnType::String => Self::String, + management::ColumnType::Bool => Self::Bool, + }) + } +} + +impl From for management::Aggregate { + fn from(v: ColumnValue) -> Self { + match v { + ColumnValue::Min => Self::Min, + ColumnValue::Max => Self::Max, + } + } +} + +impl TryFrom for ColumnValue { + type Error = FieldViolation; + + fn try_from(proto: management::Aggregate) -> Result { + use management::Aggregate; + + Ok(match proto { + Aggregate::Unspecified => return Err(FieldViolation::required("")), + Aggregate::Min => Self::Min, + Aggregate::Max => Self::Max, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types::database_rules::LifecycleRules; + + #[test] + fn test_database_rules_defaults() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(rules.name.as_str(), protobuf.name.as_str()); + assert_eq!(protobuf.name, back.name); + + assert_eq!(rules.partition_template.parts.len(), 0); + + // These will be defaulted as optionality not preserved on non-protobuf + // DatabaseRules + assert_eq!(back.partition_template, Some(Default::default())); + assert_eq!(back.lifecycle_rules, Some(LifecycleRules::default().into())); + + // These should be none as preserved on non-protobuf DatabaseRules + assert!(back.write_buffer_config.is_none()); + assert!(back.shard_config.is_none()); + } +} diff --git a/generated_types/src/database_rules/lifecycle.rs b/generated_types/src/database_rules/lifecycle.rs new file mode 100644 index 0000000000..99c8d9aae0 --- /dev/null +++ b/generated_types/src/database_rules/lifecycle.rs @@ -0,0 +1,280 @@ +use std::convert::{TryFrom, TryInto}; +use std::num::NonZeroU64; + +use data_types::database_rules::{LifecycleRules, Sort, SortOrder}; + +use crate::google::protobuf::Empty; +use crate::google::{FieldViolation, FromField, FromFieldOpt, FromFieldString}; +use crate::influxdata::iox::management::v1 as management; + +impl From for management::LifecycleRules { + fn from(config: LifecycleRules) -> Self { + Self { + mutable_linger_seconds: config + .mutable_linger_seconds + .map(Into::into) + .unwrap_or_default(), + mutable_minimum_age_seconds: config + .mutable_minimum_age_seconds + .map(Into::into) + .unwrap_or_default(), + mutable_size_threshold: config + .mutable_size_threshold + .map(|x| x.get() as u64) + .unwrap_or_default(), + buffer_size_soft: config + .buffer_size_soft + .map(|x| x.get() as u64) + .unwrap_or_default(), + buffer_size_hard: config + .buffer_size_hard + .map(|x| x.get() as u64) + .unwrap_or_default(), + sort_order: Some(config.sort_order.into()), + drop_non_persisted: config.drop_non_persisted, + persist: config.persist, + immutable: config.immutable, + worker_backoff_millis: config.worker_backoff_millis.map_or(0, NonZeroU64::get), + } + } +} + +impl TryFrom for LifecycleRules { + type Error = FieldViolation; + + fn try_from(proto: management::LifecycleRules) -> Result { + Ok(Self { + mutable_linger_seconds: proto.mutable_linger_seconds.try_into().ok(), + mutable_minimum_age_seconds: proto.mutable_minimum_age_seconds.try_into().ok(), + mutable_size_threshold: (proto.mutable_size_threshold as usize).try_into().ok(), + buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(), + buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(), + sort_order: proto.sort_order.optional("sort_order")?.unwrap_or_default(), + drop_non_persisted: proto.drop_non_persisted, + persist: proto.persist, + immutable: proto.immutable, + worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis), + }) + } +} + +impl From for management::lifecycle_rules::SortOrder { + fn from(ps: SortOrder) -> Self { + let order: management::Order = ps.order.into(); + + Self { + order: order as _, + sort: Some(ps.sort.into()), + } + } +} + +impl TryFrom for SortOrder { + type Error = FieldViolation; + + fn try_from(proto: management::lifecycle_rules::SortOrder) -> Result { + Ok(Self { + order: proto.order().scope("order")?, + sort: proto.sort.optional("sort")?.unwrap_or_default(), + }) + } +} + +impl From for management::lifecycle_rules::sort_order::Sort { + fn from(ps: Sort) -> Self { + use management::lifecycle_rules::sort_order::ColumnSort; + + match ps { + Sort::LastWriteTime => Self::LastWriteTime(Empty {}), + Sort::CreatedAtTime => Self::CreatedAtTime(Empty {}), + Sort::Column(column_name, column_type, column_value) => { + let column_type: management::ColumnType = column_type.into(); + let column_value: management::Aggregate = column_value.into(); + + Self::Column(ColumnSort { + column_name, + column_type: column_type as _, + column_value: column_value as _, + }) + } + } + } +} + +impl TryFrom for Sort { + type Error = FieldViolation; + + fn try_from(proto: management::lifecycle_rules::sort_order::Sort) -> Result { + use management::lifecycle_rules::sort_order::Sort; + + Ok(match proto { + Sort::LastWriteTime(_) => Self::LastWriteTime, + Sort::CreatedAtTime(_) => Self::CreatedAtTime, + Sort::Column(column_sort) => { + let column_type = column_sort.column_type().scope("column.column_type")?; + let column_value = column_sort.column_value().scope("column.column_value")?; + Self::Column( + column_sort.column_name.required("column.column_name")?, + column_type, + column_value, + ) + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types::database_rules::{ColumnType, ColumnValue, Order}; + + #[test] + fn lifecycle_rules() { + let protobuf = management::LifecycleRules { + mutable_linger_seconds: 123, + mutable_minimum_age_seconds: 5345, + mutable_size_threshold: 232, + buffer_size_soft: 353, + buffer_size_hard: 232, + sort_order: None, + drop_non_persisted: true, + persist: true, + immutable: true, + worker_backoff_millis: 1000, + }; + + let config: LifecycleRules = protobuf.clone().try_into().unwrap(); + let back: management::LifecycleRules = config.clone().into(); + + assert_eq!(config.sort_order, SortOrder::default()); + assert_eq!( + config.mutable_linger_seconds.unwrap().get(), + protobuf.mutable_linger_seconds + ); + assert_eq!( + config.mutable_minimum_age_seconds.unwrap().get(), + protobuf.mutable_minimum_age_seconds + ); + assert_eq!( + config.mutable_size_threshold.unwrap().get(), + protobuf.mutable_size_threshold as usize + ); + assert_eq!( + config.buffer_size_soft.unwrap().get(), + protobuf.buffer_size_soft as usize + ); + assert_eq!( + config.buffer_size_hard.unwrap().get(), + protobuf.buffer_size_hard as usize + ); + assert_eq!(config.drop_non_persisted, protobuf.drop_non_persisted); + assert_eq!(config.immutable, protobuf.immutable); + + assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds); + assert_eq!( + back.mutable_minimum_age_seconds, + protobuf.mutable_minimum_age_seconds + ); + assert_eq!(back.mutable_size_threshold, protobuf.mutable_size_threshold); + assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft); + assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard); + assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted); + assert_eq!(back.immutable, protobuf.immutable); + assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis); + } + + #[test] + fn default_background_worker_backoff_millis() { + let protobuf = management::LifecycleRules { + worker_backoff_millis: 0, + ..Default::default() + }; + + let config: LifecycleRules = protobuf.clone().try_into().unwrap(); + let back: management::LifecycleRules = config.into(); + assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis); + } + + #[test] + fn sort_order_default() { + let protobuf: management::lifecycle_rules::SortOrder = Default::default(); + let config: SortOrder = protobuf.try_into().unwrap(); + + assert_eq!(config, SortOrder::default()); + assert_eq!(config.order, Order::default()); + assert_eq!(config.sort, Sort::default()); + } + + #[test] + fn sort_order() { + use management::lifecycle_rules::sort_order; + let protobuf = management::lifecycle_rules::SortOrder { + order: management::Order::Asc as _, + sort: Some(sort_order::Sort::CreatedAtTime(Empty {})), + }; + let config: SortOrder = protobuf.clone().try_into().unwrap(); + let back: management::lifecycle_rules::SortOrder = config.clone().into(); + + assert_eq!(protobuf, back); + assert_eq!(config.order, Order::Asc); + assert_eq!(config.sort, Sort::CreatedAtTime); + } + + #[test] + fn sort() { + use management::lifecycle_rules::sort_order; + + let created_at: Sort = sort_order::Sort::CreatedAtTime(Empty {}) + .try_into() + .unwrap(); + let last_write: Sort = sort_order::Sort::LastWriteTime(Empty {}) + .try_into() + .unwrap(); + let column: Sort = sort_order::Sort::Column(sort_order::ColumnSort { + column_name: "column".to_string(), + column_type: management::ColumnType::Bool as _, + column_value: management::Aggregate::Min as _, + }) + .try_into() + .unwrap(); + + assert_eq!(created_at, Sort::CreatedAtTime); + assert_eq!(last_write, Sort::LastWriteTime); + assert_eq!( + column, + Sort::Column("column".to_string(), ColumnType::Bool, ColumnValue::Min) + ); + } + + #[test] + fn partition_sort_column_sort() { + use management::lifecycle_rules::sort_order; + + let res: Result = sort_order::Sort::Column(Default::default()).try_into(); + let err1 = res.expect_err("expected failure"); + + let res: Result = sort_order::Sort::Column(sort_order::ColumnSort { + column_type: management::ColumnType::F64 as _, + ..Default::default() + }) + .try_into(); + let err2 = res.expect_err("expected failure"); + + let res: Result = sort_order::Sort::Column(sort_order::ColumnSort { + column_type: management::ColumnType::F64 as _, + column_value: management::Aggregate::Max as _, + ..Default::default() + }) + .try_into(); + let err3 = res.expect_err("expected failure"); + + assert_eq!(err1.field, "column.column_type"); + assert_eq!(err1.description, "Field is required"); + + assert_eq!(err2.field, "column.column_value"); + assert_eq!(err2.description, "Field is required"); + + assert_eq!(err3.field, "column.column_name"); + assert_eq!(err3.description, "Field is required"); + } +} diff --git a/generated_types/src/database_rules/partition.rs b/generated_types/src/database_rules/partition.rs new file mode 100644 index 0000000000..85eeb44d70 --- /dev/null +++ b/generated_types/src/database_rules/partition.rs @@ -0,0 +1,181 @@ +use std::convert::TryFrom; + +use data_types::database_rules::{PartitionTemplate, RegexCapture, StrftimeColumn, TemplatePart}; + +use crate::google::protobuf::Empty; +use crate::google::{FieldViolation, FromFieldOpt, FromFieldString, FromFieldVec}; +use crate::influxdata::iox::management::v1 as management; + +impl From for management::PartitionTemplate { + fn from(pt: PartitionTemplate) -> Self { + Self { + parts: pt.parts.into_iter().map(Into::into).collect(), + } + } +} + +impl TryFrom for PartitionTemplate { + type Error = FieldViolation; + + fn try_from(proto: management::PartitionTemplate) -> Result { + let parts = proto.parts.vec_field("parts")?; + Ok(Self { parts }) + } +} + +impl From for management::partition_template::part::Part { + fn from(part: TemplatePart) -> Self { + use management::partition_template::part::ColumnFormat; + + match part { + TemplatePart::Table => Self::Table(Empty {}), + TemplatePart::Column(column) => Self::Column(column), + TemplatePart::RegexCapture(RegexCapture { column, regex }) => { + Self::Regex(ColumnFormat { + column, + format: regex, + }) + } + TemplatePart::StrftimeColumn(StrftimeColumn { column, format }) => { + Self::StrfTime(ColumnFormat { column, format }) + } + TemplatePart::TimeFormat(format) => Self::Time(format), + } + } +} + +impl TryFrom for TemplatePart { + type Error = FieldViolation; + + fn try_from(proto: management::partition_template::part::Part) -> Result { + use management::partition_template::part::{ColumnFormat, Part}; + + Ok(match proto { + Part::Table(_) => Self::Table, + Part::Column(column) => Self::Column(column.required("column")?), + Part::Regex(ColumnFormat { column, format }) => Self::RegexCapture(RegexCapture { + column: column.required("regex.column")?, + regex: format.required("regex.format")?, + }), + Part::StrfTime(ColumnFormat { column, format }) => { + Self::StrftimeColumn(StrftimeColumn { + column: column.required("strf_time.column")?, + format: format.required("strf_time.format")?, + }) + } + Part::Time(format) => Self::TimeFormat(format.required("time")?), + }) + } +} + +impl From for management::partition_template::Part { + fn from(part: TemplatePart) -> Self { + Self { + part: Some(part.into()), + } + } +} + +impl TryFrom for TemplatePart { + type Error = FieldViolation; + + fn try_from(proto: management::partition_template::Part) -> Result { + proto.part.required("part") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types::database_rules::DatabaseRules; + use std::convert::TryInto; + + #[test] + fn test_partition_template_default() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + partition_template: Some(management::PartitionTemplate { parts: vec![] }), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.clone().try_into().unwrap(); + let back: management::DatabaseRules = rules.clone().into(); + + assert_eq!(rules.partition_template.parts.len(), 0); + assert_eq!(protobuf.partition_template, back.partition_template); + } + + #[test] + fn test_partition_template_no_part() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + partition_template: Some(management::PartitionTemplate { + parts: vec![Default::default()], + }), + ..Default::default() + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "partition_template.parts.0.part"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_partition_template() { + use management::partition_template::part::{ColumnFormat, Part}; + + let protobuf = management::PartitionTemplate { + parts: vec![ + management::partition_template::Part { + part: Some(Part::Time("time".to_string())), + }, + management::partition_template::Part { + part: Some(Part::Table(Empty {})), + }, + management::partition_template::Part { + part: Some(Part::Regex(ColumnFormat { + column: "column".to_string(), + format: "format".to_string(), + })), + }, + ], + }; + + let pt: PartitionTemplate = protobuf.clone().try_into().unwrap(); + let back: management::PartitionTemplate = pt.clone().into(); + + assert_eq!( + pt.parts, + vec![ + TemplatePart::TimeFormat("time".to_string()), + TemplatePart::Table, + TemplatePart::RegexCapture(RegexCapture { + column: "column".to_string(), + regex: "format".to_string() + }) + ] + ); + assert_eq!(protobuf, back); + } + + #[test] + fn test_partition_template_empty() { + use management::partition_template::part::{ColumnFormat, Part}; + + let protobuf = management::PartitionTemplate { + parts: vec![management::partition_template::Part { + part: Some(Part::Regex(ColumnFormat { + ..Default::default() + })), + }], + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "parts.0.part.regex.column"); + assert_eq!(&err.description, "Field is required"); + } +} diff --git a/generated_types/src/database_rules/shard.rs b/generated_types/src/database_rules/shard.rs new file mode 100644 index 0000000000..db716ffed1 --- /dev/null +++ b/generated_types/src/database_rules/shard.rs @@ -0,0 +1,381 @@ +use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; +use std::sync::Arc; + +use regex::Regex; + +use data_types::database_rules::{HashRing, Matcher, MatcherToShard, NodeGroup, ShardConfig}; +use data_types::server_id::ServerId; + +use crate::google::FieldViolation; +use crate::influxdata::iox::management::v1 as management; + +impl From for management::ShardConfig { + fn from(shard_config: ShardConfig) -> Self { + Self { + specific_targets: shard_config + .specific_targets + .into_iter() + .map(|i| i.into()) + .collect(), + hash_ring: shard_config.hash_ring.map(|i| i.into()), + ignore_errors: shard_config.ignore_errors, + shards: shard_config + .shards + .iter() + .map(|(k, v)| (*k, from_node_group_for_management_node_group(v.clone()))) + .collect(), + } + } +} + +impl TryFrom for ShardConfig { + type Error = FieldViolation; + + fn try_from(proto: management::ShardConfig) -> Result { + Ok(Self { + specific_targets: proto + .specific_targets + .into_iter() + .map(|i| i.try_into()) + .collect::, _>>()?, + hash_ring: proto + .hash_ring + .map(|i| i.try_into()) + .map_or(Ok(None), |r| r.map(Some))?, + ignore_errors: proto.ignore_errors, + shards: Arc::new( + proto + .shards + .into_iter() + .map(|(k, v)| { + try_from_management_node_group_for_node_group(v).map(|ng| (k, ng)) + }) + .collect::, FieldViolation>>()?, + ), + }) + } +} + +/// Returns none if v matches its default value. +fn none_if_default(v: T) -> Option { + if v == Default::default() { + None + } else { + Some(v) + } +} + +impl From for management::MatcherToShard { + fn from(matcher_to_shard: MatcherToShard) -> Self { + Self { + matcher: none_if_default(matcher_to_shard.matcher.into()), + shard: matcher_to_shard.shard, + } + } +} + +impl TryFrom for MatcherToShard { + type Error = FieldViolation; + + fn try_from(proto: management::MatcherToShard) -> Result { + Ok(Self { + matcher: proto.matcher.unwrap_or_default().try_into()?, + shard: proto.shard, + }) + } +} + +impl From for management::HashRing { + fn from(hash_ring: HashRing) -> Self { + Self { + table_name: hash_ring.table_name, + columns: hash_ring.columns, + shards: hash_ring.shards.into(), + } + } +} + +impl TryFrom for HashRing { + type Error = FieldViolation; + + fn try_from(proto: management::HashRing) -> Result { + Ok(Self { + table_name: proto.table_name, + columns: proto.columns, + shards: proto.shards.into(), + }) + } +} + +// cannot (and/or don't know how to) add impl From inside prost generated code +fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup { + management::NodeGroup { + nodes: node_group + .into_iter() + .map(|id| management::node_group::Node { id: id.get_u32() }) + .collect(), + } +} + +fn try_from_management_node_group_for_node_group( + proto: management::NodeGroup, +) -> Result { + proto + .nodes + .into_iter() + .map(|i| { + ServerId::try_from(i.id).map_err(|_| FieldViolation { + field: "id".to_string(), + description: "Node ID must be nonzero".to_string(), + }) + }) + .collect() +} + +impl From for management::Matcher { + fn from(matcher: Matcher) -> Self { + Self { + table_name_regex: matcher + .table_name_regex + .map(|r| r.to_string()) + .unwrap_or_default(), + predicate: matcher.predicate.unwrap_or_default(), + } + } +} + +impl TryFrom for Matcher { + type Error = FieldViolation; + + fn try_from(proto: management::Matcher) -> Result { + let table_name_regex = match &proto.table_name_regex as &str { + "" => None, + re => Some(Regex::new(re).map_err(|e| FieldViolation { + field: "table_name_regex".to_string(), + description: e.to_string(), + })?), + }; + let predicate = match proto.predicate { + p if p.is_empty() => None, + p => Some(p), + }; + + Ok(Self { + table_name_regex, + predicate, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types::consistent_hasher::ConsistentHasher; + use data_types::database_rules::DatabaseRules; + + #[test] + fn test_matcher_default() { + let protobuf = management::Matcher { + ..Default::default() + }; + + let matcher: Matcher = protobuf.clone().try_into().unwrap(); + let back: management::Matcher = matcher.clone().into(); + + assert!(matcher.table_name_regex.is_none()); + assert_eq!(protobuf.table_name_regex, back.table_name_regex); + + assert_eq!(matcher.predicate, None); + assert_eq!(protobuf.predicate, back.predicate); + } + + #[test] + fn test_matcher_regexp() { + let protobuf = management::Matcher { + table_name_regex: "^foo$".into(), + ..Default::default() + }; + + let matcher: Matcher = protobuf.clone().try_into().unwrap(); + let back: management::Matcher = matcher.clone().into(); + + assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$"); + assert_eq!(protobuf.table_name_regex, back.table_name_regex); + } + + #[test] + fn test_matcher_bad_regexp() { + let protobuf = management::Matcher { + table_name_regex: "*".into(), + ..Default::default() + }; + + let matcher: Result = protobuf.try_into(); + assert!(matcher.is_err()); + assert_eq!(matcher.err().unwrap().field, "table_name_regex"); + } + + #[test] + fn test_hash_ring_default() { + let protobuf = management::HashRing { + ..Default::default() + }; + + let hash_ring: HashRing = protobuf.clone().try_into().unwrap(); + let back: management::HashRing = hash_ring.clone().into(); + + assert_eq!(hash_ring.table_name, false); + assert_eq!(protobuf.table_name, back.table_name); + assert!(hash_ring.columns.is_empty()); + assert_eq!(protobuf.columns, back.columns); + assert!(hash_ring.shards.is_empty()); + assert_eq!(protobuf.shards, back.shards); + } + + #[test] + fn test_hash_ring_nodes() { + let protobuf = management::HashRing { + shards: vec![1, 2], + ..Default::default() + }; + + let hash_ring: HashRing = protobuf.try_into().unwrap(); + + assert_eq!(hash_ring.shards.len(), 2); + assert_eq!(hash_ring.shards.find(1), Some(2)); + assert_eq!(hash_ring.shards.find(2), Some(1)); + } + + #[test] + fn test_matcher_to_shard_default() { + let protobuf = management::MatcherToShard { + ..Default::default() + }; + + let matcher_to_shard: MatcherToShard = protobuf.clone().try_into().unwrap(); + let back: management::MatcherToShard = matcher_to_shard.clone().into(); + + assert_eq!( + matcher_to_shard.matcher, + Matcher { + ..Default::default() + } + ); + assert_eq!(protobuf.matcher, back.matcher); + + assert_eq!(matcher_to_shard.shard, 0); + assert_eq!(protobuf.shard, back.shard); + } + + #[test] + fn test_shard_config_default() { + let protobuf = management::ShardConfig { + ..Default::default() + }; + + let shard_config: ShardConfig = protobuf.clone().try_into().unwrap(); + let back: management::ShardConfig = shard_config.clone().into(); + + assert!(shard_config.specific_targets.is_empty()); + assert_eq!(protobuf.specific_targets, back.specific_targets); + + assert!(shard_config.hash_ring.is_none()); + assert_eq!(protobuf.hash_ring, back.hash_ring); + + assert_eq!(shard_config.ignore_errors, false); + assert_eq!(protobuf.ignore_errors, back.ignore_errors); + + assert!(shard_config.shards.is_empty()); + assert_eq!(protobuf.shards, back.shards); + } + + #[test] + fn test_database_rules_shard_config() { + let protobuf = management::DatabaseRules { + name: "database".to_string(), + shard_config: Some(management::ShardConfig { + ..Default::default() + }), + ..Default::default() + }; + + let rules: DatabaseRules = protobuf.try_into().unwrap(); + let back: management::DatabaseRules = rules.into(); + + assert!(back.shard_config.is_some()); + } + + #[test] + fn test_shard_config_shards() { + let protobuf = management::ShardConfig { + shards: vec![ + ( + 1, + management::NodeGroup { + nodes: vec![ + management::node_group::Node { id: 10 }, + management::node_group::Node { id: 11 }, + management::node_group::Node { id: 12 }, + ], + }, + ), + ( + 2, + management::NodeGroup { + nodes: vec![management::node_group::Node { id: 20 }], + }, + ), + ] + .into_iter() + .collect(), + ..Default::default() + }; + + let shard_config: ShardConfig = protobuf.try_into().unwrap(); + + assert_eq!(shard_config.shards.len(), 2); + assert_eq!(shard_config.shards[&1].len(), 3); + assert_eq!(shard_config.shards[&2].len(), 1); + } + + #[test] + fn test_sharder() { + let protobuf = management::ShardConfig { + specific_targets: vec![management::MatcherToShard { + matcher: Some(management::Matcher { + table_name_regex: "pu\\d.$".to_string(), + ..Default::default() + }), + shard: 1, + }], + hash_ring: Some(management::HashRing { + table_name: true, + columns: vec!["t1".to_string(), "t2".to_string()], + shards: vec![1, 2, 3, 4], + }), + ..Default::default() + }; + + let shard_config: ShardConfig = protobuf.try_into().unwrap(); + + assert_eq!( + shard_config, + ShardConfig { + specific_targets: vec![MatcherToShard { + matcher: data_types::database_rules::Matcher { + table_name_regex: Some(Regex::new("pu\\d.$").unwrap()), + predicate: None + }, + shard: 1 + }], + hash_ring: Some(HashRing { + table_name: true, + columns: vec!["t1".to_string(), "t2".to_string(),], + shards: ConsistentHasher::new(&[1, 2, 3, 4]) + }), + ..Default::default() + } + ); + } +} diff --git a/generated_types/src/database_rules/write_buffer.rs b/generated_types/src/database_rules/write_buffer.rs new file mode 100644 index 0000000000..9e5eb3f9cc --- /dev/null +++ b/generated_types/src/database_rules/write_buffer.rs @@ -0,0 +1,119 @@ +use std::convert::{TryFrom, TryInto}; + +use data_types::database_rules::{WriteBufferConfig, WriteBufferRollover}; + +use crate::google::{FieldViolation, FromField}; +use crate::influxdata::iox::management::v1 as management; + +impl From for management::WriteBufferConfig { + fn from(rollover: WriteBufferConfig) -> Self { + let buffer_rollover: management::write_buffer_config::Rollover = + rollover.buffer_rollover.into(); + + Self { + buffer_size: rollover.buffer_size as u64, + segment_size: rollover.segment_size as u64, + buffer_rollover: buffer_rollover as _, + persist_segments: rollover.store_segments, + close_segment_after: rollover.close_segment_after.map(Into::into), + } + } +} + +impl TryFrom for WriteBufferConfig { + type Error = FieldViolation; + + fn try_from(proto: management::WriteBufferConfig) -> Result { + let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?; + let close_segment_after = proto + .close_segment_after + .map(TryInto::try_into) + .transpose() + .map_err(|_| FieldViolation { + field: "closeSegmentAfter".to_string(), + description: "Duration must be positive".to_string(), + })?; + + Ok(Self { + buffer_size: proto.buffer_size as usize, + segment_size: proto.segment_size as usize, + buffer_rollover, + store_segments: proto.persist_segments, + close_segment_after, + }) + } +} + +impl From for management::write_buffer_config::Rollover { + fn from(rollover: WriteBufferRollover) -> Self { + match rollover { + WriteBufferRollover::DropOldSegment => Self::DropOldSegment, + WriteBufferRollover::DropIncoming => Self::DropIncoming, + WriteBufferRollover::ReturnError => Self::ReturnError, + } + } +} + +impl TryFrom for WriteBufferRollover { + type Error = FieldViolation; + + fn try_from(proto: management::write_buffer_config::Rollover) -> Result { + use management::write_buffer_config::Rollover; + Ok(match proto { + Rollover::Unspecified => return Err(FieldViolation::required("")), + Rollover::DropOldSegment => Self::DropOldSegment, + Rollover::DropIncoming => Self::DropIncoming, + Rollover::ReturnError => Self::ReturnError, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_write_buffer_config_default() { + let protobuf: management::WriteBufferConfig = Default::default(); + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "buffer_rollover"); + assert_eq!(&err.description, "Field is required"); + } + + #[test] + fn test_write_buffer_config_rollover() { + let protobuf = management::WriteBufferConfig { + buffer_rollover: management::write_buffer_config::Rollover::DropIncoming as _, + ..Default::default() + }; + + let config: WriteBufferConfig = protobuf.clone().try_into().unwrap(); + let back: management::WriteBufferConfig = config.clone().into(); + + assert_eq!(config.buffer_rollover, WriteBufferRollover::DropIncoming); + assert_eq!(protobuf, back); + } + + #[test] + fn test_write_buffer_config_negative_duration() { + use crate::google::protobuf::Duration; + + let protobuf = management::WriteBufferConfig { + buffer_rollover: management::write_buffer_config::Rollover::DropOldSegment as _, + close_segment_after: Some(Duration { + seconds: -1, + nanos: -40, + }), + ..Default::default() + }; + + let res: Result = protobuf.try_into(); + let err = res.expect_err("expected failure"); + + assert_eq!(&err.field, "closeSegmentAfter"); + assert_eq!(&err.description, "Duration must be positive"); + } +} diff --git a/generated_types/src/google.rs b/generated_types/src/google.rs index ecd09e1478..bc9211d0ee 100644 --- a/generated_types/src/google.rs +++ b/generated_types/src/google.rs @@ -24,13 +24,8 @@ pub mod longrunning { use self::protobuf::Any; use observability_deps::tracing::error; -use prost::{ - bytes::{Bytes, BytesMut}, - Message, -}; -use std::convert::{TryFrom, TryInto}; -use std::iter::FromIterator; -use tonic::Status; +use prost::{bytes::BytesMut, Message}; +use std::convert::TryInto; // A newtype struct to provide conversion into tonic::Status struct EncodeError(prost::EncodeError); @@ -300,3 +295,107 @@ impl From for tonic::Status { ) } } + +/// An extension trait that adds the method `scope` to any type +/// implementing `TryInto` +pub(crate) trait FromField { + fn scope(self, field: impl Into) -> Result; +} + +impl FromField for T +where + T: TryInto, +{ + /// Try to convert type using TryInto calling `FieldViolation::scope` + /// on any returned error + fn scope(self, field: impl Into) -> Result { + self.try_into().map_err(|e| e.scope(field)) + } +} + +/// An extension trait that adds the methods `optional` and `required` to any +/// Option containing a type implementing `TryInto` +pub trait FromFieldOpt { + /// Try to convert inner type, if any, using TryInto calling + /// `FieldViolation::scope` on any error encountered + /// + /// Returns None if empty + fn optional(self, field: impl Into) -> Result, FieldViolation>; + + /// Try to convert inner type, using TryInto calling `FieldViolation::scope` + /// on any error encountered + /// + /// Returns an error if empty + fn required(self, field: impl Into) -> Result; +} + +impl FromFieldOpt for Option +where + T: TryInto, +{ + fn optional(self, field: impl Into) -> Result, FieldViolation> { + self.map(|t| t.scope(field)).transpose() + } + + fn required(self, field: impl Into) -> Result { + match self { + None => Err(FieldViolation::required(field)), + Some(t) => t.scope(field), + } + } +} + +/// An extension trait that adds the methods `optional` and `required` to any +/// String +/// +/// Prost will default string fields to empty, whereas IOx sometimes +/// uses Option, this helper aids mapping between them +/// +/// TODO: Review mixed use of Option and String in IOX +pub(crate) trait FromFieldString { + /// Returns a Ok if the String is not empty + fn required(self, field: impl Into) -> Result; + + /// Wraps non-empty strings in Some(_), returns None for empty strings + fn optional(self) -> Option; +} + +impl FromFieldString for String { + fn required(self, field: impl Into) -> Result { + if self.is_empty() { + return Err(FieldViolation::required(field)); + } + Ok(self) + } + + fn optional(self) -> Option { + if self.is_empty() { + return None; + } + Some(self) + } +} + +/// An extension trait that adds the method `vec_field` to any Vec of a type +/// implementing `TryInto` +pub(crate) trait FromFieldVec { + /// Converts to a `Vec`, short-circuiting on the first error and + /// returning a correctly scoped `FieldViolation` for where the error + /// was encountered + fn vec_field(self, field: impl Into) -> Result; +} + +impl FromFieldVec> for Vec +where + T: TryInto, +{ + fn vec_field(self, field: impl Into) -> Result, FieldViolation> { + let res: Result<_, _> = self + .into_iter() + .enumerate() + .map(|(i, t)| t.scope(i.to_string())) + .collect(); + + res.map_err(|e| e.scope(field)) + } +} diff --git a/generated_types/src/job.rs b/generated_types/src/job.rs new file mode 100644 index 0000000000..8bfe0c6ca2 --- /dev/null +++ b/generated_types/src/job.rs @@ -0,0 +1,108 @@ +use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt}; +use crate::influxdata::iox::management::v1 as management; +use crate::protobuf_type_url_eq; +use data_types::job::{Job, OperationStatus}; +use std::convert::TryFrom; + +impl From for management::operation_metadata::Job { + fn from(job: Job) -> Self { + match job { + Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }), + Job::CloseChunk { + db_name, + partition_key, + table_name, + chunk_id, + } => Self::CloseChunk(management::CloseChunk { + db_name, + partition_key, + table_name, + chunk_id, + }), + Job::WriteChunk { + db_name, + partition_key, + table_name, + chunk_id, + } => Self::WriteChunk(management::WriteChunk { + db_name, + partition_key, + table_name, + chunk_id, + }), + } + } +} + +impl From for Job { + fn from(value: management::operation_metadata::Job) -> Self { + use management::operation_metadata::Job; + match value { + Job::Dummy(management::Dummy { nanos }) => Self::Dummy { nanos }, + Job::CloseChunk(management::CloseChunk { + db_name, + partition_key, + table_name, + chunk_id, + }) => Self::CloseChunk { + db_name, + partition_key, + table_name, + chunk_id, + }, + Job::WriteChunk(management::WriteChunk { + db_name, + partition_key, + table_name, + chunk_id, + }) => Self::WriteChunk { + db_name, + partition_key, + table_name, + chunk_id, + }, + } + } +} + +impl TryFrom for data_types::job::Operation { + type Error = FieldViolation; + + fn try_from(operation: longrunning::Operation) -> Result { + let metadata: Any = operation + .metadata + .ok_or_else(|| FieldViolation::required("metadata"))?; + + if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) { + return Err(FieldViolation { + field: "metadata.type_url".to_string(), + description: "Unexpected field type".to_string(), + }); + } + + let meta: management::OperationMetadata = + prost::Message::decode(metadata.value).field("metadata.value")?; + + let status = match &operation.result { + None => OperationStatus::Running, + Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete, + Some(longrunning::operation::Result::Error(status)) => { + if status.code == tonic::Code::Cancelled as i32 { + OperationStatus::Cancelled + } else { + OperationStatus::Errored + } + } + }; + + Ok(Self { + id: operation.name.parse().field("name")?, + task_count: meta.task_count, + pending_count: meta.pending_count, + wall_time: std::time::Duration::from_nanos(meta.wall_nanos), + cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos), + job: meta.job.map(Into::into), + status, + }) + } +} diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 16553cab28..2233777097 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -2,13 +2,6 @@ // crates because of all the generated code it contains that we don't have much // control over. #![deny(broken_intra_doc_links)] -#![allow( - unused_imports, - clippy::redundant_static_lifetimes, - clippy::redundant_closure, - clippy::redundant_field_names, - clippy::clone_on_ref_ptr -)] /// This module imports the generated protobuf code into a Rust module /// hierarchy that matches the namespace hierarchy of the protobuf @@ -99,8 +92,7 @@ pub fn protobuf_type_url(protobuf_type: &str) -> String { /// Protobuf file descriptor containing all generated types. /// Useful in gRPC reflection. -pub const FILE_DESCRIPTOR_SET: &'static [u8] = - tonic::include_file_descriptor_set!("proto_descriptor"); +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("proto_descriptor"); /// Compares the protobuf type URL found within a google.protobuf.Any /// message to an expected Protobuf package and message name @@ -124,7 +116,10 @@ pub fn protobuf_type_url_eq(url: &str, protobuf_type: &str) -> bool { pub use com::github::influxdata::idpe::storage::read::*; pub use influxdata::platform::storage::*; +pub mod chunk; +pub mod database_rules; pub mod google; +pub mod job; #[cfg(test)] mod tests { diff --git a/server/src/lib.rs b/server/src/lib.rs index b7dad0b0e6..1976a29dbe 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -100,6 +100,7 @@ use crate::{ db::Db, }; use data_types::database_rules::{NodeGroup, ShardId}; +use generated_types::database_rules::{decode_database_rules, encode_database_rules}; use influxdb_iox_client::{connection::Builder, write}; use rand::seq::SliceRandom; use std::collections::HashMap; @@ -145,7 +146,7 @@ pub enum Error { IdNotSet, #[snafu(display("error serializing configuration {}", source))] ErrorSerializing { - source: data_types::database_rules::Error, + source: generated_types::database_rules::EncodeError, }, #[snafu(display("error deserializing configuration {}", source))] ErrorDeserializing { source: serde_json::Error }, @@ -420,7 +421,7 @@ impl Server { let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); let mut data = BytesMut::new(); - rules.encode(&mut data).context(ErrorSerializing)?; + encode_database_rules(rules, &mut data).context(ErrorSerializing)?; let len = data.len(); @@ -486,7 +487,7 @@ impl Server { let res = res.unwrap().freeze(); - match DatabaseRules::decode(res) { + match decode_database_rules(res) { Err(e) => { error!("error parsing database config {:?} from store: {}", path, e) } @@ -1058,7 +1059,7 @@ mod tests { .unwrap() .freeze(); - let read_rules = DatabaseRules::decode(read_data).unwrap(); + let read_rules = decode_database_rules(read_data).unwrap(); assert_eq!(rules, read_rules); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 10c67e4058..cf61d622f6 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -2,12 +2,9 @@ use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use std::sync::Arc; -use data_types::{ - database_rules::DatabaseRules, field_validation::FromFieldOpt, server_id::ServerId, - DatabaseName, -}; +use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use generated_types::google::{ - AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound, + AlreadyExists, FieldViolation, FieldViolationExt, FromFieldOpt, InternalError, NotFound, }; use generated_types::influxdata::iox::management::v1::*; use observability_deps::tracing::info;