From 23c0110b320c36d3550cf8b2008d4d943ea68297 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 8 May 2023 15:33:32 -0400 Subject: [PATCH] feat: Create newtypes for different partition templates So that the different kinds aren't mixed up. Also extracts the logic having to do with which template takes precedence onto the PartitionTemplate type itself. --- Cargo.lock | 1 + data_types/Cargo.toml | 1 + data_types/src/lib.rs | 4 +- data_types/src/partition_template.rs | 58 ++++++++- ioxd_router/src/lib.rs | 4 +- mutable_batch_pb/tests/encode.rs | 4 +- router/src/dml_handlers/partitioner.rs | 119 +++++++++++-------- router/src/dml_handlers/schema_validation.rs | 11 +- router/tests/common/mod.rs | 4 +- 9 files changed, 143 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f21c689c2..2c14ce1b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1417,6 +1417,7 @@ dependencies = [ "influxdb-line-protocol", "iox_time", "observability_deps", + "once_cell", "ordered-float 3.7.0", "percent-encoding", "proptest", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 972f509078..7de4c08e1d 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -11,6 +11,7 @@ croaring = "0.8.1" influxdb-line-protocol = { path = "../influxdb_line_protocol" } iox_time = { path = "../iox_time" } observability_deps = { path = "../observability_deps" } +once_cell = "1" ordered-float = "3" percent-encoding = "2.2.0" schema = { path = "../schema" } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 2318ea5c6a..e2db8152a8 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -317,7 +317,7 @@ pub struct NamespaceSchema { /// None represents infinite duration (i.e. never drop data). pub retention_period_ns: Option, /// The optionally-specified partition template to use for writes in this namespace. - pub partition_template: Option>, + pub partition_template: Option>, } impl NamespaceSchema { @@ -375,7 +375,7 @@ pub struct TableSchema { pub id: TableId, /// the table's partition template - pub partition_template: Option>, + pub partition_template: Option>, /// the table's columns by their name pub columns: ColumnsByName, diff --git a/data_types/src/partition_template.rs b/data_types/src/partition_template.rs index fd79221b7f..2b50a6277c 100644 --- a/data_types/src/partition_template.rs +++ b/data_types/src/partition_template.rs @@ -1,3 +1,43 @@ +use once_cell::sync::Lazy; +use std::sync::Arc; + +/// A partition template specified by a namespace record. +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct NamespacePartitionTemplateOverride(PartitionTemplate); + +impl NamespacePartitionTemplateOverride { + /// Create a new, immutable override for a namespace's partition template. + pub fn new(partition_template: PartitionTemplate) -> Self { + Self(partition_template) + } +} + +/// A partition template specified by a table record. +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct TablePartitionTemplateOverride(PartitionTemplate); + +impl TablePartitionTemplateOverride { + /// Create a new, immutable override for a table's partition template. + pub fn new(partition_template: PartitionTemplate) -> Self { + Self(partition_template) + } +} + +/// A partition template specified as the default to be used in the absence of any overrides. +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub struct DefaultPartitionTemplate(&'static PartitionTemplate); + +impl Default for DefaultPartitionTemplate { + fn default() -> Self { + Self(&PARTITION_BY_DAY) + } +} + +/// The default partitioning scheme is by each day according to the "time" column. +pub static PARTITION_BY_DAY: Lazy = Lazy::new(|| PartitionTemplate { + parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], +}); + /// `PartitionTemplate` is used to compute the partition key of each row that gets written. It can /// consist of a column name and its value or a formatted time. For columns that do not appear in /// the input row, a blank value is output. @@ -10,11 +50,19 @@ pub struct PartitionTemplate { pub parts: Vec, } -impl Default for PartitionTemplate { - fn default() -> Self { - Self { - parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], - } +impl PartitionTemplate { + /// If the table has a partition template, use that. Otherwise, if the namespace has a + /// partition template, use that. If neither the table nor the namespace has a template, + /// use the default template. + pub fn determine_precedence<'a>( + table: Option<&'a Arc>, + namespace: Option<&'a Arc>, + default: &'a DefaultPartitionTemplate, + ) -> &'a PartitionTemplate { + table + .map(|t| &t.0) + .or(namespace.map(|n| &n.0)) + .unwrap_or(default.0) } } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 2b4b6977cc..935ee630a0 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -6,7 +6,7 @@ use std::{ use async_trait::async_trait; use authz::{Authorizer, IoxAuthorizer}; use clap_blocks::router2::Router2Config; -use data_types::{NamespaceName, PartitionTemplate}; +use data_types::{DefaultPartitionTemplate, NamespaceName}; use hashbrown::HashMap; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; @@ -258,7 +258,7 @@ pub async fn create_router2_server_type( // // Add a write partitioner into the handler stack that splits by the date // portion of the write's timestamp (the default [`PartitionTemplate`]). - let partitioner = Partitioner::new(PartitionTemplate::default()); + let partitioner = Partitioner::new(DefaultPartitionTemplate::default()); let partitioner = InstrumentationDecorator::new("partitioner", &metrics, partitioner); // # Namespace resolver diff --git a/mutable_batch_pb/tests/encode.rs b/mutable_batch_pb/tests/encode.rs index 8d0f4c5c12..2f980bea69 100644 --- a/mutable_batch_pb/tests/encode.rs +++ b/mutable_batch_pb/tests/encode.rs @@ -1,5 +1,5 @@ use arrow_util::assert_batches_eq; -use data_types::{PartitionKey, PartitionTemplate}; +use data_types::{PartitionKey, PARTITION_BY_DAY}; use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload}; use mutable_batch_pb::{decode::write_table_batch, encode::encode_batch}; use schema::Projection; @@ -120,7 +120,7 @@ fn test_encode_decode_null_columns_issue_4272() { .unwrap(); writer.commit(); - let mut partitions = PartitionWrite::partition(&batch, &PartitionTemplate::default()); + let mut partitions = PartitionWrite::partition(&batch, &PARTITION_BY_DAY); // There should be two partitions, one with for the timestamp 160, and // one for the other timestamp. diff --git a/router/src/dml_handlers/partitioner.rs b/router/src/dml_handlers/partitioner.rs index 0ddb4c2d28..4079412d9d 100644 --- a/router/src/dml_handlers/partitioner.rs +++ b/router/src/dml_handlers/partitioner.rs @@ -1,5 +1,8 @@ use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema, PartitionKey, PartitionTemplate, TableId}; +use data_types::{ + DefaultPartitionTemplate, NamespaceName, NamespaceSchema, PartitionKey, PartitionTemplate, + TableId, TablePartitionTemplateOverride, +}; use hashbrown::HashMap; use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; use observability_deps::tracing::*; @@ -43,19 +46,19 @@ impl Partitioned { /// A [`DmlHandler`] implementation that splits per-table [`MutableBatch`] into /// partitioned per-table [`MutableBatch`] instances according to a configured -/// [`PartitionTemplate`]. Deletes pass through unmodified. +/// [`DefaultPartitionTemplate`]. Deletes pass through unmodified. /// /// A vector of partitions are returned to the caller, or the first error that /// occurs during partitioning. #[derive(Debug)] pub struct Partitioner { - partition_template: Arc, + partition_template: Arc, } impl Partitioner { /// Initialise a new [`Partitioner`], splitting writes according to the - /// specified [`PartitionTemplate`]. - pub fn new(partition_template: PartitionTemplate) -> Self { + /// specified [`DefaultPartitionTemplate`]. + pub fn new(partition_template: DefaultPartitionTemplate) -> Self { Self { partition_template: Arc::new(partition_template), } @@ -66,7 +69,14 @@ impl Partitioner { impl DmlHandler for Partitioner { type WriteError = PartitionError; - type WriteInput = HashMap>, MutableBatch)>; + type WriteInput = HashMap< + TableId, + ( + String, + Option>, + MutableBatch, + ), + >; type WriteOutput = Vec>>; /// Partition the per-table [`MutableBatch`]. @@ -85,14 +95,12 @@ impl DmlHandler for Partitioner { for (table_id, (table_name, table_partition_template, batch)) in batch { // Partition the table batch according to the configured partition // template and write it into the partition-keyed map. - // If the table has a partition template, use that. Otherwise, if the namespace has a - // partition template, use that. If neither the table nor the namespace has a template, - // use the partitioner's template. - let partition_template = table_partition_template - .as_ref() - .or(namespace_partition_template.as_ref()) - .unwrap_or(&self.partition_template); + let partition_template = PartitionTemplate::determine_precedence( + table_partition_template.as_ref(), + namespace_partition_template.as_ref(), + &self.partition_template, + ); for (partition_key, partition_payload) in PartitionWrite::partition(&batch, partition_template) @@ -119,14 +127,21 @@ impl DmlHandler for Partitioner { #[cfg(test)] mod tests { use assert_matches::assert_matches; - use data_types::{NamespaceId, TemplatePart}; + use data_types::{NamespaceId, NamespacePartitionTemplateOverride, TemplatePart}; use super::*; // Parse `lp` into a table-keyed MutableBatch map. pub(crate) fn lp_to_writes( lp: &str, - ) -> HashMap>, MutableBatch)> { + ) -> HashMap< + TableId, + ( + String, + Option>, + MutableBatch, + ), + > { let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) .expect("failed to build test writes from LP"); @@ -141,7 +156,7 @@ mod tests { // rest of the fields are arbitrary. fn namespace_schema( id: i64, - partition_template: Option>, + partition_template: Option>, ) -> Arc { Arc::new(NamespaceSchema { id: NamespaceId::new(id), @@ -167,7 +182,7 @@ mod tests { paste::paste! { #[tokio::test] async fn []() { - let partition_template = PartitionTemplate::default(); + let partition_template = DefaultPartitionTemplate::default(); let partitioner = Partitioner::new(partition_template); let ns = NamespaceName::new("bananas").expect("valid db name"); @@ -324,16 +339,18 @@ mod tests { #[tokio::test] async fn test_write_namespace_partition_template() { - let partitioner = Partitioner::new(PartitionTemplate::default()); + let partitioner = Partitioner::new(DefaultPartitionTemplate::default()); let ns = NamespaceName::new("bananas").expect("valid db name"); - let namespace_partition_template = Some(Arc::new(PartitionTemplate { - parts: vec![ - TemplatePart::TimeFormat("%Y".to_string()), - TemplatePart::Column("tag1".to_string()), - TemplatePart::Column("nonanas".to_string()), - ], - })); + let namespace_partition_template = Some(Arc::new(NamespacePartitionTemplateOverride::new( + PartitionTemplate { + parts: vec![ + TemplatePart::TimeFormat("%Y".to_string()), + TemplatePart::Column("tag1".to_string()), + TemplatePart::Column("nonanas".to_string()), + ], + }, + ))); let namespace_schema = namespace_schema(42, namespace_partition_template); let writes = lp_to_writes( @@ -386,25 +403,29 @@ mod tests { #[tokio::test] async fn test_write_namespace_and_table_partition_template() { - let partitioner = Partitioner::new(PartitionTemplate::default()); + let partitioner = Partitioner::new(DefaultPartitionTemplate::default()); let ns = NamespaceName::new("bananas").expect("valid db name"); // Specify this but the table partition will take precedence for bananas. - let namespace_partition_template = Some(Arc::new(PartitionTemplate { - parts: vec![ - TemplatePart::TimeFormat("%Y".to_string()), - TemplatePart::Column("tag1".to_string()), - TemplatePart::Column("nonanas".to_string()), - ], - })); + let namespace_partition_template = Some(Arc::new(NamespacePartitionTemplateOverride::new( + PartitionTemplate { + parts: vec![ + TemplatePart::TimeFormat("%Y".to_string()), + TemplatePart::Column("tag1".to_string()), + TemplatePart::Column("nonanas".to_string()), + ], + }, + ))); let namespace_schema = namespace_schema(42, namespace_partition_template); - let bananas_table_template = Some(Arc::new(PartitionTemplate { - parts: vec![ - TemplatePart::Column("oranges".to_string()), - TemplatePart::TimeFormat("%Y-%m".to_string()), - TemplatePart::Column("tag2".to_string()), - ], - })); + let bananas_table_template = Some(Arc::new(TablePartitionTemplateOverride::new( + PartitionTemplate { + parts: vec![ + TemplatePart::Column("oranges".to_string()), + TemplatePart::TimeFormat("%Y-%m".to_string()), + TemplatePart::Column("tag2".to_string()), + ], + }, + ))); let lp = " bananas,tag1=A,tag2=C val=42i 1\n\ @@ -473,19 +494,21 @@ mod tests { #[tokio::test] async fn test_write_only_table_partition_template() { - let partitioner = Partitioner::new(PartitionTemplate::default()); + let partitioner = Partitioner::new(DefaultPartitionTemplate::default()); let ns = NamespaceName::new("bananas").expect("valid db name"); // No namespace partition means the platanos table will fall back to the default let namespace_schema = namespace_schema(42, None); - let bananas_table_template = Some(Arc::new(PartitionTemplate { - parts: vec![ - TemplatePart::Column("oranges".to_string()), - TemplatePart::TimeFormat("%Y-%m".to_string()), - TemplatePart::Column("tag2".to_string()), - ], - })); + let bananas_table_template = Some(Arc::new(TablePartitionTemplateOverride::new( + PartitionTemplate { + parts: vec![ + TemplatePart::Column("oranges".to_string()), + TemplatePart::TimeFormat("%Y-%m".to_string()), + TemplatePart::Column("tag2".to_string()), + ], + }, + ))); let lp = " bananas,tag1=A,tag2=C val=42i 1\n\ diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index 6461b09dc0..d61ea43d4c 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -1,7 +1,7 @@ use std::{ops::DerefMut, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema, PartitionTemplate, TableId}; +use data_types::{NamespaceName, NamespaceSchema, TableId, TablePartitionTemplateOverride}; use hashbrown::HashMap; use iox_catalog::{ interface::{Catalog, Error as CatalogError}, @@ -145,7 +145,14 @@ where // Accepts a map of TableName -> MutableBatch type WriteInput = HashMap; // And returns a map of TableId -> (TableName, OptionalTablePartitionTemplate, MutableBatch) - type WriteOutput = HashMap>, MutableBatch)>; + type WriteOutput = HashMap< + TableId, + ( + String, + Option>, + MutableBatch, + ), + >; /// Validate the schema of all the writes in `batches`. /// diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index 07517a6f1a..a0ac073115 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -1,6 +1,6 @@ use std::{iter, string::String, sync::Arc, time::Duration}; -use data_types::{PartitionTemplate, TableId}; +use data_types::{DefaultPartitionTemplate, TableId}; use generated_types::influxdata::iox::ingester::v1::WriteRequest; use hashbrown::HashMap; use hyper::{Body, Request, Response}; @@ -149,7 +149,7 @@ impl TestContext { let retention_validator = RetentionValidator::new(); - let partitioner = Partitioner::new(PartitionTemplate::default()); + let partitioner = Partitioner::new(DefaultPartitionTemplate::default()); let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache)); let namespace_resolver = NamespaceAutocreation::new(