feat: Add retention period to cli commands
This commit touches quite a few things, but the main changes that need to be taken into account are: - An update command has been added to the CLI. This could be further extended in the future to update more than just Database retention periods. The API call for that has been written in such a way as to allow other qualities of the database to be updated at runtime from one API call. For now it only allows the retention period to be updated, but it could in theory allow us to rename a database without needing to wipe things, especially with a stable ID underlying everything. - The create database command has been extended to allow its creation with a retention period. In tandem with the update command users can now assign or delete retention periods at will - The ability to query catalog data about both databases and tables has been added as well. This has been used in tests added in this commit, but is also a fairly useful query when wanting to look at things such as the series key. This could be extended to a CLI command as well if we want to allow users to look at this data, but for now it's in the _internal table. With these changes a nice UX has been created to allow our customers to work with retention periods.pull/26520/head
parent
bc41c04656
commit
ead1fb01a9
|
@ -2446,6 +2446,16 @@ version = "2.2.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f"
|
||||
|
||||
[[package]]
|
||||
name = "humantime-serde"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c"
|
||||
dependencies = [
|
||||
"humantime",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.32"
|
||||
|
@ -3285,6 +3295,7 @@ dependencies = [
|
|||
"chrono",
|
||||
"hashbrown 0.15.3",
|
||||
"hex",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.32",
|
||||
"influxdb3_authz",
|
||||
"influxdb3_cache",
|
||||
|
|
|
@ -92,6 +92,7 @@ hex = "0.4.3"
|
|||
http = "0.2.9"
|
||||
http-body = "0.4.6"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
hyper-rustls = { version = "0.25", features = ["http1", "http2", "ring", "rustls-native-certs"] }
|
||||
insta = { version = "1.39", features = ["json", "redactions", "yaml"] }
|
||||
|
|
|
@ -132,6 +132,10 @@ pub struct DatabaseConfig {
|
|||
#[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)]
|
||||
pub database_name: String,
|
||||
|
||||
#[clap(long = "retention-period")]
|
||||
/// The retention period for the database as a human-readable duration, e.g., "30d", "24h"
|
||||
pub retention_period: Option<Duration>,
|
||||
|
||||
/// An optional arg to use a custom ca for useful for testing with self signed certs
|
||||
#[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")]
|
||||
ca_cert: Option<PathBuf>,
|
||||
|
@ -277,8 +281,14 @@ pub struct TriggerConfig {
|
|||
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||
let client = config.get_client()?;
|
||||
match config.cmd {
|
||||
SubCommand::Database(DatabaseConfig { database_name, .. }) => {
|
||||
client.api_v3_configure_db_create(&database_name).await?;
|
||||
SubCommand::Database(DatabaseConfig {
|
||||
database_name,
|
||||
retention_period,
|
||||
..
|
||||
}) => {
|
||||
client
|
||||
.api_v3_configure_db_create(&database_name, retention_period.map(Into::into))
|
||||
.await?;
|
||||
|
||||
println!("Database {:?} created successfully", &database_name);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
use super::common::InfluxDb3Config;
|
||||
use humantime::Duration;
|
||||
use influxdb3_client::Client;
|
||||
use secrecy::ExposeSecret;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
#[clap(subcommand)]
|
||||
cmd: SubCommand,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
pub enum SubCommand {
|
||||
/// Update a database
|
||||
Database(UpdateDatabase),
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Args)]
|
||||
pub struct UpdateDatabase {
|
||||
#[clap(flatten)]
|
||||
influxdb3_config: InfluxDb3Config,
|
||||
|
||||
/// The retention period as a human-readable duration (e.g., "30d", "24h") or "none" to clear
|
||||
#[clap(long, short = 'r')]
|
||||
retention_period: Option<String>,
|
||||
|
||||
/// An optional arg to use a custom ca for useful for testing with self signed certs
|
||||
#[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")]
|
||||
ca_cert: Option<PathBuf>,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||
match config.cmd {
|
||||
SubCommand::Database(UpdateDatabase {
|
||||
influxdb3_config:
|
||||
InfluxDb3Config {
|
||||
host_url,
|
||||
auth_token,
|
||||
database_name,
|
||||
..
|
||||
},
|
||||
retention_period,
|
||||
ca_cert,
|
||||
}) => {
|
||||
let mut client = Client::new(host_url, ca_cert)?;
|
||||
if let Some(token) = &auth_token {
|
||||
client = client.with_auth_token(token.expose_secret());
|
||||
}
|
||||
|
||||
if let Some(retention_str) = retention_period {
|
||||
let retention = if retention_str.to_lowercase() == "none" {
|
||||
None
|
||||
} else {
|
||||
Some(retention_str.parse::<Duration>()?.into())
|
||||
};
|
||||
client
|
||||
.api_v3_configure_db_update(&database_name, retention)
|
||||
.await?;
|
||||
|
||||
println!("Database \"{}\" updated successfully", database_name);
|
||||
} else {
|
||||
return Err("--retention-period is required for update database".into());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
|
@ -35,6 +35,7 @@ pub mod commands {
|
|||
pub mod serve;
|
||||
pub mod show;
|
||||
pub mod test;
|
||||
pub mod update;
|
||||
pub mod write;
|
||||
}
|
||||
|
||||
|
@ -105,6 +106,8 @@ enum Command {
|
|||
/// Test things, such as plugins, work the way you expect
|
||||
Test(commands::test::Config),
|
||||
|
||||
/// Update resources on the InfluxDB 3 Core server
|
||||
Update(commands::update::Config),
|
||||
/// Perform a set of writes to a running InfluxDB 3 Core server
|
||||
Write(commands::write::Config),
|
||||
}
|
||||
|
@ -200,6 +203,12 @@ fn main() -> Result<(), std::io::Error> {
|
|||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Update(config)) => {
|
||||
if let Err(e) = commands::update::command(config).await {
|
||||
eprintln!("Update command failed: {e}");
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Query(config)) => {
|
||||
if let Err(e) = commands::query::command(config).await {
|
||||
eprintln!("Query command failed: {e}");
|
||||
|
|
|
@ -0,0 +1,230 @@
|
|||
use crate::server::{ConfigProvider, TestServer};
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_create_db_with_retention_period() {
|
||||
let server = TestServer::configure().with_no_admin_token().spawn().await;
|
||||
let args = &["--tls-ca", "../testing-certs/rootCA.pem"];
|
||||
let db_name = "test_db";
|
||||
|
||||
// Create database with retention period
|
||||
let retention_period = "30d";
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"create",
|
||||
"database",
|
||||
db_name,
|
||||
"--retention-period",
|
||||
retention_period,
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("create database should succeed");
|
||||
|
||||
assert_contains!(&result, "Database \"test_db\" created successfully");
|
||||
|
||||
let args = &[
|
||||
"--tls-ca",
|
||||
"../testing-certs/rootCA.pem",
|
||||
"--format",
|
||||
"json",
|
||||
];
|
||||
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"query",
|
||||
"-d",
|
||||
"_internal",
|
||||
"SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='test_db'",
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("create database with retention period should succeed");
|
||||
|
||||
assert_eq!(&result, "[{\"retention_period_ns\":2592000000000000}]");
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_create_db_without_retention_period() {
|
||||
let server = TestServer::configure().with_no_admin_token().spawn().await;
|
||||
let db_name = "test_db2";
|
||||
let args = &["--tls-ca", "../testing-certs/rootCA.pem"];
|
||||
|
||||
// Create database without retention period
|
||||
let result = server
|
||||
.run(vec!["create", "database", db_name], args)
|
||||
.expect("create database without retention period should succeed");
|
||||
|
||||
assert_contains!(&result, "Database \"test_db2\" created successfully");
|
||||
|
||||
let args = &[
|
||||
"--tls-ca",
|
||||
"../testing-certs/rootCA.pem",
|
||||
"--format",
|
||||
"json",
|
||||
];
|
||||
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"query",
|
||||
"-d",
|
||||
"_internal",
|
||||
"SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='test_db2'",
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("create database without retention period should succeed");
|
||||
|
||||
assert_eq!(&result, "[{}]");
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_create_db_with_invalid_retention_period() {
|
||||
let server = TestServer::configure().with_no_admin_token().spawn().await;
|
||||
let db_name = "test_db3";
|
||||
let args = &["--tls-ca", "../testing-certs/rootCA.pem"];
|
||||
|
||||
// Try to create database with invalid retention period
|
||||
let result = server.run(
|
||||
vec![
|
||||
"create",
|
||||
"database",
|
||||
db_name,
|
||||
"--retention-period",
|
||||
"invalid",
|
||||
],
|
||||
args,
|
||||
);
|
||||
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Creating table with invalid retention period should fail"
|
||||
);
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_update_db_retention_period() {
|
||||
let server = TestServer::configure().with_no_admin_token().spawn().await;
|
||||
let args = &["--tls-ca", "../testing-certs/rootCA.pem"];
|
||||
let db_name = "test_db_update";
|
||||
|
||||
// Create database with retention period
|
||||
let result = server
|
||||
.run(
|
||||
vec!["create", "database", db_name, "--retention-period", "30d"],
|
||||
args,
|
||||
)
|
||||
.expect("create database should succeed");
|
||||
|
||||
assert_contains!(
|
||||
&result,
|
||||
format!("Database \"{}\" created successfully", db_name)
|
||||
);
|
||||
|
||||
// Update database retention period
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"update",
|
||||
"database",
|
||||
"--database",
|
||||
db_name,
|
||||
"--retention-period",
|
||||
"60d",
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("update database retention period should succeed");
|
||||
|
||||
assert_contains!(
|
||||
&result,
|
||||
format!("Database \"{}\" updated successfully", db_name)
|
||||
);
|
||||
|
||||
// Verify the updated retention period
|
||||
let args = &[
|
||||
"--tls-ca",
|
||||
"../testing-certs/rootCA.pem",
|
||||
"--format",
|
||||
"json",
|
||||
];
|
||||
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"query",
|
||||
"-d",
|
||||
"_internal",
|
||||
&format!("SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='{}'", db_name),
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("query should succeed");
|
||||
|
||||
assert_eq!(&result, "[{\"retention_period_ns\":5184000000000000}]"); // 60 days in nanoseconds
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_clear_db_retention_period() {
|
||||
let server = TestServer::configure().with_no_admin_token().spawn().await;
|
||||
let args = &["--tls-ca", "../testing-certs/rootCA.pem"];
|
||||
let db_name = "test_db_clear";
|
||||
|
||||
// Create database with retention period
|
||||
let result = server
|
||||
.run(
|
||||
vec!["create", "database", db_name, "--retention-period", "30d"],
|
||||
args,
|
||||
)
|
||||
.expect("create database should succeed");
|
||||
|
||||
assert_contains!(
|
||||
&result,
|
||||
format!("Database \"{}\" created successfully", db_name)
|
||||
);
|
||||
|
||||
// Clear database retention period (set to none)
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"update",
|
||||
"database",
|
||||
"--database",
|
||||
db_name,
|
||||
"--retention-period",
|
||||
"none",
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("clear database retention period should succeed");
|
||||
|
||||
assert_contains!(
|
||||
&result,
|
||||
format!("Database \"{}\" updated successfully", db_name)
|
||||
);
|
||||
|
||||
// Verify the retention period is now none (cleared)
|
||||
let args = &[
|
||||
"--tls-ca",
|
||||
"../testing-certs/rootCA.pem",
|
||||
"--format",
|
||||
"json",
|
||||
];
|
||||
|
||||
let result = server
|
||||
.run(
|
||||
vec![
|
||||
"query",
|
||||
"-d",
|
||||
"_internal",
|
||||
&format!("SELECT retention_period_ns FROM system.databases WHERE system.databases.database_name='{}'", db_name),
|
||||
],
|
||||
args,
|
||||
)
|
||||
.expect("query should succeed");
|
||||
|
||||
assert_eq!(&result, "[{}]"); // Empty object for none/cleared retention
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
mod admin_token;
|
||||
mod api;
|
||||
mod db_retention;
|
||||
|
||||
use crate::server::{ConfigProvider, TestServer};
|
||||
use assert_cmd::Command as AssertCmd;
|
||||
|
|
|
@ -43,7 +43,7 @@ mod update;
|
|||
use schema::sort::SortKey;
|
||||
pub use schema::{InfluxColumnType, InfluxFieldType};
|
||||
pub use update::HardDeletionTime;
|
||||
pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt};
|
||||
pub use update::{CatalogUpdate, CreateDatabaseOptions, DatabaseCatalogTransaction, Prompt};
|
||||
|
||||
use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver};
|
||||
use crate::log::GenerationBatch;
|
||||
|
@ -403,6 +403,7 @@ impl Catalog {
|
|||
pub(crate) fn db_or_create(
|
||||
&self,
|
||||
db_name: &str,
|
||||
retention_period: Option<Duration>,
|
||||
now_time_ns: i64,
|
||||
) -> Result<(Arc<DatabaseSchema>, Option<CatalogBatch>)> {
|
||||
match self.db_schema(db_name) {
|
||||
|
@ -425,6 +426,7 @@ impl Catalog {
|
|||
vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog {
|
||||
database_id: db.id,
|
||||
database_name: Arc::clone(&db.name),
|
||||
retention_period,
|
||||
})],
|
||||
);
|
||||
Ok((db, Some(batch)))
|
||||
|
@ -1516,13 +1518,15 @@ impl UpdateDatabaseSchema for DatabaseCatalogOp {
|
|||
if create_database.database_id != schema.id
|
||||
|| create_database.database_name != schema.name
|
||||
{
|
||||
warn!(
|
||||
panic!(
|
||||
"Create database call received by a mismatched DatabaseSchema. This should not be possible."
|
||||
)
|
||||
}
|
||||
// NOTE(trevor/catalog-refactor): if the database is already there, shouldn't this
|
||||
// be a no-op, and not to_mut the cow?
|
||||
schema.to_mut();
|
||||
schema.to_mut().retention_period = match create_database.retention_period {
|
||||
Some(duration) => RetentionPeriod::Duration(duration),
|
||||
None => RetentionPeriod::Indefinite,
|
||||
};
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
DatabaseCatalogOp::CreateTable(create_table) => create_table.update_schema(schema),
|
||||
|
@ -2312,6 +2316,7 @@ impl TokenRepository {
|
|||
.get_by_id(&token_id)
|
||||
.ok_or_else(|| CatalogError::MissingAdminTokenToUpdate)?;
|
||||
let updatable = Arc::make_mut(&mut token_info);
|
||||
|
||||
updatable.hash = hash.clone();
|
||||
updatable.updated_at = Some(updated_at);
|
||||
updatable.updated_by = Some(token_id);
|
||||
|
|
|
@ -55,6 +55,11 @@ impl HardDeletionTime {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy)]
|
||||
pub struct CreateDatabaseOptions {
|
||||
pub retention_period: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Catalog {
|
||||
pub fn begin(&self, db_name: &str) -> Result<DatabaseCatalogTransaction> {
|
||||
debug!(db_name, "starting catalog transaction");
|
||||
|
@ -79,10 +84,15 @@ impl Catalog {
|
|||
let database_name = Arc::from(db_name);
|
||||
let database_schema =
|
||||
Arc::new(DatabaseSchema::new(database_id, Arc::clone(&database_name)));
|
||||
let retention_period = match database_schema.retention_period {
|
||||
RetentionPeriod::Duration(duration) => Some(duration),
|
||||
RetentionPeriod::Indefinite => None,
|
||||
};
|
||||
let time_ns = self.time_provider.now().timestamp_nanos();
|
||||
let ops = vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog {
|
||||
database_id,
|
||||
database_name,
|
||||
retention_period,
|
||||
})];
|
||||
Ok(DatabaseCatalogTransaction {
|
||||
catalog_sequence: inner.sequence_number(),
|
||||
|
@ -240,10 +250,22 @@ impl Catalog {
|
|||
}
|
||||
|
||||
pub async fn create_database(&self, name: &str) -> Result<OrderedCatalogBatch> {
|
||||
self.create_database_opts(name, CreateDatabaseOptions::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn create_database_opts(
|
||||
&self,
|
||||
name: &str,
|
||||
options: CreateDatabaseOptions,
|
||||
) -> Result<OrderedCatalogBatch> {
|
||||
info!(name, "create database");
|
||||
self.catalog_update_with_retry(|| {
|
||||
let (_, Some(batch)) =
|
||||
self.db_or_create(name, self.time_provider.now().timestamp_nanos())?
|
||||
let (_, Some(batch)) = self.db_or_create(
|
||||
name,
|
||||
options.retention_period,
|
||||
self.time_provider.now().timestamp_nanos(),
|
||||
)?
|
||||
else {
|
||||
return Err(CatalogError::AlreadyExists);
|
||||
};
|
||||
|
|
|
@ -194,6 +194,7 @@ impl From<v2::CreateDatabaseLog> for v3::CreateDatabaseLog {
|
|||
Self {
|
||||
database_id: value.database_id,
|
||||
database_name: value.database_name,
|
||||
retention_period: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -269,6 +269,7 @@ impl std::fmt::Display for NodeMode {
|
|||
pub struct CreateDatabaseLog {
|
||||
pub database_id: DbId,
|
||||
pub database_name: Arc<str>,
|
||||
pub retention_period: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
|
|
|
@ -842,6 +842,7 @@ mod v1_tests {
|
|||
crate::log::DatabaseCatalogOp::CreateDatabase(crate::log::CreateDatabaseLog {
|
||||
database_id: DbId::new(0),
|
||||
database_name: "test-db".into(),
|
||||
retention_period: None,
|
||||
}),
|
||||
crate::log::DatabaseCatalogOp::SoftDeleteDatabase(
|
||||
crate::log::SoftDeleteDatabaseLog {
|
||||
|
|
|
@ -355,12 +355,40 @@ impl Client {
|
|||
}
|
||||
|
||||
/// Make a request to the `POST /api/v3/configure/database` API
|
||||
pub async fn api_v3_configure_db_create(&self, db: impl Into<String> + Send) -> Result<()> {
|
||||
pub async fn api_v3_configure_db_create(
|
||||
&self,
|
||||
db: impl Into<String> + Send,
|
||||
retention_period: Option<Duration>,
|
||||
) -> Result<()> {
|
||||
let _bytes = self
|
||||
.send_json_get_bytes(
|
||||
Method::POST,
|
||||
"/api/v3/configure/database",
|
||||
Some(CreateDatabaseRequest { db: db.into() }),
|
||||
Some(CreateDatabaseRequest {
|
||||
db: db.into(),
|
||||
retention_period,
|
||||
}),
|
||||
None::<()>,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Make a request to the `PUT /api/v3/configure/database` API
|
||||
pub async fn api_v3_configure_db_update(
|
||||
&self,
|
||||
db: impl Into<String> + Send,
|
||||
retention_period: Option<Duration>,
|
||||
) -> Result<()> {
|
||||
let _bytes = self
|
||||
.send_json_get_bytes(
|
||||
Method::PUT,
|
||||
"/api/v3/configure/database",
|
||||
Some(UpdateDatabaseRequest {
|
||||
db: db.into(),
|
||||
retention_period,
|
||||
}),
|
||||
None::<()>,
|
||||
None,
|
||||
)
|
||||
|
|
|
@ -1198,13 +1198,19 @@ impl HttpApi {
|
|||
}
|
||||
|
||||
async fn create_database(&self, req: Request) -> Result<Response> {
|
||||
let CreateDatabaseRequest { db } = self.read_body_json(req).await?;
|
||||
let CreateDatabaseRequest {
|
||||
db,
|
||||
retention_period,
|
||||
} = self.read_body_json(req).await?;
|
||||
validate_db_name(&db, false)?;
|
||||
self.write_buffer.catalog().create_database(&db).await?;
|
||||
Ok(ResponseBuilder::new()
|
||||
.status(StatusCode::OK)
|
||||
.body(empty_response_body())
|
||||
.unwrap())
|
||||
self.write_buffer
|
||||
.catalog()
|
||||
.create_database_opts(
|
||||
&db,
|
||||
influxdb3_catalog::catalog::CreateDatabaseOptions { retention_period },
|
||||
)
|
||||
.await?;
|
||||
Ok(Response::new(empty_response_body()))
|
||||
}
|
||||
|
||||
/// Endpoint for testing a plugin that will be trigger on WAL writes.
|
||||
|
@ -1282,6 +1288,27 @@ impl HttpApi {
|
|||
}
|
||||
}
|
||||
|
||||
async fn update_database(&self, req: Request) -> Result<Response> {
|
||||
let update_req = self.read_body_json::<UpdateDatabaseRequest>(req).await?;
|
||||
|
||||
match update_req.retention_period {
|
||||
Some(duration) => {
|
||||
self.write_buffer
|
||||
.catalog()
|
||||
.set_retention_period_for_database(&update_req.db, duration)
|
||||
.await?;
|
||||
}
|
||||
None => {
|
||||
self.write_buffer
|
||||
.catalog()
|
||||
.clear_retention_period_for_database(&update_req.db)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Response::new(empty_response_body()))
|
||||
}
|
||||
|
||||
async fn delete_database(&self, req: Request) -> Result<Response> {
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
let delete_req = serde_urlencoded::from_str::<DeleteDatabaseRequest>(query)?;
|
||||
|
@ -1289,10 +1316,7 @@ impl HttpApi {
|
|||
.catalog()
|
||||
.soft_delete_database(&delete_req.db)
|
||||
.await?;
|
||||
Ok(ResponseBuilder::new()
|
||||
.status(StatusCode::OK)
|
||||
.body(empty_response_body())
|
||||
.unwrap())
|
||||
Ok(Response::new(empty_response_body()))
|
||||
}
|
||||
|
||||
async fn create_table(&self, req: Request) -> Result<Response> {
|
||||
|
@ -1315,10 +1339,7 @@ impl HttpApi {
|
|||
.collect::<Vec<(String, FieldDataType)>>(),
|
||||
)
|
||||
.await?;
|
||||
Ok(ResponseBuilder::new()
|
||||
.status(StatusCode::OK)
|
||||
.body(empty_response_body())
|
||||
.unwrap())
|
||||
Ok(Response::new(empty_response_body()))
|
||||
}
|
||||
|
||||
async fn delete_table(&self, req: Request) -> Result<Response> {
|
||||
|
@ -1924,6 +1945,9 @@ pub(crate) async fn route_request(
|
|||
(Method::POST, all_paths::API_V3_CONFIGURE_DATABASE) => {
|
||||
http_server.create_database(req).await
|
||||
}
|
||||
(Method::PUT, all_paths::API_V3_CONFIGURE_DATABASE) => {
|
||||
http_server.update_database(req).await
|
||||
}
|
||||
(Method::DELETE, all_paths::API_V3_CONFIGURE_DATABASE) => {
|
||||
http_server.delete_database(req).await
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{StringViewBuilder, UInt64Builder};
|
||||
use arrow_array::{ArrayRef, RecordBatch};
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use datafusion::{error::DataFusionError, logical_expr::Expr};
|
||||
use influxdb3_catalog::catalog::{Catalog, RetentionPeriod};
|
||||
use iox_system_tables::IoxSystemTable;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct DatabasesTable {
|
||||
catalog: Arc<Catalog>,
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
impl DatabasesTable {
|
||||
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
|
||||
Self {
|
||||
catalog,
|
||||
schema: databases_schema(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn databases_schema() -> SchemaRef {
|
||||
let columns = vec![
|
||||
Field::new("database_name", DataType::Utf8View, false),
|
||||
Field::new("retention_period_ns", DataType::UInt64, true),
|
||||
Field::new("deleted", DataType::Boolean, false),
|
||||
];
|
||||
Arc::new(Schema::new(columns))
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IoxSystemTable for DatabasesTable {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_filters: Option<Vec<Expr>>,
|
||||
_limit: Option<usize>,
|
||||
) -> Result<RecordBatch, DataFusionError> {
|
||||
let databases = self.catalog.list_db_schema();
|
||||
|
||||
let mut database_name_arr = StringViewBuilder::with_capacity(databases.len());
|
||||
let mut retention_period_arr = UInt64Builder::with_capacity(databases.len());
|
||||
let mut deleted_arr = arrow::array::BooleanBuilder::with_capacity(databases.len());
|
||||
|
||||
for db in databases {
|
||||
database_name_arr.append_value(&db.name);
|
||||
|
||||
match db.retention_period {
|
||||
RetentionPeriod::Indefinite => retention_period_arr.append_null(),
|
||||
RetentionPeriod::Duration(duration) => {
|
||||
retention_period_arr.append_value(duration.as_nanos() as u64);
|
||||
}
|
||||
}
|
||||
|
||||
deleted_arr.append_value(db.deleted);
|
||||
}
|
||||
|
||||
let columns: Vec<ArrayRef> = vec![
|
||||
Arc::new(database_name_arr.finish()),
|
||||
Arc::new(retention_period_arr.finish()),
|
||||
Arc::new(deleted_arr.finish()),
|
||||
];
|
||||
|
||||
RecordBatch::try_new(self.schema(), columns).map_err(DataFusionError::from)
|
||||
}
|
||||
}
|
|
@ -18,16 +18,20 @@ use parquet_files::ParquetFilesTable;
|
|||
use tokens::TokenSystemTable;
|
||||
use tonic::async_trait;
|
||||
|
||||
use self::{last_caches::LastCachesTable, queries::QueriesTable};
|
||||
use self::{
|
||||
databases::DatabasesTable, last_caches::LastCachesTable, queries::QueriesTable,
|
||||
tables::TablesTable,
|
||||
};
|
||||
|
||||
mod databases;
|
||||
mod distinct_caches;
|
||||
mod generations;
|
||||
mod last_caches;
|
||||
mod parquet_files;
|
||||
use crate::system_tables::python_call::{ProcessingEngineLogsTable, ProcessingEngineTriggerTable};
|
||||
|
||||
mod python_call;
|
||||
mod queries;
|
||||
mod tables;
|
||||
mod tokens;
|
||||
|
||||
pub(crate) const SYSTEM_SCHEMA_NAME: &str = "system";
|
||||
|
@ -38,6 +42,8 @@ pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";
|
|||
pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches";
|
||||
pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
|
||||
pub(crate) const TOKENS_TABLE_NAME: &str = "tokens";
|
||||
pub(crate) const DATABASES_TABLE_NAME: &str = "databases";
|
||||
pub(crate) const TABLES_TABLE_NAME: &str = "tables";
|
||||
pub(crate) const GENERATION_DURATIONS_TABLE_NAME: &str = "generation_durations";
|
||||
|
||||
const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers";
|
||||
|
@ -140,6 +146,18 @@ impl AllSystemSchemaTablesProvider {
|
|||
started_with_auth,
|
||||
)))),
|
||||
);
|
||||
tables.insert(
|
||||
DATABASES_TABLE_NAME,
|
||||
Arc::new(SystemTableProvider::new(Arc::new(DatabasesTable::new(
|
||||
Arc::clone(&catalog),
|
||||
)))),
|
||||
);
|
||||
tables.insert(
|
||||
TABLES_TABLE_NAME,
|
||||
Arc::new(SystemTableProvider::new(Arc::new(TablesTable::new(
|
||||
Arc::clone(&catalog),
|
||||
)))),
|
||||
);
|
||||
tables.insert(
|
||||
GENERATION_DURATIONS_TABLE_NAME,
|
||||
Arc::new(SystemTableProvider::new(Arc::new(
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{StringViewBuilder, UInt64Builder};
|
||||
use arrow_array::{ArrayRef, RecordBatch};
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use datafusion::{error::DataFusionError, logical_expr::Expr};
|
||||
use influxdb3_catalog::catalog::Catalog;
|
||||
use iox_system_tables::IoxSystemTable;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct TablesTable {
|
||||
catalog: Arc<Catalog>,
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
impl TablesTable {
|
||||
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
|
||||
Self {
|
||||
catalog,
|
||||
schema: tables_schema(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn tables_schema() -> SchemaRef {
|
||||
let columns = vec![
|
||||
Field::new("database_name", DataType::Utf8View, false),
|
||||
Field::new("table_name", DataType::Utf8View, false),
|
||||
Field::new("column_count", DataType::UInt64, false),
|
||||
Field::new("series_key_columns", DataType::Utf8View, false),
|
||||
Field::new("last_cache_count", DataType::UInt64, false),
|
||||
Field::new("distinct_cache_count", DataType::UInt64, false),
|
||||
Field::new("deleted", DataType::Boolean, false),
|
||||
];
|
||||
Arc::new(Schema::new(columns))
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IoxSystemTable for TablesTable {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
_filters: Option<Vec<Expr>>,
|
||||
_limit: Option<usize>,
|
||||
) -> Result<RecordBatch, DataFusionError> {
|
||||
let databases = self.catalog.list_db_schema();
|
||||
|
||||
// Count total tables across all databases
|
||||
let total_tables: usize = databases
|
||||
.iter()
|
||||
.map(|db| db.tables.resource_iter().count())
|
||||
.sum();
|
||||
|
||||
let mut database_name_arr = StringViewBuilder::with_capacity(total_tables);
|
||||
let mut table_name_arr = StringViewBuilder::with_capacity(total_tables);
|
||||
let mut column_count_arr = UInt64Builder::with_capacity(total_tables);
|
||||
let mut series_key_arr = StringViewBuilder::with_capacity(total_tables);
|
||||
let mut last_cache_count_arr = UInt64Builder::with_capacity(total_tables);
|
||||
let mut distinct_cache_count_arr = UInt64Builder::with_capacity(total_tables);
|
||||
let mut deleted_arr = arrow::array::BooleanBuilder::with_capacity(total_tables);
|
||||
|
||||
for db in databases {
|
||||
for table in db.tables.resource_iter() {
|
||||
database_name_arr.append_value(&db.name);
|
||||
table_name_arr.append_value(&table.table_name);
|
||||
column_count_arr.append_value(table.columns.resource_iter().count() as u64);
|
||||
|
||||
// Build series key string
|
||||
let series_key_str = table.series_key_names.join(", ");
|
||||
series_key_arr.append_value(&series_key_str);
|
||||
|
||||
last_cache_count_arr.append_value(table.last_caches.resource_iter().count() as u64);
|
||||
distinct_cache_count_arr
|
||||
.append_value(table.distinct_caches.resource_iter().count() as u64);
|
||||
deleted_arr.append_value(table.deleted);
|
||||
}
|
||||
}
|
||||
|
||||
let columns: Vec<ArrayRef> = vec![
|
||||
Arc::new(database_name_arr.finish()),
|
||||
Arc::new(table_name_arr.finish()),
|
||||
Arc::new(column_count_arr.finish()),
|
||||
Arc::new(series_key_arr.finish()),
|
||||
Arc::new(last_cache_count_arr.finish()),
|
||||
Arc::new(distinct_cache_count_arr.finish()),
|
||||
Arc::new(deleted_arr.finish()),
|
||||
];
|
||||
|
||||
RecordBatch::try_new(self.schema(), columns).map_err(DataFusionError::from)
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ chrono.workspace = true
|
|||
serde.workspace = true
|
||||
hashbrown.workspace = true
|
||||
hex.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
thiserror.workspace = true
|
||||
uuid.workspace = true
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use influxdb3_authz::TokenInfo;
|
||||
|
@ -230,6 +230,16 @@ pub struct ShowDatabasesRequest {
|
|||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct CreateDatabaseRequest {
|
||||
pub db: String,
|
||||
#[serde(with = "humantime_serde", default)]
|
||||
pub retention_period: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Request definition for the `PUT /api/v3/configure/database` API
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct UpdateDatabaseRequest {
|
||||
pub db: String,
|
||||
#[serde(with = "humantime_serde", default)]
|
||||
pub retention_period: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Request definition for the `DELETE /api/v3/configure/database` API
|
||||
|
|
Loading…
Reference in New Issue