Merge pull request #7415 from influxdata/savage/6824-update-namespace-service-limit-api-call

feat(namespace): Update support for namespace service protection limits
pull/24376/head
kodiakhq[bot] 2023-04-05 14:34:55 +00:00 committed by GitHub
commit 3cd8eb56e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 784 additions and 52 deletions

View File

@ -14,6 +14,10 @@ service NamespaceService {
// Update retention period
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse);
// Update a service protection limit of a namespace. For this change to take
// effect, all routers MUST be restarted
rpc UpdateNamespaceServiceProtectionLimit(UpdateNamespaceServiceProtectionLimitRequest) returns (UpdateNamespaceServiceProtectionLimitResponse);
}
message GetNamespacesRequest {
@ -61,6 +65,23 @@ message UpdateNamespaceRetentionResponse {
Namespace namespace = 1;
}
message UpdateNamespaceServiceProtectionLimitRequest {
// Namespace to have its service protection limits updated.
string name = 1;
// The service protection limit to update.
oneof limit_update {
// Change the maximum number of tables the namespace may have.
int32 max_tables = 2;
// Change the maximum number of columns each table in the namespace may have.
int32 max_columns_per_table = 3;
}
}
message UpdateNamespaceServiceProtectionLimitResponse {
Namespace namespace = 1;
}
message Namespace {
// Namespace ID
int64 id = 1;
@ -72,4 +93,10 @@ message Namespace {
//
// NULL means "infinite retention".
optional int64 retention_period_ns = 3;
// The maximum number of tables which this namespace is allowed to contain.
int32 max_tables = 4;
// The maximum number of columns a table belonging to this namespace may have.
int32 max_columns_per_table = 5;
}

View File

@ -1,15 +1,6 @@
use influxdb_iox_client::connection::Connection;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
use crate::commands::namespace::Result;
/// Write data into the specified database
#[derive(Debug, clap::Parser)]
@ -30,10 +21,7 @@ pub struct Config {
retention_hours: u32,
}
pub async fn command(
connection: Connection,
config: Config,
) -> Result<(), crate::commands::namespace::Error> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let Config {
namespace,
retention_hours,

View File

@ -1,15 +1,6 @@
use influxdb_iox_client::connection::Connection;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
use crate::commands::namespace::Result;
#[derive(Debug, clap::Parser)]
pub struct Config {
@ -18,10 +9,7 @@ pub struct Config {
namespace: String,
}
pub async fn command(
connection: Connection,
config: Config,
) -> Result<(), crate::commands::namespace::Error> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let Config { namespace } = config;
let mut client = influxdb_iox_client::namespace::Client::new(connection);

View File

@ -6,6 +6,7 @@ use thiserror::Error;
mod create;
mod delete;
mod retention;
mod update_limit;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
@ -17,6 +18,8 @@ pub enum Error {
ClientError(#[from] influxdb_iox_client::error::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Various commands for namespace inspection
#[derive(Debug, clap::Parser)]
pub struct Config {
@ -36,11 +39,14 @@ enum Command {
/// Update retention of an existing namespace
Retention(retention::Config),
/// Update one of the service protection limits for an existing namespace
UpdateLimit(update_limit::Config),
/// Delete a namespace
Delete(delete::Config),
}
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
match config.command {
Command::Create(config) => {
create::command(connection, config).await?;
@ -53,6 +59,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error
Command::Retention(config) => {
retention::command(connection, config).await?;
}
Command::UpdateLimit(config) => {
update_limit::command(connection, config).await?;
}
Command::Delete(config) => {
delete::command(connection, config).await?;
} // Deliberately not adding _ => so the compiler will direct people here to impl new

View File

@ -1,15 +1,6 @@
use influxdb_iox_client::connection::Connection;
use thiserror::Error;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum Error {
#[error("JSON Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Client error: {0}")]
ClientError(#[from] influxdb_iox_client::error::Error),
}
use crate::commands::namespace::Result;
/// Update the specified namespace's data retention period
#[derive(Debug, clap::Parser)]
@ -24,10 +15,7 @@ pub struct Config {
retention_hours: u32,
}
pub async fn command(
connection: Connection,
config: Config,
) -> Result<(), crate::commands::namespace::Error> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let Config {
namespace,
retention_hours,

View File

@ -0,0 +1,71 @@
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::namespace::generated_types::LimitUpdate;
use crate::commands::namespace::Result;
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The namespace to update a service protection limit for
#[clap(action)]
namespace: String,
#[command(flatten)]
args: Args,
}
#[derive(Debug, clap::Args)]
#[clap(group(
// This arg group "limit" links the members of the below struct
// named "max_tables" and "max_columns_per_table" together as
// mutually exclusive flags. As we specify all flags & commands
// using clap-derive rather than the imperative builder, v3 only
// properly supports this kind of behaviour in a macro code block.
// NOTE: It takes the variable names and not the flag long names.
clap::ArgGroup::new("limit")
.required(true)
.args(&["max_tables", "max_columns_per_table"])
))]
struct Args {
/// The maximum number of tables to allow for this namespace
#[clap(action, long = "max-tables", short = 't', group = "limit")]
max_tables: Option<i32>,
/// The maximum number of columns to allow per table for this namespace
#[clap(action, long = "max-columns-per-table", short = 'c', group = "limit")]
max_columns_per_table: Option<i32>,
}
impl From<Args> for LimitUpdate {
fn from(args: Args) -> Self {
let Args {
max_tables,
max_columns_per_table,
} = args;
if let Some(n) = max_tables {
return Self::MaxTables(n);
}
if let Some(n) = max_columns_per_table {
return Self::MaxColumnsPerTable(n);
}
unreachable!();
}
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = influxdb_iox_client::namespace::Client::new(connection);
let namespace = client
.update_namespace_service_protection_limit(
&config.namespace,
LimitUpdate::from(config.args),
)
.await?;
println!("{}", serde_json::to_string_pretty(&namespace)?);
println!(
r"
NOTE: This change will NOT take effect until all router instances have been restarted!"
);
Ok(())
}

View File

@ -1040,3 +1040,113 @@ async fn query_ingester() {
.run()
.await
}
/// Test the namespace update service limit command
#[tokio::test]
async fn namespace_update_service_limit() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let namespace = "service_limiter_namespace";
let addr = state.cluster().router().router_grpc_base().to_string();
// {
// "id": <foo>,
// "name": "service_limiter_namespace",
// "serviceProtectionLimits": {
// "maxTables": 500,
// "maxColumnsPerTable": 200
// }
// }
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("create")
.arg(namespace)
.assert()
.success()
.stdout(
predicate::str::contains(namespace)
.and(predicate::str::contains(r#""maxTables": 500"#))
.and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)),
);
}
.boxed()
})),
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let namespace = "service_limiter_namespace";
let addr = state.cluster().router().router_grpc_base().to_string();
// {
// "id": <foo>,
// "name": "service_limiter_namespace",
// "serviceProtectionLimits": {
// "maxTables": 1337,
// "maxColumnsPerTable": 200
// }
// }
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("update-limit")
.arg("--max-tables")
.arg("1337")
.arg(namespace)
.assert()
.success()
.stdout(
predicate::str::contains(namespace)
.and(predicate::str::contains(r#""maxTables": 1337"#))
.and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)),
);
}
.boxed()
})),
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let namespace = "service_limiter_namespace";
let addr = state.cluster().router().router_grpc_base().to_string();
// {
// "id": <foo>,
// "name": "service_limiter_namespace",
// "serviceProtectionLimits": {
// "maxTables": 1337,
// "maxColumnsPerTable": 42
// }
// }
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&addr)
.arg("namespace")
.arg("update-limit")
.arg("--max-columns-per-table")
.arg("42")
.arg(namespace)
.assert()
.success()
.stdout(
predicate::str::contains(namespace)
.and(predicate::str::contains(r#""maxTables": 1337"#))
.and(predicate::str::contains(r#""maxColumnsPerTable": 42"#)),
);
}
.boxed()
})),
],
)
.run()
.await
}

View File

@ -7,7 +7,9 @@ use ::generated_types::google::OptionalField;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::namespace::v1::*;
pub use generated_types::influxdata::iox::namespace::v1::{
update_namespace_service_protection_limit_request::LimitUpdate, *,
};
}
/// A basic client for working with Namespaces.
@ -77,6 +79,30 @@ impl Client {
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
}
/// Update one of the service protection limits for a namespace
///
/// `limit_update` is the new service limit protection limit to set
/// on the namespace.
///
/// Zero-valued limits are rejected, returning an error.
pub async fn update_namespace_service_protection_limit(
&mut self,
namespace: &str,
limit_update: LimitUpdate,
) -> Result<Namespace, Error> {
let response = self
.inner
.update_namespace_service_protection_limit(
UpdateNamespaceServiceProtectionLimitRequest {
name: namespace.to_string(),
limit_update: Some(limit_update),
},
)
.await?;
Ok(response.into_inner().namespace.unwrap_field("namespace")?)
}
/// Delete a namespace
pub async fn delete_namespace(&mut self, namespace: &str) -> Result<(), Error> {
self.inner

View File

@ -36,6 +36,8 @@ fn namespace_to_proto(namespace: Namespace) -> proto::Namespace {
id: namespace.id.get(),
name: namespace.name,
retention_period_ns: namespace.retention_period_ns,
max_tables: namespace.max_tables,
max_columns_per_table: namespace.max_columns_per_table,
}
}
@ -82,6 +84,16 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
"use router instances to manage namespaces",
))
}
async fn update_namespace_service_protection_limit(
&self,
_request: tonic::Request<proto::UpdateNamespaceServiceProtectionLimitRequest>,
) -> Result<tonic::Response<proto::UpdateNamespaceServiceProtectionLimitResponse>, tonic::Status>
{
Err(tonic::Status::unimplemented(
"use router instances to manage namespaces",
))
}
}
#[cfg(test)]
@ -92,6 +104,11 @@ mod tests {
use querier::{create_ingester_connection_for_testing, QuerierCatalogCache};
use tokio::runtime::Handle;
use iox_catalog::{
DEFAULT_MAX_COLUMNS_PER_TABLE as TEST_MAX_COLUMNS_PER_TABLE,
DEFAULT_MAX_TABLES as TEST_MAX_TABLES,
};
/// Common retention period value we'll use in tests
const TEST_RETENTION_PERIOD_NS: Option<i64> = Some(3_600 * 1_000_000_000);
@ -171,11 +188,15 @@ mod tests {
id: 1,
name: "namespace2".to_string(),
retention_period_ns: TEST_RETENTION_PERIOD_NS,
max_tables: TEST_MAX_TABLES,
max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE,
},
proto::Namespace {
id: 2,
name: "namespace1".to_string(),
retention_period_ns: TEST_RETENTION_PERIOD_NS,
max_tables: TEST_MAX_TABLES,
max_columns_per_table: TEST_MAX_COLUMNS_PER_TABLE,
},
]
}

View File

@ -5,10 +5,10 @@ use generated_types::influxdata::iox::namespace::v1::{
namespace_service_server::NamespaceService, *,
};
use hyper::StatusCode;
use iox_catalog::interface::SoftDeletedRows;
use iox_catalog::interface::{Error as CatalogError, SoftDeletedRows};
use iox_time::{SystemProvider, TimeProvider};
use router::{
dml_handlers::{DmlError, RetentionError},
dml_handlers::{CachedServiceProtectionLimit, DmlError, RetentionError, SchemaError},
namespace_resolver::{self, NamespaceCreationError},
server::http::Error,
};
@ -562,3 +562,280 @@ async fn test_update_namespace_negative_retention_period() {
});
}
}
#[tokio::test]
async fn test_update_namespace_limit_max_tables() {
// Initialise a TestContext with namespace autocreation.
let ctx = TestContextBuilder::default()
.with_autocreate_namespace(None)
.build()
.await;
// Writing to two initial tables should succeed
ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242")
.await
.expect("write should succeed");
ctx.write_lp("bananas", "test", "platanos,tag3=C,tag4=D val=99i 42424243")
.await
.expect("write should succeed");
// Limit the maximum number of tables to prevent a write adding another table
let got = ctx
.grpc_delegate()
.namespace_service()
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: "bananas_test".to_string(),
limit_update: Some(
update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(1),
),
},
))
.await
.expect("failed to update namespace max table limit")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(got.name, "bananas_test");
assert_eq!(got.id, 1);
assert_eq!(got.max_tables, 1);
assert_eq!(
got.max_columns_per_table,
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE
);
// The list namespace RPC should show the updated namespace
{
let list = ctx
.grpc_delegate()
.namespace_service()
.get_namespaces(Request::new(Default::default()))
.await
.expect("must return namespaces")
.into_inner();
assert_matches!(list.namespaces.as_slice(), [ns] => {
assert_eq!(*ns, got);
});
}
// The catalog should contain the namespace.
{
let db_list = ctx
.catalog()
.repositories()
.await
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
assert_eq!(ns.max_tables, got.max_tables);
assert_eq!(ns.max_columns_per_table, got.max_columns_per_table);
assert!(ns.deleted_at.is_none());
});
}
// New table should fail to be created by the catalog.
let err = ctx
.write_lp(
"bananas",
"test",
"arán_banana,tag1=A,tag2=B val=42i 42424244",
)
.await
.expect_err("cached entry should be removed");
assert_matches!(err, router::server::http::Error::DmlHandler(DmlError::Schema(SchemaError::ServiceLimit(e))) => {
let e: CatalogError = *e.downcast::<CatalogError>().expect("error returned should be a table create limit error");
assert_matches!(&e, CatalogError::TableCreateLimitError { table_name, .. } => {
assert_eq!(table_name, "arán_banana");
assert_eq!(e.to_string(), "couldn't create table arán_banana; limit reached on namespace 1")
});
});
}
#[tokio::test]
async fn test_update_namespace_limit_max_columns_per_table() {
// Initialise a TestContext with namespace autocreation.
let ctx = TestContextBuilder::default()
.with_autocreate_namespace(None)
.build()
.await;
// Initial write within limit should succeed
ctx.write_lp("bananas", "test", "ananas,tag1=A,tag2=B val=42i 42424242")
.await
.expect("write should succeed");
// Limit the maximum number of columns per table so an extra column is rejected
let got = ctx
.grpc_delegate()
.namespace_service()
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: "bananas_test".to_string(),
limit_update: Some(
update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(1),
),
},
))
.await
.expect("failed to update namespace max table limit")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(got.name, "bananas_test");
assert_eq!(got.id, 1);
assert_eq!(got.max_tables, iox_catalog::DEFAULT_MAX_TABLES);
assert_eq!(got.max_columns_per_table, 1);
// The list namespace RPC should show the updated namespace
{
let list = ctx
.grpc_delegate()
.namespace_service()
.get_namespaces(Request::new(Default::default()))
.await
.expect("must return namespaces")
.into_inner();
assert_matches!(list.namespaces.as_slice(), [ns] => {
assert_eq!(*ns, got);
});
}
// The catalog should contain the namespace.
{
let db_list = ctx
.catalog()
.repositories()
.await
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
assert_eq!(ns.max_tables, got.max_tables);
assert_eq!(ns.max_columns_per_table, got.max_columns_per_table);
assert!(ns.deleted_at.is_none());
});
}
// The cached entry is not affected, and writes continue to be validated
// against the old value.
//
// https://github.com/influxdata/influxdb_iox/issues/6175
// Writing to second table should succeed while using the cached entry
ctx.write_lp(
"bananas",
"test",
"platanos,tag1=A,tag2=B val=1337i 42424243",
)
.await
.expect("write should succeed");
// The router restarts, and writes with too many columns are then rejected.
let ctx = ctx.restart().await;
let err = ctx
.write_lp(
"bananas",
"test",
"arán_banana,tag1=A,tag2=B val=76i 42424243",
)
.await
.expect_err("cached entry should be removed and write should be blocked");
assert_matches!(
err, router::server::http::Error::DmlHandler(DmlError::Schema(
SchemaError::ServiceLimit(e)
)) => {
let e: CachedServiceProtectionLimit = *e.downcast::<CachedServiceProtectionLimit>().expect("error returned should be a cached service protection limit");
assert_matches!(e, CachedServiceProtectionLimit::Column {
table_name,
max_columns_per_table,
..
} => {
assert_eq!(table_name, "arán_banana");
assert_eq!(max_columns_per_table, 1);
});
}
)
}
#[tokio::test]
async fn test_update_namespace_limit_0_max_tables_max_columns() {
// Initialise a TestContext requiring explicit namespace creation.
let ctx = TestContextBuilder::default().build().await;
// Explicitly create the namespace.
let create = ctx
.grpc_delegate()
.namespace_service()
.create_namespace(Request::new(CreateNamespaceRequest {
name: "bananas_test".to_string(),
retention_period_ns: Some(0),
}))
.await
.expect("failed to create namespace")
.into_inner()
.namespace
.expect("no namespace in response");
// Attempt to use an invalid table limit
let err = ctx
.grpc_delegate()
.namespace_service()
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: "bananas_test".to_string(),
limit_update: Some(
update_namespace_service_protection_limit_request::LimitUpdate::MaxTables(0),
),
},
))
.await
.expect_err("should not have been able to update the table limit to 0");
assert_eq!(err.code(), Code::InvalidArgument);
// Attempt to use an invalid column limit
let err = ctx
.grpc_delegate()
.namespace_service()
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: "bananas_test".to_string(),
limit_update: Some(
update_namespace_service_protection_limit_request::LimitUpdate::MaxColumnsPerTable(0),
),
},
))
.await
.expect_err("should not have been able to update the column per table limit to 0");
assert_eq!(err.code(), Code::InvalidArgument);
// The catalog should contain the namespace unchanged.
{
let db_list = ctx
.catalog()
.repositories()
.await
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), create.id);
assert_eq!(ns.name, create.name);
assert_eq!(ns.retention_period_ns, create.retention_period_ns);
assert_eq!(ns.max_tables, create.max_tables);
assert_eq!(ns.max_columns_per_table, create.max_columns_per_table);
assert!(ns.deleted_at.is_none());
});
}
}

View File

@ -1,9 +1,10 @@
//! Implementation of the namespace gRPC service
use std::sync::Arc;
use data_types::{Namespace as CatalogNamespace, QueryPoolId, TopicId};
use generated_types::influxdata::iox::namespace::v1::*;
use generated_types::influxdata::iox::namespace::v1::{
update_namespace_service_protection_limit_request::LimitUpdate, *,
};
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::{debug, info, warn};
use tonic::{Request, Response, Status};
@ -92,7 +93,7 @@ impl namespace_service_server::NamespaceService for NamespaceService {
"created namespace"
);
Ok(Response::new(create_namespace_to_proto(namespace)))
Ok(Response::new(namespace_to_create_response_proto(namespace)))
}
async fn delete_namespace(
@ -130,7 +131,11 @@ impl namespace_service_server::NamespaceService for NamespaceService {
let retention_period_ns = map_retention_period(retention_period_ns)?;
debug!(%namespace_name, ?retention_period_ns, "Updating namespace retention");
debug!(
%namespace_name,
?retention_period_ns,
"Updating namespace retention",
);
let namespace = repos
.namespaces()
@ -142,7 +147,7 @@ impl namespace_service_server::NamespaceService for NamespaceService {
})?;
info!(
namespace_name,
%namespace_name,
retention_period_ns,
namespace_id = %namespace.id,
"updated namespace retention"
@ -152,6 +157,84 @@ impl namespace_service_server::NamespaceService for NamespaceService {
namespace: Some(namespace_to_proto(namespace)),
}))
}
async fn update_namespace_service_protection_limit(
&self,
request: Request<UpdateNamespaceServiceProtectionLimitRequest>,
) -> Result<Response<UpdateNamespaceServiceProtectionLimitResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let UpdateNamespaceServiceProtectionLimitRequest {
name: namespace_name,
limit_update,
} = request.into_inner();
debug!(
%namespace_name,
?limit_update,
"updating namespace service protection limit",
);
let namespace = match limit_update {
Some(LimitUpdate::MaxTables(n)) => {
if n == 0 {
return Err(Status::invalid_argument(
"max table limit for namespace must be greater than 0",
));
}
repos
.namespaces()
.update_table_limit(&namespace_name, n)
.await
.map_err(|e| {
warn!(
error = %e,
%namespace_name,
table_limit = %n,
"failed to update table limit for namespace",
);
status_from_catalog_namespace_error(e)
})
}
Some(LimitUpdate::MaxColumnsPerTable(n)) => {
if n == 0 {
return Err(Status::invalid_argument(
"max columns per table limit for namespace must be greater than 0",
));
}
repos
.namespaces()
.update_column_limit(&namespace_name, n)
.await
.map_err(|e| {
warn!(
error = %e,
%namespace_name,
per_table_column_limit = %n,
"failed to update per table column limit for namespace",
);
status_from_catalog_namespace_error(e)
})
}
None => Err(Status::invalid_argument(
"unsupported service protection limit change requested",
)),
}?;
info!(
%namespace_name,
namespace_id = %namespace.id,
max_tables = %namespace.max_tables,
max_columns_per_table = %namespace.max_columns_per_table,
"updated namespace service protection limits",
);
Ok(Response::new(
UpdateNamespaceServiceProtectionLimitResponse {
namespace: Some(namespace_to_proto(namespace)),
},
))
}
}
fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace {
@ -159,15 +242,19 @@ fn namespace_to_proto(namespace: CatalogNamespace) -> Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
max_tables: namespace.max_tables,
max_columns_per_table: namespace.max_columns_per_table,
}
}
fn create_namespace_to_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse {
fn namespace_to_create_response_proto(namespace: CatalogNamespace) -> CreateNamespaceResponse {
CreateNamespaceResponse {
namespace: Some(Namespace {
id: namespace.id.get(),
name: namespace.name.clone(),
retention_period_ns: namespace.retention_period_ns,
max_tables: namespace.max_tables,
max_columns_per_table: namespace.max_columns_per_table,
}),
}
}
@ -189,6 +276,15 @@ fn map_retention_period(v: Option<i64>) -> Result<Option<i64>, Status> {
}
}
fn status_from_catalog_namespace_error(err: iox_catalog::interface::Error) -> Status {
match err {
iox_catalog::interface::Error::NamespaceNotFoundByName { .. } => {
Status::not_found(err.to_string())
}
_ => Status::internal(err.to_string()),
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
@ -217,6 +313,32 @@ mod tests {
});
}
#[test]
fn test_namespace_error_mapping() {
let not_found_err = iox_catalog::interface::Error::NamespaceNotFoundByName {
name: String::from("bananas_namespace"),
};
let not_found_msg = not_found_err.to_string();
assert_matches!(
status_from_catalog_namespace_error(not_found_err),
s => {
assert_eq!(s.code(), Code::NotFound);
assert_eq!(s.message(), not_found_msg);
});
let other_err = iox_catalog::interface::Error::ColumnCreateLimitError {
column_name: String::from("quantity"),
table_id: data_types::TableId::new(42),
};
let other_err_msg = other_err.to_string();
assert_matches!(
status_from_catalog_namespace_error(other_err),
s => {
assert_eq!(s.code(), Code::Internal);
assert_eq!(s.message(), other_err_msg);
});
}
#[tokio::test]
async fn test_crud() {
let catalog: Arc<dyn Catalog> =
@ -306,6 +428,47 @@ mod tests {
})
}
// Update the max allowed tables
let want_max_tables = created_ns.max_tables + 42;
let updated_ns = handler
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: NS_NAME.to_string(),
limit_update: Some(LimitUpdate::MaxTables(want_max_tables)),
},
))
.await
.expect("failed to update namespace")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(updated_ns.name, created_ns.name);
assert_eq!(updated_ns.id, created_ns.id);
assert_eq!(updated_ns.max_tables, want_max_tables);
assert_eq!(
updated_ns.max_columns_per_table,
created_ns.max_columns_per_table
);
// Update the max allowed columns per table
let want_max_columns_per_table = created_ns.max_columns_per_table + 7;
let updated_ns = handler
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: NS_NAME.to_string(),
limit_update: Some(LimitUpdate::MaxColumnsPerTable(want_max_columns_per_table)),
},
))
.await
.expect("failed to update namespace")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(updated_ns.name, created_ns.name);
assert_eq!(updated_ns.id, created_ns.id);
assert_eq!(updated_ns.max_tables, want_max_tables);
assert_eq!(updated_ns.max_columns_per_table, want_max_columns_per_table);
// Deleting the namespace should cause it to disappear
handler
.delete_namespace(Request::new(DeleteNamespaceRequest {
@ -325,4 +488,68 @@ mod tests {
assert_matches!(current.as_slice(), []);
}
}
#[tokio::test]
async fn test_reject_invalid_service_protection_limits() {
let catalog: Arc<dyn Catalog> =
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
let topic = catalog
.repositories()
.await
.topics()
.create_or_get("kafka-topic")
.await
.unwrap();
let query_pool = catalog
.repositories()
.await
.query_pools()
.create_or_get("query-pool")
.await
.unwrap();
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
let req = CreateNamespaceRequest {
name: NS_NAME.to_string(),
retention_period_ns: Some(RETENTION),
};
let created_ns = handler
.create_namespace(Request::new(req))
.await
.expect("failed to create namespace")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(created_ns.name, NS_NAME);
assert_eq!(created_ns.retention_period_ns, Some(RETENTION));
assert_ne!(created_ns.max_tables, 0);
assert_ne!(created_ns.max_columns_per_table, 0);
// The handler should reject any attempt to set the table limit to zero.
let status = handler
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: NS_NAME.to_string(),
limit_update: Some(LimitUpdate::MaxTables(0)),
},
))
.await
.expect_err("invalid namespace update request for max table limit should fail");
assert_eq!(status.code(), Code::InvalidArgument);
// ...and likewise should reject any attempt to set the column per table limit to zero.
let status = handler
.update_namespace_service_protection_limit(Request::new(
UpdateNamespaceServiceProtectionLimitRequest {
name: NS_NAME.to_string(),
limit_update: Some(LimitUpdate::MaxColumnsPerTable(0)),
},
))
.await
.expect_err(
"invalid namespace update request for max columns per table limit should fail",
);
assert_eq!(status.code(), Code::InvalidArgument);
}
}