feat: Allow passing service protection limits in create db gRPC call (#7941)
* feat: Allow passing service protection limits in create db gRPC call * fix: Move the impl into the catalog namespace trait --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
4ab407ef1b
commit
d26ad8e079
|
@ -308,6 +308,30 @@ pub struct Namespace {
|
|||
pub partition_template: NamespacePartitionTemplateOverride,
|
||||
}
|
||||
|
||||
use generated_types::influxdata::iox::namespace::v1 as namespace_proto;
|
||||
|
||||
/// Overrides for service protection limits.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct NamespaceServiceProtectionLimitsOverride {
|
||||
/// The maximum number of tables that can exist in this namespace
|
||||
pub max_tables: Option<i32>,
|
||||
/// The maximum number of columns per table in this namespace
|
||||
pub max_columns_per_table: Option<i32>,
|
||||
}
|
||||
|
||||
impl From<namespace_proto::ServiceProtectionLimits> for NamespaceServiceProtectionLimitsOverride {
|
||||
fn from(value: namespace_proto::ServiceProtectionLimits) -> Self {
|
||||
let namespace_proto::ServiceProtectionLimits {
|
||||
max_tables,
|
||||
max_columns_per_table,
|
||||
} = value;
|
||||
Self {
|
||||
max_tables,
|
||||
max_columns_per_table,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Schema collection for a namespace. This is an in-memory object useful for a schema
|
||||
/// cache.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
|
|
|
@ -15,19 +15,19 @@ service NamespaceService {
|
|||
rpc DeleteNamespace(DeleteNamespaceRequest) returns (DeleteNamespaceResponse);
|
||||
|
||||
// Update retention period
|
||||
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse);
|
||||
rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest)
|
||||
returns (UpdateNamespaceRetentionResponse);
|
||||
|
||||
// Update a service protection limit of a namespace. For this change to take
|
||||
// effect, all routers MUST be restarted
|
||||
rpc UpdateNamespaceServiceProtectionLimit(UpdateNamespaceServiceProtectionLimitRequest) returns (UpdateNamespaceServiceProtectionLimitResponse);
|
||||
rpc UpdateNamespaceServiceProtectionLimit(
|
||||
UpdateNamespaceServiceProtectionLimitRequest)
|
||||
returns (UpdateNamespaceServiceProtectionLimitResponse);
|
||||
}
|
||||
|
||||
message GetNamespacesRequest {
|
||||
}
|
||||
message GetNamespacesRequest {}
|
||||
|
||||
message GetNamespacesResponse {
|
||||
repeated Namespace namespaces = 1;
|
||||
}
|
||||
message GetNamespacesResponse { repeated Namespace namespaces = 1; }
|
||||
|
||||
message CreateNamespaceRequest {
|
||||
// Name of the namespace to be created
|
||||
|
@ -40,20 +40,20 @@ message CreateNamespaceRequest {
|
|||
optional int64 retention_period_ns = 2;
|
||||
|
||||
// Partitioning scheme to use for tables created in this namespace
|
||||
optional influxdata.iox.partition_template.v1.PartitionTemplate partition_template = 3;
|
||||
optional influxdata.iox.partition_template.v1.PartitionTemplate
|
||||
partition_template = 3;
|
||||
|
||||
ServiceProtectionLimits service_protection_limits = 4;
|
||||
}
|
||||
|
||||
message CreateNamespaceResponse {
|
||||
Namespace namespace = 1;
|
||||
}
|
||||
message CreateNamespaceResponse { Namespace namespace = 1; }
|
||||
|
||||
message DeleteNamespaceRequest {
|
||||
// Name of the namespace to be deleted
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message DeleteNamespaceResponse {
|
||||
}
|
||||
message DeleteNamespaceResponse {}
|
||||
|
||||
message UpdateNamespaceRetentionRequest {
|
||||
// Name of the namespace to be set
|
||||
|
@ -66,9 +66,7 @@ message UpdateNamespaceRetentionRequest {
|
|||
optional int64 retention_period_ns = 2;
|
||||
}
|
||||
|
||||
message UpdateNamespaceRetentionResponse {
|
||||
Namespace namespace = 1;
|
||||
}
|
||||
message UpdateNamespaceRetentionResponse { Namespace namespace = 1; }
|
||||
|
||||
message UpdateNamespaceServiceProtectionLimitRequest {
|
||||
// Namespace to have its service protection limits updated.
|
||||
|
@ -78,7 +76,8 @@ message UpdateNamespaceServiceProtectionLimitRequest {
|
|||
oneof limit_update {
|
||||
// Change the maximum number of tables the namespace may have.
|
||||
int32 max_tables = 2;
|
||||
// Change the maximum number of columns each table in the namespace may have.
|
||||
// Change the maximum number of columns each table in the namespace may
|
||||
// have.
|
||||
int32 max_columns_per_table = 3;
|
||||
}
|
||||
}
|
||||
|
@ -87,6 +86,14 @@ message UpdateNamespaceServiceProtectionLimitResponse {
|
|||
Namespace namespace = 1;
|
||||
}
|
||||
|
||||
message ServiceProtectionLimits {
|
||||
// Change the maximum number of tables the namespace may have.
|
||||
optional int32 max_tables = 2;
|
||||
// Change the maximum number of columns each table in the namespace may
|
||||
// have.
|
||||
optional int32 max_columns_per_table = 3;
|
||||
}
|
||||
|
||||
message Namespace {
|
||||
// Namespace ID
|
||||
int64 id = 1;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use influxdb_iox_client::connection::Connection;
|
||||
|
||||
use crate::commands::namespace::Result;
|
||||
use influxdb_iox_client::namespace::generated_types::ServiceProtectionLimits;
|
||||
|
||||
/// Write data into the specified database
|
||||
#[derive(Debug, clap::Parser)]
|
||||
|
@ -19,12 +20,43 @@ pub struct Config {
|
|||
default_value = "0"
|
||||
)]
|
||||
retention_hours: u32,
|
||||
|
||||
#[clap(flatten)]
|
||||
service_protection_limits: ServiceProtectionLimitsArgs,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Args)]
|
||||
pub struct ServiceProtectionLimitsArgs {
|
||||
/// The maximum number of tables to allow for this namespace
|
||||
#[clap(action, long = "max-tables", short = 't')]
|
||||
max_tables: Option<i32>,
|
||||
|
||||
/// The maximum number of columns to allow per table for this namespace
|
||||
#[clap(action, long = "max-columns-per-table", short = 'c')]
|
||||
max_columns_per_table: Option<i32>,
|
||||
}
|
||||
|
||||
impl From<ServiceProtectionLimitsArgs> for Option<ServiceProtectionLimits> {
|
||||
fn from(value: ServiceProtectionLimitsArgs) -> Self {
|
||||
let ServiceProtectionLimitsArgs {
|
||||
max_tables,
|
||||
max_columns_per_table,
|
||||
} = value;
|
||||
if max_tables.is_none() && max_columns_per_table.is_none() {
|
||||
return None;
|
||||
}
|
||||
Some(ServiceProtectionLimits {
|
||||
max_tables,
|
||||
max_columns_per_table,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
let Config {
|
||||
namespace,
|
||||
retention_hours,
|
||||
service_protection_limits,
|
||||
} = config;
|
||||
|
||||
let mut client = influxdb_iox_client::namespace::Client::new(connection);
|
||||
|
@ -37,7 +69,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
// internally
|
||||
Some(retention_hours as i64 * 60 * 60 * 1_000_000_000)
|
||||
};
|
||||
let namespace = client.create_namespace(&namespace, retention).await?;
|
||||
let namespace = client
|
||||
.create_namespace(&namespace, retention, service_protection_limits.into())
|
||||
.await?;
|
||||
println!("{}", serde_json::to_string_pretty(&namespace)?);
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -25,7 +25,7 @@ pub struct Config {
|
|||
.required(true)
|
||||
.args(&["max_tables", "max_columns_per_table"])
|
||||
))]
|
||||
struct Args {
|
||||
pub struct Args {
|
||||
/// The maximum number of tables to allow for this namespace
|
||||
#[clap(action, long = "max-tables", short = 't', group = "limit")]
|
||||
max_tables: Option<i32>,
|
||||
|
|
|
@ -905,6 +905,120 @@ async fn query_ingester() {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Test setting service limits while creating namespaces
|
||||
#[tokio::test]
|
||||
async fn namespace_create_service_limits() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
let mut cluster = MiniCluster::create_shared(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
let addr = state.cluster().router().router_grpc_base().to_string();
|
||||
let namespace = "ns1";
|
||||
|
||||
// {
|
||||
// "id": <foo>,
|
||||
// "name": "ns1",
|
||||
// "serviceProtectionLimits": {
|
||||
// "maxTables": 123,
|
||||
// "maxColumnsPerTable": 200
|
||||
// }
|
||||
// }
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&addr)
|
||||
.arg("namespace")
|
||||
.arg("create")
|
||||
.arg(namespace)
|
||||
.arg("--max-tables")
|
||||
.arg("123")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
predicate::str::contains(namespace)
|
||||
.and(predicate::str::contains(r#""maxTables": 123"#))
|
||||
.and(predicate::str::contains(r#""maxColumnsPerTable": 200"#)),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
let addr = state.cluster().router().router_grpc_base().to_string();
|
||||
let namespace = "ns2";
|
||||
|
||||
// {
|
||||
// "id": <foo>,
|
||||
// "name": "ns2",
|
||||
// "serviceProtectionLimits": {
|
||||
// "maxTables": 500,
|
||||
// "maxColumnsPerTable": 321
|
||||
// }
|
||||
// }
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&addr)
|
||||
.arg("namespace")
|
||||
.arg("create")
|
||||
.arg(namespace)
|
||||
.arg("--max-columns-per-table")
|
||||
.arg("321")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
predicate::str::contains(namespace)
|
||||
.and(predicate::str::contains(r#""maxTables": 500"#))
|
||||
.and(predicate::str::contains(r#""maxColumnsPerTable": 321"#)),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
Step::Custom(Box::new(|state: &mut StepTestState| {
|
||||
async {
|
||||
let addr = state.cluster().router().router_grpc_base().to_string();
|
||||
let namespace = "ns3";
|
||||
|
||||
// {
|
||||
// "id": <foo>,
|
||||
// "name": "ns3",
|
||||
// "serviceProtectionLimits": {
|
||||
// "maxTables": 123,
|
||||
// "maxColumnsPerTable": 321
|
||||
// }
|
||||
// }
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(&addr)
|
||||
.arg("namespace")
|
||||
.arg("create")
|
||||
.arg(namespace)
|
||||
.arg("--max-tables")
|
||||
.arg("123")
|
||||
.arg("--max-columns-per-table")
|
||||
.arg("321")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
predicate::str::contains(namespace)
|
||||
.and(predicate::str::contains(r#""maxTables": 123"#))
|
||||
.and(predicate::str::contains(r#""maxColumnsPerTable": 321"#)),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Test the namespace update service limit command
|
||||
#[tokio::test]
|
||||
async fn namespace_update_service_limit() {
|
||||
|
|
|
@ -64,7 +64,10 @@ async fn soft_deletion() {
|
|||
state.cluster().router().router_grpc_connection(),
|
||||
);
|
||||
let namespace_name = state.cluster().namespace();
|
||||
client.create_namespace(namespace_name, None).await.unwrap();
|
||||
client
|
||||
.create_namespace(namespace_name, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let namespaces = client.get_namespaces().await.unwrap();
|
||||
let created_namespace = namespaces
|
||||
.iter()
|
||||
|
@ -192,7 +195,7 @@ async fn soft_deletion() {
|
|||
let namespace_name = state.cluster().namespace();
|
||||
|
||||
let error = client
|
||||
.create_namespace(namespace_name, None)
|
||||
.create_namespace(namespace_name, None, None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(
|
||||
|
|
|
@ -44,6 +44,7 @@ impl Client {
|
|||
&mut self,
|
||||
namespace: &str,
|
||||
retention_period_ns: Option<i64>,
|
||||
service_protection_limits: Option<ServiceProtectionLimits>,
|
||||
) -> Result<Namespace, Error> {
|
||||
let response = self
|
||||
.inner
|
||||
|
@ -51,6 +52,7 @@ impl Client {
|
|||
name: namespace.to_string(),
|
||||
retention_period_ns,
|
||||
partition_template: None,
|
||||
service_protection_limits,
|
||||
})
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -4,8 +4,9 @@ use async_trait::async_trait;
|
|||
use data_types::{
|
||||
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
|
||||
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
|
||||
PartitionKey, SkippedCompaction, Table, TableId, TableSchema, Timestamp,
|
||||
NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
TableSchema, Timestamp,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
@ -256,6 +257,7 @@ pub trait NamespaceRepo: Send + Sync {
|
|||
name: &NamespaceName<'_>,
|
||||
partition_template: Option<NamespacePartitionTemplateOverride>,
|
||||
retention_period_ns: Option<i64>,
|
||||
service_protection_limits: Option<NamespaceServiceProtectionLimitsOverride>,
|
||||
) -> Result<Namespace>;
|
||||
|
||||
/// Update retention period for a namespace
|
||||
|
@ -746,7 +748,7 @@ pub(crate) mod test_helpers {
|
|||
let namespace_name = NamespaceName::new("test_namespace").unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(&namespace_name, None, None)
|
||||
.create(&namespace_name, None, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(namespace.id > NamespaceId::new(0));
|
||||
|
@ -770,7 +772,10 @@ pub(crate) mod test_helpers {
|
|||
DEFAULT_MAX_COLUMNS_PER_TABLE
|
||||
);
|
||||
|
||||
let conflict = repos.namespaces().create(&namespace_name, None, None).await;
|
||||
let conflict = repos
|
||||
.namespaces()
|
||||
.create(&namespace_name, None, None, None)
|
||||
.await;
|
||||
assert!(matches!(
|
||||
conflict.unwrap_err(),
|
||||
Error::NameExists { name: _ }
|
||||
|
@ -857,7 +862,7 @@ pub(crate) mod test_helpers {
|
|||
let namespace4_name = NamespaceName::new("test_namespace4").unwrap();
|
||||
let namespace4 = repos
|
||||
.namespaces()
|
||||
.create(&namespace4_name, None, Some(NEW_RETENTION_PERIOD_NS))
|
||||
.create(&namespace4_name, None, Some(NEW_RETENTION_PERIOD_NS), None)
|
||||
.await
|
||||
.expect("namespace with 5-hour retention should be created");
|
||||
assert_eq!(
|
||||
|
@ -882,7 +887,12 @@ pub(crate) mod test_helpers {
|
|||
let namespace5_name = NamespaceName::new("test_namespace5").unwrap();
|
||||
let namespace5 = repos
|
||||
.namespaces()
|
||||
.create(&namespace5_name, Some(tag_partition_template.clone()), None)
|
||||
.create(
|
||||
&namespace5_name,
|
||||
Some(tag_partition_template.clone()),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(namespace5.partition_template, tag_partition_template);
|
||||
|
@ -1282,6 +1292,7 @@ pub(crate) mod test_helpers {
|
|||
&custom_namespace_name,
|
||||
Some(custom_namespace_template.clone()),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2165,6 +2176,7 @@ pub(crate) mod test_helpers {
|
|||
&NamespaceName::new("retention_broken_2").unwrap(),
|
||||
None,
|
||||
Some(1),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2988,7 +3000,12 @@ pub(crate) mod test_helpers {
|
|||
{
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(&NamespaceName::new(namespace_name).unwrap(), None, None)
|
||||
.create(
|
||||
&NamespaceName::new(namespace_name).unwrap(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let namespace = match namespace {
|
||||
|
|
|
@ -289,7 +289,7 @@ pub mod test_helpers {
|
|||
let namespace_name = NamespaceName::new(name).unwrap();
|
||||
repos
|
||||
.namespaces()
|
||||
.create(&namespace_name, None, None)
|
||||
.create(&namespace_name, None, None, None)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ use data_types::{
|
|||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
||||
SkippedCompaction, Table, TableId, Timestamp,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use snafu::ensure;
|
||||
|
@ -147,6 +147,7 @@ impl NamespaceRepo for MemTxn {
|
|||
name: &NamespaceName<'_>,
|
||||
partition_template: Option<NamespacePartitionTemplateOverride>,
|
||||
retention_period_ns: Option<i64>,
|
||||
service_protection_limits: Option<NamespaceServiceProtectionLimitsOverride>,
|
||||
) -> Result<Namespace> {
|
||||
let stage = self.stage();
|
||||
|
||||
|
@ -156,11 +157,14 @@ impl NamespaceRepo for MemTxn {
|
|||
});
|
||||
}
|
||||
|
||||
let max_tables = service_protection_limits.and_then(|l| l.max_tables);
|
||||
let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table);
|
||||
|
||||
let namespace = Namespace {
|
||||
id: NamespaceId::new(stage.namespaces.len() as i64 + 1),
|
||||
name: name.to_string(),
|
||||
max_tables: DEFAULT_MAX_TABLES,
|
||||
max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE,
|
||||
max_tables: max_tables.unwrap_or(DEFAULT_MAX_TABLES),
|
||||
max_columns_per_table: max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE),
|
||||
retention_period_ns,
|
||||
deleted_at: None,
|
||||
partition_template: partition_template.unwrap_or_default(),
|
||||
|
|
|
@ -7,9 +7,9 @@ use crate::interface::{
|
|||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
||||
Table, TableId, Timestamp,
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{DurationHistogram, Metric};
|
||||
|
@ -132,7 +132,7 @@ macro_rules! decorate {
|
|||
decorate!(
|
||||
impl_trait = NamespaceRepo,
|
||||
methods = [
|
||||
"namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option<NamespacePartitionTemplateOverride>, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
||||
"namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option<NamespacePartitionTemplateOverride>, retention_period_ns: Option<i64>, service_protection_limits: Option<NamespaceServiceProtectionLimitsOverride>) -> Result<Namespace>;
|
||||
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
|
||||
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
|
||||
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;
|
||||
|
|
|
@ -19,9 +19,10 @@ use data_types::{
|
|||
partition_template::{
|
||||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, ParquetFile,
|
||||
ParquetFileExists, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
|
||||
SkippedCompaction, Table, TableId, Timestamp,
|
||||
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
|
||||
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileExists, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
|
||||
Timestamp,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
|
@ -564,13 +565,17 @@ impl NamespaceRepo for PostgresTxn {
|
|||
name: &NamespaceName<'_>,
|
||||
partition_template: Option<NamespacePartitionTemplateOverride>,
|
||||
retention_period_ns: Option<i64>,
|
||||
service_protection_limits: Option<NamespaceServiceProtectionLimitsOverride>,
|
||||
) -> Result<Namespace> {
|
||||
let max_tables = service_protection_limits.and_then(|l| l.max_tables);
|
||||
let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table);
|
||||
|
||||
let rec = sqlx::query_as::<_, Namespace>(
|
||||
r#"
|
||||
INSERT INTO namespace (
|
||||
name, topic_id, query_pool_id, retention_period_ns, max_tables, partition_template
|
||||
name, topic_id, query_pool_id, retention_period_ns, max_tables, max_columns_per_table, partition_template
|
||||
)
|
||||
VALUES ( $1, $2, $3, $4, $5, $6 )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6, $7 )
|
||||
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at,
|
||||
partition_template;
|
||||
"#,
|
||||
|
@ -579,8 +584,9 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
.bind(SHARED_TOPIC_ID) // $2
|
||||
.bind(SHARED_QUERY_POOL_ID) // $3
|
||||
.bind(retention_period_ns) // $4
|
||||
.bind(DEFAULT_MAX_TABLES) // $5
|
||||
.bind(partition_template); // $6
|
||||
.bind(max_tables.unwrap_or(DEFAULT_MAX_TABLES)) // $5
|
||||
.bind(max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE)) // $6
|
||||
.bind(partition_template); // $7
|
||||
|
||||
let rec = rec.fetch_one(&mut self.inner).await.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
|
@ -594,10 +600,6 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
}
|
||||
})?;
|
||||
|
||||
// Ensure the column default values match the code values.
|
||||
debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES);
|
||||
debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE);
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
|
||||
|
@ -2307,6 +2309,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&"lemons".try_into().unwrap(),
|
||||
None, // no partition template
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2347,6 +2350,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&namespace_custom_template_name.try_into().unwrap(),
|
||||
Some(custom_partition_template_equal_to_default.clone()),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2385,6 +2389,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&namespace_default_template_name.try_into().unwrap(),
|
||||
None, // no partition template
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2403,6 +2408,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
.unwrap(),
|
||||
),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -19,8 +19,9 @@ use data_types::{
|
|||
NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, TemplatePart,
|
||||
},
|
||||
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
|
||||
NamespaceName, ParquetFile, ParquetFileExists, ParquetFileId, ParquetFileParams, Partition,
|
||||
PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp,
|
||||
NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileExists,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction,
|
||||
Table, TableId, Timestamp,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
@ -266,11 +267,15 @@ impl NamespaceRepo for SqliteTxn {
|
|||
name: &NamespaceName<'_>,
|
||||
partition_template: Option<NamespacePartitionTemplateOverride>,
|
||||
retention_period_ns: Option<i64>,
|
||||
service_protection_limits: Option<NamespaceServiceProtectionLimitsOverride>,
|
||||
) -> Result<Namespace> {
|
||||
let max_tables = service_protection_limits.and_then(|l| l.max_tables);
|
||||
let max_columns_per_table = service_protection_limits.and_then(|l| l.max_columns_per_table);
|
||||
|
||||
let rec = sqlx::query_as::<_, Namespace>(
|
||||
r#"
|
||||
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables, partition_template )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6 )
|
||||
INSERT INTO namespace ( name, topic_id, query_pool_id, retention_period_ns, max_tables, max_columns_per_table, partition_template )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6, $7 )
|
||||
RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, deleted_at,
|
||||
partition_template;
|
||||
"#,
|
||||
|
@ -279,8 +284,9 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
.bind(SHARED_TOPIC_ID) // $2
|
||||
.bind(SHARED_QUERY_POOL_ID) // $3
|
||||
.bind(retention_period_ns) // $4
|
||||
.bind(DEFAULT_MAX_TABLES) // $5
|
||||
.bind(partition_template); // $6
|
||||
.bind(max_tables.unwrap_or(DEFAULT_MAX_TABLES)) // $5
|
||||
.bind(max_columns_per_table.unwrap_or(DEFAULT_MAX_COLUMNS_PER_TABLE)) // $6
|
||||
.bind(partition_template); // $7
|
||||
|
||||
let rec = rec.fetch_one(self.inner.get_mut()).await.map_err(|e| {
|
||||
if is_unique_violation(&e) {
|
||||
|
@ -294,10 +300,6 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
}
|
||||
})?;
|
||||
|
||||
// Ensure the column default values match the code values.
|
||||
debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES);
|
||||
debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE);
|
||||
|
||||
Ok(rec)
|
||||
}
|
||||
|
||||
|
@ -1910,6 +1912,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&"lemons".try_into().unwrap(),
|
||||
None, // no partition template
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1950,6 +1953,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&namespace_custom_template_name.try_into().unwrap(),
|
||||
Some(custom_partition_template_equal_to_default.clone()),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1986,6 +1990,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
&namespace_default_template_name.try_into().unwrap(),
|
||||
None, // no partition template
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -2004,6 +2009,7 @@ RETURNING id, name, retention_period_ns, max_tables, max_columns_per_table, dele
|
|||
.unwrap(),
|
||||
),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
@ -148,7 +148,7 @@ impl TestCatalog {
|
|||
let namespace_name = NamespaceName::new(name).unwrap();
|
||||
let namespace = repos
|
||||
.namespaces()
|
||||
.create(&namespace_name, None, retention_period_ns)
|
||||
.create(&namespace_name, None, retention_period_ns, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -168,7 +168,7 @@ mod tests {
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create(&ns, None, iox_catalog::DEFAULT_RETENTION_PERIOD,)
|
||||
.create(&ns, None, iox_catalog::DEFAULT_RETENTION_PERIOD, None)
|
||||
.await,
|
||||
Ok(_)
|
||||
);
|
||||
|
|
|
@ -144,7 +144,7 @@ mod tests {
|
|||
let mut repos = catalog.repositories().await;
|
||||
repos
|
||||
.namespaces()
|
||||
.create(&ns, None, None)
|
||||
.create(&ns, None, None, None)
|
||||
.await
|
||||
.expect("failed to setup catalog state");
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ mod tests {
|
|||
let mut repos = catalog.repositories().await;
|
||||
repos
|
||||
.namespaces()
|
||||
.create(&ns, None, None)
|
||||
.create(&ns, None, None, None)
|
||||
.await
|
||||
.expect("failed to setup catalog state");
|
||||
repos
|
||||
|
|
|
@ -107,7 +107,7 @@ where
|
|||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create(namespace, None, retention_period_ns)
|
||||
.create(namespace, None, retention_period_ns, None)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
|
|
|
@ -88,6 +88,7 @@ async fn test_namespace_create() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
|
@ -161,6 +162,7 @@ async fn test_namespace_delete() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
|
@ -291,6 +293,7 @@ async fn test_create_namespace_0_retention_period() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(0), // A zero!
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
|
@ -356,6 +359,7 @@ async fn test_create_namespace_negative_retention_period() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(-42),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let err = ctx
|
||||
.grpc_delegate()
|
||||
|
@ -420,6 +424,7 @@ async fn test_update_namespace_0_retention_period() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(42),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
|
@ -526,6 +531,7 @@ async fn test_update_namespace_negative_retention_period() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(42),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
|
@ -796,6 +802,7 @@ async fn test_update_namespace_limit_0_max_tables_max_columns() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(0),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
|
@ -868,6 +875,7 @@ async fn test_table_create() {
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let namespace = ctx
|
||||
.grpc_delegate()
|
||||
|
@ -955,6 +963,7 @@ async fn test_invalid_strftime_partition_template() {
|
|||
part: Some(template_part::Part::TimeFormat("%3F".into())),
|
||||
}],
|
||||
}),
|
||||
service_protection_limits: None,
|
||||
};
|
||||
|
||||
// Check namespace creation returned an error
|
||||
|
@ -986,6 +995,7 @@ async fn test_namespace_partition_template_implicit_table_creation() {
|
|||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
service_protection_limits: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
|
@ -1036,6 +1046,7 @@ async fn test_namespace_partition_template_explicit_table_creation_without_parti
|
|||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
service_protection_limits: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
|
@ -1101,6 +1112,7 @@ async fn test_namespace_partition_template_explicit_table_creation_with_partitio
|
|||
part: Some(template_part::Part::TagValue("tag1".into())),
|
||||
}],
|
||||
}),
|
||||
service_protection_limits: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
|
@ -1166,6 +1178,7 @@ async fn test_namespace_without_partition_template_table_with_partition_template
|
|||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
ctx.grpc_delegate()
|
||||
.namespace_service()
|
||||
|
|
|
@ -350,6 +350,7 @@ async fn test_delete_unsupported() {
|
|||
&data_types::NamespaceName::new("bananas_test").unwrap(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("failed to update table limit");
|
||||
|
|
|
@ -20,7 +20,7 @@ use std::sync::Arc;
|
|||
|
||||
use data_types::{
|
||||
partition_template::NamespacePartitionTemplateOverride, Namespace as CatalogNamespace,
|
||||
NamespaceName,
|
||||
NamespaceName, NamespaceServiceProtectionLimitsOverride,
|
||||
};
|
||||
use generated_types::influxdata::iox::namespace::v1::{
|
||||
update_namespace_service_protection_limit_request::LimitUpdate, *,
|
||||
|
@ -74,6 +74,7 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
|||
name: namespace_name,
|
||||
retention_period_ns,
|
||||
partition_template,
|
||||
service_protection_limits,
|
||||
} = request.into_inner();
|
||||
|
||||
// Ensure the namespace name is consistently processed within IOx - this
|
||||
|
@ -94,6 +95,7 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
|||
.transpose()
|
||||
.map_err(|v| Status::invalid_argument(v.to_string()))?,
|
||||
retention_period_ns,
|
||||
service_protection_limits.map(NamespaceServiceProtectionLimitsOverride::from),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
@ -383,6 +385,7 @@ mod tests {
|
|||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let created_ns = handler
|
||||
.create_namespace(Request::new(req))
|
||||
|
@ -507,6 +510,7 @@ mod tests {
|
|||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
|
||||
let created_ns = handler
|
||||
|
@ -553,6 +557,7 @@ mod tests {
|
|||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: None,
|
||||
partition_template: Some(PartitionTemplate { parts: vec![] }),
|
||||
service_protection_limits: None,
|
||||
};
|
||||
|
||||
let error = handler
|
||||
|
@ -586,6 +591,7 @@ mod tests {
|
|||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
let created_ns = handler
|
||||
.create_namespace(Request::new(req))
|
||||
|
@ -626,6 +632,37 @@ mod tests {
|
|||
assert_eq!(status.code(), Code::InvalidArgument);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_with_service_protection_limits() {
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
|
||||
|
||||
let max_tables = 123;
|
||||
let max_columns_per_table = 321;
|
||||
|
||||
let handler = NamespaceService::new(catalog);
|
||||
let req = CreateNamespaceRequest {
|
||||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: Some(ServiceProtectionLimits {
|
||||
max_tables: Some(max_tables),
|
||||
max_columns_per_table: Some(max_columns_per_table),
|
||||
}),
|
||||
};
|
||||
let created_ns = handler
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
assert_eq!(created_ns.name, NS_NAME);
|
||||
assert_eq!(created_ns.retention_period_ns, Some(RETENTION));
|
||||
assert_eq!(created_ns.max_tables, max_tables);
|
||||
assert_eq!(created_ns.max_columns_per_table, max_columns_per_table);
|
||||
}
|
||||
|
||||
macro_rules! test_create_namespace_name {
|
||||
(
|
||||
$test_name:ident,
|
||||
|
@ -646,6 +683,7 @@ mod tests {
|
|||
name: String::from($name),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
partition_template: None,
|
||||
service_protection_limits: None,
|
||||
};
|
||||
|
||||
let got = handler.create_namespace(Request::new(req)).await;
|
||||
|
|
|
@ -330,6 +330,7 @@ mod tests {
|
|||
.unwrap(),
|
||||
),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
Loading…
Reference in New Issue