feat: Wrap max tables and max columns per table values in newtypes

pull/24376/head
Carol (Nichols || Goulding) 2023-09-11 16:06:25 -04:00
parent ab7282795a
commit c32a04388c
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
18 changed files with 141 additions and 79 deletions

View File

@ -49,7 +49,10 @@ impl NamespacesSource for MockNamespacesSource {
mod tests {
use std::collections::BTreeMap;
use data_types::{Column, ColumnId, ColumnType, ColumnsByName, TableId, TableSchema};
use data_types::{
Column, ColumnId, ColumnType, ColumnsByName, MaxColumnsPerTable, MaxTables, TableId,
TableSchema,
};
use super::*;
@ -182,8 +185,8 @@ mod tests {
ns: Namespace {
id,
name: "ns".to_string(),
max_tables: 10,
max_columns_per_table: 10,
max_tables: MaxTables::new(10),
max_columns_per_table: MaxColumnsPerTable::new(10),
retention_period_ns: None,
deleted_at: None,
partition_template: Default::default(),
@ -191,8 +194,8 @@ mod tests {
schema: NamespaceSchema {
id,
tables,
max_columns_per_table: 10,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(10),
max_tables: MaxTables::new(42),
retention_period_ns: None,
partition_template: Default::default(),
},

View File

@ -273,6 +273,50 @@ impl std::fmt::Display for ParquetFileId {
}
}
/// Max tables allowed in a namespace.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct MaxTables(i32);
#[allow(missing_docs)]
impl MaxTables {
pub const fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
impl std::fmt::Display for MaxTables {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Max columns per table allowed in a namespace.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct MaxColumnsPerTable(i32);
#[allow(missing_docs)]
impl MaxColumnsPerTable {
pub const fn new(v: i32) -> Self {
Self(v)
}
pub fn get(&self) -> i32 {
self.0
}
}
impl std::fmt::Display for MaxColumnsPerTable {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Data object for a namespace
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct Namespace {
@ -283,9 +327,9 @@ pub struct Namespace {
/// The retention period in ns. None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
/// The maximum number of tables that can exist in this namespace
pub max_tables: i32,
pub max_tables: MaxTables,
/// The maximum number of columns per table in this namespace
pub max_columns_per_table: i32,
pub max_columns_per_table: MaxColumnsPerTable,
/// When this file was marked for deletion.
pub deleted_at: Option<Timestamp>,
/// The partition template to use for new tables in this namespace either created implicitly or
@ -326,9 +370,9 @@ pub struct NamespaceSchema {
/// the tables in the namespace by name
pub tables: BTreeMap<String, TableSchema>,
/// the number of columns per table this namespace allows
pub max_columns_per_table: usize,
pub max_columns_per_table: MaxColumnsPerTable,
/// The maximum number of tables permitted in this namespace.
pub max_tables: usize,
pub max_tables: MaxTables,
/// The retention period in ns.
/// None represents infinite duration (i.e. never drop data).
pub retention_period_ns: Option<i64>,
@ -353,8 +397,8 @@ impl NamespaceSchema {
Self {
id,
tables: BTreeMap::new(),
max_columns_per_table: max_columns_per_table as usize,
max_tables: max_tables as usize,
max_columns_per_table,
max_tables,
retention_period_ns,
partition_template: partition_template.clone(),
}
@ -2670,8 +2714,8 @@ mod tests {
let schema1 = NamespaceSchema {
id: NamespaceId::new(1),
tables: BTreeMap::from([]),
max_columns_per_table: 4,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(4),
max_tables: MaxTables::new(42),
retention_period_ns: None,
partition_template: Default::default(),
};
@ -2685,8 +2729,8 @@ mod tests {
partition_template: Default::default(),
},
)]),
max_columns_per_table: 4,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(4),
max_tables: MaxTables::new(42),
retention_period_ns: None,
partition_template: Default::default(),
};

View File

@ -23,7 +23,8 @@ use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket};
use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId,
MaxColumnsPerTable, MaxTables, Namespace, NamespaceId, NamespaceSchema, ParquetFile,
PartitionKey, SequenceNumber, TableId,
};
use dml::{DmlMeta, DmlWrite};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
@ -236,8 +237,10 @@ where
NamespaceSchema {
id: ns.id,
tables: Default::default(),
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
max_columns_per_table: MaxColumnsPerTable::new(
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE
),
max_tables: MaxTables::new(iox_catalog::DEFAULT_MAX_TABLES),
retention_period_ns,
partition_template: partition_template.unwrap_or_default(),
},

View File

@ -843,9 +843,9 @@ pub(crate) mod test_helpers {
assert_eq!(namespace, lookup_namespace);
// Assert default values for service protection limits.
assert_eq!(namespace.max_tables, DEFAULT_MAX_TABLES);
assert_eq!(namespace.max_tables.get(), DEFAULT_MAX_TABLES);
assert_eq!(
namespace.max_columns_per_table,
namespace.max_columns_per_table.get(),
DEFAULT_MAX_COLUMNS_PER_TABLE
);
@ -903,7 +903,7 @@ pub(crate) mod test_helpers {
.update_table_limit(namespace_name.as_str(), NEW_TABLE_LIMIT)
.await
.expect("namespace should be updateable");
assert_eq!(NEW_TABLE_LIMIT, modified.max_tables);
assert_eq!(NEW_TABLE_LIMIT, modified.max_tables.get());
const NEW_COLUMN_LIMIT: i32 = 1500;
let modified = repos
@ -911,7 +911,7 @@ pub(crate) mod test_helpers {
.update_column_limit(namespace_name.as_str(), NEW_COLUMN_LIMIT)
.await
.expect("namespace should be updateable");
assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table);
assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table.get());
const NEW_RETENTION_PERIOD_NS: i64 = 5 * 60 * 60 * 1000 * 1000 * 1000;
let modified = repos

View File

@ -17,10 +17,10 @@ use data_types::{
partition_template::{
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
},
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp, TransitionPartitionId,
Column, ColumnId, ColumnType, CompactionLevel, MaxColumnsPerTable, MaxTables, Namespace,
NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey,
SkippedCompaction, Table, TableId, Timestamp, TransitionPartitionId,
};
use iox_time::{SystemProvider, TimeProvider};
use snafu::ensure;
@ -166,8 +166,10 @@ impl NamespaceRepo for MemTxn {
let namespace = Namespace {
id: NamespaceId::new(stage.namespaces.len() as i64 + 1),
name: name.to_string(),
max_tables: max_tables.unwrap_or(DEFAULT_MAX_TABLES),
max_columns_per_table: max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE),
max_tables: MaxTables::new(max_tables.unwrap_or(DEFAULT_MAX_TABLES)),
max_columns_per_table: MaxColumnsPerTable::new(
max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE),
),
retention_period_ns,
deleted_at: None,
partition_template: partition_template.unwrap_or_default(),
@ -229,7 +231,7 @@ impl NamespaceRepo for MemTxn {
let stage = self.stage();
match stage.namespaces.iter_mut().find(|n| n.name == name) {
Some(n) => {
n.max_tables = new_max;
n.max_tables = MaxTables::new(new_max);
Ok(n.clone())
}
None => Err(Error::NamespaceNotFoundByName {
@ -242,7 +244,7 @@ impl NamespaceRepo for MemTxn {
let stage = self.stage();
match stage.namespaces.iter_mut().find(|n| n.name == name) {
Some(n) => {
n.max_columns_per_table = new_max;
n.max_columns_per_table = MaxColumnsPerTable::new(new_max);
Ok(n.clone())
}
None => Err(Error::NamespaceNotFoundByName {
@ -299,7 +301,7 @@ impl TableRepo for MemTxn {
.iter()
.filter(|t| t.namespace_id == namespace_id)
.count();
if tables_count >= max_tables.try_into().unwrap() {
if tables_count >= max_tables.get().try_into().unwrap() {
return Err(Error::TableCreateLimitError {
table_name: name.to_string(),
namespace_id,
@ -423,7 +425,7 @@ impl ColumnRepo for MemTxn {
.iter()
.filter(|t| t.table_id == table_id)
.count();
if columns_count >= max_columns_per_table.try_into().unwrap() {
if columns_count >= max_columns_per_table.get().try_into().unwrap() {
return Err(Error::ColumnCreateLimitError {
column_name: name.to_string(),
table_id,

View File

@ -36,8 +36,8 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace {
id: namespace.id.get(),
name: namespace.name,
retention_period_ns: namespace.retention_period_ns,
max_tables: namespace.max_tables,
max_columns_per_table: namespace.max_columns_per_table,
max_tables: namespace.max_tables.get(),
max_columns_per_table: namespace.max_columns_per_table.get(),
partition_template: namespace.partition_template.as_proto().cloned(),
}
}

View File

@ -6,7 +6,8 @@ use criterion::{
};
use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
ColumnId, ColumnSchema, NamespaceId, NamespaceName, NamespaceSchema, TableId, TableSchema,
ColumnId, ColumnSchema, MaxColumnsPerTable, MaxTables, NamespaceId, NamespaceName,
NamespaceSchema, TableId, TableSchema,
};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use once_cell::sync::Lazy;
@ -155,8 +156,8 @@ fn generate_namespace_schema(tables: usize, columns_per_table: usize) -> Namespa
(format!("table{i}"), schema)
})
.collect::<BTreeMap<_, _>>(),
max_columns_per_table: usize::MAX,
max_tables: usize::MAX,
max_columns_per_table: MaxColumnsPerTable::new(i32::MAX),
max_tables: MaxTables::new(i32::MAX),
retention_period_ns: None,
partition_template,
}

View File

@ -6,7 +6,7 @@ use criterion::{
};
use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
NamespaceId, NamespaceName, NamespaceSchema, TableId,
MaxColumnsPerTable, MaxTables, NamespaceId, NamespaceName, NamespaceSchema, TableId,
};
use generated_types::influxdata::iox::partition_template::v1 as proto;
use hashbrown::HashMap;
@ -166,8 +166,8 @@ fn bench(
let schema = Arc::new(NamespaceSchema {
id: NamespaceId::new(42),
tables: Default::default(),
max_columns_per_table: 1000,
max_tables: 1000,
max_columns_per_table: MaxColumnsPerTable::new(1000),
max_tables: MaxTables::new(1000),
retention_period_ns: None,
partition_template: partition_template.clone(),
});

View File

@ -4,7 +4,7 @@ use criterion::{
criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion,
Throughput,
};
use data_types::{NamespaceId, NamespaceName, NamespaceSchema};
use data_types::{MaxColumnsPerTable, MaxTables, NamespaceId, NamespaceName, NamespaceSchema};
use hashbrown::HashMap;
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use mutable_batch::MutableBatch;
@ -51,8 +51,8 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
let namespace_schema = NamespaceSchema {
id: NamespaceId::new(42),
tables: Default::default(),
max_columns_per_table: 42,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(42),
max_tables: MaxTables::new(42),
retention_period_ns: None,
partition_template: Default::default(),
};

View File

@ -362,7 +362,7 @@ fn validate_schema_limits(
// Get the column set for this table from the schema.
let mut existing_columns = match schema.tables.get(table_name) {
Some(v) => v.column_names(),
None if batch.columns().len() > schema.max_columns_per_table => {
None if batch.columns().len() > schema.max_columns_per_table.get() as usize => {
// The table does not exist, therefore all the columns in this
// write must be created - there's no need to perform a set
// union to discover the distinct column count.
@ -370,7 +370,7 @@ fn validate_schema_limits(
table_name: table_name.into(),
merged_column_count: batch.columns().len(),
existing_column_count: 0,
max_columns_per_table: schema.max_columns_per_table,
max_columns_per_table: schema.max_columns_per_table.get() as usize,
});
}
None => {
@ -387,11 +387,11 @@ fn validate_schema_limits(
// submitted to multiple router instances, exceeding the schema
// limit by some degree (eventual enforcement).
let merged_table_count = schema.tables.len() + new_tables;
if merged_table_count > schema.max_tables {
if merged_table_count > schema.max_tables.get() as usize {
return Err(CachedServiceProtectionLimit::Table {
existing_table_count: schema.tables.len(),
merged_table_count,
table_count_limit: schema.max_tables,
table_count_limit: schema.max_tables.get() as usize,
});
}
@ -415,14 +415,15 @@ fn validate_schema_limits(
// includes existing columns and doesn't exceed the limit more, this is
// allowed.
let columns_were_added_in_this_batch = merged_column_count > existing_column_count;
let column_limit_exceeded = merged_column_count > schema.max_columns_per_table;
let column_limit_exceeded =
merged_column_count > schema.max_columns_per_table.get() as usize;
if columns_were_added_in_this_batch && column_limit_exceeded {
return Err(CachedServiceProtectionLimit::Column {
table_name: table_name.into(),
merged_column_count,
existing_column_count,
max_columns_per_table: schema.max_columns_per_table,
max_columns_per_table: schema.max_columns_per_table.get() as usize,
});
}
}
@ -692,7 +693,7 @@ mod tests {
{
let schema = namespace.schema().await;
assert_eq!(schema.tables.len(), 2);
assert_eq!(schema.max_tables, 1);
assert_eq!(schema.max_tables.get(), 1);
let batches = lp_to_writes("bananas val=2 42\nplatanos val=2 42");
assert_matches!(validate_schema_limits(&batches, &schema), Ok(()));

View File

@ -16,8 +16,8 @@ mod tests {
};
use data_types::{
ColumnId, ColumnSchema, ColumnType, ColumnsByName, NamespaceId, NamespaceName,
NamespaceSchema, TableId, TableSchema,
ColumnId, ColumnSchema, ColumnType, ColumnsByName, MaxColumnsPerTable, MaxTables,
NamespaceId, NamespaceName, NamespaceSchema, TableId, TableSchema,
};
use proptest::prelude::*;
@ -91,8 +91,8 @@ mod tests {
NamespaceSchema {
id: NamespaceId::new(namespace_id),
tables,
max_columns_per_table,
max_tables,
max_columns_per_table: MaxColumnsPerTable::new(max_columns_per_table as i32),
max_tables: MaxTables::new(max_tables as i32),
retention_period_ns,
partition_template: Default::default(),
}

View File

@ -64,8 +64,8 @@ pub(crate) fn namespace_created(
namespace_name: namespace_name.into(),
namespace_id: schema.id.get(),
partition_template: schema.partition_template.as_proto().cloned(),
max_columns_per_table: schema.max_columns_per_table as u64,
max_tables: schema.max_tables as u64,
max_columns_per_table: schema.max_columns_per_table.get() as u64,
max_tables: schema.max_tables.get() as u64,
retention_period_ns: schema.retention_period_ns,
}
}

View File

@ -5,8 +5,8 @@ use std::{borrow::Cow, collections::BTreeMap, fmt::Debug};
use async_trait::async_trait;
use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
ColumnSchema, ColumnsByName, NamespaceId, NamespaceName, NamespaceNameError, NamespaceSchema,
TableId, TableSchema,
ColumnSchema, ColumnsByName, MaxColumnsPerTable, MaxTables, NamespaceId, NamespaceName,
NamespaceNameError, NamespaceSchema, TableId, TableSchema,
};
use generated_types::influxdata::iox::gossip::v1::{
schema_message::Event, NamespaceCreated, TableCreated, TableUpdated,
@ -180,8 +180,10 @@ where
NamespaceSchema {
id: NamespaceId::new(note.namespace_id),
tables: Default::default(),
max_columns_per_table: note.max_columns_per_table as _,
max_tables: note.max_tables as _,
max_columns_per_table: MaxColumnsPerTable::new(
note.max_columns_per_table as i32,
),
max_tables: MaxTables::new(note.max_tables as i32),
retention_period_ns: note.retention_period_ns,
partition_template,
},

View File

@ -142,7 +142,8 @@ pub mod server;
#[cfg(test)]
pub(crate) mod test_helpers {
use data_types::{
partition_template::NamespacePartitionTemplateOverride, NamespaceId, NamespaceSchema,
partition_template::NamespacePartitionTemplateOverride, MaxColumnsPerTable, MaxTables,
NamespaceId, NamespaceSchema,
};
use std::collections::BTreeMap;
@ -159,8 +160,10 @@ pub(crate) mod test_helpers {
NamespaceSchema {
id: NamespaceId::new(id),
tables: BTreeMap::new(),
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE as usize,
max_tables: iox_catalog::DEFAULT_MAX_TABLES as usize,
max_columns_per_table: MaxColumnsPerTable::new(
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
),
max_tables: MaxTables::new(iox_catalog::DEFAULT_MAX_TABLES),
retention_period_ns: None,
partition_template: DEFAULT_NAMESPACE_PARTITION_TEMPLATE,
}

View File

@ -172,8 +172,8 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{
Column, ColumnId, ColumnSchema, ColumnType, ColumnsByName, NamespaceId, TableId,
TableSchema,
Column, ColumnId, ColumnSchema, ColumnType, ColumnsByName, MaxColumnsPerTable, MaxTables,
NamespaceId, TableId, TableSchema,
};
use proptest::{prelude::*, prop_compose, proptest};
@ -186,8 +186,8 @@ mod tests {
NamespaceSchema {
id: TEST_NAMESPACE_ID,
tables: Default::default(),
max_columns_per_table: 50,
max_tables: 24,
max_columns_per_table: MaxColumnsPerTable::new(50),
max_tables: MaxTables::new(24),
retention_period_ns: Some(876),
partition_template: Default::default(),
}
@ -198,8 +198,8 @@ mod tests {
NamespaceSchema {
id: TEST_NAMESPACE_ID,
tables: Default::default(),
max_columns_per_table: 10,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(10),
max_tables: MaxTables::new(42),
retention_period_ns: Some(876),
partition_template: Default::default(),
}
@ -492,8 +492,8 @@ mod tests {
NamespaceSchema {
id: TEST_NAMESPACE_ID,
tables,
max_columns_per_table,
max_tables,
max_columns_per_table: MaxColumnsPerTable::new(max_columns_per_table as i32),
max_tables: MaxTables::new(max_tables as i32),
retention_period_ns,
partition_template: Default::default(),
}

View File

@ -125,7 +125,8 @@ where
mod tests {
use assert_matches::assert_matches;
use data_types::{
Column, ColumnId, ColumnType, ColumnsByName, NamespaceId, TableId, TableSchema,
Column, ColumnId, ColumnType, ColumnsByName, MaxColumnsPerTable, MaxTables, NamespaceId,
TableId, TableSchema,
};
use metric::{Attributes, MetricObserver, Observation};
@ -163,8 +164,8 @@ mod tests {
NamespaceSchema {
id: NamespaceId::new(42),
tables,
max_columns_per_table: 100,
max_tables: 42,
max_columns_per_table: MaxColumnsPerTable::new(100),
max_tables: MaxTables::new(42),
retention_period_ns: None,
partition_template: Default::default(),
}

View File

@ -137,7 +137,7 @@ mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::{Namespace, NamespaceId};
use data_types::{MaxColumnsPerTable, MaxTables, Namespace, NamespaceId};
use iox_catalog::{interface::SoftDeletedRows, mem::MemCatalog};
use super::*;
@ -228,8 +228,10 @@ mod tests {
Namespace {
id: NamespaceId::new(1),
name: ns.to_string(),
max_tables: iox_catalog::DEFAULT_MAX_TABLES,
max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
max_tables: MaxTables::new(iox_catalog::DEFAULT_MAX_TABLES),
max_columns_per_table: MaxColumnsPerTable::new(
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE
),
retention_period_ns: TEST_RETENTION_PERIOD_NS,
deleted_at: None,
partition_template: Default::default(),

View File

@ -266,8 +266,8 @@ pub fn namespace_to_proto(namespace: &CatalogNamespace) -> Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
max_tables: namespace.max_tables,
max_columns_per_table: namespace.max_columns_per_table,
max_tables: namespace.max_tables.get(),
max_columns_per_table: namespace.max_columns_per_table.get(),
partition_template: namespace.partition_template.as_proto().cloned(),
}
}