refactor: move protobuf conversion logic to generated_types (#1437)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-05-06 16:49:27 +01:00 committed by GitHub
parent 884baf7329
commit 216903a949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1637 additions and 1549 deletions

6
Cargo.lock generated
View File

@ -774,17 +774,14 @@ name = "data_types"
version = "0.1.0"
dependencies = [
"chrono",
"generated_types",
"influxdb_line_protocol",
"observability_deps",
"percent-encoding",
"prost",
"regex",
"serde",
"serde_regex",
"snafu",
"test_helpers",
"tonic",
]
[[package]]
@ -1202,6 +1199,7 @@ name = "generated_types"
version = "0.1.0"
dependencies = [
"bytes",
"data_types",
"flatbuffers",
"futures",
"google_types",
@ -1209,8 +1207,10 @@ dependencies = [
"prost",
"prost-build",
"prost-types",
"regex",
"serde",
"serde_json",
"thiserror",
"tonic",
"tonic-build",
]

View File

@ -8,15 +8,12 @@ readme = "README.md"
[dependencies] # In alphabetical order
chrono = { version = "0.4", features = ["serde"] }
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
percent-encoding = "2.1.0"
prost = "0.7"
regex = "1.4"
serde = { version = "1.0", features = ["rc"] }
serde = { version = "1.0", features = ["rc", "derive"] }
serde_regex = "1.1"
snafu = "0.6"
tonic = { version = "0.4.0" }
observability_deps = { path = "../observability_deps" }
[dev-dependencies] # In alphabetical order

View File

@ -1,12 +1,7 @@
//! Module contains a representation of chunk metadata
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use std::sync::Arc;
use crate::field_validation::FromField;
use chrono::{DateTime, Utc};
use generated_types::{google::FieldViolation, influxdata::iox::management::v1 as management};
use serde::{Deserialize, Serialize};
/// Which storage system is a chunk located in?
@ -100,221 +95,3 @@ impl ChunkSummary {
}
}
}
/// Conversion code to management API chunk structure
impl From<ChunkSummary> for management::Chunk {
fn from(summary: ChunkSummary) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
} = summary;
let storage: management::ChunkStorage = storage.into();
let storage = storage.into(); // convert to i32
let estimated_bytes = estimated_bytes as u64;
let row_count = row_count as u64;
let partition_key = match Arc::try_unwrap(partition_key) {
// no one else has a reference so take the string
Ok(partition_key) => partition_key,
// some other reference exists to this string, so clone it
Err(partition_key) => partition_key.as_ref().clone(),
};
let table_name = match Arc::try_unwrap(table_name) {
// no one else has a reference so take the string
Ok(table_name) => table_name,
// some other reference exists to this string, so clone it
Err(table_name) => table_name.as_ref().clone(),
};
let time_of_first_write = time_of_first_write.map(|t| t.into());
let time_of_last_write = time_of_last_write.map(|t| t.into());
let time_closed = time_closed.map(|t| t.into());
Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
}
}
}
impl From<ChunkStorage> for management::ChunkStorage {
fn from(storage: ChunkStorage) -> Self {
match storage {
ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer,
ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer,
ChunkStorage::ReadBuffer => Self::ReadBuffer,
ChunkStorage::ReadBufferAndObjectStore => Self::ReadBufferAndObjectStore,
ChunkStorage::ObjectStoreOnly => Self::ObjectStoreOnly,
}
}
}
/// Conversion code from management API chunk structure
impl TryFrom<management::Chunk> for ChunkSummary {
type Error = FieldViolation;
fn try_from(proto: management::Chunk) -> Result<Self, Self::Error> {
// Use prost enum conversion
let storage = proto.storage().scope("storage")?;
let time_of_first_write = proto
.time_of_first_write
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_of_first_write".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let time_of_last_write = proto
.time_of_last_write
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_of_last_write".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let time_closed = proto
.time_closed
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_closed".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let management::Chunk {
partition_key,
table_name,
id,
estimated_bytes,
row_count,
..
} = proto;
let estimated_bytes = estimated_bytes as usize;
let row_count = row_count as usize;
let partition_key = Arc::new(partition_key);
let table_name = Arc::new(table_name);
Ok(Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
})
}
}
impl TryFrom<management::ChunkStorage> for ChunkStorage {
type Error = FieldViolation;
fn try_from(proto: management::ChunkStorage) -> Result<Self, Self::Error> {
match proto {
management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer),
management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer),
management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer),
management::ChunkStorage::ReadBufferAndObjectStore => {
Ok(Self::ReadBufferAndObjectStore)
}
management::ChunkStorage::ObjectStoreOnly => Ok(Self::ObjectStoreOnly),
management::ChunkStorage::Unspecified => Err(FieldViolation::required("")),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn valid_proto_to_summary() {
let proto = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
let summary = ChunkSummary::try_from(proto).expect("conversion successful");
let expected = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
assert_eq!(
summary, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
summary, expected
);
}
#[test]
fn valid_summary_to_proto() {
let summary = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
let proto = management::Chunk::try_from(summary).expect("conversion successful");
let expected = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
assert_eq!(
proto, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
proto, expected
);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,112 +0,0 @@
//! A collection of extension traits for types that
//! implement TryInto<U, Error=FieldViolation>
//!
//! Allows associating field context with the generated errors
//! as they propagate up the struct topology
use generated_types::google::FieldViolation;
use std::convert::TryInto;
/// An extension trait that adds the method `scope` to any type
/// implementing `TryInto<U, Error = FieldViolation>`
pub(crate) trait FromField<T> {
fn scope(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromField<U> for T
where
T: TryInto<U, Error = FieldViolation>,
{
/// Try to convert type using TryInto calling `FieldViolation::scope`
/// on any returned error
fn scope(self, field: impl Into<String>) -> Result<U, FieldViolation> {
self.try_into().map_err(|e| e.scope(field))
}
}
/// An extension trait that adds the methods `optional` and `required` to any
/// Option containing a type implementing `TryInto<U, Error = FieldViolation>`
pub trait FromFieldOpt<T> {
/// Try to convert inner type, if any, using TryInto calling
/// `FieldViolation::scope` on any error encountered
///
/// Returns None if empty
fn optional(self, field: impl Into<String>) -> Result<Option<T>, FieldViolation>;
/// Try to convert inner type, using TryInto calling `FieldViolation::scope`
/// on any error encountered
///
/// Returns an error if empty
fn required(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromFieldOpt<U> for Option<T>
where
T: TryInto<U, Error = FieldViolation>,
{
fn optional(self, field: impl Into<String>) -> Result<Option<U>, FieldViolation> {
self.map(|t| t.scope(field)).transpose()
}
fn required(self, field: impl Into<String>) -> Result<U, FieldViolation> {
match self {
None => Err(FieldViolation::required(field)),
Some(t) => t.scope(field),
}
}
}
/// An extension trait that adds the methods `optional` and `required` to any
/// String
///
/// Prost will default string fields to empty, whereas IOx sometimes
/// uses Option<String>, this helper aids mapping between them
///
/// TODO: Review mixed use of Option<String> and String in IOX
pub(crate) trait FromFieldString {
/// Returns a Ok if the String is not empty
fn required(self, field: impl Into<String>) -> Result<String, FieldViolation>;
/// Wraps non-empty strings in Some(_), returns None for empty strings
fn optional(self) -> Option<String>;
}
impl FromFieldString for String {
fn required(self, field: impl Into<Self>) -> Result<String, FieldViolation> {
if self.is_empty() {
return Err(FieldViolation::required(field));
}
Ok(self)
}
fn optional(self) -> Option<String> {
if self.is_empty() {
return None;
}
Some(self)
}
}
/// An extension trait that adds the method `vec_field` to any Vec of a type
/// implementing `TryInto<U, Error = FieldViolation>`
pub(crate) trait FromFieldVec<T> {
/// Converts to a `Vec<U>`, short-circuiting on the first error and
/// returning a correctly scoped `FieldViolation` for where the error
/// was encountered
fn vec_field(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromFieldVec<Vec<U>> for Vec<T>
where
T: TryInto<U, Error = FieldViolation>,
{
fn vec_field(self, field: impl Into<String>) -> Result<Vec<U>, FieldViolation> {
let res: Result<_, _> = self
.into_iter()
.enumerate()
.map(|(i, t)| t.scope(i.to_string()))
.collect();
res.map_err(|e| e.scope(field))
}
}

View File

@ -1,9 +1,4 @@
use generated_types::google::{protobuf::Any, FieldViolation, FieldViolationExt};
use generated_types::{
google::longrunning, influxdata::iox::management::v1 as management, protobuf_type_url_eq,
};
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
/// Metadata associated with a set of background tasks
/// Used in combination with TrackerRegistry
@ -32,67 +27,6 @@ pub enum Job {
},
}
impl From<Job> for management::operation_metadata::Job {
fn from(job: Job) -> Self {
match job {
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
Job::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
Job::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
}
}
}
impl From<management::operation_metadata::Job> for Job {
fn from(value: management::operation_metadata::Job) -> Self {
use management::operation_metadata::Job;
match value {
Job::Dummy(management::Dummy { nanos }) => Self::Dummy { nanos },
Job::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
Job::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
}
}
}
impl Job {
/// Returns the database name assocated with this job, if any
pub fn db_name(&self) -> Option<&str> {
@ -169,45 +103,3 @@ pub struct Operation {
/// The status of the running operation
pub status: OperationStatus,
}
impl TryFrom<longrunning::Operation> for Operation {
type Error = FieldViolation;
fn try_from(operation: longrunning::Operation) -> Result<Self, Self::Error> {
let metadata: Any = operation
.metadata
.ok_or_else(|| FieldViolation::required("metadata"))?;
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
return Err(FieldViolation {
field: "metadata.type_url".to_string(),
description: "Unexpected field type".to_string(),
});
}
let meta: management::OperationMetadata =
prost::Message::decode(metadata.value).field("metadata.value")?;
let status = match &operation.result {
None => OperationStatus::Running,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete,
Some(longrunning::operation::Result::Error(status)) => {
if status.code == tonic::Code::Cancelled as i32 {
OperationStatus::Cancelled
} else {
OperationStatus::Errored
}
}
};
Ok(Self {
id: operation.name.parse().field("name")?,
task_count: meta.task_count,
pending_count: meta.pending_count,
wall_time: std::time::Duration::from_nanos(meta.wall_nanos),
cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos),
job: meta.job.map(Into::into),
status,
})
}
}

View File

@ -16,7 +16,6 @@ mod database_name;
pub use database_name::*;
pub mod database_rules;
pub mod error;
pub mod field_validation;
pub mod http;
pub mod job;
pub mod names;

View File

@ -6,17 +6,20 @@ edition = "2018"
[dependencies] # In alphabetical order
bytes = { version = "1.0", features = ["serde"] }
data_types = { path = "../data_types" }
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "0.8"
futures = "0.3"
google_types = { path = "../google_types" }
observability_deps = { path = "../observability_deps" }
prost = "0.7"
prost-types = "0.7"
tonic = "0.4"
observability_deps = { path = "../observability_deps" }
google_types = { path = "../google_types" }
regex = "1.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.44"
thiserror = "1.0.23"
tonic = "0.4"
[build-dependencies] # In alphabetical order
tonic-build = "0.4"

View File

@ -0,0 +1,223 @@
use crate::google::{FieldViolation, FromField};
use crate::influxdata::iox::management::v1 as management;
use data_types::chunk::{ChunkStorage, ChunkSummary};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
/// Conversion code to management API chunk structure
impl From<ChunkSummary> for management::Chunk {
fn from(summary: ChunkSummary) -> Self {
let ChunkSummary {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
} = summary;
let storage: management::ChunkStorage = storage.into();
let storage = storage.into(); // convert to i32
let estimated_bytes = estimated_bytes as u64;
let row_count = row_count as u64;
let partition_key = match Arc::try_unwrap(partition_key) {
// no one else has a reference so take the string
Ok(partition_key) => partition_key,
// some other reference exists to this string, so clone it
Err(partition_key) => partition_key.as_ref().clone(),
};
let table_name = match Arc::try_unwrap(table_name) {
// no one else has a reference so take the string
Ok(table_name) => table_name,
// some other reference exists to this string, so clone it
Err(table_name) => table_name.as_ref().clone(),
};
let time_of_first_write = time_of_first_write.map(|t| t.into());
let time_of_last_write = time_of_last_write.map(|t| t.into());
let time_closed = time_closed.map(|t| t.into());
Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
}
}
}
impl From<ChunkStorage> for management::ChunkStorage {
fn from(storage: ChunkStorage) -> Self {
match storage {
ChunkStorage::OpenMutableBuffer => Self::OpenMutableBuffer,
ChunkStorage::ClosedMutableBuffer => Self::ClosedMutableBuffer,
ChunkStorage::ReadBuffer => Self::ReadBuffer,
ChunkStorage::ReadBufferAndObjectStore => Self::ReadBufferAndObjectStore,
ChunkStorage::ObjectStoreOnly => Self::ObjectStoreOnly,
}
}
}
/// Conversion code from management API chunk structure
impl TryFrom<management::Chunk> for ChunkSummary {
type Error = FieldViolation;
fn try_from(proto: management::Chunk) -> Result<Self, Self::Error> {
// Use prost enum conversion
let storage = proto.storage().scope("storage")?;
let time_of_first_write = proto
.time_of_first_write
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_of_first_write".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let time_of_last_write = proto
.time_of_last_write
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_of_last_write".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let time_closed = proto
.time_closed
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "time_closed".to_string(),
description: "Timestamp must be positive".to_string(),
})?;
let management::Chunk {
partition_key,
table_name,
id,
estimated_bytes,
row_count,
..
} = proto;
let estimated_bytes = estimated_bytes as usize;
let row_count = row_count as usize;
let partition_key = Arc::new(partition_key);
let table_name = Arc::new(table_name);
Ok(Self {
partition_key,
table_name,
id,
storage,
estimated_bytes,
row_count,
time_of_first_write,
time_of_last_write,
time_closed,
})
}
}
impl TryFrom<management::ChunkStorage> for ChunkStorage {
type Error = FieldViolation;
fn try_from(proto: management::ChunkStorage) -> Result<Self, Self::Error> {
match proto {
management::ChunkStorage::OpenMutableBuffer => Ok(Self::OpenMutableBuffer),
management::ChunkStorage::ClosedMutableBuffer => Ok(Self::ClosedMutableBuffer),
management::ChunkStorage::ReadBuffer => Ok(Self::ReadBuffer),
management::ChunkStorage::ReadBufferAndObjectStore => {
Ok(Self::ReadBufferAndObjectStore)
}
management::ChunkStorage::ObjectStoreOnly => Ok(Self::ObjectStoreOnly),
management::ChunkStorage::Unspecified => Err(FieldViolation::required("")),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn valid_proto_to_summary() {
let proto = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
let summary = ChunkSummary::try_from(proto).expect("conversion successful");
let expected = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
assert_eq!(
summary, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
summary, expected
);
}
#[test]
fn valid_summary_to_proto() {
let summary = ChunkSummary {
partition_key: Arc::new("foo".to_string()),
table_name: Arc::new("bar".to_string()),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: ChunkStorage::ObjectStoreOnly,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
let proto = management::Chunk::try_from(summary).expect("conversion successful");
let expected = management::Chunk {
partition_key: "foo".to_string(),
table_name: "bar".to_string(),
id: 42,
estimated_bytes: 1234,
row_count: 321,
storage: management::ChunkStorage::ObjectStoreOnly.into(),
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
};
assert_eq!(
proto, expected,
"Actual:\n\n{:?}\n\nExpected:\n\n{:?}\n\n",
proto, expected
);
}
}

View File

@ -0,0 +1,189 @@
use std::convert::{TryFrom, TryInto};
use thiserror::Error;
use data_types::database_rules::{ColumnType, ColumnValue, DatabaseRules, Order};
use data_types::DatabaseName;
use crate::google::{FieldViolation, FieldViolationExt, FromFieldOpt};
use crate::influxdata::iox::management::v1 as management;
mod lifecycle;
mod partition;
mod shard;
mod write_buffer;
impl From<DatabaseRules> for management::DatabaseRules {
fn from(rules: DatabaseRules) -> Self {
Self {
name: rules.name.into(),
partition_template: Some(rules.partition_template.into()),
write_buffer_config: rules.write_buffer_config.map(Into::into),
lifecycle_rules: Some(rules.lifecycle_rules.into()),
shard_config: rules.shard_config.map(Into::into),
}
}
}
impl TryFrom<management::DatabaseRules> for DatabaseRules {
type Error = FieldViolation;
fn try_from(proto: management::DatabaseRules) -> Result<Self, Self::Error> {
let name = DatabaseName::new(proto.name.clone()).field("name")?;
let write_buffer_config = proto.write_buffer_config.optional("write_buffer_config")?;
let lifecycle_rules = proto
.lifecycle_rules
.optional("lifecycle_rules")?
.unwrap_or_default();
let partition_template = proto
.partition_template
.optional("partition_template")?
.unwrap_or_default();
let shard_config = proto
.shard_config
.optional("shard_config")
.unwrap_or_default();
Ok(Self {
name,
partition_template,
write_buffer_config,
lifecycle_rules,
shard_config,
})
}
}
#[derive(Debug, Error)]
pub enum DecodeError {
#[error("failed to decode protobuf: {0}")]
DecodeError(#[from] prost::DecodeError),
#[error("validation failed: {0}")]
ValidationError(#[from] FieldViolation),
}
#[derive(Debug, Error)]
pub enum EncodeError {
#[error("failed to encode protobuf: {0}")]
EncodeError(#[from] prost::EncodeError),
}
pub fn decode_database_rules(bytes: prost::bytes::Bytes) -> Result<DatabaseRules, DecodeError> {
let message: management::DatabaseRules = prost::Message::decode(bytes)?;
Ok(message.try_into()?)
}
pub fn encode_database_rules(
rules: DatabaseRules,
bytes: &mut prost::bytes::BytesMut,
) -> Result<(), EncodeError> {
let encoded: management::DatabaseRules = rules.into();
Ok(prost::Message::encode(&encoded, bytes)?)
}
impl From<Order> for management::Order {
fn from(o: Order) -> Self {
match o {
Order::Asc => Self::Asc,
Order::Desc => Self::Desc,
}
}
}
impl TryFrom<management::Order> for Order {
type Error = FieldViolation;
fn try_from(proto: management::Order) -> Result<Self, Self::Error> {
Ok(match proto {
management::Order::Unspecified => Self::default(),
management::Order::Asc => Self::Asc,
management::Order::Desc => Self::Desc,
})
}
}
impl From<ColumnType> for management::ColumnType {
fn from(t: ColumnType) -> Self {
match t {
ColumnType::I64 => Self::I64,
ColumnType::U64 => Self::U64,
ColumnType::F64 => Self::F64,
ColumnType::String => Self::String,
ColumnType::Bool => Self::Bool,
}
}
}
impl TryFrom<management::ColumnType> for ColumnType {
type Error = FieldViolation;
fn try_from(proto: management::ColumnType) -> Result<Self, Self::Error> {
Ok(match proto {
management::ColumnType::Unspecified => return Err(FieldViolation::required("")),
management::ColumnType::I64 => Self::I64,
management::ColumnType::U64 => Self::U64,
management::ColumnType::F64 => Self::F64,
management::ColumnType::String => Self::String,
management::ColumnType::Bool => Self::Bool,
})
}
}
impl From<ColumnValue> for management::Aggregate {
fn from(v: ColumnValue) -> Self {
match v {
ColumnValue::Min => Self::Min,
ColumnValue::Max => Self::Max,
}
}
}
impl TryFrom<management::Aggregate> for ColumnValue {
type Error = FieldViolation;
fn try_from(proto: management::Aggregate) -> Result<Self, Self::Error> {
use management::Aggregate;
Ok(match proto {
Aggregate::Unspecified => return Err(FieldViolation::required("")),
Aggregate::Min => Self::Min,
Aggregate::Max => Self::Max,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::database_rules::LifecycleRules;
#[test]
fn test_database_rules_defaults() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
..Default::default()
};
let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
let back: management::DatabaseRules = rules.clone().into();
assert_eq!(rules.name.as_str(), protobuf.name.as_str());
assert_eq!(protobuf.name, back.name);
assert_eq!(rules.partition_template.parts.len(), 0);
// These will be defaulted as optionality not preserved on non-protobuf
// DatabaseRules
assert_eq!(back.partition_template, Some(Default::default()));
assert_eq!(back.lifecycle_rules, Some(LifecycleRules::default().into()));
// These should be none as preserved on non-protobuf DatabaseRules
assert!(back.write_buffer_config.is_none());
assert!(back.shard_config.is_none());
}
}

View File

@ -0,0 +1,280 @@
use std::convert::{TryFrom, TryInto};
use std::num::NonZeroU64;
use data_types::database_rules::{LifecycleRules, Sort, SortOrder};
use crate::google::protobuf::Empty;
use crate::google::{FieldViolation, FromField, FromFieldOpt, FromFieldString};
use crate::influxdata::iox::management::v1 as management;
impl From<LifecycleRules> for management::LifecycleRules {
fn from(config: LifecycleRules) -> Self {
Self {
mutable_linger_seconds: config
.mutable_linger_seconds
.map(Into::into)
.unwrap_or_default(),
mutable_minimum_age_seconds: config
.mutable_minimum_age_seconds
.map(Into::into)
.unwrap_or_default(),
mutable_size_threshold: config
.mutable_size_threshold
.map(|x| x.get() as u64)
.unwrap_or_default(),
buffer_size_soft: config
.buffer_size_soft
.map(|x| x.get() as u64)
.unwrap_or_default(),
buffer_size_hard: config
.buffer_size_hard
.map(|x| x.get() as u64)
.unwrap_or_default(),
sort_order: Some(config.sort_order.into()),
drop_non_persisted: config.drop_non_persisted,
persist: config.persist,
immutable: config.immutable,
worker_backoff_millis: config.worker_backoff_millis.map_or(0, NonZeroU64::get),
}
}
}
impl TryFrom<management::LifecycleRules> for LifecycleRules {
type Error = FieldViolation;
fn try_from(proto: management::LifecycleRules) -> Result<Self, Self::Error> {
Ok(Self {
mutable_linger_seconds: proto.mutable_linger_seconds.try_into().ok(),
mutable_minimum_age_seconds: proto.mutable_minimum_age_seconds.try_into().ok(),
mutable_size_threshold: (proto.mutable_size_threshold as usize).try_into().ok(),
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
sort_order: proto.sort_order.optional("sort_order")?.unwrap_or_default(),
drop_non_persisted: proto.drop_non_persisted,
persist: proto.persist,
immutable: proto.immutable,
worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis),
})
}
}
impl From<SortOrder> for management::lifecycle_rules::SortOrder {
fn from(ps: SortOrder) -> Self {
let order: management::Order = ps.order.into();
Self {
order: order as _,
sort: Some(ps.sort.into()),
}
}
}
impl TryFrom<management::lifecycle_rules::SortOrder> for SortOrder {
type Error = FieldViolation;
fn try_from(proto: management::lifecycle_rules::SortOrder) -> Result<Self, Self::Error> {
Ok(Self {
order: proto.order().scope("order")?,
sort: proto.sort.optional("sort")?.unwrap_or_default(),
})
}
}
impl From<Sort> for management::lifecycle_rules::sort_order::Sort {
fn from(ps: Sort) -> Self {
use management::lifecycle_rules::sort_order::ColumnSort;
match ps {
Sort::LastWriteTime => Self::LastWriteTime(Empty {}),
Sort::CreatedAtTime => Self::CreatedAtTime(Empty {}),
Sort::Column(column_name, column_type, column_value) => {
let column_type: management::ColumnType = column_type.into();
let column_value: management::Aggregate = column_value.into();
Self::Column(ColumnSort {
column_name,
column_type: column_type as _,
column_value: column_value as _,
})
}
}
}
}
impl TryFrom<management::lifecycle_rules::sort_order::Sort> for Sort {
type Error = FieldViolation;
fn try_from(proto: management::lifecycle_rules::sort_order::Sort) -> Result<Self, Self::Error> {
use management::lifecycle_rules::sort_order::Sort;
Ok(match proto {
Sort::LastWriteTime(_) => Self::LastWriteTime,
Sort::CreatedAtTime(_) => Self::CreatedAtTime,
Sort::Column(column_sort) => {
let column_type = column_sort.column_type().scope("column.column_type")?;
let column_value = column_sort.column_value().scope("column.column_value")?;
Self::Column(
column_sort.column_name.required("column.column_name")?,
column_type,
column_value,
)
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::database_rules::{ColumnType, ColumnValue, Order};
#[test]
fn lifecycle_rules() {
let protobuf = management::LifecycleRules {
mutable_linger_seconds: 123,
mutable_minimum_age_seconds: 5345,
mutable_size_threshold: 232,
buffer_size_soft: 353,
buffer_size_hard: 232,
sort_order: None,
drop_non_persisted: true,
persist: true,
immutable: true,
worker_backoff_millis: 1000,
};
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
let back: management::LifecycleRules = config.clone().into();
assert_eq!(config.sort_order, SortOrder::default());
assert_eq!(
config.mutable_linger_seconds.unwrap().get(),
protobuf.mutable_linger_seconds
);
assert_eq!(
config.mutable_minimum_age_seconds.unwrap().get(),
protobuf.mutable_minimum_age_seconds
);
assert_eq!(
config.mutable_size_threshold.unwrap().get(),
protobuf.mutable_size_threshold as usize
);
assert_eq!(
config.buffer_size_soft.unwrap().get(),
protobuf.buffer_size_soft as usize
);
assert_eq!(
config.buffer_size_hard.unwrap().get(),
protobuf.buffer_size_hard as usize
);
assert_eq!(config.drop_non_persisted, protobuf.drop_non_persisted);
assert_eq!(config.immutable, protobuf.immutable);
assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds);
assert_eq!(
back.mutable_minimum_age_seconds,
protobuf.mutable_minimum_age_seconds
);
assert_eq!(back.mutable_size_threshold, protobuf.mutable_size_threshold);
assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft);
assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard);
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
assert_eq!(back.immutable, protobuf.immutable);
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
}
#[test]
fn default_background_worker_backoff_millis() {
let protobuf = management::LifecycleRules {
worker_backoff_millis: 0,
..Default::default()
};
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
let back: management::LifecycleRules = config.into();
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
}
#[test]
fn sort_order_default() {
let protobuf: management::lifecycle_rules::SortOrder = Default::default();
let config: SortOrder = protobuf.try_into().unwrap();
assert_eq!(config, SortOrder::default());
assert_eq!(config.order, Order::default());
assert_eq!(config.sort, Sort::default());
}
#[test]
fn sort_order() {
use management::lifecycle_rules::sort_order;
let protobuf = management::lifecycle_rules::SortOrder {
order: management::Order::Asc as _,
sort: Some(sort_order::Sort::CreatedAtTime(Empty {})),
};
let config: SortOrder = protobuf.clone().try_into().unwrap();
let back: management::lifecycle_rules::SortOrder = config.clone().into();
assert_eq!(protobuf, back);
assert_eq!(config.order, Order::Asc);
assert_eq!(config.sort, Sort::CreatedAtTime);
}
#[test]
fn sort() {
use management::lifecycle_rules::sort_order;
let created_at: Sort = sort_order::Sort::CreatedAtTime(Empty {})
.try_into()
.unwrap();
let last_write: Sort = sort_order::Sort::LastWriteTime(Empty {})
.try_into()
.unwrap();
let column: Sort = sort_order::Sort::Column(sort_order::ColumnSort {
column_name: "column".to_string(),
column_type: management::ColumnType::Bool as _,
column_value: management::Aggregate::Min as _,
})
.try_into()
.unwrap();
assert_eq!(created_at, Sort::CreatedAtTime);
assert_eq!(last_write, Sort::LastWriteTime);
assert_eq!(
column,
Sort::Column("column".to_string(), ColumnType::Bool, ColumnValue::Min)
);
}
#[test]
fn partition_sort_column_sort() {
use management::lifecycle_rules::sort_order;
let res: Result<Sort, _> = sort_order::Sort::Column(Default::default()).try_into();
let err1 = res.expect_err("expected failure");
let res: Result<Sort, _> = sort_order::Sort::Column(sort_order::ColumnSort {
column_type: management::ColumnType::F64 as _,
..Default::default()
})
.try_into();
let err2 = res.expect_err("expected failure");
let res: Result<Sort, _> = sort_order::Sort::Column(sort_order::ColumnSort {
column_type: management::ColumnType::F64 as _,
column_value: management::Aggregate::Max as _,
..Default::default()
})
.try_into();
let err3 = res.expect_err("expected failure");
assert_eq!(err1.field, "column.column_type");
assert_eq!(err1.description, "Field is required");
assert_eq!(err2.field, "column.column_value");
assert_eq!(err2.description, "Field is required");
assert_eq!(err3.field, "column.column_name");
assert_eq!(err3.description, "Field is required");
}
}

View File

@ -0,0 +1,181 @@
use std::convert::TryFrom;
use data_types::database_rules::{PartitionTemplate, RegexCapture, StrftimeColumn, TemplatePart};
use crate::google::protobuf::Empty;
use crate::google::{FieldViolation, FromFieldOpt, FromFieldString, FromFieldVec};
use crate::influxdata::iox::management::v1 as management;
impl From<PartitionTemplate> for management::PartitionTemplate {
fn from(pt: PartitionTemplate) -> Self {
Self {
parts: pt.parts.into_iter().map(Into::into).collect(),
}
}
}
impl TryFrom<management::PartitionTemplate> for PartitionTemplate {
type Error = FieldViolation;
fn try_from(proto: management::PartitionTemplate) -> Result<Self, Self::Error> {
let parts = proto.parts.vec_field("parts")?;
Ok(Self { parts })
}
}
impl From<TemplatePart> for management::partition_template::part::Part {
fn from(part: TemplatePart) -> Self {
use management::partition_template::part::ColumnFormat;
match part {
TemplatePart::Table => Self::Table(Empty {}),
TemplatePart::Column(column) => Self::Column(column),
TemplatePart::RegexCapture(RegexCapture { column, regex }) => {
Self::Regex(ColumnFormat {
column,
format: regex,
})
}
TemplatePart::StrftimeColumn(StrftimeColumn { column, format }) => {
Self::StrfTime(ColumnFormat { column, format })
}
TemplatePart::TimeFormat(format) => Self::Time(format),
}
}
}
impl TryFrom<management::partition_template::part::Part> for TemplatePart {
type Error = FieldViolation;
fn try_from(proto: management::partition_template::part::Part) -> Result<Self, Self::Error> {
use management::partition_template::part::{ColumnFormat, Part};
Ok(match proto {
Part::Table(_) => Self::Table,
Part::Column(column) => Self::Column(column.required("column")?),
Part::Regex(ColumnFormat { column, format }) => Self::RegexCapture(RegexCapture {
column: column.required("regex.column")?,
regex: format.required("regex.format")?,
}),
Part::StrfTime(ColumnFormat { column, format }) => {
Self::StrftimeColumn(StrftimeColumn {
column: column.required("strf_time.column")?,
format: format.required("strf_time.format")?,
})
}
Part::Time(format) => Self::TimeFormat(format.required("time")?),
})
}
}
impl From<TemplatePart> for management::partition_template::Part {
fn from(part: TemplatePart) -> Self {
Self {
part: Some(part.into()),
}
}
}
impl TryFrom<management::partition_template::Part> for TemplatePart {
type Error = FieldViolation;
fn try_from(proto: management::partition_template::Part) -> Result<Self, Self::Error> {
proto.part.required("part")
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::database_rules::DatabaseRules;
use std::convert::TryInto;
#[test]
fn test_partition_template_default() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
partition_template: Some(management::PartitionTemplate { parts: vec![] }),
..Default::default()
};
let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
let back: management::DatabaseRules = rules.clone().into();
assert_eq!(rules.partition_template.parts.len(), 0);
assert_eq!(protobuf.partition_template, back.partition_template);
}
#[test]
fn test_partition_template_no_part() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
partition_template: Some(management::PartitionTemplate {
parts: vec![Default::default()],
}),
..Default::default()
};
let res: Result<DatabaseRules, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "partition_template.parts.0.part");
assert_eq!(&err.description, "Field is required");
}
#[test]
fn test_partition_template() {
use management::partition_template::part::{ColumnFormat, Part};
let protobuf = management::PartitionTemplate {
parts: vec![
management::partition_template::Part {
part: Some(Part::Time("time".to_string())),
},
management::partition_template::Part {
part: Some(Part::Table(Empty {})),
},
management::partition_template::Part {
part: Some(Part::Regex(ColumnFormat {
column: "column".to_string(),
format: "format".to_string(),
})),
},
],
};
let pt: PartitionTemplate = protobuf.clone().try_into().unwrap();
let back: management::PartitionTemplate = pt.clone().into();
assert_eq!(
pt.parts,
vec![
TemplatePart::TimeFormat("time".to_string()),
TemplatePart::Table,
TemplatePart::RegexCapture(RegexCapture {
column: "column".to_string(),
regex: "format".to_string()
})
]
);
assert_eq!(protobuf, back);
}
#[test]
fn test_partition_template_empty() {
use management::partition_template::part::{ColumnFormat, Part};
let protobuf = management::PartitionTemplate {
parts: vec![management::partition_template::Part {
part: Some(Part::Regex(ColumnFormat {
..Default::default()
})),
}],
};
let res: Result<PartitionTemplate, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "parts.0.part.regex.column");
assert_eq!(&err.description, "Field is required");
}
}

View File

@ -0,0 +1,381 @@
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use regex::Regex;
use data_types::database_rules::{HashRing, Matcher, MatcherToShard, NodeGroup, ShardConfig};
use data_types::server_id::ServerId;
use crate::google::FieldViolation;
use crate::influxdata::iox::management::v1 as management;
impl From<ShardConfig> for management::ShardConfig {
fn from(shard_config: ShardConfig) -> Self {
Self {
specific_targets: shard_config
.specific_targets
.into_iter()
.map(|i| i.into())
.collect(),
hash_ring: shard_config.hash_ring.map(|i| i.into()),
ignore_errors: shard_config.ignore_errors,
shards: shard_config
.shards
.iter()
.map(|(k, v)| (*k, from_node_group_for_management_node_group(v.clone())))
.collect(),
}
}
}
impl TryFrom<management::ShardConfig> for ShardConfig {
type Error = FieldViolation;
fn try_from(proto: management::ShardConfig) -> Result<Self, Self::Error> {
Ok(Self {
specific_targets: proto
.specific_targets
.into_iter()
.map(|i| i.try_into())
.collect::<Result<Vec<MatcherToShard>, _>>()?,
hash_ring: proto
.hash_ring
.map(|i| i.try_into())
.map_or(Ok(None), |r| r.map(Some))?,
ignore_errors: proto.ignore_errors,
shards: Arc::new(
proto
.shards
.into_iter()
.map(|(k, v)| {
try_from_management_node_group_for_node_group(v).map(|ng| (k, ng))
})
.collect::<Result<HashMap<u32, NodeGroup>, FieldViolation>>()?,
),
})
}
}
/// Returns none if v matches its default value.
fn none_if_default<T: Default + PartialEq>(v: T) -> Option<T> {
if v == Default::default() {
None
} else {
Some(v)
}
}
impl From<MatcherToShard> for management::MatcherToShard {
fn from(matcher_to_shard: MatcherToShard) -> Self {
Self {
matcher: none_if_default(matcher_to_shard.matcher.into()),
shard: matcher_to_shard.shard,
}
}
}
impl TryFrom<management::MatcherToShard> for MatcherToShard {
type Error = FieldViolation;
fn try_from(proto: management::MatcherToShard) -> Result<Self, Self::Error> {
Ok(Self {
matcher: proto.matcher.unwrap_or_default().try_into()?,
shard: proto.shard,
})
}
}
impl From<HashRing> for management::HashRing {
fn from(hash_ring: HashRing) -> Self {
Self {
table_name: hash_ring.table_name,
columns: hash_ring.columns,
shards: hash_ring.shards.into(),
}
}
}
impl TryFrom<management::HashRing> for HashRing {
type Error = FieldViolation;
fn try_from(proto: management::HashRing) -> Result<Self, Self::Error> {
Ok(Self {
table_name: proto.table_name,
columns: proto.columns,
shards: proto.shards.into(),
})
}
}
// cannot (and/or don't know how to) add impl From inside prost generated code
fn from_node_group_for_management_node_group(node_group: NodeGroup) -> management::NodeGroup {
management::NodeGroup {
nodes: node_group
.into_iter()
.map(|id| management::node_group::Node { id: id.get_u32() })
.collect(),
}
}
fn try_from_management_node_group_for_node_group(
proto: management::NodeGroup,
) -> Result<NodeGroup, FieldViolation> {
proto
.nodes
.into_iter()
.map(|i| {
ServerId::try_from(i.id).map_err(|_| FieldViolation {
field: "id".to_string(),
description: "Node ID must be nonzero".to_string(),
})
})
.collect()
}
impl From<Matcher> for management::Matcher {
fn from(matcher: Matcher) -> Self {
Self {
table_name_regex: matcher
.table_name_regex
.map(|r| r.to_string())
.unwrap_or_default(),
predicate: matcher.predicate.unwrap_or_default(),
}
}
}
impl TryFrom<management::Matcher> for Matcher {
type Error = FieldViolation;
fn try_from(proto: management::Matcher) -> Result<Self, Self::Error> {
let table_name_regex = match &proto.table_name_regex as &str {
"" => None,
re => Some(Regex::new(re).map_err(|e| FieldViolation {
field: "table_name_regex".to_string(),
description: e.to_string(),
})?),
};
let predicate = match proto.predicate {
p if p.is_empty() => None,
p => Some(p),
};
Ok(Self {
table_name_regex,
predicate,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use data_types::consistent_hasher::ConsistentHasher;
use data_types::database_rules::DatabaseRules;
#[test]
fn test_matcher_default() {
let protobuf = management::Matcher {
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert!(matcher.table_name_regex.is_none());
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
assert_eq!(matcher.predicate, None);
assert_eq!(protobuf.predicate, back.predicate);
}
#[test]
fn test_matcher_regexp() {
let protobuf = management::Matcher {
table_name_regex: "^foo$".into(),
..Default::default()
};
let matcher: Matcher = protobuf.clone().try_into().unwrap();
let back: management::Matcher = matcher.clone().into();
assert_eq!(matcher.table_name_regex.unwrap().to_string(), "^foo$");
assert_eq!(protobuf.table_name_regex, back.table_name_regex);
}
#[test]
fn test_matcher_bad_regexp() {
let protobuf = management::Matcher {
table_name_regex: "*".into(),
..Default::default()
};
let matcher: Result<Matcher, FieldViolation> = protobuf.try_into();
assert!(matcher.is_err());
assert_eq!(matcher.err().unwrap().field, "table_name_regex");
}
#[test]
fn test_hash_ring_default() {
let protobuf = management::HashRing {
..Default::default()
};
let hash_ring: HashRing = protobuf.clone().try_into().unwrap();
let back: management::HashRing = hash_ring.clone().into();
assert_eq!(hash_ring.table_name, false);
assert_eq!(protobuf.table_name, back.table_name);
assert!(hash_ring.columns.is_empty());
assert_eq!(protobuf.columns, back.columns);
assert!(hash_ring.shards.is_empty());
assert_eq!(protobuf.shards, back.shards);
}
#[test]
fn test_hash_ring_nodes() {
let protobuf = management::HashRing {
shards: vec![1, 2],
..Default::default()
};
let hash_ring: HashRing = protobuf.try_into().unwrap();
assert_eq!(hash_ring.shards.len(), 2);
assert_eq!(hash_ring.shards.find(1), Some(2));
assert_eq!(hash_ring.shards.find(2), Some(1));
}
#[test]
fn test_matcher_to_shard_default() {
let protobuf = management::MatcherToShard {
..Default::default()
};
let matcher_to_shard: MatcherToShard = protobuf.clone().try_into().unwrap();
let back: management::MatcherToShard = matcher_to_shard.clone().into();
assert_eq!(
matcher_to_shard.matcher,
Matcher {
..Default::default()
}
);
assert_eq!(protobuf.matcher, back.matcher);
assert_eq!(matcher_to_shard.shard, 0);
assert_eq!(protobuf.shard, back.shard);
}
#[test]
fn test_shard_config_default() {
let protobuf = management::ShardConfig {
..Default::default()
};
let shard_config: ShardConfig = protobuf.clone().try_into().unwrap();
let back: management::ShardConfig = shard_config.clone().into();
assert!(shard_config.specific_targets.is_empty());
assert_eq!(protobuf.specific_targets, back.specific_targets);
assert!(shard_config.hash_ring.is_none());
assert_eq!(protobuf.hash_ring, back.hash_ring);
assert_eq!(shard_config.ignore_errors, false);
assert_eq!(protobuf.ignore_errors, back.ignore_errors);
assert!(shard_config.shards.is_empty());
assert_eq!(protobuf.shards, back.shards);
}
#[test]
fn test_database_rules_shard_config() {
let protobuf = management::DatabaseRules {
name: "database".to_string(),
shard_config: Some(management::ShardConfig {
..Default::default()
}),
..Default::default()
};
let rules: DatabaseRules = protobuf.try_into().unwrap();
let back: management::DatabaseRules = rules.into();
assert!(back.shard_config.is_some());
}
#[test]
fn test_shard_config_shards() {
let protobuf = management::ShardConfig {
shards: vec![
(
1,
management::NodeGroup {
nodes: vec![
management::node_group::Node { id: 10 },
management::node_group::Node { id: 11 },
management::node_group::Node { id: 12 },
],
},
),
(
2,
management::NodeGroup {
nodes: vec![management::node_group::Node { id: 20 }],
},
),
]
.into_iter()
.collect(),
..Default::default()
};
let shard_config: ShardConfig = protobuf.try_into().unwrap();
assert_eq!(shard_config.shards.len(), 2);
assert_eq!(shard_config.shards[&1].len(), 3);
assert_eq!(shard_config.shards[&2].len(), 1);
}
#[test]
fn test_sharder() {
let protobuf = management::ShardConfig {
specific_targets: vec![management::MatcherToShard {
matcher: Some(management::Matcher {
table_name_regex: "pu\\d.$".to_string(),
..Default::default()
}),
shard: 1,
}],
hash_ring: Some(management::HashRing {
table_name: true,
columns: vec!["t1".to_string(), "t2".to_string()],
shards: vec![1, 2, 3, 4],
}),
..Default::default()
};
let shard_config: ShardConfig = protobuf.try_into().unwrap();
assert_eq!(
shard_config,
ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: data_types::database_rules::Matcher {
table_name_regex: Some(Regex::new("pu\\d.$").unwrap()),
predicate: None
},
shard: 1
}],
hash_ring: Some(HashRing {
table_name: true,
columns: vec!["t1".to_string(), "t2".to_string(),],
shards: ConsistentHasher::new(&[1, 2, 3, 4])
}),
..Default::default()
}
);
}
}

View File

@ -0,0 +1,119 @@
use std::convert::{TryFrom, TryInto};
use data_types::database_rules::{WriteBufferConfig, WriteBufferRollover};
use crate::google::{FieldViolation, FromField};
use crate::influxdata::iox::management::v1 as management;
impl From<WriteBufferConfig> for management::WriteBufferConfig {
fn from(rollover: WriteBufferConfig) -> Self {
let buffer_rollover: management::write_buffer_config::Rollover =
rollover.buffer_rollover.into();
Self {
buffer_size: rollover.buffer_size as u64,
segment_size: rollover.segment_size as u64,
buffer_rollover: buffer_rollover as _,
persist_segments: rollover.store_segments,
close_segment_after: rollover.close_segment_after.map(Into::into),
}
}
}
impl TryFrom<management::WriteBufferConfig> for WriteBufferConfig {
type Error = FieldViolation;
fn try_from(proto: management::WriteBufferConfig) -> Result<Self, Self::Error> {
let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?;
let close_segment_after = proto
.close_segment_after
.map(TryInto::try_into)
.transpose()
.map_err(|_| FieldViolation {
field: "closeSegmentAfter".to_string(),
description: "Duration must be positive".to_string(),
})?;
Ok(Self {
buffer_size: proto.buffer_size as usize,
segment_size: proto.segment_size as usize,
buffer_rollover,
store_segments: proto.persist_segments,
close_segment_after,
})
}
}
impl From<WriteBufferRollover> for management::write_buffer_config::Rollover {
fn from(rollover: WriteBufferRollover) -> Self {
match rollover {
WriteBufferRollover::DropOldSegment => Self::DropOldSegment,
WriteBufferRollover::DropIncoming => Self::DropIncoming,
WriteBufferRollover::ReturnError => Self::ReturnError,
}
}
}
impl TryFrom<management::write_buffer_config::Rollover> for WriteBufferRollover {
type Error = FieldViolation;
fn try_from(proto: management::write_buffer_config::Rollover) -> Result<Self, Self::Error> {
use management::write_buffer_config::Rollover;
Ok(match proto {
Rollover::Unspecified => return Err(FieldViolation::required("")),
Rollover::DropOldSegment => Self::DropOldSegment,
Rollover::DropIncoming => Self::DropIncoming,
Rollover::ReturnError => Self::ReturnError,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_write_buffer_config_default() {
let protobuf: management::WriteBufferConfig = Default::default();
let res: Result<WriteBufferConfig, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "buffer_rollover");
assert_eq!(&err.description, "Field is required");
}
#[test]
fn test_write_buffer_config_rollover() {
let protobuf = management::WriteBufferConfig {
buffer_rollover: management::write_buffer_config::Rollover::DropIncoming as _,
..Default::default()
};
let config: WriteBufferConfig = protobuf.clone().try_into().unwrap();
let back: management::WriteBufferConfig = config.clone().into();
assert_eq!(config.buffer_rollover, WriteBufferRollover::DropIncoming);
assert_eq!(protobuf, back);
}
#[test]
fn test_write_buffer_config_negative_duration() {
use crate::google::protobuf::Duration;
let protobuf = management::WriteBufferConfig {
buffer_rollover: management::write_buffer_config::Rollover::DropOldSegment as _,
close_segment_after: Some(Duration {
seconds: -1,
nanos: -40,
}),
..Default::default()
};
let res: Result<WriteBufferConfig, _> = protobuf.try_into();
let err = res.expect_err("expected failure");
assert_eq!(&err.field, "closeSegmentAfter");
assert_eq!(&err.description, "Duration must be positive");
}
}

View File

@ -24,13 +24,8 @@ pub mod longrunning {
use self::protobuf::Any;
use observability_deps::tracing::error;
use prost::{
bytes::{Bytes, BytesMut},
Message,
};
use std::convert::{TryFrom, TryInto};
use std::iter::FromIterator;
use tonic::Status;
use prost::{bytes::BytesMut, Message};
use std::convert::TryInto;
// A newtype struct to provide conversion into tonic::Status
struct EncodeError(prost::EncodeError);
@ -300,3 +295,107 @@ impl From<QuotaFailure> for tonic::Status {
)
}
}
/// An extension trait that adds the method `scope` to any type
/// implementing `TryInto<U, Error = FieldViolation>`
pub(crate) trait FromField<T> {
fn scope(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromField<U> for T
where
T: TryInto<U, Error = FieldViolation>,
{
/// Try to convert type using TryInto calling `FieldViolation::scope`
/// on any returned error
fn scope(self, field: impl Into<String>) -> Result<U, FieldViolation> {
self.try_into().map_err(|e| e.scope(field))
}
}
/// An extension trait that adds the methods `optional` and `required` to any
/// Option containing a type implementing `TryInto<U, Error = FieldViolation>`
pub trait FromFieldOpt<T> {
/// Try to convert inner type, if any, using TryInto calling
/// `FieldViolation::scope` on any error encountered
///
/// Returns None if empty
fn optional(self, field: impl Into<String>) -> Result<Option<T>, FieldViolation>;
/// Try to convert inner type, using TryInto calling `FieldViolation::scope`
/// on any error encountered
///
/// Returns an error if empty
fn required(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromFieldOpt<U> for Option<T>
where
T: TryInto<U, Error = FieldViolation>,
{
fn optional(self, field: impl Into<String>) -> Result<Option<U>, FieldViolation> {
self.map(|t| t.scope(field)).transpose()
}
fn required(self, field: impl Into<String>) -> Result<U, FieldViolation> {
match self {
None => Err(FieldViolation::required(field)),
Some(t) => t.scope(field),
}
}
}
/// An extension trait that adds the methods `optional` and `required` to any
/// String
///
/// Prost will default string fields to empty, whereas IOx sometimes
/// uses Option<String>, this helper aids mapping between them
///
/// TODO: Review mixed use of Option<String> and String in IOX
pub(crate) trait FromFieldString {
/// Returns a Ok if the String is not empty
fn required(self, field: impl Into<String>) -> Result<String, FieldViolation>;
/// Wraps non-empty strings in Some(_), returns None for empty strings
fn optional(self) -> Option<String>;
}
impl FromFieldString for String {
fn required(self, field: impl Into<Self>) -> Result<String, FieldViolation> {
if self.is_empty() {
return Err(FieldViolation::required(field));
}
Ok(self)
}
fn optional(self) -> Option<String> {
if self.is_empty() {
return None;
}
Some(self)
}
}
/// An extension trait that adds the method `vec_field` to any Vec of a type
/// implementing `TryInto<U, Error = FieldViolation>`
pub(crate) trait FromFieldVec<T> {
/// Converts to a `Vec<U>`, short-circuiting on the first error and
/// returning a correctly scoped `FieldViolation` for where the error
/// was encountered
fn vec_field(self, field: impl Into<String>) -> Result<T, FieldViolation>;
}
impl<T, U> FromFieldVec<Vec<U>> for Vec<T>
where
T: TryInto<U, Error = FieldViolation>,
{
fn vec_field(self, field: impl Into<String>) -> Result<Vec<U>, FieldViolation> {
let res: Result<_, _> = self
.into_iter()
.enumerate()
.map(|(i, t)| t.scope(i.to_string()))
.collect();
res.map_err(|e| e.scope(field))
}
}

108
generated_types/src/job.rs Normal file
View File

@ -0,0 +1,108 @@
use crate::google::{longrunning, protobuf::Any, FieldViolation, FieldViolationExt};
use crate::influxdata::iox::management::v1 as management;
use crate::protobuf_type_url_eq;
use data_types::job::{Job, OperationStatus};
use std::convert::TryFrom;
impl From<Job> for management::operation_metadata::Job {
fn from(job: Job) -> Self {
match job {
Job::Dummy { nanos } => Self::Dummy(management::Dummy { nanos }),
Job::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
Job::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
} => Self::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}),
}
}
}
impl From<management::operation_metadata::Job> for Job {
fn from(value: management::operation_metadata::Job) -> Self {
use management::operation_metadata::Job;
match value {
Job::Dummy(management::Dummy { nanos }) => Self::Dummy { nanos },
Job::CloseChunk(management::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::CloseChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
Job::WriteChunk(management::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
}) => Self::WriteChunk {
db_name,
partition_key,
table_name,
chunk_id,
},
}
}
}
impl TryFrom<longrunning::Operation> for data_types::job::Operation {
type Error = FieldViolation;
fn try_from(operation: longrunning::Operation) -> Result<Self, Self::Error> {
let metadata: Any = operation
.metadata
.ok_or_else(|| FieldViolation::required("metadata"))?;
if !protobuf_type_url_eq(&metadata.type_url, management::OPERATION_METADATA) {
return Err(FieldViolation {
field: "metadata.type_url".to_string(),
description: "Unexpected field type".to_string(),
});
}
let meta: management::OperationMetadata =
prost::Message::decode(metadata.value).field("metadata.value")?;
let status = match &operation.result {
None => OperationStatus::Running,
Some(longrunning::operation::Result::Response(_)) => OperationStatus::Complete,
Some(longrunning::operation::Result::Error(status)) => {
if status.code == tonic::Code::Cancelled as i32 {
OperationStatus::Cancelled
} else {
OperationStatus::Errored
}
}
};
Ok(Self {
id: operation.name.parse().field("name")?,
task_count: meta.task_count,
pending_count: meta.pending_count,
wall_time: std::time::Duration::from_nanos(meta.wall_nanos),
cpu_time: std::time::Duration::from_nanos(meta.cpu_nanos),
job: meta.job.map(Into::into),
status,
})
}
}

View File

@ -2,13 +2,6 @@
// crates because of all the generated code it contains that we don't have much
// control over.
#![deny(broken_intra_doc_links)]
#![allow(
unused_imports,
clippy::redundant_static_lifetimes,
clippy::redundant_closure,
clippy::redundant_field_names,
clippy::clone_on_ref_ptr
)]
/// This module imports the generated protobuf code into a Rust module
/// hierarchy that matches the namespace hierarchy of the protobuf
@ -99,8 +92,7 @@ pub fn protobuf_type_url(protobuf_type: &str) -> String {
/// Protobuf file descriptor containing all generated types.
/// Useful in gRPC reflection.
pub const FILE_DESCRIPTOR_SET: &'static [u8] =
tonic::include_file_descriptor_set!("proto_descriptor");
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("proto_descriptor");
/// Compares the protobuf type URL found within a google.protobuf.Any
/// message to an expected Protobuf package and message name
@ -124,7 +116,10 @@ pub fn protobuf_type_url_eq(url: &str, protobuf_type: &str) -> bool {
pub use com::github::influxdata::idpe::storage::read::*;
pub use influxdata::platform::storage::*;
pub mod chunk;
pub mod database_rules;
pub mod google;
pub mod job;
#[cfg(test)]
mod tests {

View File

@ -100,6 +100,7 @@ use crate::{
db::Db,
};
use data_types::database_rules::{NodeGroup, ShardId};
use generated_types::database_rules::{decode_database_rules, encode_database_rules};
use influxdb_iox_client::{connection::Builder, write};
use rand::seq::SliceRandom;
use std::collections::HashMap;
@ -145,7 +146,7 @@ pub enum Error {
IdNotSet,
#[snafu(display("error serializing configuration {}", source))]
ErrorSerializing {
source: data_types::database_rules::Error,
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("error deserializing configuration {}", source))]
ErrorDeserializing { source: serde_json::Error },
@ -420,7 +421,7 @@ impl<M: ConnectionManager> Server<M> {
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
let mut data = BytesMut::new();
rules.encode(&mut data).context(ErrorSerializing)?;
encode_database_rules(rules, &mut data).context(ErrorSerializing)?;
let len = data.len();
@ -486,7 +487,7 @@ impl<M: ConnectionManager> Server<M> {
let res = res.unwrap().freeze();
match DatabaseRules::decode(res) {
match decode_database_rules(res) {
Err(e) => {
error!("error parsing database config {:?} from store: {}", path, e)
}
@ -1058,7 +1059,7 @@ mod tests {
.unwrap()
.freeze();
let read_rules = DatabaseRules::decode(read_data).unwrap();
let read_rules = decode_database_rules(read_data).unwrap();
assert_eq!(rules, read_rules);

View File

@ -2,12 +2,9 @@ use std::convert::{TryFrom, TryInto};
use std::fmt::Debug;
use std::sync::Arc;
use data_types::{
database_rules::DatabaseRules, field_validation::FromFieldOpt, server_id::ServerId,
DatabaseName,
};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use generated_types::google::{
AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound,
AlreadyExists, FieldViolation, FieldViolationExt, FromFieldOpt, InternalError, NotFound,
};
use generated_types::influxdata::iox::management::v1::*;
use observability_deps::tracing::info;