diff --git a/Cargo.lock b/Cargo.lock index f4760966cb..8251334da3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2446,6 +2446,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.32" @@ -3285,6 +3295,7 @@ dependencies = [ "chrono", "hashbrown 0.15.3", "hex", + "humantime-serde", "hyper 0.14.32", "influxdb3_authz", "influxdb3_cache", diff --git a/Cargo.toml b/Cargo.toml index 47f009e8b3..b3a1ed9bb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ hex = "0.4.3" http = "0.2.9" http-body = "0.4.6" humantime = "2.1.0" +humantime-serde = "1.1.1" hyper = "0.14" hyper-rustls = { version = "0.25", features = ["http1", "http2", "ring", "rustls-native-certs"] } insta = { version = "1.39", features = ["json", "redactions", "yaml"] } diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 88ea4c3764..c3fecdfeaf 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -132,6 +132,10 @@ pub struct DatabaseConfig { #[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)] pub database_name: String, + #[clap(long = "retention-period")] + /// The retention period for the database as a human-readable duration, e.g., "30d", "24h" + pub retention_period: Option, + /// An optional arg to use a custom ca for useful for testing with self signed certs #[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")] ca_cert: Option, @@ -277,8 +281,14 @@ pub struct TriggerConfig { pub async fn command(config: Config) -> Result<(), Box> { let client = config.get_client()?; match config.cmd { - SubCommand::Database(DatabaseConfig { database_name, .. }) => { - client.api_v3_configure_db_create(&database_name).await?; + SubCommand::Database(DatabaseConfig { + database_name, + retention_period, + .. + }) => { + client + .api_v3_configure_db_create(&database_name, retention_period.map(Into::into)) + .await?; println!("Database {:?} created successfully", &database_name); } diff --git a/influxdb3/src/commands/update.rs b/influxdb3/src/commands/update.rs new file mode 100644 index 0000000000..9184138166 --- /dev/null +++ b/influxdb3/src/commands/update.rs @@ -0,0 +1,69 @@ +use super::common::InfluxDb3Config; +use humantime::Duration; +use influxdb3_client::Client; +use secrecy::ExposeSecret; +use std::error::Error; +use std::path::PathBuf; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(subcommand)] + cmd: SubCommand, +} + +#[derive(Debug, clap::Subcommand)] +pub enum SubCommand { + /// Update a database + Database(UpdateDatabase), +} + +#[derive(Debug, clap::Args)] +pub struct UpdateDatabase { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The retention period as a human-readable duration (e.g., "30d", "24h") or "none" to clear + #[clap(long, short = 'r')] + retention_period: Option, + + /// An optional arg to use a custom ca for useful for testing with self signed certs + #[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")] + ca_cert: Option, +} + +pub async fn command(config: Config) -> Result<(), Box> { + match config.cmd { + SubCommand::Database(UpdateDatabase { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + database_name, + .. + }, + retention_period, + ca_cert, + }) => { + let mut client = Client::new(host_url, ca_cert)?; + if let Some(token) = &auth_token { + client = client.with_auth_token(token.expose_secret()); + } + + if let Some(retention_str) = retention_period { + let retention = if retention_str.to_lowercase() == "none" { + None + } else { + Some(retention_str.parse::()?.into()) + }; + client + .api_v3_configure_db_update(&database_name, retention) + .await?; + + println!("Database \"{}\" updated successfully", database_name); + } else { + return Err("--retention-period is required for update database".into()); + } + } + } + Ok(()) +} diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 8843d23158..412e85cd0e 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -35,6 +35,7 @@ pub mod commands { pub mod serve; pub mod show; pub mod test; + pub mod update; pub mod write; } @@ -105,6 +106,8 @@ enum Command { /// Test things, such as plugins, work the way you expect Test(commands::test::Config), + /// Update resources on the InfluxDB 3 Core server + Update(commands::update::Config), /// Perform a set of writes to a running InfluxDB 3 Core server Write(commands::write::Config), } @@ -200,6 +203,12 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } + Some(Command::Update(config)) => { + if let Err(e) = commands::update::command(config).await { + eprintln!("Update command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } Some(Command::Query(config)) => { if let Err(e) = commands::query::command(config).await { eprintln!("Query command failed: {e}"); diff --git a/influxdb3/tests/cli/db_retention.rs b/influxdb3/tests/cli/db_retention.rs new file mode 100644 index 0000000000..7e6fa82642 --- /dev/null +++ b/influxdb3/tests/cli/db_retention.rs @@ -0,0 +1,230 @@ +use crate::server::{ConfigProvider, TestServer}; +use test_helpers::assert_contains; + +#[test_log::test(tokio::test)] +async fn test_create_db_with_retention_period() { + let server = TestServer::configure().with_no_admin_token().spawn().await; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + let db_name = "test_db"; + + // Create database with retention period + let retention_period = "30d"; + let result = server + .run( + vec![ + "create", + "database", + db_name, + "--retention-period", + retention_period, + ], + args, + ) + .expect("create database should succeed"); + + assert_contains!(&result, "Database \"test_db\" created successfully"); + + let args = &[ + "--tls-ca", + "../testing-certs/rootCA.pem", + "--format", + "json", + ]; + + let result = server + .run( + vec![ + "query", + "-d", + "_internal", + "SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='test_db'", + ], + args, + ) + .expect("create database with retention period should succeed"); + + assert_eq!(&result, "[{\"retention_period_ns\":2592000000000000}]"); +} + +#[test_log::test(tokio::test)] +async fn test_create_db_without_retention_period() { + let server = TestServer::configure().with_no_admin_token().spawn().await; + let db_name = "test_db2"; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + + // Create database without retention period + let result = server + .run(vec!["create", "database", db_name], args) + .expect("create database without retention period should succeed"); + + assert_contains!(&result, "Database \"test_db2\" created successfully"); + + let args = &[ + "--tls-ca", + "../testing-certs/rootCA.pem", + "--format", + "json", + ]; + + let result = server + .run( + vec![ + "query", + "-d", + "_internal", + "SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='test_db2'", + ], + args, + ) + .expect("create database without retention period should succeed"); + + assert_eq!(&result, "[{}]"); +} + +#[test_log::test(tokio::test)] +async fn test_create_db_with_invalid_retention_period() { + let server = TestServer::configure().with_no_admin_token().spawn().await; + let db_name = "test_db3"; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + + // Try to create database with invalid retention period + let result = server.run( + vec![ + "create", + "database", + db_name, + "--retention-period", + "invalid", + ], + args, + ); + + assert!( + result.is_err(), + "Creating table with invalid retention period should fail" + ); +} + +#[test_log::test(tokio::test)] +async fn test_update_db_retention_period() { + let server = TestServer::configure().with_no_admin_token().spawn().await; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + let db_name = "test_db_update"; + + // Create database with retention period + let result = server + .run( + vec!["create", "database", db_name, "--retention-period", "30d"], + args, + ) + .expect("create database should succeed"); + + assert_contains!( + &result, + format!("Database \"{}\" created successfully", db_name) + ); + + // Update database retention period + let result = server + .run( + vec![ + "update", + "database", + "--database", + db_name, + "--retention-period", + "60d", + ], + args, + ) + .expect("update database retention period should succeed"); + + assert_contains!( + &result, + format!("Database \"{}\" updated successfully", db_name) + ); + + // Verify the updated retention period + let args = &[ + "--tls-ca", + "../testing-certs/rootCA.pem", + "--format", + "json", + ]; + + let result = server + .run( + vec![ + "query", + "-d", + "_internal", + &format!("SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='{}'", db_name), + ], + args, + ) + .expect("query should succeed"); + + assert_eq!(&result, "[{\"retention_period_ns\":5184000000000000}]"); // 60 days in nanoseconds +} + +#[test_log::test(tokio::test)] +async fn test_clear_db_retention_period() { + let server = TestServer::configure().with_no_admin_token().spawn().await; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + let db_name = "test_db_clear"; + + // Create database with retention period + let result = server + .run( + vec!["create", "database", db_name, "--retention-period", "30d"], + args, + ) + .expect("create database should succeed"); + + assert_contains!( + &result, + format!("Database \"{}\" created successfully", db_name) + ); + + // Clear database retention period (set to none) + let result = server + .run( + vec![ + "update", + "database", + "--database", + db_name, + "--retention-period", + "none", + ], + args, + ) + .expect("clear database retention period should succeed"); + + assert_contains!( + &result, + format!("Database \"{}\" updated successfully", db_name) + ); + + // Verify the retention period is now none (cleared) + let args = &[ + "--tls-ca", + "../testing-certs/rootCA.pem", + "--format", + "json", + ]; + + let result = server + .run( + vec![ + "query", + "-d", + "_internal", + &format!("SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='{}'", db_name), + ], + args, + ) + .expect("query should succeed"); + + assert_eq!(&result, "[{}]"); // Empty object for none/cleared retention +} diff --git a/influxdb3/tests/cli/mod.rs b/influxdb3/tests/cli/mod.rs index e3e0c0b19b..b1a4db4e38 100644 --- a/influxdb3/tests/cli/mod.rs +++ b/influxdb3/tests/cli/mod.rs @@ -1,5 +1,6 @@ mod admin_token; mod api; +mod db_retention; use crate::server::{ConfigProvider, TestServer}; use assert_cmd::Command as AssertCmd; diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 880d04405e..4d089519a8 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -43,7 +43,7 @@ mod update; use schema::sort::SortKey; pub use schema::{InfluxColumnType, InfluxFieldType}; pub use update::HardDeletionTime; -pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt}; +pub use update::{CatalogUpdate, CreateDatabaseOptions, DatabaseCatalogTransaction, Prompt}; use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver}; use crate::log::GenerationBatch; @@ -403,6 +403,7 @@ impl Catalog { pub(crate) fn db_or_create( &self, db_name: &str, + retention_period: Option, now_time_ns: i64, ) -> Result<(Arc, Option)> { match self.db_schema(db_name) { @@ -425,6 +426,7 @@ impl Catalog { vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog { database_id: db.id, database_name: Arc::clone(&db.name), + retention_period, })], ); Ok((db, Some(batch))) @@ -1516,13 +1518,15 @@ impl UpdateDatabaseSchema for DatabaseCatalogOp { if create_database.database_id != schema.id || create_database.database_name != schema.name { - warn!( + panic!( "Create database call received by a mismatched DatabaseSchema. This should not be possible." ) } - // NOTE(trevor/catalog-refactor): if the database is already there, shouldn't this - // be a no-op, and not to_mut the cow? - schema.to_mut(); + schema.to_mut().retention_period = match create_database.retention_period { + Some(duration) => RetentionPeriod::Duration(duration), + None => RetentionPeriod::Indefinite, + }; + Ok(schema) } DatabaseCatalogOp::CreateTable(create_table) => create_table.update_schema(schema), @@ -2312,6 +2316,7 @@ impl TokenRepository { .get_by_id(&token_id) .ok_or_else(|| CatalogError::MissingAdminTokenToUpdate)?; let updatable = Arc::make_mut(&mut token_info); + updatable.hash = hash.clone(); updatable.updated_at = Some(updated_at); updatable.updated_by = Some(token_id); diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index eb4172e4e2..9848d2f5ea 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -55,6 +55,11 @@ impl HardDeletionTime { } } +#[derive(Default, Debug, Clone, Copy)] +pub struct CreateDatabaseOptions { + pub retention_period: Option, +} + impl Catalog { pub fn begin(&self, db_name: &str) -> Result { debug!(db_name, "starting catalog transaction"); @@ -79,10 +84,15 @@ impl Catalog { let database_name = Arc::from(db_name); let database_schema = Arc::new(DatabaseSchema::new(database_id, Arc::clone(&database_name))); + let retention_period = match database_schema.retention_period { + RetentionPeriod::Duration(duration) => Some(duration), + RetentionPeriod::Indefinite => None, + }; let time_ns = self.time_provider.now().timestamp_nanos(); let ops = vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog { database_id, database_name, + retention_period, })]; Ok(DatabaseCatalogTransaction { catalog_sequence: inner.sequence_number(), @@ -240,10 +250,22 @@ impl Catalog { } pub async fn create_database(&self, name: &str) -> Result { + self.create_database_opts(name, CreateDatabaseOptions::default()) + .await + } + + pub async fn create_database_opts( + &self, + name: &str, + options: CreateDatabaseOptions, + ) -> Result { info!(name, "create database"); self.catalog_update_with_retry(|| { - let (_, Some(batch)) = - self.db_or_create(name, self.time_provider.now().timestamp_nanos())? + let (_, Some(batch)) = self.db_or_create( + name, + options.retention_period, + self.time_provider.now().timestamp_nanos(), + )? else { return Err(CatalogError::AlreadyExists); }; diff --git a/influxdb3_catalog/src/log/versions/v2/conversion.rs b/influxdb3_catalog/src/log/versions/v2/conversion.rs index 01c8dc0204..c5191f9ec8 100644 --- a/influxdb3_catalog/src/log/versions/v2/conversion.rs +++ b/influxdb3_catalog/src/log/versions/v2/conversion.rs @@ -194,6 +194,7 @@ impl From for v3::CreateDatabaseLog { Self { database_id: value.database_id, database_name: value.database_name, + retention_period: None, } } } diff --git a/influxdb3_catalog/src/log/versions/v3.rs b/influxdb3_catalog/src/log/versions/v3.rs index 96c302751a..db49f7333a 100644 --- a/influxdb3_catalog/src/log/versions/v3.rs +++ b/influxdb3_catalog/src/log/versions/v3.rs @@ -269,6 +269,7 @@ impl std::fmt::Display for NodeMode { pub struct CreateDatabaseLog { pub database_id: DbId, pub database_name: Arc, + pub retention_period: Option, } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index 572244a00d..56c13eee9f 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -842,6 +842,7 @@ mod v1_tests { crate::log::DatabaseCatalogOp::CreateDatabase(crate::log::CreateDatabaseLog { database_id: DbId::new(0), database_name: "test-db".into(), + retention_period: None, }), crate::log::DatabaseCatalogOp::SoftDeleteDatabase( crate::log::SoftDeleteDatabaseLog { diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index d2101a60ee..4fecf6fa8f 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -355,12 +355,40 @@ impl Client { } /// Make a request to the `POST /api/v3/configure/database` API - pub async fn api_v3_configure_db_create(&self, db: impl Into + Send) -> Result<()> { + pub async fn api_v3_configure_db_create( + &self, + db: impl Into + Send, + retention_period: Option, + ) -> Result<()> { let _bytes = self .send_json_get_bytes( Method::POST, "/api/v3/configure/database", - Some(CreateDatabaseRequest { db: db.into() }), + Some(CreateDatabaseRequest { + db: db.into(), + retention_period, + }), + None::<()>, + None, + ) + .await?; + Ok(()) + } + + /// Make a request to the `PUT /api/v3/configure/database` API + pub async fn api_v3_configure_db_update( + &self, + db: impl Into + Send, + retention_period: Option, + ) -> Result<()> { + let _bytes = self + .send_json_get_bytes( + Method::PUT, + "/api/v3/configure/database", + Some(UpdateDatabaseRequest { + db: db.into(), + retention_period, + }), None::<()>, None, ) diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 25371962e7..bfa6197b5a 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1198,13 +1198,19 @@ impl HttpApi { } async fn create_database(&self, req: Request) -> Result { - let CreateDatabaseRequest { db } = self.read_body_json(req).await?; + let CreateDatabaseRequest { + db, + retention_period, + } = self.read_body_json(req).await?; validate_db_name(&db, false)?; - self.write_buffer.catalog().create_database(&db).await?; - Ok(ResponseBuilder::new() - .status(StatusCode::OK) - .body(empty_response_body()) - .unwrap()) + self.write_buffer + .catalog() + .create_database_opts( + &db, + influxdb3_catalog::catalog::CreateDatabaseOptions { retention_period }, + ) + .await?; + Ok(Response::new(empty_response_body())) } /// Endpoint for testing a plugin that will be trigger on WAL writes. @@ -1282,6 +1288,27 @@ impl HttpApi { } } + async fn update_database(&self, req: Request) -> Result { + let update_req = self.read_body_json::(req).await?; + + match update_req.retention_period { + Some(duration) => { + self.write_buffer + .catalog() + .set_retention_period_for_database(&update_req.db, duration) + .await?; + } + None => { + self.write_buffer + .catalog() + .clear_retention_period_for_database(&update_req.db) + .await?; + } + } + + Ok(Response::new(empty_response_body())) + } + async fn delete_database(&self, req: Request) -> Result { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; @@ -1289,10 +1316,7 @@ impl HttpApi { .catalog() .soft_delete_database(&delete_req.db) .await?; - Ok(ResponseBuilder::new() - .status(StatusCode::OK) - .body(empty_response_body()) - .unwrap()) + Ok(Response::new(empty_response_body())) } async fn create_table(&self, req: Request) -> Result { @@ -1315,10 +1339,7 @@ impl HttpApi { .collect::>(), ) .await?; - Ok(ResponseBuilder::new() - .status(StatusCode::OK) - .body(empty_response_body()) - .unwrap()) + Ok(Response::new(empty_response_body())) } async fn delete_table(&self, req: Request) -> Result { @@ -1924,6 +1945,9 @@ pub(crate) async fn route_request( (Method::POST, all_paths::API_V3_CONFIGURE_DATABASE) => { http_server.create_database(req).await } + (Method::PUT, all_paths::API_V3_CONFIGURE_DATABASE) => { + http_server.update_database(req).await + } (Method::DELETE, all_paths::API_V3_CONFIGURE_DATABASE) => { http_server.delete_database(req).await } diff --git a/influxdb3_server/src/system_tables/databases.rs b/influxdb3_server/src/system_tables/databases.rs new file mode 100644 index 0000000000..78a325899e --- /dev/null +++ b/influxdb3_server/src/system_tables/databases.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use arrow::array::{StringViewBuilder, UInt64Builder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::{error::DataFusionError, logical_expr::Expr}; +use influxdb3_catalog::catalog::{Catalog, RetentionPeriod}; +use iox_system_tables::IoxSystemTable; + +#[derive(Debug)] +pub(super) struct DatabasesTable { + catalog: Arc, + schema: SchemaRef, +} + +impl DatabasesTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + catalog, + schema: databases_schema(), + } + } +} + +fn databases_schema() -> SchemaRef { + let columns = vec![ + Field::new("database_name", DataType::Utf8View, false), + Field::new("retention_period_ns", DataType::UInt64, true), + Field::new("deleted", DataType::Boolean, false), + ]; + Arc::new(Schema::new(columns)) +} + +#[async_trait::async_trait] +impl IoxSystemTable for DatabasesTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let databases = self.catalog.list_db_schema(); + + let mut database_name_arr = StringViewBuilder::with_capacity(databases.len()); + let mut retention_period_arr = UInt64Builder::with_capacity(databases.len()); + let mut deleted_arr = arrow::array::BooleanBuilder::with_capacity(databases.len()); + + for db in databases { + database_name_arr.append_value(&db.name); + + match db.retention_period { + RetentionPeriod::Indefinite => retention_period_arr.append_null(), + RetentionPeriod::Duration(duration) => { + retention_period_arr.append_value(duration.as_nanos() as u64); + } + } + + deleted_arr.append_value(db.deleted); + } + + let columns: Vec = vec![ + Arc::new(database_name_arr.finish()), + Arc::new(retention_period_arr.finish()), + Arc::new(deleted_arr.finish()), + ]; + + RecordBatch::try_new(self.schema(), columns).map_err(DataFusionError::from) + } +} diff --git a/influxdb3_server/src/system_tables/mod.rs b/influxdb3_server/src/system_tables/mod.rs index 42bc2257a9..1c2f8dbf62 100644 --- a/influxdb3_server/src/system_tables/mod.rs +++ b/influxdb3_server/src/system_tables/mod.rs @@ -18,16 +18,20 @@ use parquet_files::ParquetFilesTable; use tokens::TokenSystemTable; use tonic::async_trait; -use self::{last_caches::LastCachesTable, queries::QueriesTable}; +use self::{ + databases::DatabasesTable, last_caches::LastCachesTable, queries::QueriesTable, + tables::TablesTable, +}; +mod databases; mod distinct_caches; mod generations; mod last_caches; mod parquet_files; use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEngineTriggerTable}; - mod python_call; mod queries; +mod tables; mod tokens; pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system"; @@ -38,6 +42,8 @@ pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches"; pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches"; pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files"; pub(crate) const TOKENS_TABLE_NAME: &str = "tokens"; +pub(crate) const DATABASES_TABLE_NAME: &str = "databases"; +pub(crate) const TABLES_TABLE_NAME: &str = "tables"; pub(crate) const GENERATION_DURATIONS_TABLE_NAME: &str = "generation_durations"; const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers"; @@ -140,6 +146,18 @@ impl AllSystemSchemaTablesProvider { started_with_auth, )))), ); + tables.insert( + DATABASES_TABLE_NAME, + Arc::new(SystemTableProvider::new(Arc::new(DatabasesTable::new( + Arc::clone(&catalog), + )))), + ); + tables.insert( + TABLES_TABLE_NAME, + Arc::new(SystemTableProvider::new(Arc::new(TablesTable::new( + Arc::clone(&catalog), + )))), + ); tables.insert( GENERATION_DURATIONS_TABLE_NAME, Arc::new(SystemTableProvider::new(Arc::new( diff --git a/influxdb3_server/src/system_tables/tables.rs b/influxdb3_server/src/system_tables/tables.rs new file mode 100644 index 0000000000..a7dcea1ddc --- /dev/null +++ b/influxdb3_server/src/system_tables/tables.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use arrow::array::{StringViewBuilder, UInt64Builder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::{error::DataFusionError, logical_expr::Expr}; +use influxdb3_catalog::catalog::Catalog; +use iox_system_tables::IoxSystemTable; + +#[derive(Debug)] +pub(super) struct TablesTable { + catalog: Arc, + schema: SchemaRef, +} + +impl TablesTable { + pub(super) fn new(catalog: Arc) -> Self { + Self { + catalog, + schema: tables_schema(), + } + } +} + +fn tables_schema() -> SchemaRef { + let columns = vec![ + Field::new("database_name", DataType::Utf8View, false), + Field::new("table_name", DataType::Utf8View, false), + Field::new("column_count", DataType::UInt64, false), + Field::new("series_key_columns", DataType::Utf8View, false), + Field::new("last_cache_count", DataType::UInt64, false), + Field::new("distinct_cache_count", DataType::UInt64, false), + Field::new("deleted", DataType::Boolean, false), + ]; + Arc::new(Schema::new(columns)) +} + +#[async_trait::async_trait] +impl IoxSystemTable for TablesTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let databases = self.catalog.list_db_schema(); + + // Count total tables across all databases + let total_tables: usize = databases + .iter() + .map(|db| db.tables.resource_iter().count()) + .sum(); + + let mut database_name_arr = StringViewBuilder::with_capacity(total_tables); + let mut table_name_arr = StringViewBuilder::with_capacity(total_tables); + let mut column_count_arr = UInt64Builder::with_capacity(total_tables); + let mut series_key_arr = StringViewBuilder::with_capacity(total_tables); + let mut last_cache_count_arr = UInt64Builder::with_capacity(total_tables); + let mut distinct_cache_count_arr = UInt64Builder::with_capacity(total_tables); + let mut deleted_arr = arrow::array::BooleanBuilder::with_capacity(total_tables); + + for db in databases { + for table in db.tables.resource_iter() { + database_name_arr.append_value(&db.name); + table_name_arr.append_value(&table.table_name); + column_count_arr.append_value(table.columns.resource_iter().count() as u64); + + // Build series key string + let series_key_str = table.series_key_names.join(", "); + series_key_arr.append_value(&series_key_str); + + last_cache_count_arr.append_value(table.last_caches.resource_iter().count() as u64); + distinct_cache_count_arr + .append_value(table.distinct_caches.resource_iter().count() as u64); + deleted_arr.append_value(table.deleted); + } + } + + let columns: Vec = vec![ + Arc::new(database_name_arr.finish()), + Arc::new(table_name_arr.finish()), + Arc::new(column_count_arr.finish()), + Arc::new(series_key_arr.finish()), + Arc::new(last_cache_count_arr.finish()), + Arc::new(distinct_cache_count_arr.finish()), + Arc::new(deleted_arr.finish()), + ]; + + RecordBatch::try_new(self.schema(), columns).map_err(DataFusionError::from) + } +} diff --git a/influxdb3_types/Cargo.toml b/influxdb3_types/Cargo.toml index 8b27d060c5..fb3e3372ec 100644 --- a/influxdb3_types/Cargo.toml +++ b/influxdb3_types/Cargo.toml @@ -21,6 +21,7 @@ chrono.workspace = true serde.workspace = true hashbrown.workspace = true hex.workspace = true +humantime-serde.workspace = true hyper.workspace = true thiserror.workspace = true uuid.workspace = true diff --git a/influxdb3_types/src/http.rs b/influxdb3_types/src/http.rs index 1d79830f57..e59b5a69cf 100644 --- a/influxdb3_types/src/http.rs +++ b/influxdb3_types/src/http.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use chrono::{DateTime, Utc}; use influxdb3_authz::TokenInfo; @@ -230,6 +230,16 @@ pub struct ShowDatabasesRequest { #[derive(Debug, Deserialize, Serialize)] pub struct CreateDatabaseRequest { pub db: String, + #[serde(with = "humantime_serde", default)] + pub retention_period: Option, +} + +/// Request definition for the `PUT /api/v3/configure/database` API +#[derive(Debug, Deserialize, Serialize)] +pub struct UpdateDatabaseRequest { + pub db: String, + #[serde(with = "humantime_serde", default)] + pub retention_period: Option, } /// Request definition for the `DELETE /api/v3/configure/database` API