diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs
index c11160d4e1..f66ba8ad93 100644
--- a/data_types/src/database_rules.rs
+++ b/data_types/src/database_rules.rs
@@ -1,9 +1,19 @@
-use influxdb_line_protocol::ParsedLine;
+use std::convert::{TryFrom, TryInto};
 
 use chrono::{DateTime, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
 use snafu::Snafu;
 
+use generated_types::google::protobuf::Empty;
+use generated_types::{
+    google::{FieldViolation, FieldViolationExt},
+    influxdata::iox::management::v1 as management,
+};
+use influxdb_line_protocol::ParsedLine;
+
+use crate::field_validation::{FromField, FromFieldOpt, FromFieldString, FromFieldVec};
+use crate::DatabaseName;
+
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("Error in {}: {}", source_module, source))]
@@ -22,7 +32,7 @@ pub struct DatabaseRules {
     /// The unencoded name of the database. This gets put in by the create
     /// database call, so an empty default is fine.
     #[serde(default)]
-    pub name: String,
+    pub name: String, // TODO: Use DatabaseName here
     /// Template that generates a partition key for each row inserted into the
     /// db
     #[serde(default)]
@@ -137,6 +147,82 @@ impl Partitioner for DatabaseRules {
     }
 }
 
+impl From<DatabaseRules> for management::DatabaseRules {
+    fn from(rules: DatabaseRules) -> Self {
+        let subscriptions: Vec<management::subscription_config::Subscription> =
+            rules.subscriptions.into_iter().map(Into::into).collect();
+
+        let replication_config = management::ReplicationConfig {
+            replications: rules.replication,
+            replication_count: rules.replication_count as _,
+            replication_queue_max_size: rules.replication_queue_max_size as _,
+        };
+
+        let query_config = management::QueryConfig {
+            query_local: rules.query_local,
+            primary: rules.primary_query_group.unwrap_or_default(),
+            secondaries: rules.secondary_query_groups,
+            read_only_partitions: rules.read_only_partitions,
+        };
+
+        Self {
+            name: rules.name,
+            partition_template: Some(rules.partition_template.into()),
+            replication_config: Some(replication_config),
+            subscription_config: Some(management::SubscriptionConfig { subscriptions }),
+            query_config: Some(query_config),
+            wal_buffer_config: rules.wal_buffer_config.map(Into::into),
+            mutable_buffer_config: rules.mutable_buffer_config.map(Into::into),
+        }
+    }
+}
+
+impl TryFrom<management::DatabaseRules> for DatabaseRules {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::DatabaseRules) -> Result<Self, Self::Error> {
+        DatabaseName::new(&proto.name).field("name")?;
+
+        let subscriptions = proto
+            .subscription_config
+            .map(|s| {
+                s.subscriptions
+                    .vec_field("subscription_config.subscriptions")
+            })
+            .transpose()?
+            .unwrap_or_default();
+
+        let wal_buffer_config = proto.wal_buffer_config.optional("wal_buffer_config")?;
+
+        let mutable_buffer_config = proto
+            .mutable_buffer_config
+            .optional("mutable_buffer_config")?;
+
+        let partition_template = proto
+            .partition_template
+            .optional("partition_template")?
+            .unwrap_or_default();
+
+        let query = proto.query_config.unwrap_or_default();
+        let replication = proto.replication_config.unwrap_or_default();
+
+        Ok(Self {
+            name: proto.name,
+            partition_template,
+            replication: replication.replications,
+            replication_count: replication.replication_count as _,
+            replication_queue_max_size: replication.replication_queue_max_size as _,
+            subscriptions,
+            query_local: query.query_local,
+            primary_query_group: query.primary.optional(),
+            secondary_query_groups: query.secondaries,
+            read_only_partitions: query.read_only_partitions,
+            wal_buffer_config,
+            mutable_buffer_config,
+        })
+    }
+}
+
 /// MutableBufferConfig defines the configuration for the in-memory database
 /// that is hot for writes as they arrive. Operators can define rules for
 /// evicting data once the mutable buffer passes a set memory threshold.
@@ -190,6 +276,47 @@ impl Default for MutableBufferConfig {
     }
 }
 
+impl From<MutableBufferConfig> for management::MutableBufferConfig {
+    fn from(config: MutableBufferConfig) -> Self {
+        Self {
+            buffer_size: config.buffer_size as _,
+            reject_if_not_persisted: config.reject_if_not_persisted,
+            partition_drop_order: Some(config.partition_drop_order.into()),
+            persist_after_cold_seconds: config.persist_after_cold_seconds.unwrap_or_default(),
+        }
+    }
+}
+
+impl TryFrom<management::MutableBufferConfig> for MutableBufferConfig {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::MutableBufferConfig) -> Result<Self, Self::Error> {
+        let partition_drop_order = proto
+            .partition_drop_order
+            .optional("partition_drop_order")?
+            .unwrap_or_default();
+
+        let buffer_size = if proto.buffer_size == 0 {
+            DEFAULT_MUTABLE_BUFFER_SIZE
+        } else {
+            proto.buffer_size as usize
+        };
+
+        let persist_after_cold_seconds = if proto.persist_after_cold_seconds == 0 {
+            None
+        } else {
+            Some(proto.persist_after_cold_seconds)
+        };
+
+        Ok(Self {
+            buffer_size,
+            reject_if_not_persisted: proto.reject_if_not_persisted,
+            partition_drop_order,
+            persist_after_cold_seconds,
+        })
+    }
+}
+
 /// This struct specifies the rules for the order to sort partitions
 /// from the mutable buffer. This is used to determine which order to drop them
 /// in. The last partition in the list will be dropped, until enough space has
@@ -204,7 +331,7 @@ impl Default for MutableBufferConfig {
 ///     sort: PartitionSort::CreatedAtTime,
 /// };
 /// ```
-#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
+#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub struct PartitionSortRules {
     /// Sort partitions by this order. Last will be dropped.
     pub order: Order,
@@ -213,6 +340,30 @@ pub struct PartitionSortRules {
     pub sort: PartitionSort,
 }
 
+impl From<PartitionSortRules> for management::mutable_buffer_config::PartitionDropOrder {
+    fn from(ps: PartitionSortRules) -> Self {
+        let order: management::Order = ps.order.into();
+
+        Self {
+            order: order as _,
+            sort: Some(ps.sort.into()),
+        }
+    }
+}
+
+impl TryFrom<management::mutable_buffer_config::PartitionDropOrder> for PartitionSortRules {
+    type Error = FieldViolation;
+
+    fn try_from(
+        proto: management::mutable_buffer_config::PartitionDropOrder,
+    ) -> Result<Self, Self::Error> {
+        Ok(Self {
+            order: proto.order().scope("order")?,
+            sort: proto.sort.optional("sort")?.unwrap_or_default(),
+        })
+    }
+}
+
 /// What to sort the partition by.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub enum PartitionSort {
@@ -235,6 +386,57 @@ pub enum PartitionSort {
     Column(String, ColumnType, ColumnValue),
 }
 
+impl Default for PartitionSort {
+    fn default() -> Self {
+        Self::LastWriteTime
+    }
+}
+
+impl From<PartitionSort> for management::mutable_buffer_config::partition_drop_order::Sort {
+    fn from(ps: PartitionSort) -> Self {
+        use management::mutable_buffer_config::partition_drop_order::ColumnSort;
+
+        match ps {
+            PartitionSort::LastWriteTime => Self::LastWriteTime(Empty {}),
+            PartitionSort::CreatedAtTime => Self::CreatedAtTime(Empty {}),
+            PartitionSort::Column(column_name, column_type, column_value) => {
+                let column_type: management::ColumnType = column_type.into();
+                let column_value: management::Aggregate = column_value.into();
+
+                Self::Column(ColumnSort {
+                    column_name,
+                    column_type: column_type as _,
+                    column_value: column_value as _,
+                })
+            }
+        }
+    }
+}
+
+impl TryFrom<management::mutable_buffer_config::partition_drop_order::Sort> for PartitionSort {
+    type Error = FieldViolation;
+
+    fn try_from(
+        proto: management::mutable_buffer_config::partition_drop_order::Sort,
+    ) -> Result<Self, Self::Error> {
+        use management::mutable_buffer_config::partition_drop_order::Sort;
+
+        Ok(match proto {
+            Sort::LastWriteTime(_) => Self::LastWriteTime,
+            Sort::CreatedAtTime(_) => Self::CreatedAtTime,
+            Sort::Column(column_sort) => {
+                let column_type = column_sort.column_type().scope("column.column_type")?;
+                let column_value = column_sort.column_value().scope("column.column_value")?;
+                Self::Column(
+                    column_sort.column_name.required("column.column_name")?,
+                    column_type,
+                    column_value,
+                )
+            }
+        })
+    }
+}
+
 /// The sort order.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub enum Order {
@@ -242,6 +444,33 @@ pub enum Order {
     Desc,
 }
 
+impl Default for Order {
+    fn default() -> Self {
+        Self::Asc
+    }
+}
+
+impl From<Order> for management::Order {
+    fn from(o: Order) -> Self {
+        match o {
+            Order::Asc => Self::Asc,
+            Order::Desc => Self::Desc,
+        }
+    }
+}
+
+impl TryFrom<management::Order> for Order {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::Order) -> Result<Self, Self::Error> {
+        Ok(match proto {
+            management::Order::Unspecified => Self::default(),
+            management::Order::Asc => Self::Asc,
+            management::Order::Desc => Self::Desc,
+        })
+    }
+}
+
 /// Use columns of this type.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub enum ColumnType {
@@ -252,6 +481,33 @@ pub enum ColumnType {
     Bool,
 }
 
+impl From<ColumnType> for management::ColumnType {
+    fn from(t: ColumnType) -> Self {
+        match t {
+            ColumnType::I64 => Self::I64,
+            ColumnType::U64 => Self::U64,
+            ColumnType::F64 => Self::F64,
+            ColumnType::String => Self::String,
+            ColumnType::Bool => Self::Bool,
+        }
+    }
+}
+
+impl TryFrom<management::ColumnType> for ColumnType {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::ColumnType) -> Result<Self, Self::Error> {
+        Ok(match proto {
+            management::ColumnType::Unspecified => return Err(FieldViolation::required("")),
+            management::ColumnType::I64 => Self::I64,
+            management::ColumnType::U64 => Self::U64,
+            management::ColumnType::F64 => Self::F64,
+            management::ColumnType::String => Self::String,
+            management::ColumnType::Bool => Self::Bool,
+        })
+    }
+}
+
 /// Use either the min or max summary statistic.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub enum ColumnValue {
@@ -259,6 +515,29 @@ pub enum ColumnValue {
     Max,
 }
 
+impl From<ColumnValue> for management::Aggregate {
+    fn from(v: ColumnValue) -> Self {
+        match v {
+            ColumnValue::Min => Self::Min,
+            ColumnValue::Max => Self::Max,
+        }
+    }
+}
+
+impl TryFrom<management::Aggregate> for ColumnValue {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::Aggregate) -> Result<Self, Self::Error> {
+        use management::Aggregate;
+
+        Ok(match proto {
+            Aggregate::Unspecified => return Err(FieldViolation::required("")),
+            Aggregate::Min => Self::Min,
+            Aggregate::Max => Self::Max,
+        })
+    }
+}
+
 /// WalBufferConfig defines the configuration for buffering data from the WAL in
 /// memory. This buffer is used for asynchronous replication and to collect
 /// segments before sending them to object storage.
@@ -294,6 +573,45 @@ pub struct WalBufferConfig {
     pub close_segment_after: Option<std::time::Duration>,
 }
 
+impl From<WalBufferConfig> for management::WalBufferConfig {
+    fn from(rollover: WalBufferConfig) -> Self {
+        let buffer_rollover: management::wal_buffer_config::Rollover =
+            rollover.buffer_rollover.into();
+
+        Self {
+            buffer_size: rollover.buffer_size,
+            segment_size: rollover.segment_size,
+            buffer_rollover: buffer_rollover as _,
+            persist_segments: rollover.store_segments,
+            close_segment_after: rollover.close_segment_after.map(Into::into),
+        }
+    }
+}
+
+impl TryFrom<management::WalBufferConfig> for WalBufferConfig {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::WalBufferConfig) -> Result<Self, Self::Error> {
+        let buffer_rollover = proto.buffer_rollover().scope("buffer_rollover")?;
+        let close_segment_after = proto
+            .close_segment_after
+            .map(TryInto::try_into)
+            .transpose()
+            .map_err(|_| FieldViolation {
+                field: "closeSegmentAfter".to_string(),
+                description: "Duration must be positive".to_string(),
+            })?;
+
+        Ok(Self {
+            buffer_size: proto.buffer_size,
+            segment_size: proto.segment_size,
+            buffer_rollover,
+            store_segments: proto.persist_segments,
+            close_segment_after,
+        })
+    }
+}
+
 /// WalBufferRollover defines the behavior of what should happen if a write
 /// comes in that would cause the buffer to exceed its max size AND the oldest
 /// segment can't be dropped because it has not yet been persisted.
@@ -311,6 +629,30 @@ pub enum WalBufferRollover {
     ReturnError,
 }
 
+impl From<WalBufferRollover> for management::wal_buffer_config::Rollover {
+    fn from(rollover: WalBufferRollover) -> Self {
+        match rollover {
+            WalBufferRollover::DropOldSegment => Self::DropOldSegment,
+            WalBufferRollover::DropIncoming => Self::DropIncoming,
+            WalBufferRollover::ReturnError => Self::ReturnError,
+        }
+    }
+}
+
+impl TryFrom<management::wal_buffer_config::Rollover> for WalBufferRollover {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::wal_buffer_config::Rollover) -> Result<Self, Self::Error> {
+        use management::wal_buffer_config::Rollover;
+        Ok(match proto {
+            Rollover::Unspecified => return Err(FieldViolation::required("")),
+            Rollover::DropOldSegment => Self::DropOldSegment,
+            Rollover::DropIncoming => Self::DropIncoming,
+            Rollover::ReturnError => Self::ReturnError,
+        })
+    }
+}
+
 /// `PartitionTemplate` is used to compute the partition key of each row that
 /// gets written. It can consist of the table name, a column name and its value,
 /// a formatted time, or a string column and regex captures of its value. For
@@ -353,6 +695,23 @@ impl PartitionTemplate {
     }
 }
 
+impl From<PartitionTemplate> for management::PartitionTemplate {
+    fn from(pt: PartitionTemplate) -> Self {
+        Self {
+            parts: pt.parts.into_iter().map(Into::into).collect(),
+        }
+    }
+}
+
+impl TryFrom<management::PartitionTemplate> for PartitionTemplate {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::PartitionTemplate) -> Result<Self, Self::Error> {
+        let parts = proto.parts.vec_field("parts")?;
+        Ok(Self { parts })
+    }
+}
+
 /// `TemplatePart` specifies what part of a row should be used to compute this
 /// part of a partition key.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
@@ -380,6 +739,67 @@ pub struct StrftimeColumn {
     format: String,
 }
 
+impl From<TemplatePart> for management::partition_template::part::Part {
+    fn from(part: TemplatePart) -> Self {
+        use management::partition_template::part::ColumnFormat;
+
+        match part {
+            TemplatePart::Table => Self::Table(Empty {}),
+            TemplatePart::Column(column) => Self::Column(column),
+            TemplatePart::RegexCapture(RegexCapture { column, regex }) => {
+                Self::Regex(ColumnFormat {
+                    column,
+                    format: regex,
+                })
+            }
+            TemplatePart::StrftimeColumn(StrftimeColumn { column, format }) => {
+                Self::StrfTime(ColumnFormat { column, format })
+            }
+            TemplatePart::TimeFormat(format) => Self::Time(format),
+        }
+    }
+}
+
+impl TryFrom<management::partition_template::part::Part> for TemplatePart {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::partition_template::part::Part) -> Result<Self, Self::Error> {
+        use management::partition_template::part::{ColumnFormat, Part};
+
+        Ok(match proto {
+            Part::Table(_) => Self::Table,
+            Part::Column(column) => Self::Column(column.required("column")?),
+            Part::Regex(ColumnFormat { column, format }) => Self::RegexCapture(RegexCapture {
+                column: column.required("regex.column")?,
+                regex: format.required("regex.format")?,
+            }),
+            Part::StrfTime(ColumnFormat { column, format }) => {
+                Self::StrftimeColumn(StrftimeColumn {
+                    column: column.required("strf_time.column")?,
+                    format: format.required("strf_time.format")?,
+                })
+            }
+            Part::Time(format) => Self::TimeFormat(format.required("time")?),
+        })
+    }
+}
+
+impl From<TemplatePart> for management::partition_template::Part {
+    fn from(part: TemplatePart) -> Self {
+        Self {
+            part: Some(part.into()),
+        }
+    }
+}
+
+impl TryFrom<management::partition_template::Part> for TemplatePart {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::partition_template::Part) -> Result<Self, Self::Error> {
+        proto.part.required("part")
+    }
+}
+
 /// `PartitionId` is the object storage identifier for a specific partition. It
 /// should be a path that can be used against an object store to locate all the
 /// files and subdirectories for a partition. It takes the form of `/<writer
@@ -403,9 +823,31 @@ pub struct Subscription {
     pub matcher: Matcher,
 }
 
+impl From<Subscription> for management::subscription_config::Subscription {
+    fn from(s: Subscription) -> Self {
+        Self {
+            name: s.name,
+            host_group_id: s.host_group_id,
+            matcher: Some(s.matcher.into()),
+        }
+    }
+}
+
+impl TryFrom<management::subscription_config::Subscription> for Subscription {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::subscription_config::Subscription) -> Result<Self, Self::Error> {
+        Ok(Self {
+            name: proto.name.required("name")?,
+            host_group_id: proto.host_group_id.required("host_group_id")?,
+            matcher: proto.matcher.optional("matcher")?.unwrap_or_default(),
+        })
+    }
+}
+
 /// `Matcher` specifies the rule against the table name and/or a predicate
 /// against the row to determine if it matches the write rule.
-#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
+#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Clone)]
 pub struct Matcher {
     pub tables: MatchTables,
     // TODO: make this work with query::Predicate
@@ -413,6 +855,26 @@ pub struct Matcher {
     pub predicate: Option<String>,
 }
 
+impl From<Matcher> for management::Matcher {
+    fn from(m: Matcher) -> Self {
+        Self {
+            predicate: m.predicate.unwrap_or_default(),
+            table_matcher: Some(m.tables.into()),
+        }
+    }
+}
+
+impl TryFrom<management::Matcher> for Matcher {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::Matcher) -> Result<Self, Self::Error> {
+        Ok(Self {
+            tables: proto.table_matcher.required("table_matcher")?,
+            predicate: proto.predicate.optional(),
+        })
+    }
+}
+
 /// `MatchTables` looks at the table name of a row to determine if it should
 /// match the rule.
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
@@ -424,6 +886,35 @@ pub enum MatchTables {
     Regex(String),
 }
 
+impl Default for MatchTables {
+    fn default() -> Self {
+        Self::All
+    }
+}
+
+impl From<MatchTables> for management::matcher::TableMatcher {
+    fn from(m: MatchTables) -> Self {
+        match m {
+            MatchTables::All => Self::All(Empty {}),
+            MatchTables::Table(table) => Self::Table(table),
+            MatchTables::Regex(regex) => Self::Regex(regex),
+        }
+    }
+}
+
+impl TryFrom<management::matcher::TableMatcher> for MatchTables {
+    type Error = FieldViolation;
+
+    fn try_from(proto: management::matcher::TableMatcher) -> Result<Self, Self::Error> {
+        use management::matcher::TableMatcher;
+        Ok(match proto {
+            TableMatcher::All(_) => Self::All,
+            TableMatcher::Table(table) => Self::Table(table.required("table_matcher.table")?),
+            TableMatcher::Regex(regex) => Self::Regex(regex.required("table_matcher.regex")?),
+        })
+    }
+}
+
 pub type HostGroupId = String;
 
 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
@@ -435,9 +926,10 @@ pub struct HostGroup {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
     use influxdb_line_protocol::parse_lines;
 
+    use super::*;
+
     type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
     type Result<T = (), E = TestError> = std::result::Result<T, E>;
 
@@ -598,4 +1090,440 @@ mod tests {
     fn parse_line(line: &str) -> ParsedLine<'_> {
         parsed_lines(line).pop().unwrap()
     }
+
+    #[test]
+    fn test_database_rules_defaults() {
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            ..Default::default()
+        };
+
+        let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
+        let back: management::DatabaseRules = rules.clone().into();
+
+        assert_eq!(rules.name, protobuf.name);
+        assert_eq!(protobuf.name, back.name);
+
+        assert_eq!(rules.partition_template.parts.len(), 0);
+        assert_eq!(rules.subscriptions.len(), 0);
+        assert!(rules.primary_query_group.is_none());
+        assert_eq!(rules.read_only_partitions.len(), 0);
+        assert_eq!(rules.secondary_query_groups.len(), 0);
+
+        // These will be defaulted as optionality not preserved on non-protobuf
+        // DatabaseRules
+        assert_eq!(back.replication_config, Some(Default::default()));
+        assert_eq!(back.subscription_config, Some(Default::default()));
+        assert_eq!(back.query_config, Some(Default::default()));
+        assert_eq!(back.partition_template, Some(Default::default()));
+
+        // These should be none as preserved on non-protobuf DatabaseRules
+        assert!(back.wal_buffer_config.is_none());
+        assert!(back.mutable_buffer_config.is_none());
+    }
+
+    #[test]
+    fn test_database_rules_query() {
+        let readonly = vec!["readonly1".to_string(), "readonly2".to_string()];
+        let secondaries = vec!["secondary1".to_string(), "secondary2".to_string()];
+
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            query_config: Some(management::QueryConfig {
+                query_local: true,
+                primary: "primary".to_string(),
+                secondaries: secondaries.clone(),
+                read_only_partitions: readonly.clone(),
+            }),
+            ..Default::default()
+        };
+
+        let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
+        let back: management::DatabaseRules = rules.clone().into();
+
+        assert_eq!(rules.name, protobuf.name);
+        assert_eq!(protobuf.name, back.name);
+
+        assert_eq!(rules.read_only_partitions, readonly);
+        assert_eq!(rules.primary_query_group, Some("primary".to_string()));
+        assert_eq!(rules.secondary_query_groups, secondaries);
+        assert_eq!(rules.subscriptions.len(), 0);
+        assert_eq!(rules.partition_template.parts.len(), 0);
+
+        // Should be the same as was specified
+        assert_eq!(back.query_config, protobuf.query_config);
+        assert!(back.wal_buffer_config.is_none());
+        assert!(back.mutable_buffer_config.is_none());
+
+        // These will be defaulted as optionality not preserved on non-protobuf
+        // DatabaseRules
+        assert_eq!(back.replication_config, Some(Default::default()));
+        assert_eq!(back.subscription_config, Some(Default::default()));
+        assert_eq!(back.partition_template, Some(Default::default()));
+    }
+
+    #[test]
+    fn test_query_config_default() {
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            query_config: Some(Default::default()),
+            ..Default::default()
+        };
+
+        let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
+        let back: management::DatabaseRules = rules.clone().into();
+
+        assert!(rules.primary_query_group.is_none());
+        assert_eq!(rules.secondary_query_groups.len(), 0);
+        assert_eq!(rules.read_only_partitions.len(), 0);
+        assert_eq!(rules.query_local, false);
+
+        assert_eq!(protobuf.query_config, back.query_config);
+    }
+
+    #[test]
+    fn test_partition_template_default() {
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            partition_template: Some(management::PartitionTemplate { parts: vec![] }),
+            ..Default::default()
+        };
+
+        let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
+        let back: management::DatabaseRules = rules.clone().into();
+
+        assert_eq!(rules.partition_template.parts.len(), 0);
+        assert_eq!(protobuf.partition_template, back.partition_template);
+    }
+
+    #[test]
+    fn test_partition_template_no_part() {
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            partition_template: Some(management::PartitionTemplate {
+                parts: vec![Default::default()],
+            }),
+            ..Default::default()
+        };
+
+        let res: Result<DatabaseRules, _> = protobuf.try_into();
+        let err = res.expect_err("expected failure");
+
+        assert_eq!(&err.field, "partition_template.parts.0.part");
+        assert_eq!(&err.description, "Field is required");
+    }
+
+    #[test]
+    fn test_partition_template() {
+        use management::partition_template::part::{ColumnFormat, Part};
+
+        let protobuf = management::PartitionTemplate {
+            parts: vec![
+                management::partition_template::Part {
+                    part: Some(Part::Time("time".to_string())),
+                },
+                management::partition_template::Part {
+                    part: Some(Part::Table(Empty {})),
+                },
+                management::partition_template::Part {
+                    part: Some(Part::Regex(ColumnFormat {
+                        column: "column".to_string(),
+                        format: "format".to_string(),
+                    })),
+                },
+            ],
+        };
+
+        let pt: PartitionTemplate = protobuf.clone().try_into().unwrap();
+        let back: management::PartitionTemplate = pt.clone().into();
+
+        assert_eq!(
+            pt.parts,
+            vec![
+                TemplatePart::TimeFormat("time".to_string()),
+                TemplatePart::Table,
+                TemplatePart::RegexCapture(RegexCapture {
+                    column: "column".to_string(),
+                    regex: "format".to_string()
+                })
+            ]
+        );
+        assert_eq!(protobuf, back);
+    }
+
+    #[test]
+    fn test_partition_template_empty() {
+        use management::partition_template::part::{ColumnFormat, Part};
+
+        let protobuf = management::PartitionTemplate {
+            parts: vec![management::partition_template::Part {
+                part: Some(Part::Regex(ColumnFormat {
+                    ..Default::default()
+                })),
+            }],
+        };
+
+        let res: Result<PartitionTemplate, _> = protobuf.try_into();
+        let err = res.expect_err("expected failure");
+
+        assert_eq!(&err.field, "parts.0.part.regex.column");
+        assert_eq!(&err.description, "Field is required");
+    }
+
+    #[test]
+    fn test_wal_buffer_config_default() {
+        let protobuf: management::WalBufferConfig = Default::default();
+
+        let res: Result<WalBufferConfig, _> = protobuf.try_into();
+        let err = res.expect_err("expected failure");
+
+        assert_eq!(&err.field, "buffer_rollover");
+        assert_eq!(&err.description, "Field is required");
+    }
+
+    #[test]
+    fn test_wal_buffer_config_rollover() {
+        let protobuf = management::WalBufferConfig {
+            buffer_rollover: management::wal_buffer_config::Rollover::DropIncoming as _,
+            ..Default::default()
+        };
+
+        let config: WalBufferConfig = protobuf.clone().try_into().unwrap();
+        let back: management::WalBufferConfig = config.clone().into();
+
+        assert_eq!(config.buffer_rollover, WalBufferRollover::DropIncoming);
+        assert_eq!(protobuf, back);
+    }
+
+    #[test]
+    fn test_wal_buffer_config_negative_duration() {
+        use generated_types::google::protobuf::Duration;
+
+        let protobuf = management::WalBufferConfig {
+            buffer_rollover: management::wal_buffer_config::Rollover::DropOldSegment as _,
+            close_segment_after: Some(Duration {
+                seconds: -1,
+                nanos: -40,
+            }),
+            ..Default::default()
+        };
+
+        let res: Result<WalBufferConfig, _> = protobuf.try_into();
+        let err = res.expect_err("expected failure");
+
+        assert_eq!(&err.field, "closeSegmentAfter");
+        assert_eq!(&err.description, "Duration must be positive");
+    }
+
+    #[test]
+    fn test_matcher_default() {
+        let protobuf: management::Matcher = Default::default();
+
+        let res: Result<Matcher, _> = protobuf.try_into();
+        let err = res.expect_err("expected failure");
+
+        assert_eq!(&err.field, "table_matcher");
+        assert_eq!(&err.description, "Field is required");
+    }
+
+    #[test]
+    fn test_matcher() {
+        let protobuf = management::Matcher {
+            predicate: Default::default(),
+            table_matcher: Some(management::matcher::TableMatcher::Regex(
+                "regex".to_string(),
+            )),
+        };
+        let matcher: Matcher = protobuf.try_into().unwrap();
+
+        assert_eq!(matcher.tables, MatchTables::Regex("regex".to_string()));
+        assert!(matcher.predicate.is_none());
+    }
+
+    #[test]
+    fn test_subscription_default() {
+        let pb_matcher = Some(management::Matcher {
+            predicate: "predicate1".to_string(),
+            table_matcher: Some(management::matcher::TableMatcher::Table(
+                "table".to_string(),
+            )),
+        });
+
+        let matcher = Matcher {
+            tables: MatchTables::Table("table".to_string()),
+            predicate: Some("predicate1".to_string()),
+        };
+
+        let subscription_config = management::SubscriptionConfig {
+            subscriptions: vec![
+                management::subscription_config::Subscription {
+                    name: "subscription1".to_string(),
+                    host_group_id: "host group".to_string(),
+                    matcher: pb_matcher.clone(),
+                },
+                management::subscription_config::Subscription {
+                    name: "subscription2".to_string(),
+                    host_group_id: "host group".to_string(),
+                    matcher: pb_matcher,
+                },
+            ],
+        };
+
+        let protobuf = management::DatabaseRules {
+            name: "database".to_string(),
+            subscription_config: Some(subscription_config),
+            ..Default::default()
+        };
+
+        let rules: DatabaseRules = protobuf.clone().try_into().unwrap();
+        let back: management::DatabaseRules = rules.clone().into();
+
+        assert_eq!(protobuf.subscription_config, back.subscription_config);
+        assert_eq!(
+            rules.subscriptions,
+            vec![
+                Subscription {
+                    name: "subscription1".to_string(),
+                    host_group_id: "host group".to_string(),
+                    matcher: matcher.clone()
+                },
+                Subscription {
+                    name: "subscription2".to_string(),
+                    host_group_id: "host group".to_string(),
+                    matcher
+                }
+            ]
+        )
+    }
+
+    #[test]
+    fn mutable_buffer_config_default() {
+        let protobuf: management::MutableBufferConfig = Default::default();
+
+        let config: MutableBufferConfig = protobuf.try_into().unwrap();
+        let back: management::MutableBufferConfig = config.clone().into();
+
+        assert_eq!(config.buffer_size, DEFAULT_MUTABLE_BUFFER_SIZE);
+        assert_eq!(config.persist_after_cold_seconds, None);
+        assert_eq!(config.partition_drop_order, PartitionSortRules::default());
+        assert!(!config.reject_if_not_persisted);
+
+        assert_eq!(back.reject_if_not_persisted, config.reject_if_not_persisted);
+        assert_eq!(back.buffer_size as usize, config.buffer_size);
+        assert_eq!(
+            back.partition_drop_order,
+            Some(PartitionSortRules::default().into())
+        );
+        assert_eq!(back.persist_after_cold_seconds, 0);
+    }
+
+    #[test]
+    fn mutable_buffer_config() {
+        let protobuf = management::MutableBufferConfig {
+            buffer_size: 32,
+            reject_if_not_persisted: true,
+            partition_drop_order: Some(management::mutable_buffer_config::PartitionDropOrder {
+                order: management::Order::Desc as _,
+                sort: None,
+            }),
+            persist_after_cold_seconds: 439,
+        };
+
+        let config: MutableBufferConfig = protobuf.clone().try_into().unwrap();
+        let back: management::MutableBufferConfig = config.clone().into();
+
+        assert_eq!(config.buffer_size, protobuf.buffer_size as usize);
+        assert_eq!(
+            config.persist_after_cold_seconds,
+            Some(protobuf.persist_after_cold_seconds)
+        );
+        assert_eq!(config.partition_drop_order.order, Order::Desc);
+        assert!(config.reject_if_not_persisted);
+
+        assert_eq!(back.reject_if_not_persisted, config.reject_if_not_persisted);
+        assert_eq!(back.buffer_size as usize, config.buffer_size);
+        assert_eq!(
+            back.persist_after_cold_seconds,
+            protobuf.persist_after_cold_seconds
+        );
+    }
+
+    #[test]
+    fn partition_drop_order_default() {
+        let protobuf: management::mutable_buffer_config::PartitionDropOrder = Default::default();
+        let config: PartitionSortRules = protobuf.try_into().unwrap();
+
+        assert_eq!(config, PartitionSortRules::default());
+        assert_eq!(config.order, Order::default());
+        assert_eq!(config.sort, PartitionSort::default());
+    }
+
+    #[test]
+    fn partition_drop_order() {
+        use management::mutable_buffer_config::{partition_drop_order::Sort, PartitionDropOrder};
+        let protobuf = PartitionDropOrder {
+            order: management::Order::Asc as _,
+            sort: Some(Sort::CreatedAtTime(Empty {})),
+        };
+        let config: PartitionSortRules = protobuf.clone().try_into().unwrap();
+        let back: PartitionDropOrder = config.clone().into();
+
+        assert_eq!(protobuf, back);
+        assert_eq!(config.order, Order::Asc);
+        assert_eq!(config.sort, PartitionSort::CreatedAtTime);
+    }
+
+    #[test]
+    fn partition_sort() {
+        use management::mutable_buffer_config::partition_drop_order::{ColumnSort, Sort};
+
+        let created_at: PartitionSort = Sort::CreatedAtTime(Empty {}).try_into().unwrap();
+        let last_write: PartitionSort = Sort::LastWriteTime(Empty {}).try_into().unwrap();
+        let column: PartitionSort = Sort::Column(ColumnSort {
+            column_name: "column".to_string(),
+            column_type: management::ColumnType::Bool as _,
+            column_value: management::Aggregate::Min as _,
+        })
+        .try_into()
+        .unwrap();
+
+        assert_eq!(created_at, PartitionSort::CreatedAtTime);
+        assert_eq!(last_write, PartitionSort::LastWriteTime);
+        assert_eq!(
+            column,
+            PartitionSort::Column("column".to_string(), ColumnType::Bool, ColumnValue::Min)
+        );
+    }
+
+    #[test]
+    fn partition_sort_column_sort() {
+        use management::mutable_buffer_config::partition_drop_order::{ColumnSort, Sort};
+
+        let res: Result<PartitionSort, _> = Sort::Column(Default::default()).try_into();
+        let err1 = res.expect_err("expected failure");
+
+        let res: Result<PartitionSort, _> = Sort::Column(ColumnSort {
+            column_type: management::ColumnType::F64 as _,
+            ..Default::default()
+        })
+        .try_into();
+        let err2 = res.expect_err("expected failure");
+
+        let res: Result<PartitionSort, _> = Sort::Column(ColumnSort {
+            column_type: management::ColumnType::F64 as _,
+            column_value: management::Aggregate::Max as _,
+            ..Default::default()
+        })
+        .try_into();
+        let err3 = res.expect_err("expected failure");
+
+        assert_eq!(err1.field, "column.column_type");
+        assert_eq!(err1.description, "Field is required");
+
+        assert_eq!(err2.field, "column.column_value");
+        assert_eq!(err2.description, "Field is required");
+
+        assert_eq!(err3.field, "column.column_name");
+        assert_eq!(err3.description, "Field is required");
+    }
 }
diff --git a/data_types/src/field_validation.rs b/data_types/src/field_validation.rs
new file mode 100644
index 0000000000..49ae635a6d
--- /dev/null
+++ b/data_types/src/field_validation.rs
@@ -0,0 +1,112 @@
+//! A collection of extension traits for types that
+//! implement TryInto<U, Error=FieldViolation>
+//!
+//! Allows associating field context with the generated errors
+//! as they propagate up the struct topology
+
+use generated_types::google::FieldViolation;
+use std::convert::TryInto;
+
+/// An extension trait that adds the method `scope` to any type
+/// implementing `TryInto<U, Error = FieldViolation>`
+pub(crate) trait FromField<T> {
+    fn scope(self, field: impl Into<String>) -> Result<T, FieldViolation>;
+}
+
+impl<T, U> FromField<U> for T
+where
+    T: TryInto<U, Error = FieldViolation>,
+{
+    /// Try to convert type using TryInto calling `FieldViolation::scope`
+    /// on any returned error
+    fn scope(self, field: impl Into<String>) -> Result<U, FieldViolation> {
+        self.try_into().map_err(|e| e.scope(field))
+    }
+}
+
+/// An extension trait that adds the methods `optional` and `required` to any
+/// Option containing a type implementing `TryInto<U, Error = FieldViolation>`
+pub(crate) trait FromFieldOpt<T> {
+    /// Try to convert inner type, if any, using TryInto calling
+    /// `FieldViolation::scope` on any error encountered
+    ///
+    /// Returns None if empty
+    fn optional(self, field: impl Into<String>) -> Result<Option<T>, FieldViolation>;
+
+    /// Try to convert inner type, using TryInto calling `FieldViolation::scope`
+    /// on any error encountered
+    ///
+    /// Returns an error if empty
+    fn required(self, field: impl Into<String>) -> Result<T, FieldViolation>;
+}
+
+impl<T, U> FromFieldOpt<U> for Option<T>
+where
+    T: TryInto<U, Error = FieldViolation>,
+{
+    fn optional(self, field: impl Into<String>) -> Result<Option<U>, FieldViolation> {
+        self.map(|t| t.scope(field)).transpose()
+    }
+
+    fn required(self, field: impl Into<String>) -> Result<U, FieldViolation> {
+        match self {
+            None => Err(FieldViolation::required(field)),
+            Some(t) => t.scope(field),
+        }
+    }
+}
+
+/// An extension trait that adds the methods `optional` and `required` to any
+/// String
+///
+/// Prost will default string fields to empty, whereas IOx sometimes
+/// uses Option<String>, this helper aids mapping between them
+///
+/// TODO: Review mixed use of Option<String> and String in IOX
+pub(crate) trait FromFieldString {
+    /// Returns a Ok if the String is not empty
+    fn required(self, field: impl Into<String>) -> Result<String, FieldViolation>;
+
+    /// Wraps non-empty strings in Some(_), returns None for empty strings
+    fn optional(self) -> Option<String>;
+}
+
+impl FromFieldString for String {
+    fn required(self, field: impl Into<String>) -> Result<String, FieldViolation> {
+        if self.is_empty() {
+            return Err(FieldViolation::required(field));
+        }
+        Ok(self)
+    }
+
+    fn optional(self) -> Option<String> {
+        if self.is_empty() {
+            return None;
+        }
+        Some(self)
+    }
+}
+
+/// An extension trait that adds the method `vec_field` to any Vec of a type
+/// implementing `TryInto<U, Error = FieldViolation>`
+pub(crate) trait FromFieldVec<T> {
+    /// Converts to a `Vec<U>`, short-circuiting on the first error and
+    /// returning a correctly scoped `FieldViolation` for where the error
+    /// was encountered
+    fn vec_field(self, field: impl Into<String>) -> Result<T, FieldViolation>;
+}
+
+impl<T, U> FromFieldVec<Vec<U>> for Vec<T>
+where
+    T: TryInto<U, Error = FieldViolation>,
+{
+    fn vec_field(self, field: impl Into<String>) -> Result<Vec<U>, FieldViolation> {
+        let res: Result<_, _> = self
+            .into_iter()
+            .enumerate()
+            .map(|(i, t)| t.scope(i.to_string()))
+            .collect();
+
+        res.map_err(|e| e.scope(field))
+    }
+}
diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs
index 7cf358c180..d3766053b6 100644
--- a/data_types/src/lib.rs
+++ b/data_types/src/lib.rs
@@ -32,3 +32,5 @@ pub mod wal;
 
 mod database_name;
 pub use database_name::*;
+
+pub(crate) mod field_validation;