diff --git a/generated_types/protos/influxdata/iox/namespace/v1/service.proto b/generated_types/protos/influxdata/iox/namespace/v1/service.proto index 83885eede1..e586bece07 100644 --- a/generated_types/protos/influxdata/iox/namespace/v1/service.proto +++ b/generated_types/protos/influxdata/iox/namespace/v1/service.proto @@ -14,6 +14,10 @@ service NamespaceService { // Update retention period rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse); + + // Update a service protection limit of a namespace. For this change to take + // effect, all routers MUST be restarted + rpc UpdateNamespaceServiceProtectionLimit(UpdateNamespaceServiceProtectionLimitRequest) returns (UpdateNamespaceServiceProtectionLimitResponse); } message GetNamespacesRequest { @@ -61,6 +65,23 @@ message UpdateNamespaceRetentionResponse { Namespace namespace = 1; } +message UpdateNamespaceServiceProtectionLimitRequest { + // Namespace to have its service protection limits updated. + string name = 1; + + // The service protection limit to update. + oneof limit_update { + // Change the maximum number of tables the namespace may have. + int32 max_tables = 2; + // Change the maximum number of columns each table in the namespace may have. + int32 max_columns_per_table = 3; + } +} + +message UpdateNamespaceServiceProtectionLimitResponse { + Namespace namespace = 1; +} + message Namespace { // Namespace ID int64 id = 1; @@ -72,4 +93,10 @@ message Namespace { // // NULL means "infinite retention". optional int64 retention_period_ns = 3; + + // The maximum number of tables which this namespace is allowed to contain. + int32 max_tables = 4; + + // The maximum number of columns a table belonging to this namespace may have. + int32 max_columns_per_table = 5; } diff --git a/influxdb_iox/src/commands/namespace/create.rs b/influxdb_iox/src/commands/namespace/create.rs index 0046dde448..1e2111e082 100644 --- a/influxdb_iox/src/commands/namespace/create.rs +++ b/influxdb_iox/src/commands/namespace/create.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; /// Write data into the specified database #[derive(Debug, clap::Parser)] @@ -30,10 +21,7 @@ pub struct Config { retention_hours: u32, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, retention_hours, diff --git a/influxdb_iox/src/commands/namespace/delete.rs b/influxdb_iox/src/commands/namespace/delete.rs index ff1c015f05..93246beec2 100644 --- a/influxdb_iox/src/commands/namespace/delete.rs +++ b/influxdb_iox/src/commands/namespace/delete.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; #[derive(Debug, clap::Parser)] pub struct Config { @@ -18,10 +9,7 @@ pub struct Config { namespace: String, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace } = config; let mut client = influxdb_iox_client::namespace::Client::new(connection); diff --git a/influxdb_iox/src/commands/namespace/mod.rs b/influxdb_iox/src/commands/namespace/mod.rs index c823847d7f..47b191f5f2 100644 --- a/influxdb_iox/src/commands/namespace/mod.rs +++ b/influxdb_iox/src/commands/namespace/mod.rs @@ -6,6 +6,7 @@ use thiserror::Error; mod create; mod delete; mod retention; +mod update_limit; #[allow(clippy::enum_variant_names)] #[derive(Debug, Error)] @@ -17,6 +18,8 @@ pub enum Error { ClientError(#[from] influxdb_iox_client::error::Error), } +pub type Result = std::result::Result; + /// Various commands for namespace inspection #[derive(Debug, clap::Parser)] pub struct Config { @@ -36,11 +39,14 @@ enum Command { /// Update retention of an existing namespace Retention(retention::Config), + /// Update one of the service protection limits for an existing namespace + UpdateLimit(update_limit::Config), + /// Delete a namespace Delete(delete::Config), } -pub async fn command(connection: Connection, config: Config) -> Result<(), Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { match config.command { Command::Create(config) => { create::command(connection, config).await?; @@ -53,6 +59,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error Command::Retention(config) => { retention::command(connection, config).await?; } + Command::UpdateLimit(config) => { + update_limit::command(connection, config).await?; + } Command::Delete(config) => { delete::command(connection, config).await?; } // Deliberately not adding _ => so the compiler will direct people here to impl new diff --git a/influxdb_iox/src/commands/namespace/retention.rs b/influxdb_iox/src/commands/namespace/retention.rs index a4a10ee2c2..d896d79bc6 100644 --- a/influxdb_iox/src/commands/namespace/retention.rs +++ b/influxdb_iox/src/commands/namespace/retention.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; /// Update the specified namespace's data retention period #[derive(Debug, clap::Parser)] @@ -24,10 +15,7 @@ pub struct Config { retention_hours: u32, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, retention_hours, diff --git a/influxdb_iox/src/commands/namespace/update_limit.rs b/influxdb_iox/src/commands/namespace/update_limit.rs new file mode 100644 index 0000000000..7c2734ee0a --- /dev/null +++ b/influxdb_iox/src/commands/namespace/update_limit.rs @@ -0,0 +1,71 @@ +use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::namespace::generated_types::LimitUpdate; + +use crate::commands::namespace::Result; + +#[derive(Debug, clap::Parser)] +pub struct Config { + /// The namespace to update a service protection limit for + #[clap(action)] + namespace: String, + + #[command(flatten)] + args: Args, +} + +#[derive(Debug, clap::Args)] +#[clap(group( + // This arg group "limit" links the members of the below struct + // named "max_tables" and "max_columns_per_table" together as + // mutually exclusive flags. As we specify all flags & commands + // using clap-derive rather than the imperative builder, v3 only + // properly supports this kind of behaviour in a macro code block. + // NOTE: It takes the variable names and not the flag long names. + clap::ArgGroup::new("limit") + .required(true) + .args(&["max_tables", "max_columns_per_table"]) + ))] +struct Args { + /// The maximum number of tables to allow for this namespace + #[clap(action, long = "max-tables", short = 't', group = "limit")] + max_tables: Option, + + /// The maximum number of columns to allow per table for this namespace + #[clap(action, long = "max-columns-per-table", short = 'c', group = "limit")] + max_columns_per_table: Option, +} + +impl From for LimitUpdate { + fn from(args: Args) -> Self { + let Args { + max_tables, + max_columns_per_table, + } = args; + + if let Some(n) = max_tables { + return Self::MaxTables(n); + } + if let Some(n) = max_columns_per_table { + return Self::MaxColumnsPerTable(n); + } + unreachable!(); + } +} + +pub async fn command(connection: Connection, config: Config) -> Result<()> { + let mut client = influxdb_iox_client::namespace::Client::new(connection); + + let namespace = client + .update_namespace_service_protection_limit( + &config.namespace, + LimitUpdate::from(config.args), + ) + .await?; + println!("{}", serde_json::to_string_pretty(&namespace)?); + + println!( + r" +NOTE: This change will NOT take effect until all router instances have been restarted!" + ); + Ok(()) +} diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index d9996dae03..71173d109a 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -1040,3 +1040,113 @@ async fn query_ingester() { .run() .await } + +/// Test the namespace update service limit command +#[tokio::test] +async fn namespace_update_service_limit() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 500, + // "maxColumnsPerTable": 200 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("create") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 500"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 1337, + // "maxColumnsPerTable": 200 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("update-limit") + .arg("--max-tables") + .arg("1337") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 1337"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 1337, + // "maxColumnsPerTable": 42 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("update-limit") + .arg("--max-columns-per-table") + .arg("42") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 1337"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 42"#)), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} diff --git a/influxdb_iox_client/src/client/namespace.rs b/influxdb_iox_client/src/client/namespace.rs index c94ea8389c..fb88dca097 100644 --- a/influxdb_iox_client/src/client/namespace.rs +++ b/influxdb_iox_client/src/client/namespace.rs @@ -7,7 +7,9 @@ use ::generated_types::google::OptionalField; /// Re-export generated_types pub mod generated_types { - pub use generated_types::influxdata::iox::namespace::v1::*; + pub use generated_types::influxdata::iox::namespace::v1::{ + update_namespace_service_protection_limit_request::LimitUpdate, *, + }; } /// A basic client for working with Namespaces. @@ -77,6 +79,30 @@ impl Client { Ok(response.into_inner().namespace.unwrap_field("namespace")?) } + /// Update one of the service protection limits for a namespace + /// + /// `limit_update` is the new service limit protection limit to set + /// on the namespace. + /// + /// Zero-valued limits are rejected, returning an error. + pub async fn update_namespace_service_protection_limit( + &mut self, + namespace: &str, + limit_update: LimitUpdate, + ) -> Result { + let response = self + .inner + .update_namespace_service_protection_limit( + UpdateNamespaceServiceProtectionLimitRequest { + name: namespace.to_string(), + limit_update: Some(limit_update), + }, + ) + .await?; + + Ok(response.into_inner().namespace.unwrap_field("namespace")?) + } + /// Delete a namespace pub async fn delete_namespace(&mut self, namespace: &str) -> Result<(), Error> { self.inner diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index a6a027de65..313868df2d 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -36,6 +36,8 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace { id: namespace.id.get(), name: namespace.name, retention_period_ns: namespace.retention_period_ns, + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, } } @@ -82,6 +84,16 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl "use router instances to manage namespaces", )) } + + async fn update_namespace_service_protection_limit( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> + { + Err(tonic::Status::unimplemented( + "use router instances to manage namespaces", + )) + } } #[cfg(test)] @@ -92,6 +104,11 @@ mod tests { use querier::{create_ingester_connection_for_testing, QuerierCatalogCache}; use tokio::runtime::Handle; + use iox_catalog::{ + DEFAULT_MAX_COLUMNS_PER_TABLE as TEST_MAX_COLUMNS_PER_TABLE, + DEFAULT_MAX_TABLES as TEST_MAX_TABLES, + }; + /// Common retention period value we'll use in tests const TEST_RETENTION_PERIOD_NS: Option = Some(3_600 * 1_000_000_000); @@ -171,11 +188,15 @@ mod tests { id: 1, name: "namespace2".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, }, proto::Namespace { id: 2, name: "namespace1".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, }, ] } diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index 2aa48d3a08..fd940bae2d 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -5,10 +5,10 @@ use generated_types::influxdata::iox::namespace::v1::{ namespace_service_server::NamespaceService, *, }; use hyper::StatusCode; -use iox_catalog::interface::SoftDeletedRows; +use iox_catalog::interface::{Error as CatalogError, SoftDeletedRows}; use iox_time::{SystemProvider, TimeProvider}; use router::{ - dml_handlers::{DmlError, RetentionError}, + dml_handlers::{CachedServiceProtectionLimit, DmlError, RetentionError, SchemaError}, namespace_resolver::{self, NamespaceCreationError}, server::http::Error, }; @@ -562,3 +562,280 @@ async fn test_update_namespace_negative_retention_period() { }); } } + +#[tokio::test] +async fn test_update_namespace_limit_max_tables() { + // Initialise a TestContext with namespace autocreation. + let ctx = TestContextBuilder::default() + .with_autocreate_namespace(None) + .build() + .await; + + // Writing to two initial tables should succeed + ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") + .await + .expect("write should succeed"); + ctx.write_lp("bananas", "test", "platanos,tag3=C,tag4=D val=99i 42424243") + .await + .expect("write should succeed"); + + // Limit the maximum number of tables to prevent a write adding another table + let got = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(1), + ), + }, + )) + .await + .expect("failed to update namespace max table limit") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.max_tables, 1); + assert_eq!( + got.max_columns_per_table, + iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE + ); + + // The list namespace RPC should show the updated namespace + { + let list = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert_matches!(list.namespaces.as_slice(), [ns] => { + assert_eq!(*ns, got); + }); + } + + // The catalog should contain the namespace. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + assert_eq!(ns.max_tables, got.max_tables); + assert_eq!(ns.max_columns_per_table, got.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } + + // New table should fail to be created by the catalog. + let err = ctx + .write_lp( + "bananas", + "test", + "arán_banana,tag1=A,tag2=B val=42i 42424244", + ) + .await + .expect_err("cached entry should be removed"); + assert_matches!(err, router::server::http::Error::DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(e))) => { + let e: CatalogError = *e.downcast::().expect("error returned should be a table create limit error"); + assert_matches!(&e, CatalogError::TableCreateLimitError { table_name, .. } => { + assert_eq!(table_name, "arán_banana"); + assert_eq!(e.to_string(), "couldn't create table arán_banana; limit reached on namespace 1") + }); + }); +} +#[tokio::test] +async fn test_update_namespace_limit_max_columns_per_table() { + // Initialise a TestContext with namespace autocreation. + let ctx = TestContextBuilder::default() + .with_autocreate_namespace(None) + .build() + .await; + + // Initial write within limit should succeed + ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") + .await + .expect("write should succeed"); + + // Limit the maximum number of columns per table so an extra column is rejected + let got = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(1), + ), + }, + )) + .await + .expect("failed to update namespace max table limit") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.max_tables, iox_catalog::DEFAULT_MAX_TABLES); + assert_eq!(got.max_columns_per_table, 1); + + // The list namespace RPC should show the updated namespace + { + let list = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert_matches!(list.namespaces.as_slice(), [ns] => { + assert_eq!(*ns, got); + }); + } + + // The catalog should contain the namespace. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + assert_eq!(ns.max_tables, got.max_tables); + assert_eq!(ns.max_columns_per_table, got.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } + + // The cached entry is not affected, and writes continue to be validated + // against the old value. + // + // https://github.com/influxdata/influxdb_iox/issues/6175 + + // Writing to second table should succeed while using the cached entry + ctx.write_lp( + "bananas", + "test", + "platanos,tag1=A,tag2=B val=1337i 42424243", + ) + .await + .expect("write should succeed"); + + // The router restarts, and writes with too many columns are then rejected. + let ctx = ctx.restart().await; + + let err = ctx + .write_lp( + "bananas", + "test", + "arán_banana,tag1=A,tag2=B val=76i 42424243", + ) + .await + .expect_err("cached entry should be removed and write should be blocked"); + assert_matches!( + err, router::server::http::Error::DmlHandler(DmlError::Schema( + SchemaError::ServiceLimit(e) + )) => { + let e: CachedServiceProtectionLimit = *e.downcast::().expect("error returned should be a cached service protection limit"); + assert_matches!(e, CachedServiceProtectionLimit::Column { + table_name, + max_columns_per_table, + .. + } => { + assert_eq!(table_name, "arán_banana"); + assert_eq!(max_columns_per_table, 1); + }); + } + ) +} + +#[tokio::test] +async fn test_update_namespace_limit_0_max_tables_max_columns() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContextBuilder::default().build().await; + + // Explicitly create the namespace. + let create = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(0), + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + // Attempt to use an invalid table limit + let err = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(0), + ), + }, + )) + .await + .expect_err("should not have been able to update the table limit to 0"); + assert_eq!(err.code(), Code::InvalidArgument); + + // Attempt to use an invalid column limit + let err = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(0), + ), + }, + )) + .await + .expect_err("should not have been able to update the column per table limit to 0"); + assert_eq!(err.code(), Code::InvalidArgument); + + // The catalog should contain the namespace unchanged. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), create.id); + assert_eq!(ns.name, create.name); + assert_eq!(ns.retention_period_ns, create.retention_period_ns); + assert_eq!(ns.max_tables, create.max_tables); + assert_eq!(ns.max_columns_per_table, create.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } +} diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 79fc7603bc..26754f77b2 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -1,9 +1,10 @@ //! Implementation of the namespace gRPC service - use std::sync::Arc; use data_types::{Namespace as CatalogNamespace, QueryPoolId, TopicId}; -use generated_types::influxdata::iox::namespace::v1::*; +use generated_types::influxdata::iox::namespace::v1::{ + update_namespace_service_protection_limit_request::LimitUpdate, *, +}; use iox_catalog::interface::{Catalog, SoftDeletedRows}; use observability_deps::tracing::{debug, info, warn}; use tonic::{Request, Response, Status}; @@ -92,7 +93,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { "created namespace" ); - Ok(Response::new(create_namespace_to_proto(namespace))) + Ok(Response::new(namespace_to_create_response_proto(namespace))) } async fn delete_namespace( @@ -130,7 +131,11 @@ impl namespace_service_server::NamespaceService for NamespaceService { let retention_period_ns = map_retention_period(retention_period_ns)?; - debug!(%namespace_name, ?retention_period_ns, "Updating namespace retention"); + debug!( + %namespace_name, + ?retention_period_ns, + "Updating namespace retention", + ); let namespace = repos .namespaces() @@ -142,7 +147,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { })?; info!( - namespace_name, + %namespace_name, retention_period_ns, namespace_id = %namespace.id, "updated namespace retention" @@ -152,6 +157,84 @@ impl namespace_service_server::NamespaceService for NamespaceService { namespace: Some(namespace_to_proto(namespace)), })) } + + async fn update_namespace_service_protection_limit( + &self, + request: Request, + ) -> Result, Status> { + let mut repos = self.catalog.repositories().await; + + let UpdateNamespaceServiceProtectionLimitRequest { + name: namespace_name, + limit_update, + } = request.into_inner(); + + debug!( + %namespace_name, + ?limit_update, + "updating namespace service protection limit", + ); + + let namespace = match limit_update { + Some(LimitUpdate::MaxTables(n)) => { + if n == 0 { + return Err(Status::invalid_argument( + "max table limit for namespace must be greater than 0", + )); + } + repos + .namespaces() + .update_table_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!( + error = %e, + %namespace_name, + table_limit = %n, + "failed to update table limit for namespace", + ); + status_from_catalog_namespace_error(e) + }) + } + Some(LimitUpdate::MaxColumnsPerTable(n)) => { + if n == 0 { + return Err(Status::invalid_argument( + "max columns per table limit for namespace must be greater than 0", + )); + } + repos + .namespaces() + .update_column_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!( + error = %e, + %namespace_name, + per_table_column_limit = %n, + "failed to update per table column limit for namespace", + ); + status_from_catalog_namespace_error(e) + }) + } + None => Err(Status::invalid_argument( + "unsupported service protection limit change requested", + )), + }?; + + info!( + %namespace_name, + namespace_id = %namespace.id, + max_tables = %namespace.max_tables, + max_columns_per_table = %namespace.max_columns_per_table, + "updated namespace service protection limits", + ); + + Ok(Response::new( + UpdateNamespaceServiceProtectionLimitResponse { + namespace: Some(namespace_to_proto(namespace)), + }, + )) + } } fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace { @@ -159,15 +242,19 @@ fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace { id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, } } -fn create_namespace_to_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse { +fn namespace_to_create_response_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse { CreateNamespaceResponse { namespace: Some(Namespace { id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, }), } } @@ -189,6 +276,15 @@ fn map_retention_period(v: Option) -> Result, Status> { } } +fn status_from_catalog_namespace_error(err: iox_catalog::interface::Error) -> Status { + match err { + iox_catalog::interface::Error::NamespaceNotFoundByName { .. } => { + Status::not_found(err.to_string()) + } + _ => Status::internal(err.to_string()), + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -217,6 +313,32 @@ mod tests { }); } + #[test] + fn test_namespace_error_mapping() { + let not_found_err = iox_catalog::interface::Error::NamespaceNotFoundByName { + name: String::from("bananas_namespace"), + }; + let not_found_msg = not_found_err.to_string(); + assert_matches!( + status_from_catalog_namespace_error(not_found_err), + s => { + assert_eq!(s.code(), Code::NotFound); + assert_eq!(s.message(), not_found_msg); + }); + + let other_err = iox_catalog::interface::Error::ColumnCreateLimitError { + column_name: String::from("quantity"), + table_id: data_types::TableId::new(42), + }; + let other_err_msg = other_err.to_string(); + assert_matches!( + status_from_catalog_namespace_error(other_err), + s => { + assert_eq!(s.code(), Code::Internal); + assert_eq!(s.message(), other_err_msg); + }); + } + #[tokio::test] async fn test_crud() { let catalog: Arc = @@ -306,6 +428,47 @@ mod tests { }) } + // Update the max allowed tables + let want_max_tables = created_ns.max_tables + 42; + let updated_ns = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxTables(want_max_tables)), + }, + )) + .await + .expect("failed to update namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_eq!(updated_ns.max_tables, want_max_tables); + assert_eq!( + updated_ns.max_columns_per_table, + created_ns.max_columns_per_table + ); + + // Update the max allowed columns per table + let want_max_columns_per_table = created_ns.max_columns_per_table + 7; + let updated_ns = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxColumnsPerTable(want_max_columns_per_table)), + }, + )) + .await + .expect("failed to update namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_eq!(updated_ns.max_tables, want_max_tables); + assert_eq!(updated_ns.max_columns_per_table, want_max_columns_per_table); + // Deleting the namespace should cause it to disappear handler .delete_namespace(Request::new(DeleteNamespaceRequest { @@ -325,4 +488,68 @@ mod tests { assert_matches!(current.as_slice(), []); } } + + #[tokio::test] + async fn test_reject_invalid_service_protection_limits() { + let catalog: Arc = + Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); + + let topic = catalog + .repositories() + .await + .topics() + .create_or_get("kafka-topic") + .await + .unwrap(); + let query_pool = catalog + .repositories() + .await + .query_pools() + .create_or_get("query-pool") + .await + .unwrap(); + + let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + let req = CreateNamespaceRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(RETENTION), + }; + let created_ns = handler + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(created_ns.name, NS_NAME); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + assert_ne!(created_ns.max_tables, 0); + assert_ne!(created_ns.max_columns_per_table, 0); + + // The handler should reject any attempt to set the table limit to zero. + let status = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxTables(0)), + }, + )) + .await + .expect_err("invalid namespace update request for max table limit should fail"); + assert_eq!(status.code(), Code::InvalidArgument); + + // ...and likewise should reject any attempt to set the column per table limit to zero. + let status = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxColumnsPerTable(0)), + }, + )) + .await + .expect_err( + "invalid namespace update request for max columns per table limit should fail", + ); + assert_eq!(status.code(), Code::InvalidArgument); + } }