feat: CLI to create and delete metadata caches (#25595)
This adds two new CLI commands to the `influxdb3` binary: * `influxdb3 meta-cache create` * `influxdb3 meta-cache delete` To create and delete metadata caches, respectively. A basic integration test was added to check that this works E2E. The `influxdb3_client` was updated with methods to create and delete metadata caches, and which is what the CLI commands use under the hood.pull/25598/head
parent
9ead1dfe4b
commit
13ab41fa1f
|
@ -0,0 +1,86 @@
|
|||
use std::{error::Error, num::NonZeroUsize};
|
||||
|
||||
use secrecy::ExposeSecret;
|
||||
|
||||
use crate::commands::common::{InfluxDb3Config, SeparatedList};
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
#[clap(flatten)]
|
||||
influxdb3_config: InfluxDb3Config,
|
||||
|
||||
#[clap(flatten)]
|
||||
meta_cache_config: MetaCacheConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct MetaCacheConfig {
|
||||
/// The table name for which the cache is being created
|
||||
#[clap(short = 't', long = "table")]
|
||||
table: String,
|
||||
|
||||
/// Give the name of the cache.
|
||||
///
|
||||
/// This will be automatically generated if not provided
|
||||
#[clap(long = "cache-name")]
|
||||
cache_name: Option<String>,
|
||||
|
||||
/// Which columns in the table to cache distinct values for, as a comma-separated list of the
|
||||
/// column names.
|
||||
///
|
||||
/// The cache is a hieararchical structure, with a level for each column specified; the order
|
||||
/// specified here will determine the order of the levels from top-to-bottom of the cache
|
||||
/// hierarchy.
|
||||
#[clap(long = "columns")]
|
||||
columns: SeparatedList<String>,
|
||||
|
||||
/// The maximum number of distinct value combinations to hold in the cache
|
||||
#[clap(long = "max-cardinality")]
|
||||
max_cardinality: Option<NonZeroUsize>,
|
||||
|
||||
/// The maximum age of an entry in the cache entered as a human-readable duration, e.g., "30d", "24h"
|
||||
#[clap(long = "max-age")]
|
||||
max_age: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||
let InfluxDb3Config {
|
||||
host_url,
|
||||
database_name,
|
||||
auth_token,
|
||||
} = config.influxdb3_config;
|
||||
let MetaCacheConfig {
|
||||
table,
|
||||
cache_name,
|
||||
columns,
|
||||
max_cardinality,
|
||||
max_age,
|
||||
} = config.meta_cache_config;
|
||||
|
||||
let mut client = influxdb3_client::Client::new(host_url)?;
|
||||
if let Some(t) = auth_token {
|
||||
client = client.with_auth_token(t.expose_secret());
|
||||
}
|
||||
let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns);
|
||||
|
||||
// Add the optional stuff:
|
||||
if let Some(name) = cache_name {
|
||||
b = b.name(name);
|
||||
}
|
||||
if let Some(max_cardinality) = max_cardinality {
|
||||
b = b.max_cardinality(max_cardinality);
|
||||
}
|
||||
if let Some(max_age) = max_age {
|
||||
b = b.max_age(max_age.into());
|
||||
}
|
||||
|
||||
match b.send().await? {
|
||||
Some(def) => println!(
|
||||
"new cache created: {}",
|
||||
serde_json::to_string_pretty(&def).expect("serialize meta cache definition as JSON")
|
||||
),
|
||||
None => println!("a cache already exists for the provided parameters"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
use std::error::Error;
|
||||
|
||||
use secrecy::ExposeSecret;
|
||||
|
||||
use crate::commands::common::InfluxDb3Config;
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub struct Config {
|
||||
#[clap(flatten)]
|
||||
influxdb3_config: InfluxDb3Config,
|
||||
|
||||
/// The table under which the cache is being deleted
|
||||
#[clap(short = 't', long = "table")]
|
||||
table: String,
|
||||
|
||||
/// The name of the cache being deleted
|
||||
#[clap(short = 'n', long = "cache-name")]
|
||||
cache_name: String,
|
||||
}
|
||||
|
||||
pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||
let InfluxDb3Config {
|
||||
host_url,
|
||||
database_name,
|
||||
auth_token,
|
||||
} = config.influxdb3_config;
|
||||
let mut client = influxdb3_client::Client::new(host_url)?;
|
||||
if let Some(t) = auth_token {
|
||||
client = client.with_auth_token(t.expose_secret());
|
||||
}
|
||||
client
|
||||
.api_v3_configure_meta_cache_delete(database_name, config.table, config.cache_name)
|
||||
.await?;
|
||||
|
||||
println!("meta cache deleted successfully");
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
use std::error::Error;
|
||||
|
||||
pub mod create;
|
||||
pub mod delete;
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
pub(crate) struct Config {
|
||||
#[clap(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Parser)]
|
||||
enum Command {
|
||||
/// Create a new metadata cache
|
||||
Create(create::Config),
|
||||
|
||||
/// Delete a metadata cache
|
||||
Delete(delete::Config),
|
||||
}
|
||||
|
||||
pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
||||
match config.command {
|
||||
Command::Create(config) => create::command(config).await,
|
||||
Command::Delete(config) => delete::command(config).await,
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ mod commands {
|
|||
pub(crate) mod common;
|
||||
pub mod last_cache;
|
||||
pub mod manage;
|
||||
pub mod meta_cache;
|
||||
pub mod query;
|
||||
pub mod serve;
|
||||
pub mod token;
|
||||
|
@ -93,6 +94,9 @@ enum Command {
|
|||
/// Manage last-n-value caches
|
||||
LastCache(commands::last_cache::Config),
|
||||
|
||||
/// Manage metadata caches
|
||||
MetaCache(commands::meta_cache::Config),
|
||||
|
||||
/// Manage database (delete only for the moment)
|
||||
Database(commands::manage::database::ManageDatabaseConfig),
|
||||
|
||||
|
@ -155,6 +159,12 @@ fn main() -> Result<(), std::io::Error> {
|
|||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::MetaCache(config)) => {
|
||||
if let Err(e) = commands::meta_cache::command(config).await {
|
||||
eprintln!("Metadata Cache command faild: {e}");
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Database(config)) => {
|
||||
if let Err(e) = commands::manage::database::delete_database(config).await {
|
||||
eprintln!("Database delete command failed: {e}");
|
||||
|
|
|
@ -10,6 +10,28 @@ use test_helpers::assert_contains;
|
|||
|
||||
use crate::TestServer;
|
||||
|
||||
pub fn run(args: &[&str]) -> String {
|
||||
let process = Command::cargo_bin("influxdb3")
|
||||
.unwrap()
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.output()
|
||||
.unwrap();
|
||||
|
||||
String::from_utf8_lossy(&process.stdout).trim().into()
|
||||
}
|
||||
|
||||
pub fn run_and_err(args: &[&str]) -> String {
|
||||
let process = Command::cargo_bin("influxdb3")
|
||||
.unwrap()
|
||||
.args(args)
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.unwrap();
|
||||
|
||||
String::from_utf8_lossy(&process.stderr).trim().into()
|
||||
}
|
||||
|
||||
pub fn run_with_confirmation(args: &[&str]) -> String {
|
||||
let mut child_process = Command::cargo_bin("influxdb3")
|
||||
.unwrap()
|
||||
|
@ -153,3 +175,83 @@ async fn test_delete_missing_table() {
|
|||
debug!(result = ?result, "delete missing table");
|
||||
assert_contains!(&result, "404");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_delete_meta_cache() {
|
||||
let server = TestServer::spawn().await;
|
||||
let server_addr = server.client_addr();
|
||||
let db_name = "foo";
|
||||
let table_name = "bar";
|
||||
server
|
||||
.write_lp_to_db(
|
||||
db_name,
|
||||
format!("{table_name},t1=a,t2=aa,t3=aaa f1=true,f2=\"hello\",f3=42"),
|
||||
influxdb3_client::Precision::Second,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let cache_name = "baz";
|
||||
// first create the cache:
|
||||
let result = run(&[
|
||||
"meta-cache",
|
||||
"create",
|
||||
"--host",
|
||||
&server_addr,
|
||||
"--dbname",
|
||||
db_name,
|
||||
"--table",
|
||||
table_name,
|
||||
"--cache-name",
|
||||
cache_name,
|
||||
"--columns",
|
||||
"t1,t2",
|
||||
]);
|
||||
assert_contains!(&result, "new cache created");
|
||||
// doing the same thing over again will be a no-op
|
||||
let result = run(&[
|
||||
"meta-cache",
|
||||
"create",
|
||||
"--host",
|
||||
&server_addr,
|
||||
"--dbname",
|
||||
db_name,
|
||||
"--table",
|
||||
table_name,
|
||||
"--cache-name",
|
||||
cache_name,
|
||||
"--columns",
|
||||
"t1,t2",
|
||||
]);
|
||||
assert_contains!(
|
||||
&result,
|
||||
"a cache already exists for the provided parameters"
|
||||
);
|
||||
// now delete it:
|
||||
let result = run(&[
|
||||
"meta-cache",
|
||||
"delete",
|
||||
"--host",
|
||||
&server_addr,
|
||||
"--dbname",
|
||||
db_name,
|
||||
"--table",
|
||||
table_name,
|
||||
"--cache-name",
|
||||
cache_name,
|
||||
]);
|
||||
assert_contains!(&result, "meta cache deleted successfully");
|
||||
// trying to delete again should result in an error as the cache no longer exists:
|
||||
let result = run_and_err(&[
|
||||
"meta-cache",
|
||||
"delete",
|
||||
"--host",
|
||||
&server_addr,
|
||||
"--dbname",
|
||||
db_name,
|
||||
"--table",
|
||||
table_name,
|
||||
"--cache-name",
|
||||
cache_name,
|
||||
]);
|
||||
assert_contains!(&result, "[404 Not Found]: cache not found");
|
||||
}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
use std::{collections::HashMap, fmt::Display, string::FromUtf8Error};
|
||||
use std::{
|
||||
collections::HashMap, fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use iox_query_params::StatementParam;
|
||||
|
@ -259,6 +261,71 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Compose a request to the `POST /api/v3/configure/meta_cache` API
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// # use influxdb3_client::Client;
|
||||
/// # use std::num::NonZeroUsize;
|
||||
/// # use std::time::Duration;
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
/// let client = Client::new("http://localhost:8181")?;
|
||||
/// let resp = client
|
||||
/// .api_v3_configure_meta_cache_create("db_name", "table_name", ["col1", "col2"])
|
||||
/// .name("cache_name")
|
||||
/// .max_cardinality(NonZeroUsize::new(1_000).unwrap())
|
||||
/// .max_age(Duration::from_secs(3_600))
|
||||
/// .send()
|
||||
/// .await
|
||||
/// .expect("send create meta cache request");
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn api_v3_configure_meta_cache_create(
|
||||
&self,
|
||||
db: impl Into<String>,
|
||||
table: impl Into<String>,
|
||||
columns: impl IntoIterator<Item: Into<String>>,
|
||||
) -> CreateMetaCacheRequestBuilder<'_> {
|
||||
CreateMetaCacheRequestBuilder::new(self, db, table, columns)
|
||||
}
|
||||
|
||||
/// Make a request to the `DELETE /api/v3/configure/meta_cache` API
|
||||
pub async fn api_v3_configure_meta_cache_delete(
|
||||
&self,
|
||||
db: impl Into<String> + Send,
|
||||
table: impl Into<String> + Send,
|
||||
name: impl Into<String> + Send,
|
||||
) -> Result<()> {
|
||||
let url = self.base_url.join("/api/v3/configure/meta_cache")?;
|
||||
#[derive(Serialize)]
|
||||
struct Req {
|
||||
db: String,
|
||||
table: String,
|
||||
name: String,
|
||||
}
|
||||
let mut req = self.http_client.delete(url).json(&Req {
|
||||
db: db.into(),
|
||||
table: table.into(),
|
||||
name: name.into(),
|
||||
});
|
||||
if let Some(token) = &self.auth_token {
|
||||
req = req.bearer_auth(token.expose_secret());
|
||||
}
|
||||
let resp = req.send().await.map_err(|src| {
|
||||
Error::request_send(Method::DELETE, "/api/v3/configure/meta_cache", src)
|
||||
})?;
|
||||
let status = resp.status();
|
||||
match status {
|
||||
StatusCode::OK => Ok(()),
|
||||
code => Err(Error::ApiError {
|
||||
code,
|
||||
message: resp.text().await.map_err(Error::Text)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Make a request to the `DELETE /api/v3/configure/database?db=foo` API
|
||||
pub async fn api_v3_configure_db_delete(&self, db: impl AsRef<str> + Send) -> Result<()> {
|
||||
let api_path = "/api/v3/configure/database";
|
||||
|
@ -786,6 +853,103 @@ pub enum LastCacheValueColumnsDef {
|
|||
AllNonKeyColumns,
|
||||
}
|
||||
|
||||
/// Type for composing requests to the `POST /api/v3/configure/meta_cache` API created by the
|
||||
/// [`Client::api_v3_configure_meta_cache_create`] method
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct CreateMetaCacheRequestBuilder<'c> {
|
||||
#[serde(skip_serializing)]
|
||||
client: &'c Client,
|
||||
db: String,
|
||||
table: String,
|
||||
columns: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_cardinality: Option<NonZeroUsize>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
max_age: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<'c> CreateMetaCacheRequestBuilder<'c> {
|
||||
fn new(
|
||||
client: &'c Client,
|
||||
db: impl Into<String>,
|
||||
table: impl Into<String>,
|
||||
columns: impl IntoIterator<Item: Into<String>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
db: db.into(),
|
||||
table: table.into(),
|
||||
columns: columns.into_iter().map(Into::into).collect(),
|
||||
name: None,
|
||||
max_cardinality: None,
|
||||
max_age: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify the name of the cache to be created, `snake_case` names are encouraged
|
||||
pub fn name(mut self, name: impl Into<String>) -> Self {
|
||||
self.name = Some(name.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Specify the maximum cardinality for the cache as a non-zero unsigned integer
|
||||
pub fn max_cardinality(mut self, max_cardinality: NonZeroUsize) -> Self {
|
||||
self.max_cardinality = Some(max_cardinality);
|
||||
self
|
||||
}
|
||||
|
||||
/// Specify the maximum age for entries in the cache
|
||||
pub fn max_age(mut self, max_age: Duration) -> Self {
|
||||
self.max_age = Some(max_age);
|
||||
self
|
||||
}
|
||||
|
||||
/// Send the create cache request
|
||||
pub async fn send(self) -> Result<Option<MetaCacheCreatedResponse>> {
|
||||
let url = self.client.base_url.join("/api/v3/configure/meta_cache")?;
|
||||
let mut req = self.client.http_client.post(url).json(&self);
|
||||
if let Some(token) = &self.client.auth_token {
|
||||
req = req.bearer_auth(token.expose_secret());
|
||||
}
|
||||
let resp = req.send().await.map_err(|src| {
|
||||
Error::request_send(Method::POST, "/api/v3/configure/meta_cache", src)
|
||||
})?;
|
||||
let status = resp.status();
|
||||
match status {
|
||||
StatusCode::CREATED => {
|
||||
let content = resp
|
||||
.json::<MetaCacheCreatedResponse>()
|
||||
.await
|
||||
.map_err(Error::Json)?;
|
||||
Ok(Some(content))
|
||||
}
|
||||
StatusCode::NO_CONTENT => Ok(None),
|
||||
code => Err(Error::ApiError {
|
||||
code,
|
||||
message: resp.text().await.map_err(Error::Text)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct MetaCacheCreatedResponse {
|
||||
/// The id of the table the cache was created on
|
||||
pub table_id: u32,
|
||||
/// The name of the table the cache was created on
|
||||
pub table_name: String,
|
||||
/// The name of the created cache
|
||||
pub cache_name: String,
|
||||
/// The columns in the cache
|
||||
pub column_ids: Vec<u32>,
|
||||
/// The maximum number of unique value combinations the cache will hold
|
||||
pub max_cardinality: usize,
|
||||
/// The maximum age for entries in the cache
|
||||
pub max_age_seconds: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use mockito::{Matcher, Server};
|
||||
|
|
Loading…
Reference in New Issue