From 134967cddb445a1cef86772caf62af206b80c482 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Wed, 29 Mar 2023 17:38:45 +0100 Subject: [PATCH 1/6] feat(namespace): Enable update of service protection limits over gRPC This adds a message type to encapsulate service protection limits for a namespace, an RPC to update any single limit and exposes the limits on a namespace as part of the pre-existing Namespace message. --- .../influxdata/iox/namespace/v1/service.proto | 31 +++ ioxd_querier/src/rpc/namespace.rs | 27 +++ service_grpc_namespace/src/lib.rs | 225 +++++++++++++++++- 3 files changed, 279 insertions(+), 4 deletions(-) diff --git a/generated_types/protos/influxdata/iox/namespace/v1/service.proto b/generated_types/protos/influxdata/iox/namespace/v1/service.proto index 83885eede1..2f8026a4e2 100644 --- a/generated_types/protos/influxdata/iox/namespace/v1/service.proto +++ b/generated_types/protos/influxdata/iox/namespace/v1/service.proto @@ -14,6 +14,10 @@ service NamespaceService { // Update retention period rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse); + + // Update a service protection limit of a namespace. For this change to take + // effect, all routers MUST be restarted + rpc UpdateNamespaceServiceProtectionLimit(UpdateNamespaceServiceProtectionLimitRequest) returns (UpdateNamespaceServiceProtectionLimitResponse); } message GetNamespacesRequest { @@ -61,6 +65,23 @@ message UpdateNamespaceRetentionResponse { Namespace namespace = 1; } +message UpdateNamespaceServiceProtectionLimitRequest { + // Namespace to have its service protection limits updated. + string name = 1; + + // The service protection limit to update. + oneof limit_update { + // Change the maximum number of tables the namespace may have. + int32 max_tables = 2; + // Change the maximum number of columns each table in the namespace may have. + int32 max_columns_per_table = 3; + } +} + +message UpdateNamespaceServiceProtectionLimitResponse { + Namespace namespace = 1; +} + message Namespace { // Namespace ID int64 id = 1; @@ -72,4 +93,14 @@ message Namespace { // // NULL means "infinite retention". optional int64 retention_period_ns = 3; + + // The service protection limits which the namespace operates within. + ServiceProtectionLimits service_protection_limits = 4; +} + +message ServiceProtectionLimits { + // The maximum number of tables which this namespace is allowed to contain. + int32 max_tables = 1; + // The maximum number of columns a table belonging to this namespace may have. + int32 max_columns_per_table = 2; } diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index a6a027de65..e65429bdec 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -36,6 +36,10 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace { id: namespace.id.get(), name: namespace.name, retention_period_ns: namespace.retention_period_ns, + service_protection_limits: Some(proto::ServiceProtectionLimits { + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, + }), } } @@ -82,6 +86,16 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl "use router instances to manage namespaces", )) } + + async fn update_namespace_service_protection_limit( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> + { + Err(tonic::Status::unimplemented( + "use router instances to manage namespaces", + )) + } } #[cfg(test)] @@ -92,6 +106,11 @@ mod tests { use querier::{create_ingester_connection_for_testing, QuerierCatalogCache}; use tokio::runtime::Handle; + use iox_catalog::{ + DEFAULT_MAX_COLUMNS_PER_TABLE as TEST_MAX_COLUMNS_PER_TABLE, + DEFAULT_MAX_TABLES as TEST_MAX_TABLES, + }; + /// Common retention period value we'll use in tests const TEST_RETENTION_PERIOD_NS: Option = Some(3_600 * 1_000_000_000); @@ -171,11 +190,19 @@ mod tests { id: 1, name: "namespace2".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, + service_protection_limits: Some(proto::ServiceProtectionLimits { + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, + }), }, proto::Namespace { id: 2, name: "namespace1".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, + service_protection_limits: Some(proto::ServiceProtectionLimits { + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, + }), }, ] } diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 79fc7603bc..4d63ec5404 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -1,9 +1,10 @@ //! Implementation of the namespace gRPC service - use std::sync::Arc; use data_types::{Namespace as CatalogNamespace, QueryPoolId, TopicId}; -use generated_types::influxdata::iox::namespace::v1::*; +use generated_types::influxdata::iox::namespace::v1::{ + update_namespace_service_protection_limit_request::LimitUpdate, *, +}; use iox_catalog::interface::{Catalog, SoftDeletedRows}; use observability_deps::tracing::{debug, info, warn}; use tonic::{Request, Response, Status}; @@ -92,7 +93,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { "created namespace" ); - Ok(Response::new(create_namespace_to_proto(namespace))) + Ok(Response::new(namespace_to_create_response_proto(namespace))) } async fn delete_namespace( @@ -152,6 +153,64 @@ impl namespace_service_server::NamespaceService for NamespaceService { namespace: Some(namespace_to_proto(namespace)), })) } + + async fn update_namespace_service_protection_limit( + &self, + request: Request, + ) -> Result, Status> { + let mut repos = self.catalog.repositories().await; + + let UpdateNamespaceServiceProtectionLimitRequest { + name: namespace_name, + limit_update, + } = request.into_inner(); + + debug!(%namespace_name, ?limit_update, "updating namespace service protection limit"); + + let namespace = match limit_update { + Some(LimitUpdate::MaxTables(n)) => { + if n == 0 { + return Err(Status::invalid_argument( + "max table limit for namespace must be greater than 0", + )); + } + repos + .namespaces() + .update_table_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!(error=%e, %namespace_name, table_limit=%n, "failed to update table limit for namespace"); + status_from_catalog_namespace_error(e) + }) + } + Some(LimitUpdate::MaxColumnsPerTable(n)) => { + if n == 0 { + return Err(Status::invalid_argument( + "max columns per table limit for namespace must be greater than 0", + )); + } + repos + .namespaces() + .update_column_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!(error=%e, %namespace_name, per_table_column_limit=%n, "failed to update per table column limit for namespace"); + status_from_catalog_namespace_error(e) + }) + } + None => Err(Status::invalid_argument( + "unsupported service protection limit change requested", + )), + }?; + + info!(namespace_name, namespace_id = %namespace.id, "updated namespace service protection limits"); + + Ok(Response::new( + UpdateNamespaceServiceProtectionLimitResponse { + namespace: Some(namespace_to_proto(namespace)), + }, + )) + } } fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace { @@ -159,15 +218,23 @@ fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace { id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, + service_protection_limits: Some(ServiceProtectionLimits { + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, + }), } } -fn create_namespace_to_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse { +fn namespace_to_create_response_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse { CreateNamespaceResponse { namespace: Some(Namespace { id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, + service_protection_limits: Some(ServiceProtectionLimits { + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, + }), }), } } @@ -189,6 +256,15 @@ fn map_retention_period(v: Option) -> Result, Status> { } } +fn status_from_catalog_namespace_error(err: iox_catalog::interface::Error) -> Status { + match err { + iox_catalog::interface::Error::NamespaceNotFoundByName { .. } => { + Status::not_found(err.to_string()) + } + _ => Status::internal(err.to_string()), + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -217,6 +293,32 @@ mod tests { }); } + #[test] + fn test_namespace_error_mapping() { + let not_found_err = iox_catalog::interface::Error::NamespaceNotFoundByName { + name: String::from("bananas_namespace"), + }; + let not_found_msg = not_found_err.to_string(); + assert_matches!( + status_from_catalog_namespace_error(not_found_err), + s => { + assert_eq!(s.code(), Code::NotFound); + assert_eq!(s.message(), not_found_msg); + }); + + let other_err = iox_catalog::interface::Error::ColumnCreateLimitError { + column_name: String::from("quantity"), + table_id: data_types::TableId::new(42), + }; + let other_err_msg = other_err.to_string(); + assert_matches!( + status_from_catalog_namespace_error(other_err), + s => { + assert_eq!(s.code(), Code::Internal); + assert_eq!(s.message(), other_err_msg); + }); + } + #[tokio::test] async fn test_crud() { let catalog: Arc = @@ -263,6 +365,13 @@ mod tests { .expect("no namespace in response"); assert_eq!(created_ns.name, NS_NAME); assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + let ServiceProtectionLimits { + max_tables: original_max_tables, + max_columns_per_table: original_max_columns_per_table, + } = created_ns + .service_protection_limits + .clone() + .expect("created namespace must have service protection limits"); // There should now be one namespace { @@ -306,6 +415,48 @@ mod tests { }) } + // Update the max allowed tables + let want_max_tables = original_max_tables + 42; + let updated_ns = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxTables(want_max_tables)), + }, + )) + .await + .expect("failed to update namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_matches!(updated_ns.service_protection_limits, Some(got_limits) => { + assert_eq!(got_limits.max_tables, want_max_tables); + assert_eq!(got_limits.max_columns_per_table, original_max_columns_per_table); + }); + + // Update the max allowed columns per table + let want_max_columns_per_table = original_max_columns_per_table + 7; + let updated_ns = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxColumnsPerTable(want_max_columns_per_table)), + }, + )) + .await + .expect("failed to update namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_matches!(updated_ns.service_protection_limits, Some(got_limits) => { + assert_eq!(got_limits.max_tables, want_max_tables); + assert_eq!(got_limits.max_columns_per_table, want_max_columns_per_table); + }); + // Deleting the namespace should cause it to disappear handler .delete_namespace(Request::new(DeleteNamespaceRequest { @@ -325,4 +476,70 @@ mod tests { assert_matches!(current.as_slice(), []); } } + + #[tokio::test] + async fn test_reject_invalid_service_protection_limits() { + let catalog: Arc = + Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); + + let topic = catalog + .repositories() + .await + .topics() + .create_or_get("kafka-topic") + .await + .unwrap(); + let query_pool = catalog + .repositories() + .await + .query_pools() + .create_or_get("query-pool") + .await + .unwrap(); + + let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + let req = CreateNamespaceRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(RETENTION), + }; + let created_ns = handler + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(created_ns.name, NS_NAME); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + assert_matches!(created_ns.service_protection_limits, Some(limits) => { + assert_ne!(limits.max_tables, 0); + assert_ne!(limits.max_columns_per_table, 0); + }); + + // The handler should reject any attempt to set the table limit to zero. + let status = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxTables(0)), + }, + )) + .await + .expect_err("invalid namespace update request for max table limit should fail"); + assert_eq!(status.code(), Code::InvalidArgument); + + // ...and likewise should reject any attempt to set the column per table limit to zero. + let status = handler + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: NS_NAME.to_string(), + limit_update: Some(LimitUpdate::MaxColumnsPerTable(0)), + }, + )) + .await + .expect_err( + "invalid namespace update request for max columns per table limit should fail", + ); + assert_eq!(status.code(), Code::InvalidArgument); + } } From 30b292f3df54ed596ad970ea73bd706a5648d5a9 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Thu, 30 Mar 2023 15:33:33 +0100 Subject: [PATCH 2/6] feat(cli): Update namespace service protection limits This commit adds a client method to invoke the UpdateNamespaceServiceProtectionLimits RPC API, providing a user friendly way to do this through the IOx command line. --- influxdb_iox/src/commands/namespace/mod.rs | 14 ++- .../src/commands/namespace/update_limit.rs | 67 +++++++++++ influxdb_iox/tests/end_to_end_cases/cli.rs | 110 ++++++++++++++++++ influxdb_iox_client/src/client/namespace.rs | 28 ++++- 4 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 influxdb_iox/src/commands/namespace/update_limit.rs diff --git a/influxdb_iox/src/commands/namespace/mod.rs b/influxdb_iox/src/commands/namespace/mod.rs index c823847d7f..9edb414fbb 100644 --- a/influxdb_iox/src/commands/namespace/mod.rs +++ b/influxdb_iox/src/commands/namespace/mod.rs @@ -6,6 +6,7 @@ use thiserror::Error; mod create; mod delete; mod retention; +mod update_limit; #[allow(clippy::enum_variant_names)] #[derive(Debug, Error)] @@ -15,8 +16,13 @@ pub enum Error { #[error("Client error: {0}")] ClientError(#[from] influxdb_iox_client::error::Error), + + #[error("No valid limit was provided")] + InvalidLimit, } +pub type Result = std::result::Result; + /// Various commands for namespace inspection #[derive(Debug, clap::Parser)] pub struct Config { @@ -36,11 +42,14 @@ enum Command { /// Update retention of an existing namespace Retention(retention::Config), + /// Update one of the service protection limits for an existing namespace + UpdateLimit(update_limit::Config), + /// Delete a namespace Delete(delete::Config), } -pub async fn command(connection: Connection, config: Config) -> Result<(), Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { match config.command { Command::Create(config) => { create::command(connection, config).await?; @@ -53,6 +62,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error Command::Retention(config) => { retention::command(connection, config).await?; } + Command::UpdateLimit(config) => { + update_limit::command(connection, config).await?; + } Command::Delete(config) => { delete::command(connection, config).await?; } // Deliberately not adding _ => so the compiler will direct people here to impl new diff --git a/influxdb_iox/src/commands/namespace/update_limit.rs b/influxdb_iox/src/commands/namespace/update_limit.rs new file mode 100644 index 0000000000..2aefa70305 --- /dev/null +++ b/influxdb_iox/src/commands/namespace/update_limit.rs @@ -0,0 +1,67 @@ +use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::namespace::generated_types::LimitUpdate; + +use crate::commands::namespace::{Error, Result}; + +#[derive(Debug, clap::Parser)] +pub struct Config { + /// The namespace to update a service protection limit for + #[clap(action)] + namespace: String, + + #[command(flatten)] + args: Args, +} + +#[derive(Debug, clap::Args)] +#[clap(group( + clap::ArgGroup::new("limit") + .required(true) + .args(&["max_tables", "max_columns_per_table"]) + ))] +struct Args { + /// The maximum number of tables to allow for this namespace + #[clap(action, long = "max-tables", short = 't', group = "limit")] + max_tables: Option, + + /// The maximum number of columns to allow per table for this namespace + #[clap(action, long = "max-columns-per-table", short = 'c', group = "limit")] + max_columns_per_table: Option, +} + +impl TryFrom for LimitUpdate { + type Error = Error; + fn try_from(args: Args) -> Result { + let Args { + max_tables, + max_columns_per_table, + } = args; + + if let Some(n) = max_tables { + return Ok(Self::MaxTables(n)); + } + if let Some(n) = max_columns_per_table { + return Ok(Self::MaxColumnsPerTable(n)); + } + + Err(Error::InvalidLimit) + } +} + +pub async fn command(connection: Connection, config: Config) -> Result<()> { + let mut client = influxdb_iox_client::namespace::Client::new(connection); + + let namespace = client + .update_namespace_service_protection_limit( + &config.namespace, + LimitUpdate::try_from(config.args)?, + ) + .await?; + println!("{}", serde_json::to_string_pretty(&namespace)?); + + println!( + r" +NOTE: This change will NOT take effect until all router instances have been restarted!" + ); + Ok(()) +} diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index d9996dae03..71173d109a 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -1040,3 +1040,113 @@ async fn query_ingester() { .run() .await } + +/// Test the namespace update service limit command +#[tokio::test] +async fn namespace_update_service_limit() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 500, + // "maxColumnsPerTable": 200 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("create") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 500"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 1337, + // "maxColumnsPerTable": 200 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("update-limit") + .arg("--max-tables") + .arg("1337") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 1337"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)), + ); + } + .boxed() + })), + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let namespace = "service_limiter_namespace"; + let addr = state.cluster().router().router_grpc_base().to_string(); + + // { + // "id": , + // "name": "service_limiter_namespace", + // "serviceProtectionLimits": { + // "maxTables": 1337, + // "maxColumnsPerTable": 42 + // } + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("update-limit") + .arg("--max-columns-per-table") + .arg("42") + .arg(namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(r#""maxTables": 1337"#)) + .and(predicate::str::contains(r#""maxColumnsPerTable": 42"#)), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} diff --git a/influxdb_iox_client/src/client/namespace.rs b/influxdb_iox_client/src/client/namespace.rs index c94ea8389c..fb88dca097 100644 --- a/influxdb_iox_client/src/client/namespace.rs +++ b/influxdb_iox_client/src/client/namespace.rs @@ -7,7 +7,9 @@ use ::generated_types::google::OptionalField; /// Re-export generated_types pub mod generated_types { - pub use generated_types::influxdata::iox::namespace::v1::*; + pub use generated_types::influxdata::iox::namespace::v1::{ + update_namespace_service_protection_limit_request::LimitUpdate, *, + }; } /// A basic client for working with Namespaces. @@ -77,6 +79,30 @@ impl Client { Ok(response.into_inner().namespace.unwrap_field("namespace")?) } + /// Update one of the service protection limits for a namespace + /// + /// `limit_update` is the new service limit protection limit to set + /// on the namespace. + /// + /// Zero-valued limits are rejected, returning an error. + pub async fn update_namespace_service_protection_limit( + &mut self, + namespace: &str, + limit_update: LimitUpdate, + ) -> Result { + let response = self + .inner + .update_namespace_service_protection_limit( + UpdateNamespaceServiceProtectionLimitRequest { + name: namespace.to_string(), + limit_update: Some(limit_update), + }, + ) + .await?; + + Ok(response.into_inner().namespace.unwrap_field("namespace")?) + } + /// Delete a namespace pub async fn delete_namespace(&mut self, namespace: &str) -> Result<(), Error> { self.inner From eba7eb748667333249e78ddca5119bf604621fba Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Thu, 30 Mar 2023 15:38:35 +0100 Subject: [PATCH 3/6] refactor(cli): DRY error definitions for namespace commands Unused redefinitions of Error were made in namespace commands. This commit removes those and consolidates error definition to the main namespace CLI module. --- influxdb_iox/src/commands/namespace/create.rs | 16 ++-------------- influxdb_iox/src/commands/namespace/delete.rs | 16 ++-------------- influxdb_iox/src/commands/namespace/retention.rs | 16 ++-------------- 3 files changed, 6 insertions(+), 42 deletions(-) diff --git a/influxdb_iox/src/commands/namespace/create.rs b/influxdb_iox/src/commands/namespace/create.rs index 0046dde448..1e2111e082 100644 --- a/influxdb_iox/src/commands/namespace/create.rs +++ b/influxdb_iox/src/commands/namespace/create.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; /// Write data into the specified database #[derive(Debug, clap::Parser)] @@ -30,10 +21,7 @@ pub struct Config { retention_hours: u32, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, retention_hours, diff --git a/influxdb_iox/src/commands/namespace/delete.rs b/influxdb_iox/src/commands/namespace/delete.rs index ff1c015f05..93246beec2 100644 --- a/influxdb_iox/src/commands/namespace/delete.rs +++ b/influxdb_iox/src/commands/namespace/delete.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; #[derive(Debug, clap::Parser)] pub struct Config { @@ -18,10 +9,7 @@ pub struct Config { namespace: String, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace } = config; let mut client = influxdb_iox_client::namespace::Client::new(connection); diff --git a/influxdb_iox/src/commands/namespace/retention.rs b/influxdb_iox/src/commands/namespace/retention.rs index a4a10ee2c2..d896d79bc6 100644 --- a/influxdb_iox/src/commands/namespace/retention.rs +++ b/influxdb_iox/src/commands/namespace/retention.rs @@ -1,15 +1,6 @@ use influxdb_iox_client::connection::Connection; -use thiserror::Error; -#[allow(clippy::enum_variant_names)] -#[derive(Debug, Error)] -pub enum Error { - #[error("JSON Serialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Client error: {0}")] - ClientError(#[from] influxdb_iox_client::error::Error), -} +use crate::commands::namespace::Result; /// Update the specified namespace's data retention period #[derive(Debug, clap::Parser)] @@ -24,10 +15,7 @@ pub struct Config { retention_hours: u32, } -pub async fn command( - connection: Connection, - config: Config, -) -> Result<(), crate::commands::namespace::Error> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, retention_hours, From 3ad4cbe7a960e26aa7b565262b3c74371c68cc1a Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Fri, 31 Mar 2023 17:34:37 +0100 Subject: [PATCH 4/6] feat(router): Add grpc integration tests for namespace limit update This adds additional testing coverage for updates to service protection limits to a namespace, and how they affect subsequent writes that exceed the limits. --- router/tests/grpc.rs | 308 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 306 insertions(+), 2 deletions(-) diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index 2aa48d3a08..034928d9f7 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -5,10 +5,10 @@ use generated_types::influxdata::iox::namespace::v1::{ namespace_service_server::NamespaceService, *, }; use hyper::StatusCode; -use iox_catalog::interface::SoftDeletedRows; +use iox_catalog::interface::{Error as CatalogError, SoftDeletedRows}; use iox_time::{SystemProvider, TimeProvider}; use router::{ - dml_handlers::{DmlError, RetentionError}, + dml_handlers::{CachedServiceProtectionLimit, DmlError, RetentionError, SchemaError}, namespace_resolver::{self, NamespaceCreationError}, server::http::Error, }; @@ -562,3 +562,307 @@ async fn test_update_namespace_negative_retention_period() { }); } } + +#[tokio::test] +async fn test_update_namespace_limit_max_tables() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContextBuilder::default().build().await; + + // Explicitly create the namespace. + let create = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(0), + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + // Writing to two initial tables should succeed + ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") + .await + .expect("write should succeed"); + ctx.write_lp("bananas", "test", "platanos,tag3=C,tag4=D val=99i 42424243") + .await + .expect("write should succeed"); + + // Limit the maximum number of tables to prevent a write adding another table + let got = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(1), + ), + }, + )) + .await + .expect("failed to update namespace max table limit") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, create.name); + assert_eq!(got.id, create.id); + assert_eq!(got.retention_period_ns, create.retention_period_ns); + assert_matches!(&got.service_protection_limits, Some(got_limits) => { + assert_eq!(got_limits.max_tables, 1); + assert_eq!(got_limits.max_columns_per_table, create.service_protection_limits.expect("created namespace should have limits").max_columns_per_table); + }); + + // The list namespace RPC should show the updated namespace + { + let list = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert_matches!(list.namespaces.as_slice(), [ns] => { + assert_eq!(*ns, got); + }); + } + + // The catalog should contain the namespace. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + let got_limits = got.service_protection_limits.expect("created namespace should have limits"); + assert_eq!(ns.max_tables, got_limits.max_tables); + assert_eq!(ns.max_columns_per_table, got_limits.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } + + // New table should fail to be created by the catalog. + let err = ctx + .write_lp( + "bananas", + "test", + "arán_banana,tag1=A,tag2=B val=42i 42424244", + ) + .await + .expect_err("cached entry should be removed"); + assert_matches!(err, router::server::http::Error::DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(e))) => { + let e: CatalogError = *e.downcast::().expect("error returned should be a table create limit error"); + assert_matches!(e,CatalogError::TableCreateLimitError { table_name, .. } => { + assert_eq!(table_name, "arán_banana"); + }); + }); +} +#[tokio::test] +async fn test_update_namespace_limit_max_columns_per_table() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContextBuilder::default().build().await; + + // Explicitly create the namespace. + let create = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(0), + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + // Initial write within limit should succeed + ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") + .await + .expect("write should succeed"); + + // Limit the maximum number of columns per table so an extra column is rejected + let got = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(1), + ), + }, + )) + .await + .expect("failed to update namespace max table limit") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, create.name); + assert_eq!(got.id, create.id); + assert_eq!(got.retention_period_ns, create.retention_period_ns); + assert_matches!(&got.service_protection_limits, Some(got_limits) => { + assert_eq!(got_limits.max_tables, create.service_protection_limits.expect("namespace should have limits").max_tables); + assert_eq!(got_limits.max_columns_per_table, 1); + }); + + // The list namespace RPC should show the updated namespace + { + let list = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert_matches!(list.namespaces.as_slice(), [ns] => { + assert_eq!(*ns, got); + }); + } + + // The catalog should contain the namespace. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + let got_limits = got.service_protection_limits.expect("created namespace should have limits"); + assert_eq!(ns.max_tables, got_limits.max_tables); + assert_eq!(ns.max_columns_per_table, got_limits.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } + + // The cached entry is not affected, and writes continue to be validated + // against the old value. + // + // https://github.com/influxdata/influxdb_iox/issues/6175 + + // Writing to second table should succeed while using the cached entry + ctx.write_lp( + "bananas", + "test", + "platanos,tag1=A,tag2=B val=1337i 42424243", + ) + .await + .expect("write should succeed"); + + // The router restarts, and writes with too many columns are then rejected. + let ctx = ctx.restart().await; + + let err = ctx + .write_lp( + "bananas", + "test", + "arán_banana,tag1=A,tag2=B val=76i 42424243", + ) + .await + .expect_err("cached entry should be removed and write should be blocked"); + assert_matches!( + err, router::server::http::Error::DmlHandler(DmlError::Schema( + SchemaError::ServiceLimit(e) + )) => { + let e: CachedServiceProtectionLimit = *e.downcast::().expect("error returned should be a cached service protection limit"); + assert_matches!(e, CachedServiceProtectionLimit::Column { + table_name, + max_columns_per_table, + .. + } => { + assert_eq!(table_name, "arán_banana"); + assert_eq!(max_columns_per_table, 1); + }); + } + ) +} + +#[tokio::test] +async fn test_update_namespace_limit_0_max_tables_max_columns() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContextBuilder::default().build().await; + + // Explicitly create the namespace. + let create = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(0), + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + // Attempt to use an invalid table limit + let err = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(0), + ), + }, + )) + .await + .expect_err("should not have been able to update the table limit to 0"); + assert_eq!(err.code(), Code::InvalidArgument); + + // Attempt to use an invalid column limit + let err = ctx + .grpc_delegate() + .namespace_service() + .update_namespace_service_protection_limit(Request::new( + UpdateNamespaceServiceProtectionLimitRequest { + name: "bananas_test".to_string(), + limit_update: Some( + update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(0), + ), + }, + )) + .await + .expect_err("should not have been able to update the column per table limit to 0"); + assert_eq!(err.code(), Code::InvalidArgument); + + // The catalog should contain the namespace unchanged. + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), create.id); + assert_eq!(ns.name, create.name); + assert_eq!(ns.retention_period_ns, create.retention_period_ns); + let create_limits = create.service_protection_limits.expect("created namespace should have limits"); + assert_eq!(ns.max_tables, create_limits.max_tables); + assert_eq!(ns.max_columns_per_table, create_limits.max_columns_per_table); + assert!(ns.deleted_at.is_none()); + }); + } +} From b53b8c7d76c355327358e022c0667150a796630a Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Wed, 5 Apr 2023 14:46:14 +0100 Subject: [PATCH 5/6] refactor(namespace): Flatten service protection limits in Namespace proto definition This commit also cleans up the code formatting for the gRPC handler and simplifies some of the gRPC handler tests for the new update service limit API. --- .../influxdata/iox/namespace/v1/service.proto | 10 +- ioxd_querier/src/rpc/namespace.rs | 18 +-- router/tests/grpc.rs | 85 +++++--------- service_grpc_namespace/src/lib.rs | 104 ++++++++++-------- 4 files changed, 95 insertions(+), 122 deletions(-) diff --git a/generated_types/protos/influxdata/iox/namespace/v1/service.proto b/generated_types/protos/influxdata/iox/namespace/v1/service.proto index 2f8026a4e2..e586bece07 100644 --- a/generated_types/protos/influxdata/iox/namespace/v1/service.proto +++ b/generated_types/protos/influxdata/iox/namespace/v1/service.proto @@ -94,13 +94,9 @@ message Namespace { // NULL means "infinite retention". optional int64 retention_period_ns = 3; - // The service protection limits which the namespace operates within. - ServiceProtectionLimits service_protection_limits = 4; -} - -message ServiceProtectionLimits { // The maximum number of tables which this namespace is allowed to contain. - int32 max_tables = 1; + int32 max_tables = 4; + // The maximum number of columns a table belonging to this namespace may have. - int32 max_columns_per_table = 2; + int32 max_columns_per_table = 5; } diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index e65429bdec..313868df2d 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -36,10 +36,8 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace { id: namespace.id.get(), name: namespace.name, retention_period_ns: namespace.retention_period_ns, - service_protection_limits: Some(proto::ServiceProtectionLimits { - max_tables: namespace.max_tables, - max_columns_per_table: namespace.max_columns_per_table, - }), + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, } } @@ -190,19 +188,15 @@ mod tests { id: 1, name: "namespace2".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, - service_protection_limits: Some(proto::ServiceProtectionLimits { - max_tables: TEST_MAX_TABLES, - max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, - }), + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, }, proto::Namespace { id: 2, name: "namespace1".to_string(), retention_period_ns: TEST_RETENTION_PERIOD_NS, - service_protection_limits: Some(proto::ServiceProtectionLimits { - max_tables: TEST_MAX_TABLES, - max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, - }), + max_tables: TEST_MAX_TABLES, + max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE, }, ] } diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index 034928d9f7..fd940bae2d 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -565,22 +565,11 @@ async fn test_update_namespace_negative_retention_period() { #[tokio::test] async fn test_update_namespace_limit_max_tables() { - // Initialise a TestContext requiring explicit namespace creation. - let ctx = TestContextBuilder::default().build().await; - - // Explicitly create the namespace. - let create = ctx - .grpc_delegate() - .namespace_service() - .create_namespace(Request::new(CreateNamespaceRequest { - name: "bananas_test".to_string(), - retention_period_ns: Some(0), - })) - .await - .expect("failed to create namespace") - .into_inner() - .namespace - .expect("no namespace in response"); + // Initialise a TestContext with namespace autocreation. + let ctx = TestContextBuilder::default() + .with_autocreate_namespace(None) + .build() + .await; // Writing to two initial tables should succeed ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") @@ -608,13 +597,13 @@ async fn test_update_namespace_limit_max_tables() { .namespace .expect("no namespace in response"); - assert_eq!(got.name, create.name); - assert_eq!(got.id, create.id); - assert_eq!(got.retention_period_ns, create.retention_period_ns); - assert_matches!(&got.service_protection_limits, Some(got_limits) => { - assert_eq!(got_limits.max_tables, 1); - assert_eq!(got_limits.max_columns_per_table, create.service_protection_limits.expect("created namespace should have limits").max_columns_per_table); - }); + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.max_tables, 1); + assert_eq!( + got.max_columns_per_table, + iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE + ); // The list namespace RPC should show the updated namespace { @@ -644,9 +633,8 @@ async fn test_update_namespace_limit_max_tables() { assert_eq!(ns.id.get(), got.id); assert_eq!(ns.name, got.name); assert_eq!(ns.retention_period_ns, got.retention_period_ns); - let got_limits = got.service_protection_limits.expect("created namespace should have limits"); - assert_eq!(ns.max_tables, got_limits.max_tables); - assert_eq!(ns.max_columns_per_table, got_limits.max_columns_per_table); + assert_eq!(ns.max_tables, got.max_tables); + assert_eq!(ns.max_columns_per_table, got.max_columns_per_table); assert!(ns.deleted_at.is_none()); }); } @@ -662,29 +650,19 @@ async fn test_update_namespace_limit_max_tables() { .expect_err("cached entry should be removed"); assert_matches!(err, router::server::http::Error::DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(e))) => { let e: CatalogError = *e.downcast::().expect("error returned should be a table create limit error"); - assert_matches!(e,CatalogError::TableCreateLimitError { table_name, .. } => { + assert_matches!(&e, CatalogError::TableCreateLimitError { table_name, .. } => { assert_eq!(table_name, "arán_banana"); + assert_eq!(e.to_string(), "couldn't create table arán_banana; limit reached on namespace 1") }); }); } #[tokio::test] async fn test_update_namespace_limit_max_columns_per_table() { - // Initialise a TestContext requiring explicit namespace creation. - let ctx = TestContextBuilder::default().build().await; - - // Explicitly create the namespace. - let create = ctx - .grpc_delegate() - .namespace_service() - .create_namespace(Request::new(CreateNamespaceRequest { - name: "bananas_test".to_string(), - retention_period_ns: Some(0), - })) - .await - .expect("failed to create namespace") - .into_inner() - .namespace - .expect("no namespace in response"); + // Initialise a TestContext with namespace autocreation. + let ctx = TestContextBuilder::default() + .with_autocreate_namespace(None) + .build() + .await; // Initial write within limit should succeed ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242") @@ -709,13 +687,10 @@ async fn test_update_namespace_limit_max_columns_per_table() { .namespace .expect("no namespace in response"); - assert_eq!(got.name, create.name); - assert_eq!(got.id, create.id); - assert_eq!(got.retention_period_ns, create.retention_period_ns); - assert_matches!(&got.service_protection_limits, Some(got_limits) => { - assert_eq!(got_limits.max_tables, create.service_protection_limits.expect("namespace should have limits").max_tables); - assert_eq!(got_limits.max_columns_per_table, 1); - }); + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.max_tables, iox_catalog::DEFAULT_MAX_TABLES); + assert_eq!(got.max_columns_per_table, 1); // The list namespace RPC should show the updated namespace { @@ -745,9 +720,8 @@ async fn test_update_namespace_limit_max_columns_per_table() { assert_eq!(ns.id.get(), got.id); assert_eq!(ns.name, got.name); assert_eq!(ns.retention_period_ns, got.retention_period_ns); - let got_limits = got.service_protection_limits.expect("created namespace should have limits"); - assert_eq!(ns.max_tables, got_limits.max_tables); - assert_eq!(ns.max_columns_per_table, got_limits.max_columns_per_table); + assert_eq!(ns.max_tables, got.max_tables); + assert_eq!(ns.max_columns_per_table, got.max_columns_per_table); assert!(ns.deleted_at.is_none()); }); } @@ -859,9 +833,8 @@ async fn test_update_namespace_limit_0_max_tables_max_columns() { assert_eq!(ns.id.get(), create.id); assert_eq!(ns.name, create.name); assert_eq!(ns.retention_period_ns, create.retention_period_ns); - let create_limits = create.service_protection_limits.expect("created namespace should have limits"); - assert_eq!(ns.max_tables, create_limits.max_tables); - assert_eq!(ns.max_columns_per_table, create_limits.max_columns_per_table); + assert_eq!(ns.max_tables, create.max_tables); + assert_eq!(ns.max_columns_per_table, create.max_columns_per_table); assert!(ns.deleted_at.is_none()); }); } diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 4d63ec5404..26754f77b2 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -131,7 +131,11 @@ impl namespace_service_server::NamespaceService for NamespaceService { let retention_period_ns = map_retention_period(retention_period_ns)?; - debug!(%namespace_name, ?retention_period_ns, "Updating namespace retention"); + debug!( + %namespace_name, + ?retention_period_ns, + "Updating namespace retention", + ); let namespace = repos .namespaces() @@ -143,7 +147,7 @@ impl namespace_service_server::NamespaceService for NamespaceService { })?; info!( - namespace_name, + %namespace_name, retention_period_ns, namespace_id = %namespace.id, "updated namespace retention" @@ -165,7 +169,11 @@ impl namespace_service_server::NamespaceService for NamespaceService { limit_update, } = request.into_inner(); - debug!(%namespace_name, ?limit_update, "updating namespace service protection limit"); + debug!( + %namespace_name, + ?limit_update, + "updating namespace service protection limit", + ); let namespace = match limit_update { Some(LimitUpdate::MaxTables(n)) => { @@ -175,13 +183,18 @@ impl namespace_service_server::NamespaceService for NamespaceService { )); } repos - .namespaces() - .update_table_limit(&namespace_name, n) - .await - .map_err(|e| { - warn!(error=%e, %namespace_name, table_limit=%n, "failed to update table limit for namespace"); - status_from_catalog_namespace_error(e) - }) + .namespaces() + .update_table_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!( + error = %e, + %namespace_name, + table_limit = %n, + "failed to update table limit for namespace", + ); + status_from_catalog_namespace_error(e) + }) } Some(LimitUpdate::MaxColumnsPerTable(n)) => { if n == 0 { @@ -190,20 +203,31 @@ impl namespace_service_server::NamespaceService for NamespaceService { )); } repos - .namespaces() - .update_column_limit(&namespace_name, n) - .await - .map_err(|e| { - warn!(error=%e, %namespace_name, per_table_column_limit=%n, "failed to update per table column limit for namespace"); - status_from_catalog_namespace_error(e) - }) + .namespaces() + .update_column_limit(&namespace_name, n) + .await + .map_err(|e| { + warn!( + error = %e, + %namespace_name, + per_table_column_limit = %n, + "failed to update per table column limit for namespace", + ); + status_from_catalog_namespace_error(e) + }) } None => Err(Status::invalid_argument( "unsupported service protection limit change requested", )), }?; - info!(namespace_name, namespace_id = %namespace.id, "updated namespace service protection limits"); + info!( + %namespace_name, + namespace_id = %namespace.id, + max_tables = %namespace.max_tables, + max_columns_per_table = %namespace.max_columns_per_table, + "updated namespace service protection limits", + ); Ok(Response::new( UpdateNamespaceServiceProtectionLimitResponse { @@ -218,10 +242,8 @@ fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace { id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, - service_protection_limits: Some(ServiceProtectionLimits { - max_tables: namespace.max_tables, - max_columns_per_table: namespace.max_columns_per_table, - }), + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, } } @@ -231,10 +253,8 @@ fn namespace_to_create_response_proto(namespace: CatalogNamespace) -> CreateName id: namespace.id.get(), name: namespace.name.clone(), retention_period_ns: namespace.retention_period_ns, - service_protection_limits: Some(ServiceProtectionLimits { - max_tables: namespace.max_tables, - max_columns_per_table: namespace.max_columns_per_table, - }), + max_tables: namespace.max_tables, + max_columns_per_table: namespace.max_columns_per_table, }), } } @@ -365,13 +385,6 @@ mod tests { .expect("no namespace in response"); assert_eq!(created_ns.name, NS_NAME); assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); - let ServiceProtectionLimits { - max_tables: original_max_tables, - max_columns_per_table: original_max_columns_per_table, - } = created_ns - .service_protection_limits - .clone() - .expect("created namespace must have service protection limits"); // There should now be one namespace { @@ -416,7 +429,7 @@ mod tests { } // Update the max allowed tables - let want_max_tables = original_max_tables + 42; + let want_max_tables = created_ns.max_tables + 42; let updated_ns = handler .update_namespace_service_protection_limit(Request::new( UpdateNamespaceServiceProtectionLimitRequest { @@ -431,13 +444,14 @@ mod tests { .expect("no namespace in response"); assert_eq!(updated_ns.name, created_ns.name); assert_eq!(updated_ns.id, created_ns.id); - assert_matches!(updated_ns.service_protection_limits, Some(got_limits) => { - assert_eq!(got_limits.max_tables, want_max_tables); - assert_eq!(got_limits.max_columns_per_table, original_max_columns_per_table); - }); + assert_eq!(updated_ns.max_tables, want_max_tables); + assert_eq!( + updated_ns.max_columns_per_table, + created_ns.max_columns_per_table + ); // Update the max allowed columns per table - let want_max_columns_per_table = original_max_columns_per_table + 7; + let want_max_columns_per_table = created_ns.max_columns_per_table + 7; let updated_ns = handler .update_namespace_service_protection_limit(Request::new( UpdateNamespaceServiceProtectionLimitRequest { @@ -452,10 +466,8 @@ mod tests { .expect("no namespace in response"); assert_eq!(updated_ns.name, created_ns.name); assert_eq!(updated_ns.id, created_ns.id); - assert_matches!(updated_ns.service_protection_limits, Some(got_limits) => { - assert_eq!(got_limits.max_tables, want_max_tables); - assert_eq!(got_limits.max_columns_per_table, want_max_columns_per_table); - }); + assert_eq!(updated_ns.max_tables, want_max_tables); + assert_eq!(updated_ns.max_columns_per_table, want_max_columns_per_table); // Deleting the namespace should cause it to disappear handler @@ -511,10 +523,8 @@ mod tests { .expect("no namespace in response"); assert_eq!(created_ns.name, NS_NAME); assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); - assert_matches!(created_ns.service_protection_limits, Some(limits) => { - assert_ne!(limits.max_tables, 0); - assert_ne!(limits.max_columns_per_table, 0); - }); + assert_ne!(created_ns.max_tables, 0); + assert_ne!(created_ns.max_columns_per_table, 0); // The handler should reject any attempt to set the table limit to zero. let status = handler From 44478348c73676dfc2a6d8b4495f0dc0034d8ed4 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Wed, 5 Apr 2023 15:25:37 +0100 Subject: [PATCH 6/6] refactor(cli): Simplify namespace update-limit command code --- influxdb_iox/src/commands/namespace/mod.rs | 3 --- .../src/commands/namespace/update_limit.rs | 22 +++++++++++-------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/influxdb_iox/src/commands/namespace/mod.rs b/influxdb_iox/src/commands/namespace/mod.rs index 9edb414fbb..47b191f5f2 100644 --- a/influxdb_iox/src/commands/namespace/mod.rs +++ b/influxdb_iox/src/commands/namespace/mod.rs @@ -16,9 +16,6 @@ pub enum Error { #[error("Client error: {0}")] ClientError(#[from] influxdb_iox_client::error::Error), - - #[error("No valid limit was provided")] - InvalidLimit, } pub type Result = std::result::Result; diff --git a/influxdb_iox/src/commands/namespace/update_limit.rs b/influxdb_iox/src/commands/namespace/update_limit.rs index 2aefa70305..7c2734ee0a 100644 --- a/influxdb_iox/src/commands/namespace/update_limit.rs +++ b/influxdb_iox/src/commands/namespace/update_limit.rs @@ -1,7 +1,7 @@ use influxdb_iox_client::connection::Connection; use influxdb_iox_client::namespace::generated_types::LimitUpdate; -use crate::commands::namespace::{Error, Result}; +use crate::commands::namespace::Result; #[derive(Debug, clap::Parser)] pub struct Config { @@ -15,6 +15,12 @@ pub struct Config { #[derive(Debug, clap::Args)] #[clap(group( + // This arg group "limit" links the members of the below struct + // named "max_tables" and "max_columns_per_table" together as + // mutually exclusive flags. As we specify all flags & commands + // using clap-derive rather than the imperative builder, v3 only + // properly supports this kind of behaviour in a macro code block. + // NOTE: It takes the variable names and not the flag long names. clap::ArgGroup::new("limit") .required(true) .args(&["max_tables", "max_columns_per_table"]) @@ -29,22 +35,20 @@ struct Args { max_columns_per_table: Option, } -impl TryFrom for LimitUpdate { - type Error = Error; - fn try_from(args: Args) -> Result { +impl From for LimitUpdate { + fn from(args: Args) -> Self { let Args { max_tables, max_columns_per_table, } = args; if let Some(n) = max_tables { - return Ok(Self::MaxTables(n)); + return Self::MaxTables(n); } if let Some(n) = max_columns_per_table { - return Ok(Self::MaxColumnsPerTable(n)); + return Self::MaxColumnsPerTable(n); } - - Err(Error::InvalidLimit) + unreachable!(); } } @@ -54,7 +58,7 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { let namespace = client .update_namespace_service_protection_limit( &config.namespace, - LimitUpdate::try_from(config.args)?, + LimitUpdate::from(config.args), ) .await?; println!("{}", serde_json::to_string_pretty(&namespace)?);