diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index dd047bb999..8a37e4cb20 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -442,6 +442,8 @@ pub struct Namespace { /// never drop data). #[sqlx(default)] pub retention_duration: Option, + /// The retention period in ns. None represents infinite duration (i.e. never drop data). + pub retention_period_ns: Option, /// The topic that writes to this namespace will land in pub topic_id: TopicId, /// The query pool assigned to answer queries for this namespace diff --git a/generated_types/protos/influxdata/iox/schema/v1/service.proto b/generated_types/protos/influxdata/iox/schema/v1/service.proto index a82c72c7b1..f981f969a0 100644 --- a/generated_types/protos/influxdata/iox/schema/v1/service.proto +++ b/generated_types/protos/influxdata/iox/schema/v1/service.proto @@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/schema/v1"; service SchemaService { // Get the schema for a namespace rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse); + + // update retention period + rpc UpdateNamespaceRetention(UpdateNamespaceRetentionRequest) returns (UpdateNamespaceRetentionResponse); } message GetSchemaRequest { @@ -61,3 +64,25 @@ message ColumnSchema { } } +message UpdateNamespaceRetentionRequest { + // Name of the namespace to be set + string name = 1; + + // Number of hours of the retention period + int64 retention_hours = 2; +} + +message UpdateNamespaceRetentionResponse { + Namespace namespace = 1; +} + +message Namespace { + // Namespace ID + int64 id = 1; + + // Name of the Namespace + string name = 2; + + // Retention period ns + optional int64 retention_period_ns = 3; +} \ No newline at end of file diff --git a/influxdb_iox/src/commands/namespace.rs b/influxdb_iox/src/commands/namespace/mod.rs similarity index 79% rename from influxdb_iox/src/commands/namespace.rs rename to influxdb_iox/src/commands/namespace/mod.rs index e5a679edc6..37438a8ce0 100644 --- a/influxdb_iox/src/commands/namespace.rs +++ b/influxdb_iox/src/commands/namespace/mod.rs @@ -3,6 +3,8 @@ use influxdb_iox_client::{connection::Connection, namespace}; use thiserror::Error; +mod retention; + #[allow(clippy::enum_variant_names)] #[derive(Debug, Error)] pub enum Error { @@ -25,17 +27,22 @@ pub struct Config { enum Command { /// Fetch namespaces List, + + /// Update retention of an existing namespace + Retention(retention::Config), } pub async fn command(connection: Connection, config: Config) -> Result<(), Error> { - let mut client = namespace::Client::new(connection); match config.command { Command::List => { + let mut client = namespace::Client::new(connection); let namespaces = client.get_namespaces().await?; println!("{}", serde_json::to_string_pretty(&namespaces)?); + } + Command::Retention(config) => { + retention::command(connection, config).await?; } // Deliberately not adding _ => so the compiler will direct people here to impl new // commands } - Ok(()) } diff --git a/influxdb_iox/src/commands/namespace/retention.rs b/influxdb_iox/src/commands/namespace/retention.rs new file mode 100644 index 0000000000..e7b97aa141 --- /dev/null +++ b/influxdb_iox/src/commands/namespace/retention.rs @@ -0,0 +1,42 @@ +use influxdb_iox_client::connection::Connection; +use thiserror::Error; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Error)] +pub enum Error { + #[error("JSON Serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("Client error: {0}")] + ClientError(#[from] influxdb_iox_client::error::Error), +} + +/// Write data into the specified database +#[derive(Debug, clap::Parser)] +pub struct Config { + /// The namespace to update the retention period for + #[clap(action)] + namespace: String, + + /// Num of hours of the retention period of this namespace. Default is 0 representing infinite retention + #[clap(action, long, short = 'c', default_value = "0")] + retention_hours: u32, +} + +pub async fn command( + connection: Connection, + config: Config, +) -> Result<(), crate::commands::namespace::Error> { + let Config { + namespace, + retention_hours, + } = config; + + let mut client = influxdb_iox_client::schema::Client::new(connection); + let namespace = client + .update_namespace_retention(&namespace, retention_hours.try_into().unwrap()) + .await?; + println!("{}", serde_json::to_string_pretty(&namespace)?); + + Ok(()) +} diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 95e9a28694..8e69eb6d46 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -570,6 +570,91 @@ async fn namespaces_cli() { .await } +/// Test the namespace retention command +#[tokio::test] +async fn namespace_retention() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + let mut cluster = MiniCluster::create_shared(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(String::from( + "my_awesome_table2,tag1=A,tag2=B val=42i 123456", + )), + // Set the retention period to 2 hours + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let addr = state.cluster().router().router_grpc_base().to_string(); + let namespace = state.cluster().namespace(); + let retention_period_hours = 2; + let retention_period_ns = + retention_period_hours as i64 * 60 * 60 * 1_000_000_000; + + // Validate the output of the namespace retention command + // + // { + // "id": "1", + // "name": "0911430016317810_8303971312605107", + // "retentionPeriodNs": "7200000000000" + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("retention") + .arg("--retention-hours") + .arg(retention_period_hours.to_string()) + .arg(&namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains(&retention_period_ns.to_string())), + ); + } + .boxed() + })), + // set the retention period to null + Step::Custom(Box::new(|state: &mut StepTestState| { + async { + let addr = state.cluster().router().router_grpc_base().to_string(); + let namespace = state.cluster().namespace(); + let retention_period_hours = 0; // will be updated to null + + // Validate the output of the namespace retention command + // + // { + // "id": "1", + // "name": "6699752880299094_1206270074309156" + // } + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(&addr) + .arg("namespace") + .arg("retention") + .arg("--retention-hours") + .arg(retention_period_hours.to_string()) + .arg(&namespace) + .assert() + .success() + .stdout( + predicate::str::contains(namespace) + .and(predicate::str::contains("retentionPeriodNs".to_string())) + .not(), + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + /// Test the query_ingester CLI command #[tokio::test] async fn query_ingester() { diff --git a/influxdb_iox_client/src/client/schema.rs b/influxdb_iox_client/src/client/schema.rs index 8f9e7bcd8b..e73677a604 100644 --- a/influxdb_iox_client/src/client/schema.rs +++ b/influxdb_iox_client/src/client/schema.rs @@ -35,4 +35,21 @@ impl Client { Ok(response.into_inner().schema.unwrap_field("schema")?) } + + /// Update retention for a namespace + pub async fn update_namespace_retention( + &mut self, + namespace: &str, + retention_hours: i64, + ) -> Result { + let response = self + .inner + .update_namespace_retention(UpdateNamespaceRetentionRequest { + name: namespace.to_string(), + retention_hours, + }) + .await?; + + Ok(response.into_inner().namespace.unwrap_field("namespace")?) + } } diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index e9196b87d4..446e06d298 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -295,6 +295,13 @@ pub trait NamespaceRepo: Send + Sync { query_pool_id: QueryPoolId, ) -> Result; + /// Update retention period for a namespace + async fn update_retention_period( + &mut self, + name: &str, + retention_hours: i64, + ) -> Result; + /// List all namespaces. async fn list(&mut self) -> Result>; @@ -718,6 +725,23 @@ where get_schema_internal(namespace, repos).await } +/// Update retention for a namespace +pub async fn update_namespace_retention( + name: &str, + retention_hours: i64, + repos: &mut R, +) -> Result +where + R: RepoCollection + ?Sized, +{ + let namespace = repos + .namespaces() + .update_retention_period(name, retention_hours) + .await?; + + Ok(namespace) +} + async fn get_schema_internal(namespace: Namespace, repos: &mut R) -> Result where R: RepoCollection + ?Sized, @@ -1036,6 +1060,25 @@ pub(crate) mod test_helpers { .await .expect("namespace should be updateable"); assert_eq!(NEW_COLUMN_LIMIT, modified.max_columns_per_table); + + const NEW_RETENTION_PERIOD: i64 = 5; + let modified = repos + .namespaces() + .update_retention_period(namespace_name, NEW_RETENTION_PERIOD) + .await + .expect("namespace should be updateable"); + assert_eq!( + NEW_RETENTION_PERIOD as i64 * 60 * 60 * 1_000_000_000, + modified.retention_period_ns.unwrap() + ); + + const NEW_RETENTION_PERIOD_NULL: i64 = 0; + let modified = repos + .namespaces() + .update_retention_period(namespace_name, NEW_RETENTION_PERIOD_NULL) + .await + .expect("namespace should be updateable"); + assert!(modified.retention_period_ns.is_none()); } async fn test_table(catalog: Arc) { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 471ef23314..a24036a89a 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -31,6 +31,8 @@ const TIME_COLUMN: &str = "time"; pub const DEFAULT_MAX_TABLES: i32 = 10_000; /// Default per-table column count service protection limit. pub const DEFAULT_MAX_COLUMNS_PER_TABLE: i32 = 200; +/// Default retention period for data in the catalog. +pub const DEFAULT_RETENTION_PERIOD: Option = None; /// A string value representing an infinite retention policy. pub const INFINITE_RETENTION_POLICY: &str = "inf"; diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 2ebfd368b7..c86419473c 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -9,7 +9,7 @@ use crate::{ TombstoneRepo, TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_RETENTION_PERIOD, }; use async_trait::async_trait; use data_types::{ @@ -305,6 +305,7 @@ impl NamespaceRepo for MemTxn { retention_duration: Some(retention_duration.to_string()), max_tables: DEFAULT_MAX_TABLES, max_columns_per_table: DEFAULT_MAX_COLUMNS_PER_TABLE, + retention_period_ns: DEFAULT_RETENTION_PERIOD, }; stage.namespaces.push(namespace); Ok(stage.namespaces.last().unwrap().clone()) @@ -353,6 +354,30 @@ impl NamespaceRepo for MemTxn { }), } } + + async fn update_retention_period( + &mut self, + name: &str, + retention_hours: i64, + ) -> Result { + let rentenion_period_ns = retention_hours * 60 * 60 * 1_000_000_000; + let retention = if rentenion_period_ns == 0 { + None + } else { + Some(rentenion_period_ns) + }; + + let stage = self.stage(); + match stage.namespaces.iter_mut().find(|n| n.name == name) { + Some(n) => { + n.retention_period_ns = retention; + Ok(n.clone()) + } + None => Err(Error::NamespaceNotFoundByName { + name: name.to_string(), + }), + } + } } #[async_trait] diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 8be8f907aa..fe15bf10f7 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -194,6 +194,7 @@ decorate!( impl_trait = NamespaceRepo, methods = [ "namespace_create" = create(&mut self, name: &str, retention_duration: &str, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result; + "namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_hours: i64) -> Result; "namespace_list" = list(&mut self) -> Result>; "namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId) -> Result>; "namespace_get_by_name" = get_by_name(&mut self, name: &str) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 93b1a44933..d63393977e 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -8,7 +8,7 @@ use crate::{ TombstoneRepo, TopicMetadataRepo, Transaction, }, metrics::MetricDecorator, - DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, DEFAULT_RETENTION_PERIOD, }; use async_trait::async_trait; use data_types::{ @@ -622,6 +622,7 @@ RETURNING *; // Ensure the column default values match the code values. debug_assert_eq!(rec.max_tables, DEFAULT_MAX_TABLES); debug_assert_eq!(rec.max_columns_per_table, DEFAULT_MAX_COLUMNS_PER_TABLE); + debug_assert_eq!(rec.retention_period_ns, DEFAULT_RETENTION_PERIOD); Ok(rec) } @@ -729,6 +730,40 @@ RETURNING *; Ok(namespace) } + + async fn update_retention_period( + &mut self, + name: &str, + retention_hours: i64, + ) -> Result { + let rentenion_period_ns = retention_hours * 60 * 60 * 1_000_000_000; + + let rec = if rentenion_period_ns == 0 { + sqlx::query_as::<_, Namespace>( + r#"UPDATE namespace SET retention_period_ns = NULL WHERE name = $1 RETURNING *;"#, + ) + .bind(&name) // $1 + .fetch_one(&mut self.inner) + .await + } else { + sqlx::query_as::<_, Namespace>( + r#"UPDATE namespace SET retention_period_ns = $1 WHERE name = $2 RETURNING *;"#, + ) + .bind(&rentenion_period_ns) // $1 + .bind(&name) // $2 + .fetch_one(&mut self.inner) + .await + }; + + let namespace = rec.map_err(|e| match e { + sqlx::Error::RowNotFound => Error::NamespaceNotFoundByName { + name: name.to_string(), + }, + _ => Error::SqlxError { source: e }, + })?; + + Ok(namespace) + } } #[async_trait] diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index 180f4d1a49..3fbdd9d491 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -215,6 +215,7 @@ mod tests { query_pool_id: QueryPoolId::new(42), max_tables: iox_catalog::DEFAULT_MAX_TABLES, max_columns_per_table: iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + retention_period_ns: iox_catalog::DEFAULT_RETENTION_PERIOD, } ); } diff --git a/service_grpc_schema/src/lib.rs b/service_grpc_schema/src/lib.rs index f446a73388..23ef242e03 100644 --- a/service_grpc_schema/src/lib.rs +++ b/service_grpc_schema/src/lib.rs @@ -3,7 +3,7 @@ use std::{ops::DerefMut, sync::Arc}; use generated_types::influxdata::iox::schema::v1::*; -use iox_catalog::interface::{get_schema_by_name, Catalog}; +use iox_catalog::interface::{get_schema_by_name, update_namespace_retention, Catalog}; use observability_deps::tracing::warn; use tonic::{Request, Response, Status}; @@ -38,6 +38,24 @@ impl schema_service_server::SchemaService for SchemaService { .map(Arc::new)?; Ok(Response::new(schema_to_proto(schema))) } + + async fn update_namespace_retention( + &self, + request: Request, + ) -> Result, Status> { + let mut repos = self.catalog.repositories().await; + + let req = request.into_inner(); + let namespace = + update_namespace_retention(&req.name, req.retention_hours, repos.deref_mut()) + .await + .map_err(|e| { + warn!(error=%e, %req.name, "failed to update namespace retention"); + Status::not_found(e.to_string()) + }) + .map(Arc::new)?; + Ok(Response::new(namespace_to_proto(namespace))) + } } fn schema_to_proto(schema: Arc) -> GetSchemaResponse { @@ -76,6 +94,16 @@ fn schema_to_proto(schema: Arc) -> GetSchemaRespons response } +fn namespace_to_proto(namespace: Arc) -> UpdateNamespaceRetentionResponse { + UpdateNamespaceRetentionResponse { + namespace: Some(Namespace { + id: namespace.id.get(), + name: namespace.name.clone(), + retention_period_ns: namespace.retention_period_ns, + }), + } +} + #[cfg(test)] mod tests { use super::*;