refactor: rename plugin activate/deactivate to enable/disable (#25793)
Closes #25789pull/25794/head
parent
1ff4f76896
commit
bdcdb4f296
|
@ -32,7 +32,7 @@ impl Config {
|
|||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
enum SubCommand {
|
||||
/// Deactivate a plugin trigger
|
||||
/// Disable a plugin trigger
|
||||
Trigger(TriggerConfig),
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ struct TriggerConfig {
|
|||
#[clap(flatten)]
|
||||
influxdb3_config: InfluxDb3Config,
|
||||
|
||||
/// Name of trigger to deactivate
|
||||
/// Name of trigger to disable
|
||||
#[clap(required = true)]
|
||||
trigger_name: String,
|
||||
}
|
||||
|
@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
|||
trigger_name,
|
||||
}) => {
|
||||
client
|
||||
.api_v3_configure_processing_engine_trigger_deactivate(database_name, &trigger_name)
|
||||
.api_v3_configure_processing_engine_trigger_disable(database_name, &trigger_name)
|
||||
.await?;
|
||||
println!("Trigger {} deactivated successfully", trigger_name);
|
||||
println!("Trigger {} disabled successfully", trigger_name);
|
||||
}
|
||||
}
|
||||
Ok(())
|
|
@ -32,7 +32,7 @@ impl Config {
|
|||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
enum SubCommand {
|
||||
/// Activate a trigger to enable plugin execution
|
||||
/// Enable a trigger to enable plugin execution
|
||||
Trigger(TriggerConfig),
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ struct TriggerConfig {
|
|||
#[clap(flatten)]
|
||||
influxdb3_config: InfluxDb3Config,
|
||||
|
||||
/// Name of trigger to manage
|
||||
/// Name of trigger to enable
|
||||
#[clap(required = true)]
|
||||
trigger_name: String,
|
||||
}
|
||||
|
@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
|
|||
trigger_name,
|
||||
}) => {
|
||||
client
|
||||
.api_v3_configure_processing_engine_trigger_activate(database_name, &trigger_name)
|
||||
.api_v3_configure_processing_engine_trigger_enable(database_name, &trigger_name)
|
||||
.await?;
|
||||
println!("Trigger {} activated successfully", trigger_name);
|
||||
println!("Trigger {} enabled successfully", trigger_name);
|
||||
}
|
||||
}
|
||||
Ok(())
|
|
@ -21,11 +21,11 @@ use trogging::{
|
|||
};
|
||||
|
||||
mod commands {
|
||||
pub mod activate;
|
||||
pub(crate) mod common;
|
||||
pub mod create;
|
||||
pub mod deactivate;
|
||||
pub mod delete;
|
||||
pub mod disable;
|
||||
pub mod enable;
|
||||
pub mod query;
|
||||
pub mod serve;
|
||||
pub mod show;
|
||||
|
@ -84,14 +84,14 @@ struct Config {
|
|||
#[derive(Debug, clap::Parser)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum Command {
|
||||
/// Activate a resource such as a trigger
|
||||
Activate(commands::activate::Config),
|
||||
/// Enable a resource such as a trigger
|
||||
Enable(commands::enable::Config),
|
||||
|
||||
/// Create a resource such as a database or auth token
|
||||
Create(commands::create::Config),
|
||||
|
||||
/// Deactivate a resource such as a trigger
|
||||
Deactivate(commands::deactivate::Config),
|
||||
/// Disable a resource such as a trigger
|
||||
Disable(commands::disable::Config),
|
||||
|
||||
/// Delete a resource such as a database or table
|
||||
Delete(commands::delete::Config),
|
||||
|
@ -136,9 +136,9 @@ fn main() -> Result<(), std::io::Error> {
|
|||
|
||||
match config.command {
|
||||
None => println!("command required, -h/--help for help"),
|
||||
Some(Command::Activate(config)) => {
|
||||
if let Err(e) = commands::activate::command(config).await {
|
||||
eprintln!("Activate command failed: {e}");
|
||||
Some(Command::Enable(config)) => {
|
||||
if let Err(e) = commands::enable::command(config).await {
|
||||
eprintln!("Enable command failed: {e}");
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
|
@ -148,9 +148,9 @@ fn main() -> Result<(), std::io::Error> {
|
|||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
Some(Command::Deactivate(config)) => {
|
||||
if let Err(e) = commands::deactivate::command(config).await {
|
||||
eprintln!("Deactivate command failed: {e}");
|
||||
Some(Command::Disable(config)) => {
|
||||
if let Err(e) = commands::disable::command(config).await {
|
||||
eprintln!("Disable command failed: {e}");
|
||||
std::process::exit(ReturnCode::Failure as _)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -520,7 +520,7 @@ def process_rows(iterator, output):
|
|||
#[cfg(feature = "system-py")]
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_create_trigger_and_run() {
|
||||
// create a plugin and trigger and write data in, verifying that the trigger is activated
|
||||
// create a plugin and trigger and write data in, verifying that the trigger is enabled
|
||||
// and sent data
|
||||
let server = TestServer::spawn().await;
|
||||
let server_addr = server.client_addr();
|
||||
|
@ -560,7 +560,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
plugin_name,
|
||||
]);
|
||||
|
||||
// creating the trigger should activate it
|
||||
// creating the trigger should enable it
|
||||
let result = run_with_confirmation(&[
|
||||
"create",
|
||||
"trigger",
|
||||
|
@ -589,9 +589,18 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
.await
|
||||
.expect("write to db");
|
||||
|
||||
// query to see if the processed data is there
|
||||
let expected = json!(
|
||||
[
|
||||
{"table_name": "cpu", "row_count": 4},
|
||||
{"table_name": "mem", "row_count": 1}
|
||||
]
|
||||
);
|
||||
|
||||
// query to see if the processed data is there. we loop because it could take a bit to write
|
||||
// back the data. There's also a condition where the table may have been created, but the
|
||||
// write hasn't happend yet, which returns empty results. This ensures we don't hit that race.
|
||||
let mut check_count = 0;
|
||||
let result = loop {
|
||||
loop {
|
||||
match server
|
||||
.api_v3_query_sql(&[
|
||||
("db", db_name),
|
||||
|
@ -602,7 +611,18 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
.json::<Value>()
|
||||
.await
|
||||
{
|
||||
Ok(value) => break value,
|
||||
Ok(value) => {
|
||||
if value == expected {
|
||||
return;
|
||||
}
|
||||
check_count += 1;
|
||||
if check_count > 10 {
|
||||
panic!(
|
||||
"Unexpected query result, got: {:#?}, expected {:#?}",
|
||||
value, expected
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
check_count += 1;
|
||||
if check_count > 10 {
|
||||
|
@ -611,21 +631,11 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
result,
|
||||
json!(
|
||||
[
|
||||
{"table_name": "cpu", "row_count": 4},
|
||||
{"table_name": "mem", "row_count": 1}
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_trigger_activation() {
|
||||
async fn test_trigger_enable() {
|
||||
let server = TestServer::spawn().await;
|
||||
let server_addr = server.client_addr();
|
||||
let db_name = "foo";
|
||||
|
@ -668,9 +678,9 @@ def process_rows(iterator, output):
|
|||
trigger_name,
|
||||
]);
|
||||
|
||||
// Test activation
|
||||
// Test enabling
|
||||
let result = run_with_confirmation(&[
|
||||
"activate",
|
||||
"enable",
|
||||
"trigger",
|
||||
"--database",
|
||||
db_name,
|
||||
|
@ -678,12 +688,12 @@ def process_rows(iterator, output):
|
|||
&server_addr,
|
||||
trigger_name,
|
||||
]);
|
||||
debug!(result = ?result, "activate trigger");
|
||||
assert_contains!(&result, "Trigger test_trigger activated successfully");
|
||||
debug!(result = ?result, "enable trigger");
|
||||
assert_contains!(&result, "Trigger test_trigger enabled successfully");
|
||||
|
||||
// Test deactivation
|
||||
// Test disable
|
||||
let result = run_with_confirmation(&[
|
||||
"deactivate",
|
||||
"disable",
|
||||
"trigger",
|
||||
"--database",
|
||||
db_name,
|
||||
|
@ -691,19 +701,19 @@ def process_rows(iterator, output):
|
|||
&server_addr,
|
||||
trigger_name,
|
||||
]);
|
||||
debug!(result = ?result, "deactivate trigger");
|
||||
assert_contains!(&result, "Trigger test_trigger deactivated successfully");
|
||||
debug!(result = ?result, "disable trigger");
|
||||
assert_contains!(&result, "Trigger test_trigger disabled successfully");
|
||||
}
|
||||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_delete_active_trigger() {
|
||||
async fn test_delete_enabled_trigger() {
|
||||
let server = TestServer::spawn().await;
|
||||
let server_addr = server.client_addr();
|
||||
let db_name = "foo";
|
||||
let plugin_name = "test_plugin";
|
||||
let trigger_name = "test_trigger";
|
||||
|
||||
// Setup: create database, plugin, and active trigger
|
||||
// Setup: create database, plugin, and enable trigger
|
||||
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
|
||||
|
||||
let plugin_file = create_plugin_file(
|
||||
|
@ -739,17 +749,7 @@ def process_rows(iterator, output):
|
|||
trigger_name,
|
||||
]);
|
||||
|
||||
run_with_confirmation(&[
|
||||
"activate",
|
||||
"trigger",
|
||||
"--database",
|
||||
db_name,
|
||||
"--host",
|
||||
&server_addr,
|
||||
trigger_name,
|
||||
]);
|
||||
|
||||
// Try to delete active trigger without force flag
|
||||
// Try to delete the enabled trigger without force flag
|
||||
let result = run_with_confirmation_and_err(&[
|
||||
"delete",
|
||||
"trigger",
|
||||
|
@ -759,7 +759,7 @@ def process_rows(iterator, output):
|
|||
&server_addr,
|
||||
trigger_name,
|
||||
]);
|
||||
debug!(result = ?result, "delete active trigger without force");
|
||||
debug!(result = ?result, "delete enabled trigger without force");
|
||||
assert_contains!(&result, "command failed");
|
||||
|
||||
// Delete active trigger with force flag
|
||||
|
@ -773,7 +773,7 @@ def process_rows(iterator, output):
|
|||
trigger_name,
|
||||
"--force",
|
||||
]);
|
||||
debug!(result = ?result, "delete active trigger with force");
|
||||
debug!(result = ?result, "delete enabled trigger with force");
|
||||
assert_contains!(&result, "Trigger test_trigger deleted successfully");
|
||||
}
|
||||
|
||||
|
|
|
@ -645,13 +645,13 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/activate`
|
||||
pub async fn api_v3_configure_processing_engine_trigger_activate(
|
||||
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/enable`
|
||||
pub async fn api_v3_configure_processing_engine_trigger_enable(
|
||||
&self,
|
||||
db: impl Into<String> + Send,
|
||||
trigger_name: impl Into<String> + Send,
|
||||
) -> Result<()> {
|
||||
let api_path = "/api/v3/configure/processing_engine_trigger/activate";
|
||||
let api_path = "/api/v3/configure/processing_engine_trigger/enable";
|
||||
let url = self.base_url.join(api_path)?;
|
||||
|
||||
let mut req = self
|
||||
|
@ -676,13 +676,13 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/deactivate`
|
||||
pub async fn api_v3_configure_processing_engine_trigger_deactivate(
|
||||
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/disable`
|
||||
pub async fn api_v3_configure_processing_engine_trigger_disable(
|
||||
&self,
|
||||
db: impl Into<String> + Send,
|
||||
trigger_name: impl Into<String> + Send,
|
||||
) -> Result<()> {
|
||||
let api_path = "/api/v3/configure/processing_engine_trigger/deactivate";
|
||||
let api_path = "/api/v3/configure/processing_engine_trigger/disable";
|
||||
let url = self.base_url.join(api_path)?;
|
||||
|
||||
let mut req = self
|
||||
|
|
|
@ -235,14 +235,14 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
// Do this first to avoid a dangling running plugin.
|
||||
// Potential edge-case of a plugin being stopped but not deleted,
|
||||
// but should be okay given desire to force delete.
|
||||
let needs_deactivate = force
|
||||
let needs_disable = force
|
||||
&& db_schema
|
||||
.processing_engine_triggers
|
||||
.get(trigger_name)
|
||||
.is_some_and(|trigger| !trigger.disabled);
|
||||
|
||||
if needs_deactivate {
|
||||
self.deactivate_trigger(db, trigger_name).await?;
|
||||
if needs_disable {
|
||||
self.disable_trigger(db, trigger_name).await?;
|
||||
}
|
||||
|
||||
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? {
|
||||
|
@ -293,7 +293,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn deactivate_trigger(
|
||||
async fn disable_trigger(
|
||||
&self,
|
||||
db_name: &str,
|
||||
trigger_name: &str,
|
||||
|
@ -336,7 +336,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn activate_trigger(
|
||||
async fn enable_trigger(
|
||||
&self,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
query_executor: Arc<dyn QueryExecutor>,
|
||||
|
@ -705,8 +705,8 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// Deactivate the trigger
|
||||
let result = pem.deactivate_trigger("foo", "test_trigger").await;
|
||||
// Disable the trigger
|
||||
let result = pem.disable_trigger("foo", "test_trigger").await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
// Verify trigger is disabled in schema
|
||||
|
@ -717,9 +717,9 @@ mod tests {
|
|||
.unwrap();
|
||||
assert!(trigger.disabled);
|
||||
|
||||
// Activate the trigger
|
||||
// Enable the trigger
|
||||
let result = pem
|
||||
.activate_trigger(
|
||||
.enable_trigger(
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&pem._query_executor),
|
||||
"foo",
|
||||
|
@ -797,7 +797,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_activate_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> {
|
||||
async fn test_enable_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> {
|
||||
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
|
||||
let test_store = Arc::new(InMemory::new());
|
||||
let wal_config = WalConfig {
|
||||
|
@ -822,7 +822,7 @@ mod tests {
|
|||
.await?;
|
||||
|
||||
let result = pem
|
||||
.activate_trigger(
|
||||
.enable_trigger(
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&pem._query_executor),
|
||||
"foo",
|
||||
|
|
|
@ -65,13 +65,13 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
|
|||
trigger_name: &str,
|
||||
) -> Result<(), ProcessingEngineError>;
|
||||
|
||||
async fn deactivate_trigger(
|
||||
async fn disable_trigger(
|
||||
&self,
|
||||
db_name: &str,
|
||||
trigger_name: &str,
|
||||
) -> Result<(), ProcessingEngineError>;
|
||||
|
||||
async fn activate_trigger(
|
||||
async fn enable_trigger(
|
||||
&self,
|
||||
write_buffer: Arc<dyn WriteBuffer>,
|
||||
query_executor: Arc<dyn QueryExecutor>,
|
||||
|
|
|
@ -1091,27 +1091,24 @@ where
|
|||
.body(Body::empty())?)
|
||||
}
|
||||
|
||||
async fn deactivate_processing_engine_trigger(
|
||||
async fn disable_processing_engine_trigger(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
|
||||
self.processing_engine
|
||||
.deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str())
|
||||
.disable_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str())
|
||||
.await?;
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
async fn activate_processing_engine_trigger(
|
||||
&self,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>> {
|
||||
async fn enable_processing_engine_trigger(&self, req: Request<Body>) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().unwrap_or("");
|
||||
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
|
||||
self.processing_engine
|
||||
.activate_trigger(
|
||||
.enable_trigger(
|
||||
Arc::clone(&self.write_buffer),
|
||||
Arc::clone(&self.query_executor),
|
||||
delete_req.db.as_str(),
|
||||
|
@ -1753,11 +1750,11 @@ pub(crate) async fn route_request<T: TimeProvider>(
|
|||
(Method::DELETE, "/api/v3/configure/processing_engine_plugin") => {
|
||||
http_server.delete_processing_engine_plugin(req).await
|
||||
}
|
||||
(Method::POST, "/api/v3/configure/processing_engine_trigger/deactivate") => {
|
||||
http_server.deactivate_processing_engine_trigger(req).await
|
||||
(Method::POST, "/api/v3/configure/processing_engine_trigger/disable") => {
|
||||
http_server.disable_processing_engine_trigger(req).await
|
||||
}
|
||||
(Method::POST, "/api/v3/configure/processing_engine_trigger/activate") => {
|
||||
http_server.activate_processing_engine_trigger(req).await
|
||||
(Method::POST, "/api/v3/configure/processing_engine_trigger/enable") => {
|
||||
http_server.enable_processing_engine_trigger(req).await
|
||||
}
|
||||
(Method::POST, "/api/v3/configure/processing_engine_trigger") => {
|
||||
http_server.configure_processing_engine_trigger(req).await
|
||||
|
|
Loading…
Reference in New Issue