diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 3831ac32fc..8227fdaed4 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -308,6 +308,30 @@ pub struct Namespace { pub partition_template: NamespacePartitionTemplateOverride, } +use generated_types::influxdata::iox::namespace::v1 as namespace_proto; + +/// Overrides for service protection limits. +#[derive(Debug, Copy, Clone)] +pub struct NamespaceServiceProtectionLimitsOverride { + /// The maximum number of tables that can exist in this namespace + pub max_tables: Option, + /// The maximum number of columns per table in this namespace + pub max_columns_per_table: Option, +} + +impl From for NamespaceServiceProtectionLimitsOverride { + fn from(value: namespace_proto::ServiceProtectionLimits) -> Self { + let namespace_proto::ServiceProtectionLimits { + max_tables, + max_columns_per_table, + } = value; + Self { + max_tables, + max_columns_per_table, + } + } +} + /// Schema collection for a namespace. This is an in-memory object useful for a schema /// cache. #[derive(Debug, Clone, PartialEq)] diff --git a/generated_types/protos/influxdata/iox/namespace/v1/service.proto b/generated_types/protos/influxdata/iox/namespace/v1/service.proto index ce75731da9..f629ac4995 100644 --- a/generated_types/protos/influxdata/iox/namespace/v1/service.proto +++ b/generated_types/protos/influxdata/iox/namespace/v1/service.proto @@ -15,19 +15,19 @@ service NamespaceService { rpc DeleteNamespace(DeleteNamespaceRequest) returns (DeleteNamespaceResponse); // Update retention period - rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse); + rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) + returns (UpdateNamespaceRetentionResponse); // Update a service protection limit of a namespace. For this change to take // effect, all routers MUST be restarted - rpc UpdateNamespaceServiceProtectionLimit(UpdateNamespaceServiceProtectionLimitRequest) returns (UpdateNamespaceServiceProtectionLimitResponse); + rpc UpdateNamespaceServiceProtectionLimit( + UpdateNamespaceServiceProtectionLimitRequest) + returns (UpdateNamespaceServiceProtectionLimitResponse); } -message GetNamespacesRequest { -} +message GetNamespacesRequest {} -message GetNamespacesResponse { - repeated Namespace namespaces = 1; -} +message GetNamespacesResponse { repeated Namespace namespaces = 1; } message CreateNamespaceRequest { // Name of the namespace to be created @@ -40,20 +40,20 @@ message CreateNamespaceRequest { optional int64 retention_period_ns = 2; // Partitioning scheme to use for tables created in this namespace - optional influxdata.iox.partition_template.v1.PartitionTemplate partition_template = 3; + optional influxdata.iox.partition_template.v1.PartitionTemplate + partition_template = 3; + + ServiceProtectionLimits service_protection_limits = 4; } -message CreateNamespaceResponse { - Namespace namespace = 1; -} +message CreateNamespaceResponse { Namespace namespace = 1; } message DeleteNamespaceRequest { // Name of the namespace to be deleted string name = 1; } -message DeleteNamespaceResponse { -} +message DeleteNamespaceResponse {} message UpdateNamespaceRetentionRequest { // Name of the namespace to be set @@ -66,9 +66,7 @@ message UpdateNamespaceRetentionRequest { optional int64 retention_period_ns = 2; } -message UpdateNamespaceRetentionResponse { - Namespace namespace = 1; -} +message UpdateNamespaceRetentionResponse { Namespace namespace = 1; } message UpdateNamespaceServiceProtectionLimitRequest { // Namespace to have its service protection limits updated. @@ -78,7 +76,8 @@ message UpdateNamespaceServiceProtectionLimitRequest { oneof limit_update { // Change the maximum number of tables the namespace may have. int32 max_tables = 2; - // Change the maximum number of columns each table in the namespace may have. + // Change the maximum number of columns each table in the namespace may + // have. int32 max_columns_per_table = 3; } } @@ -87,6 +86,14 @@ message UpdateNamespaceServiceProtectionLimitResponse { Namespace namespace = 1; } +message ServiceProtectionLimits { + // Change the maximum number of tables the namespace may have. + optional int32 max_tables = 2; + // Change the maximum number of columns each table in the namespace may + // have. + optional int32 max_columns_per_table = 3; +} + message Namespace { // Namespace ID int64 id = 1; diff --git a/influxdb_iox/src/commands/namespace/create.rs b/influxdb_iox/src/commands/namespace/create.rs index 1e2111e082..fe3cbe661a 100644 --- a/influxdb_iox/src/commands/namespace/create.rs +++ b/influxdb_iox/src/commands/namespace/create.rs @@ -1,6 +1,7 @@ use influxdb_iox_client::connection::Connection; use crate::commands::namespace::Result; +use influxdb_iox_client::namespace::generated_types::ServiceProtectionLimits; /// Write data into the specified database #[derive(Debug, clap::Parser)] @@ -19,12 +20,43 @@ pub struct Config { default_value = "0" )] retention_hours: u32, + + #[clap(flatten)] + service_protection_limits: ServiceProtectionLimitsArgs, +} + +#[derive(Debug, clap::Args)] +pub struct ServiceProtectionLimitsArgs { + /// The maximum number of tables to allow for this namespace + #[clap(action, long = "max-tables", short = 't')] + max_tables: Option, + + /// The maximum number of columns to allow per table for this namespace + #[clap(action, long = "max-columns-per-table", short = 'c')] + max_columns_per_table: Option, +} + +impl From for Option { + fn from(value: ServiceProtectionLimitsArgs) -> Self { + let ServiceProtectionLimitsArgs { + max_tables, + max_columns_per_table, + } = value; + if max_tables.is_none() && max_columns_per_table.is_none() { + return None; + } + Some(ServiceProtectionLimits { + max_tables, + max_columns_per_table, + }) + } } pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, retention_hours, + service_protection_limits, } = config; let mut client = influxdb_iox_client::namespace::Client::new(connection); @@ -37,7 +69,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { // internally Some(retention_hours as i64 * 60 * 60 * 1_000_000_000) }; - let namespace = client.create_namespace(&namespace, retention).await?; + let namespace = client + .create_namespace(&namespace, retention, service_protection_limits.into()) + .await?; println!("{}", serde_json::to_string_pretty(&namespace)?); Ok(()) diff --git a/influxdb_iox/src/commands/namespace/update_limit.rs b/influxdb_iox/src/commands/namespace/update_limit.rs index 7c2734ee0a..d1d6ba2466 100644 --- a/influxdb_iox/src/commands/namespace/update_limit.rs +++ b/influxdb_iox/src/commands/namespace/update_limit.rs @@ -25,7 +25,7 @@ pub struct Config { .required(true) .args(&["max_tables", "max_columns_per_table"]) ))] -struct Args { +pub struct Args { /// The maximum number of tables to allow for this namespace #[clap(action, long = "max-tables", short = 't', group = "limit")] max_tables: Option, diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index a26d27f264..8cb0d184cc 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -905,6 +905,120 @@ async fn query_ingester() { .await } +/// Test setting service limits while creating namespaces +#[tokio::test] +async fn namespace_create_service_limits() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + let mut cluster = MiniCluster::create_shared(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let addr = state.cluster().router().router_grpc_base().to_string(); + let namespace = "ns1"; + + // { + // "id": , + // "name": "ns1", + // "serviceProtectionLimits": { + // "maxTables": 123, + // "maxColumnsPerTable": 200 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("create") + .arg(namespace) + .arg("--max-tables") + .arg("123") + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 123"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let addr = state.cluster().router().router_grpc_base().to_string(); + let namespace = "ns2"; + + // { + // "id": , + // "name": "ns2", + // "serviceProtectionLimits": { + // "maxTables": 500, + // "maxColumnsPerTable": 321 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("create") + .arg(namespace) + .arg("--max-columns-per-table") + .arg("321") + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 500"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 321"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let addr = state.cluster().router().router_grpc_base().to_string(); + let namespace = "ns3"; + + // { + // "id": , + // "name": "ns3", + // "serviceProtectionLimits": { + // "maxTables": 123, + // "maxColumnsPerTable": 321 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("create") + .arg(namespace) + .arg("--max-tables") + .arg("123") + .arg("--max-columns-per-table") + .arg("321") + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 123"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 321"#)), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + /// Test the namespace update service limit command #[tokio::test] async fn namespace_update_service_limit() { diff --git a/influxdb_iox/tests/end_to_end_cases/namespace.rs b/influxdb_iox/tests/end_to_end_cases/namespace.rs index d52eb85325..73d684550b 100644 --- a/influxdb_iox/tests/end_to_end_cases/namespace.rs +++ b/influxdb_iox/tests/end_to_end_cases/namespace.rs @@ -64,7 +64,10 @@ async fn soft_deletion() { state.cluster().router().router_grpc_connection(), ); let namespace_name = state.cluster().namespace(); - client.create_namespace(namespace_name, None).await.unwrap(); + client + .create_namespace(namespace_name, None, None) + .await + .unwrap(); let namespaces = client.get_namespaces().await.unwrap(); let created_namespace = namespaces .iter() @@ -192,7 +195,7 @@ async fn soft_deletion() { let namespace_name = state.cluster().namespace(); let error = client - .create_namespace(namespace_name, None) + .create_namespace(namespace_name, None, None) .await .unwrap_err(); assert_eq!( diff --git a/influxdb_iox_client/src/client/namespace.rs b/influxdb_iox_client/src/client/namespace.rs index 92102a89de..5db10b2046 100644 --- a/influxdb_iox_client/src/client/namespace.rs +++ b/influxdb_iox_client/src/client/namespace.rs @@ -44,6 +44,7 @@ impl Client { &mut self, namespace: &str, retention_period_ns: Option, + service_protection_limits: Option, ) -> Result { let response = self .inner @@ -51,6 +52,7 @@ impl Client { name: namespace.to_string(), retention_period_ns, partition_template: None, + service_protection_limits, }) .await?; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 464df18f06..b35de5441b 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -4,8 +4,9 @@ use async_trait::async_trait; use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName, - NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, - PartitionKey, SkippedCompaction, Table, TableId, TableSchema, Timestamp, + NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, + ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + TableSchema, Timestamp, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -256,6 +257,7 @@ pub trait NamespaceRepo: Send + Sync { name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, + service_protection_limits: Option, ) -> Result; /// Update retention period for a namespace @@ -746,7 +748,7 @@ pub(crate) mod test_helpers { let namespace_name = NamespaceName::new("test_namespace").unwrap(); let namespace = repos .namespaces() - .create(&namespace_name, None, None) + .create(&namespace_name, None, None, None) .await .unwrap(); assert!(namespace.id > NamespaceId::new(0)); @@ -770,7 +772,10 @@ pub(crate) mod test_helpers { DEFAULT_MAX_COLUMNS_PER_TABLE ); - let conflict = repos.namespaces().create(&namespace_name, None, None).await; + let conflict = repos + .namespaces() + .create(&namespace_name, None, None, None) + .await; assert!(matches!( conflict.unwrap_err(), Error::NameExists { name: _ } @@ -857,7 +862,7 @@ pub(crate) mod test_helpers { let namespace4_name = NamespaceName::new("test_namespace4").unwrap(); let namespace4 = repos .namespaces() - .create(&namespace4_name, None, Some(NEW_RETENTION_PERIOD_NS)) + .create(&namespace4_name, None, Some(NEW_RETENTION_PERIOD_NS), None) .await .expect("namespace with 5-hour retention should be created"); assert_eq!( @@ -882,7 +887,12 @@ pub(crate) mod test_helpers { let namespace5_name = NamespaceName::new("test_namespace5").unwrap(); let namespace5 = repos .namespaces() - .create(&namespace5_name, Some(tag_partition_template.clone()), None) + .create( + &namespace5_name, + Some(tag_partition_template.clone()), + None, + None, + ) .await .unwrap(); assert_eq!(namespace5.partition_template, tag_partition_template); @@ -1282,6 +1292,7 @@ pub(crate) mod test_helpers { &custom_namespace_name, Some(custom_namespace_template.clone()), None, + None, ) .await .unwrap(); @@ -2165,6 +2176,7 @@ pub(crate) mod test_helpers { &NamespaceName::new("retention_broken_2").unwrap(), None, Some(1), + None, ) .await .unwrap(); @@ -2988,7 +3000,12 @@ pub(crate) mod test_helpers { { let namespace = repos .namespaces() - .create(&NamespaceName::new(namespace_name).unwrap(), None, None) + .create( + &NamespaceName::new(namespace_name).unwrap(), + None, + None, + None, + ) .await; let namespace = match namespace { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index d58d367e88..0277ec098b 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -289,7 +289,7 @@ pub mod test_helpers { let namespace_name = NamespaceName::new(name).unwrap(); repos .namespaces() - .create(&namespace_name, None, None) + .create(&namespace_name, None, None, None) .await .unwrap() } diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index fc434ea7bc..a4563d7429 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -16,8 +16,8 @@ use data_types::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, }, Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, - SkippedCompaction, Table, TableId, Timestamp, + NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, + Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use snafu::ensure; @@ -147,6 +147,7 @@ impl NamespaceRepo for MemTxn { name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, + service_protection_limits: Option, ) -> Result { let stage = self.stage(); @@ -156,11 +157,14 @@ impl NamespaceRepo for MemTxn { }); } + let max_tables = service_protection_limits.and_then(|l| l.max_tables); + let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table); + let namespace = Namespace { id: NamespaceId::new(stage.namespaces.len() as i64 + 1), name: name.to_string(), - max_tables: DEFAULT_MAX_TABLES, - max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE, + max_tables: max_tables.unwrap_or(DEFAULT_MAX_TABLES), + max_columns_per_table: max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE), retention_period_ns, deleted_at: None, partition_template: partition_template.unwrap_or_default(), diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 7fdaa6fcab..1e643a6b16 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -7,9 +7,9 @@ use crate::interface::{ use async_trait::async_trait; use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, - Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, - Table, TableId, Timestamp, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, + NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, + Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -132,7 +132,7 @@ macro_rules! decorate { decorate!( impl_trait = NamespaceRepo, methods = [ - "namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option) -> Result; + "namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, service_protection_limits: Option) -> Result; "namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option) -> Result; "namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result>; "namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 7b6a4a1f4c..75e0abac18 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -19,9 +19,10 @@ use data_types::{ partition_template::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, }, - Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile, - ParquetFileExists, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, - SkippedCompaction, Table, TableId, Timestamp, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, + NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileExists, ParquetFileId, + ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::{debug, info, warn}; @@ -564,13 +565,17 @@ impl NamespaceRepo for PostgresTxn { name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, + service_protection_limits: Option, ) -> Result { + let max_tables = service_protection_limits.and_then(|l| l.max_tables); + let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table); + let rec = sqlx::query_as::<_, Namespace>( r#" INSERT INTO namespace ( - name, topic_id, query_pool_id, retention_period_ns, max_tables, partition_template + name, topic_id, query_pool_id, retention_period_ns, max_tables, max_columns_per_table, partition_template ) -VALUES ( $1, $2, $3, $4, $5, $6 ) +VALUES ( $1, $2, $3, $4, $5, $6, $7 ) RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at, partition_template; "#, @@ -579,8 +584,9 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele .bind(SHARED_TOPIC_ID) // $2 .bind(SHARED_QUERY_POOL_ID) // $3 .bind(retention_period_ns) // $4 - .bind(DEFAULT_MAX_TABLES) // $5 - .bind(partition_template); // $6 + .bind(max_tables.unwrap_or(DEFAULT_MAX_TABLES)) // $5 + .bind(max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE)) // $6 + .bind(partition_template); // $7 let rec = rec.fetch_one(&mut self.inner).await.map_err(|e| { if is_unique_violation(&e) { @@ -594,10 +600,6 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele } })?; - // Ensure the column default values match the code values. - debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES); - debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE); - Ok(rec) } @@ -2307,6 +2309,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &"lemons".try_into().unwrap(), None, // no partition template None, + None, ) .await .unwrap(); @@ -2347,6 +2350,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &namespace_custom_template_name.try_into().unwrap(), Some(custom_partition_template_equal_to_default.clone()), None, + None, ) .await .unwrap(); @@ -2385,6 +2389,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &namespace_default_template_name.try_into().unwrap(), None, // no partition template None, + None, ) .await .unwrap(); @@ -2403,6 +2408,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele .unwrap(), ), None, + None, ) .await .unwrap(); diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index d76f786bee..26e5617d55 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -19,8 +19,9 @@ use data_types::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart, }, Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, - NamespaceName, ParquetFile, ParquetFileExists, ParquetFileId, ParquetFileParams, Partition, - PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, + NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileExists, + ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, + Table, TableId, Timestamp, }; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -266,11 +267,15 @@ impl NamespaceRepo for SqliteTxn { name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, + service_protection_limits: Option, ) -> Result { + let max_tables = service_protection_limits.and_then(|l| l.max_tables); + let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table); + let rec = sqlx::query_as::<_, Namespace>( r#" -INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables, partition_template ) -VALUES ( $1, $2, $3, $4, $5, $6 ) +INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables, max_columns_per_table, partition_template ) +VALUES ( $1, $2, $3, $4, $5, $6, $7 ) RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at, partition_template; "#, @@ -279,8 +284,9 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele .bind(SHARED_TOPIC_ID) // $2 .bind(SHARED_QUERY_POOL_ID) // $3 .bind(retention_period_ns) // $4 - .bind(DEFAULT_MAX_TABLES) // $5 - .bind(partition_template); // $6 + .bind(max_tables.unwrap_or(DEFAULT_MAX_TABLES)) // $5 + .bind(max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE)) // $6 + .bind(partition_template); // $7 let rec = rec.fetch_one(self.inner.get_mut()).await.map_err(|e| { if is_unique_violation(&e) { @@ -294,10 +300,6 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele } })?; - // Ensure the column default values match the code values. - debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES); - debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE); - Ok(rec) } @@ -1910,6 +1912,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &"lemons".try_into().unwrap(), None, // no partition template None, + None, ) .await .unwrap(); @@ -1950,6 +1953,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &namespace_custom_template_name.try_into().unwrap(), Some(custom_partition_template_equal_to_default.clone()), None, + None, ) .await .unwrap(); @@ -1986,6 +1990,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele &namespace_default_template_name.try_into().unwrap(), None, // no partition template None, + None, ) .await .unwrap(); @@ -2004,6 +2009,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele .unwrap(), ), None, + None, ) .await .unwrap(); diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 17d2d4582c..e5f2638433 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -148,7 +148,7 @@ impl TestCatalog { let namespace_name = NamespaceName::new(name).unwrap(); let namespace = repos .namespaces() - .create(&namespace_name, None, retention_period_ns) + .create(&namespace_name, None, retention_period_ns, None) .await .unwrap(); diff --git a/router/src/namespace_cache/read_through_cache.rs b/router/src/namespace_cache/read_through_cache.rs index 40ad1cc3e6..9d41efa4c1 100644 --- a/router/src/namespace_cache/read_through_cache.rs +++ b/router/src/namespace_cache/read_through_cache.rs @@ -168,7 +168,7 @@ mod tests { .repositories() .await .namespaces() - .create(&ns, None, iox_catalog::DEFAULT_RETENTION_PERIOD,) + .create(&ns, None, iox_catalog::DEFAULT_RETENTION_PERIOD, None) .await, Ok(_) ); diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index c9d2aa3170..2e8184c390 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -144,7 +144,7 @@ mod tests { let mut repos = catalog.repositories().await; repos .namespaces() - .create(&ns, None, None) + .create(&ns, None, None, None) .await .expect("failed to setup catalog state"); } @@ -176,7 +176,7 @@ mod tests { let mut repos = catalog.repositories().await; repos .namespaces() - .create(&ns, None, None) + .create(&ns, None, None, None) .await .expect("failed to setup catalog state"); repos diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index e852591f0c..11e0d5bf00 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -107,7 +107,7 @@ where .repositories() .await .namespaces() - .create(namespace, None, retention_period_ns) + .create(namespace, None, retention_period_ns, None) .await { Ok(_) => { diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index c913e19041..e7057282ca 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -88,6 +88,7 @@ async fn test_namespace_create() { name: "bananas_test".to_string(), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let got = ctx .grpc_delegate() @@ -161,6 +162,7 @@ async fn test_namespace_delete() { name: "bananas_test".to_string(), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let got = ctx .grpc_delegate() @@ -291,6 +293,7 @@ async fn test_create_namespace_0_retention_period() { name: "bananas_test".to_string(), retention_period_ns: Some(0), // A zero! partition_template: None, + service_protection_limits: None, }; let got = ctx .grpc_delegate() @@ -356,6 +359,7 @@ async fn test_create_namespace_negative_retention_period() { name: "bananas_test".to_string(), retention_period_ns: Some(-42), partition_template: None, + service_protection_limits: None, }; let err = ctx .grpc_delegate() @@ -420,6 +424,7 @@ async fn test_update_namespace_0_retention_period() { name: "bananas_test".to_string(), retention_period_ns: Some(42), partition_template: None, + service_protection_limits: None, })) .await .expect("failed to create namespace") @@ -526,6 +531,7 @@ async fn test_update_namespace_negative_retention_period() { name: "bananas_test".to_string(), retention_period_ns: Some(42), partition_template: None, + service_protection_limits: None, })) .await .expect("failed to create namespace") @@ -796,6 +802,7 @@ async fn test_update_namespace_limit_0_max_tables_max_columns() { name: "bananas_test".to_string(), retention_period_ns: Some(0), partition_template: None, + service_protection_limits: None, })) .await .expect("failed to create namespace") @@ -868,6 +875,7 @@ async fn test_table_create() { name: "bananas_test".to_string(), retention_period_ns: None, partition_template: None, + service_protection_limits: None, }; let namespace = ctx .grpc_delegate() @@ -955,6 +963,7 @@ async fn test_invalid_strftime_partition_template() { part: Some(template_part::Part::TimeFormat("%3F".into())), }], }), + service_protection_limits: None, }; // Check namespace creation returned an error @@ -986,6 +995,7 @@ async fn test_namespace_partition_template_implicit_table_creation() { part: Some(template_part::Part::TagValue("tag1".into())), }], }), + service_protection_limits: None, }; ctx.grpc_delegate() .namespace_service() @@ -1036,6 +1046,7 @@ async fn test_namespace_partition_template_explicit_table_creation_without_parti part: Some(template_part::Part::TagValue("tag1".into())), }], }), + service_protection_limits: None, }; ctx.grpc_delegate() .namespace_service() @@ -1101,6 +1112,7 @@ async fn test_namespace_partition_template_explicit_table_creation_with_partitio part: Some(template_part::Part::TagValue("tag1".into())), }], }), + service_protection_limits: None, }; ctx.grpc_delegate() .namespace_service() @@ -1166,6 +1178,7 @@ async fn test_namespace_without_partition_template_table_with_partition_template name: "bananas_test".to_string(), retention_period_ns: None, partition_template: None, + service_protection_limits: None, }; ctx.grpc_delegate() .namespace_service() diff --git a/router/tests/http.rs b/router/tests/http.rs index 5963f904f5..13581905cd 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -350,6 +350,7 @@ async fn test_delete_unsupported() { &data_types::NamespaceName::new("bananas_test").unwrap(), None, None, + None, ) .await .expect("failed to update table limit"); diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 681f588569..5a57987a42 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use data_types::{ partition_template::NamespacePartitionTemplateOverride, Namespace as CatalogNamespace, - NamespaceName, + NamespaceName, NamespaceServiceProtectionLimitsOverride, }; use generated_types::influxdata::iox::namespace::v1::{ update_namespace_service_protection_limit_request::LimitUpdate, *, @@ -74,6 +74,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { name: namespace_name, retention_period_ns, partition_template, + service_protection_limits, } = request.into_inner(); // Ensure the namespace name is consistently processed within IOx - this @@ -94,6 +95,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { .transpose() .map_err(|v| Status::invalid_argument(v.to_string()))?, retention_period_ns, + service_protection_limits.map(NamespaceServiceProtectionLimitsOverride::from), ) .await .map_err(|e| { @@ -383,6 +385,7 @@ mod tests { name: NS_NAME.to_string(), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let created_ns = handler .create_namespace(Request::new(req)) @@ -507,6 +510,7 @@ mod tests { name: NS_NAME.to_string(), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let created_ns = handler @@ -553,6 +557,7 @@ mod tests { name: NS_NAME.to_string(), retention_period_ns: None, partition_template: Some(PartitionTemplate { parts: vec![] }), + service_protection_limits: None, }; let error = handler @@ -586,6 +591,7 @@ mod tests { name: NS_NAME.to_string(), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let created_ns = handler .create_namespace(Request::new(req)) @@ -626,6 +632,37 @@ mod tests { assert_eq!(status.code(), Code::InvalidArgument); } + #[tokio::test] + async fn test_create_with_service_protection_limits() { + let catalog: Arc = + Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); + + let max_tables = 123; + let max_columns_per_table = 321; + + let handler = NamespaceService::new(catalog); + let req = CreateNamespaceRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(RETENTION), + partition_template: None, + service_protection_limits: Some(ServiceProtectionLimits { + max_tables: Some(max_tables), + max_columns_per_table: Some(max_columns_per_table), + }), + }; + let created_ns = handler + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(created_ns.name, NS_NAME); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + assert_eq!(created_ns.max_tables, max_tables); + assert_eq!(created_ns.max_columns_per_table, max_columns_per_table); + } + macro_rules! test_create_namespace_name { ( $test_name:ident, @@ -646,6 +683,7 @@ mod tests { name: String::from($name), retention_period_ns: Some(RETENTION), partition_template: None, + service_protection_limits: None, }; let got = handler.create_namespace(Request::new(req)).await; diff --git a/service_grpc_table/src/lib.rs b/service_grpc_table/src/lib.rs index 0fbb252097..ccc56a0def 100644 --- a/service_grpc_table/src/lib.rs +++ b/service_grpc_table/src/lib.rs @@ -330,6 +330,7 @@ mod tests { .unwrap(), ), None, + None, ) .await .unwrap();