feat: Create newtypes for different partition templates
So that the different kinds aren't mixed up. Also extracts the logic having to do with which template takes precedence onto the PartitionTemplate type itself.pull/24376/head
parent
ebceabb608
commit
23c0110b32
|
@ -1417,6 +1417,7 @@ dependencies = [
|
|||
"influxdb-line-protocol",
|
||||
"iox_time",
|
||||
"observability_deps",
|
||||
"once_cell",
|
||||
"ordered-float 3.7.0",
|
||||
"percent-encoding",
|
||||
"proptest",
|
||||
|
|
|
@ -11,6 +11,7 @@ croaring = "0.8.1"
|
|||
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
|
||||
iox_time = { path = "../iox_time" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = "1"
|
||||
ordered-float = "3"
|
||||
percent-encoding = "2.2.0"
|
||||
schema = { path = "../schema" }
|
||||
|
|
|
@ -317,7 +317,7 @@ pub struct NamespaceSchema {
|
|||
/// None represents infinite duration (i.e. never drop data).
|
||||
pub retention_period_ns: Option<i64>,
|
||||
/// The optionally-specified partition template to use for writes in this namespace.
|
||||
pub partition_template: Option<Arc<PartitionTemplate>>,
|
||||
pub partition_template: Option<Arc<NamespacePartitionTemplateOverride>>,
|
||||
}
|
||||
|
||||
impl NamespaceSchema {
|
||||
|
@ -375,7 +375,7 @@ pub struct TableSchema {
|
|||
pub id: TableId,
|
||||
|
||||
/// the table's partition template
|
||||
pub partition_template: Option<Arc<PartitionTemplate>>,
|
||||
pub partition_template: Option<Arc<TablePartitionTemplateOverride>>,
|
||||
|
||||
/// the table's columns by their name
|
||||
pub columns: ColumnsByName,
|
||||
|
|
|
@ -1,3 +1,43 @@
|
|||
use once_cell::sync::Lazy;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A partition template specified by a namespace record.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct NamespacePartitionTemplateOverride(PartitionTemplate);
|
||||
|
||||
impl NamespacePartitionTemplateOverride {
|
||||
/// Create a new, immutable override for a namespace's partition template.
|
||||
pub fn new(partition_template: PartitionTemplate) -> Self {
|
||||
Self(partition_template)
|
||||
}
|
||||
}
|
||||
|
||||
/// A partition template specified by a table record.
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct TablePartitionTemplateOverride(PartitionTemplate);
|
||||
|
||||
impl TablePartitionTemplateOverride {
|
||||
/// Create a new, immutable override for a table's partition template.
|
||||
pub fn new(partition_template: PartitionTemplate) -> Self {
|
||||
Self(partition_template)
|
||||
}
|
||||
}
|
||||
|
||||
/// A partition template specified as the default to be used in the absence of any overrides.
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
|
||||
pub struct DefaultPartitionTemplate(&'static PartitionTemplate);
|
||||
|
||||
impl Default for DefaultPartitionTemplate {
|
||||
fn default() -> Self {
|
||||
Self(&PARTITION_BY_DAY)
|
||||
}
|
||||
}
|
||||
|
||||
/// The default partitioning scheme is by each day according to the "time" column.
|
||||
pub static PARTITION_BY_DAY: Lazy<PartitionTemplate> = Lazy::new(|| PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
||||
});
|
||||
|
||||
/// `PartitionTemplate` is used to compute the partition key of each row that gets written. It can
|
||||
/// consist of a column name and its value or a formatted time. For columns that do not appear in
|
||||
/// the input row, a blank value is output.
|
||||
|
@ -10,11 +50,19 @@ pub struct PartitionTemplate {
|
|||
pub parts: Vec<TemplatePart>,
|
||||
}
|
||||
|
||||
impl Default for PartitionTemplate {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
||||
}
|
||||
impl PartitionTemplate {
|
||||
/// If the table has a partition template, use that. Otherwise, if the namespace has a
|
||||
/// partition template, use that. If neither the table nor the namespace has a template,
|
||||
/// use the default template.
|
||||
pub fn determine_precedence<'a>(
|
||||
table: Option<&'a Arc<TablePartitionTemplateOverride>>,
|
||||
namespace: Option<&'a Arc<NamespacePartitionTemplateOverride>>,
|
||||
default: &'a DefaultPartitionTemplate,
|
||||
) -> &'a PartitionTemplate {
|
||||
table
|
||||
.map(|t| &t.0)
|
||||
.or(namespace.map(|n| &n.0))
|
||||
.unwrap_or(default.0)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
use async_trait::async_trait;
|
||||
use authz::{Authorizer, IoxAuthorizer};
|
||||
use clap_blocks::router2::Router2Config;
|
||||
use data_types::{NamespaceName, PartitionTemplate};
|
||||
use data_types::{DefaultPartitionTemplate, NamespaceName};
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{Body, Request, Response};
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
@ -258,7 +258,7 @@ pub async fn create_router2_server_type(
|
|||
//
|
||||
// Add a write partitioner into the handler stack that splits by the date
|
||||
// portion of the write's timestamp (the default [`PartitionTemplate`]).
|
||||
let partitioner = Partitioner::new(PartitionTemplate::default());
|
||||
let partitioner = Partitioner::new(DefaultPartitionTemplate::default());
|
||||
let partitioner = InstrumentationDecorator::new("partitioner", &metrics, partitioner);
|
||||
|
||||
// # Namespace resolver
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use arrow_util::assert_batches_eq;
|
||||
use data_types::{PartitionKey, PartitionTemplate};
|
||||
use data_types::{PartitionKey, PARTITION_BY_DAY};
|
||||
use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload};
|
||||
use mutable_batch_pb::{decode::write_table_batch, encode::encode_batch};
|
||||
use schema::Projection;
|
||||
|
@ -120,7 +120,7 @@ fn test_encode_decode_null_columns_issue_4272() {
|
|||
.unwrap();
|
||||
writer.commit();
|
||||
|
||||
let mut partitions = PartitionWrite::partition(&batch, &PartitionTemplate::default());
|
||||
let mut partitions = PartitionWrite::partition(&batch, &PARTITION_BY_DAY);
|
||||
|
||||
// There should be two partitions, one with for the timestamp 160, and
|
||||
// one for the other timestamp.
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceName, NamespaceSchema, PartitionKey, PartitionTemplate, TableId};
|
||||
use data_types::{
|
||||
DefaultPartitionTemplate, NamespaceName, NamespaceSchema, PartitionKey, PartitionTemplate,
|
||||
TableId, TablePartitionTemplateOverride,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -43,19 +46,19 @@ impl<T> Partitioned<T> {
|
|||
|
||||
/// A [`DmlHandler`] implementation that splits per-table [`MutableBatch`] into
|
||||
/// partitioned per-table [`MutableBatch`] instances according to a configured
|
||||
/// [`PartitionTemplate`]. Deletes pass through unmodified.
|
||||
/// [`DefaultPartitionTemplate`]. Deletes pass through unmodified.
|
||||
///
|
||||
/// A vector of partitions are returned to the caller, or the first error that
|
||||
/// occurs during partitioning.
|
||||
#[derive(Debug)]
|
||||
pub struct Partitioner {
|
||||
partition_template: Arc<PartitionTemplate>,
|
||||
partition_template: Arc<DefaultPartitionTemplate>,
|
||||
}
|
||||
|
||||
impl Partitioner {
|
||||
/// Initialise a new [`Partitioner`], splitting writes according to the
|
||||
/// specified [`PartitionTemplate`].
|
||||
pub fn new(partition_template: PartitionTemplate) -> Self {
|
||||
/// specified [`DefaultPartitionTemplate`].
|
||||
pub fn new(partition_template: DefaultPartitionTemplate) -> Self {
|
||||
Self {
|
||||
partition_template: Arc::new(partition_template),
|
||||
}
|
||||
|
@ -66,7 +69,14 @@ impl Partitioner {
|
|||
impl DmlHandler for Partitioner {
|
||||
type WriteError = PartitionError;
|
||||
|
||||
type WriteInput = HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)>;
|
||||
type WriteInput = HashMap<
|
||||
TableId,
|
||||
(
|
||||
String,
|
||||
Option<Arc<TablePartitionTemplateOverride>>,
|
||||
MutableBatch,
|
||||
),
|
||||
>;
|
||||
type WriteOutput = Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>;
|
||||
|
||||
/// Partition the per-table [`MutableBatch`].
|
||||
|
@ -85,14 +95,12 @@ impl DmlHandler for Partitioner {
|
|||
for (table_id, (table_name, table_partition_template, batch)) in batch {
|
||||
// Partition the table batch according to the configured partition
|
||||
// template and write it into the partition-keyed map.
|
||||
// If the table has a partition template, use that. Otherwise, if the namespace has a
|
||||
// partition template, use that. If neither the table nor the namespace has a template,
|
||||
// use the partitioner's template.
|
||||
|
||||
let partition_template = table_partition_template
|
||||
.as_ref()
|
||||
.or(namespace_partition_template.as_ref())
|
||||
.unwrap_or(&self.partition_template);
|
||||
let partition_template = PartitionTemplate::determine_precedence(
|
||||
table_partition_template.as_ref(),
|
||||
namespace_partition_template.as_ref(),
|
||||
&self.partition_template,
|
||||
);
|
||||
|
||||
for (partition_key, partition_payload) in
|
||||
PartitionWrite::partition(&batch, partition_template)
|
||||
|
@ -119,14 +127,21 @@ impl DmlHandler for Partitioner {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, TemplatePart};
|
||||
use data_types::{NamespaceId, NamespacePartitionTemplateOverride, TemplatePart};
|
||||
|
||||
use super::*;
|
||||
|
||||
// Parse `lp` into a table-keyed MutableBatch map.
|
||||
pub(crate) fn lp_to_writes(
|
||||
lp: &str,
|
||||
) -> HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)> {
|
||||
) -> HashMap<
|
||||
TableId,
|
||||
(
|
||||
String,
|
||||
Option<Arc<TablePartitionTemplateOverride>>,
|
||||
MutableBatch,
|
||||
),
|
||||
> {
|
||||
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
|
||||
.expect("failed to build test writes from LP");
|
||||
|
||||
|
@ -141,7 +156,7 @@ mod tests {
|
|||
// rest of the fields are arbitrary.
|
||||
fn namespace_schema(
|
||||
id: i64,
|
||||
partition_template: Option<Arc<PartitionTemplate>>,
|
||||
partition_template: Option<Arc<NamespacePartitionTemplateOverride>>,
|
||||
) -> Arc<NamespaceSchema> {
|
||||
Arc::new(NamespaceSchema {
|
||||
id: NamespaceId::new(id),
|
||||
|
@ -167,7 +182,7 @@ mod tests {
|
|||
paste::paste! {
|
||||
#[tokio::test]
|
||||
async fn [<test_write_ $name>]() {
|
||||
let partition_template = PartitionTemplate::default();
|
||||
let partition_template = DefaultPartitionTemplate::default();
|
||||
|
||||
let partitioner = Partitioner::new(partition_template);
|
||||
let ns = NamespaceName::new("bananas").expect("valid db name");
|
||||
|
@ -324,16 +339,18 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_namespace_partition_template() {
|
||||
let partitioner = Partitioner::new(PartitionTemplate::default());
|
||||
let partitioner = Partitioner::new(DefaultPartitionTemplate::default());
|
||||
let ns = NamespaceName::new("bananas").expect("valid db name");
|
||||
|
||||
let namespace_partition_template = Some(Arc::new(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::TimeFormat("%Y".to_string()),
|
||||
TemplatePart::Column("tag1".to_string()),
|
||||
TemplatePart::Column("nonanas".to_string()),
|
||||
],
|
||||
}));
|
||||
let namespace_partition_template = Some(Arc::new(NamespacePartitionTemplateOverride::new(
|
||||
PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::TimeFormat("%Y".to_string()),
|
||||
TemplatePart::Column("tag1".to_string()),
|
||||
TemplatePart::Column("nonanas".to_string()),
|
||||
],
|
||||
},
|
||||
)));
|
||||
let namespace_schema = namespace_schema(42, namespace_partition_template);
|
||||
|
||||
let writes = lp_to_writes(
|
||||
|
@ -386,25 +403,29 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_namespace_and_table_partition_template() {
|
||||
let partitioner = Partitioner::new(PartitionTemplate::default());
|
||||
let partitioner = Partitioner::new(DefaultPartitionTemplate::default());
|
||||
let ns = NamespaceName::new("bananas").expect("valid db name");
|
||||
|
||||
// Specify this but the table partition will take precedence for bananas.
|
||||
let namespace_partition_template = Some(Arc::new(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::TimeFormat("%Y".to_string()),
|
||||
TemplatePart::Column("tag1".to_string()),
|
||||
TemplatePart::Column("nonanas".to_string()),
|
||||
],
|
||||
}));
|
||||
let namespace_partition_template = Some(Arc::new(NamespacePartitionTemplateOverride::new(
|
||||
PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::TimeFormat("%Y".to_string()),
|
||||
TemplatePart::Column("tag1".to_string()),
|
||||
TemplatePart::Column("nonanas".to_string()),
|
||||
],
|
||||
},
|
||||
)));
|
||||
let namespace_schema = namespace_schema(42, namespace_partition_template);
|
||||
let bananas_table_template = Some(Arc::new(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::Column("oranges".to_string()),
|
||||
TemplatePart::TimeFormat("%Y-%m".to_string()),
|
||||
TemplatePart::Column("tag2".to_string()),
|
||||
],
|
||||
}));
|
||||
let bananas_table_template = Some(Arc::new(TablePartitionTemplateOverride::new(
|
||||
PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::Column("oranges".to_string()),
|
||||
TemplatePart::TimeFormat("%Y-%m".to_string()),
|
||||
TemplatePart::Column("tag2".to_string()),
|
||||
],
|
||||
},
|
||||
)));
|
||||
|
||||
let lp = "
|
||||
bananas,tag1=A,tag2=C val=42i 1\n\
|
||||
|
@ -473,19 +494,21 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_write_only_table_partition_template() {
|
||||
let partitioner = Partitioner::new(PartitionTemplate::default());
|
||||
let partitioner = Partitioner::new(DefaultPartitionTemplate::default());
|
||||
let ns = NamespaceName::new("bananas").expect("valid db name");
|
||||
|
||||
// No namespace partition means the platanos table will fall back to the default
|
||||
let namespace_schema = namespace_schema(42, None);
|
||||
|
||||
let bananas_table_template = Some(Arc::new(PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::Column("oranges".to_string()),
|
||||
TemplatePart::TimeFormat("%Y-%m".to_string()),
|
||||
TemplatePart::Column("tag2".to_string()),
|
||||
],
|
||||
}));
|
||||
let bananas_table_template = Some(Arc::new(TablePartitionTemplateOverride::new(
|
||||
PartitionTemplate {
|
||||
parts: vec![
|
||||
TemplatePart::Column("oranges".to_string()),
|
||||
TemplatePart::TimeFormat("%Y-%m".to_string()),
|
||||
TemplatePart::Column("tag2".to_string()),
|
||||
],
|
||||
},
|
||||
)));
|
||||
|
||||
let lp = "
|
||||
bananas,tag1=A,tag2=C val=42i 1\n\
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{ops::DerefMut, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceName, NamespaceSchema, PartitionTemplate, TableId};
|
||||
use data_types::{NamespaceName, NamespaceSchema, TableId, TablePartitionTemplateOverride};
|
||||
use hashbrown::HashMap;
|
||||
use iox_catalog::{
|
||||
interface::{Catalog, Error as CatalogError},
|
||||
|
@ -145,7 +145,14 @@ where
|
|||
// Accepts a map of TableName -> MutableBatch
|
||||
type WriteInput = HashMap<String, MutableBatch>;
|
||||
// And returns a map of TableId -> (TableName, OptionalTablePartitionTemplate, MutableBatch)
|
||||
type WriteOutput = HashMap<TableId, (String, Option<Arc<PartitionTemplate>>, MutableBatch)>;
|
||||
type WriteOutput = HashMap<
|
||||
TableId,
|
||||
(
|
||||
String,
|
||||
Option<Arc<TablePartitionTemplateOverride>>,
|
||||
MutableBatch,
|
||||
),
|
||||
>;
|
||||
|
||||
/// Validate the schema of all the writes in `batches`.
|
||||
///
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{iter, string::String, sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{PartitionTemplate, TableId};
|
||||
use data_types::{DefaultPartitionTemplate, TableId};
|
||||
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{Body, Request, Response};
|
||||
|
@ -149,7 +149,7 @@ impl TestContext {
|
|||
|
||||
let retention_validator = RetentionValidator::new();
|
||||
|
||||
let partitioner = Partitioner::new(PartitionTemplate::default());
|
||||
let partitioner = Partitioner::new(DefaultPartitionTemplate::default());
|
||||
|
||||
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
|
||||
let namespace_resolver = NamespaceAutocreation::new(
|
||||
|
|
Loading…
Reference in New Issue