feat: Refactor plugins to only require creating trigger (#25914)

This refactors plugins and triggers so that plugins no longer need to be "created". Since plugins exist in either the configured local directory or on the Github repo, a user now only needs to create a trigger and reference the plugin filename.

Closes #25876
pull/25918/head
Paul Dix 2025-01-27 11:26:46 -05:00 committed by GitHub
parent 43e186d761
commit d49276a7fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 89 additions and 912 deletions

View File

@ -50,15 +50,6 @@ impl Config {
},
..
})
| SubCommand::Plugin(PluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Table(TableConfig {
influxdb3_config:
InfluxDb3Config {
@ -100,13 +91,11 @@ pub enum SubCommand {
/// Create a new distinct value cache
#[clap(name = "distinct_cache")]
DistinctCache(DistinctCacheConfig),
/// Create a new processing engine plugin
Plugin(PluginConfig),
/// Create a new table in a database
Table(TableConfig),
/// Create a new auth token
Token,
/// Create a new trigger for the processing engine
/// Create a new trigger for the processing engine that executes a plugin on either WAL rows, scheduled tasks, or requests to the serve at `/api/v3/engine/<path>`
Trigger(TriggerConfig),
}
@ -201,20 +190,6 @@ pub struct DistinctCacheConfig {
cache_name: Option<String>,
}
#[derive(Debug, clap::Parser)]
pub struct PluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Python file name of the file on the server's plugin-dir containing the plugin code
#[clap(long = "filename")]
file_name: String,
/// Type of trigger the plugin processes. Options: wal_rows, scheduled, request
#[clap(long = "plugin-type")]
plugin_type: String,
/// Name of the plugin to create
plugin_name: String,
}
#[derive(Debug, clap::Args)]
pub struct TableConfig {
#[clap(long = "tags", required = true, num_args=0..)]
@ -238,14 +213,15 @@ pub struct TableConfig {
pub struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Plugin to execute when trigger fires
#[clap(long = "plugin")]
plugin_name: String,
/// Python file name of the file on the server's plugin-dir containing the plugin code. Or
/// on the [influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) repo if `gh:` is specified as
/// the prefix.
#[clap(long = "plugin-filename")]
plugin_filename: String,
/// When the trigger should fire
#[clap(long = "trigger-spec",
value_parser = TriggerSpecificationDefinition::from_string_rep,
help = "Trigger specification format: 'table:<TABLE_NAME>' or 'all_tables'")]
help = "The plugin file must be for the given trigger type of wal, schedule, or request. Trigger specification format:\nFor wal_rows use: 'table:<TABLE_NAME>' or 'all_tables'\nFor scheduled use: 'cron:<CRON_EXPRESSION>' or 'every:<duration e.g. 10m>'\nFor request use: 'path:<PATH>' e.g. path:foo will be at /api/v3/engine/foo")]
trigger_specification: TriggerSpecificationDefinition,
/// Comma separated list of key/value pairs to use as trigger arguments. Example: key1=val1,key2=val2
#[clap(long = "trigger-arguments")]
@ -334,22 +310,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
None => println!("a cache already exists for the provided parameters"),
}
}
SubCommand::Plugin(PluginConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
file_name,
plugin_type,
}) => {
client
.api_v3_configure_processing_engine_plugin_create(
database_name,
&plugin_name,
file_name,
plugin_type,
)
.await?;
println!("Plugin {} created successfully", plugin_name);
}
SubCommand::Table(TableConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table_name,
@ -387,7 +347,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
SubCommand::Trigger(TriggerConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
trigger_name,
plugin_name,
plugin_filename,
trigger_specification,
trigger_arguments,
disabled,
@ -402,7 +362,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
.api_v3_configure_processing_engine_trigger_create(
database_name,
&trigger_name,
plugin_name,
plugin_filename,
trigger_specification.string_rep(),
trigger_arguments,
disabled,

View File

@ -38,15 +38,6 @@ impl Config {
},
..
})
| SubCommand::Plugin(PluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
})
| SubCommand::Table(TableConfig {
influxdb3_config:
InfluxDb3Config {
@ -85,8 +76,6 @@ pub enum SubCommand {
/// Delete a distinct value cache
#[clap(name = "distinct_cache")]
DistinctCache(DistinctCacheConfig),
/// Delete an existing processing engine plugin
Plugin(PluginConfig),
/// Delete a table in a database
Table(TableConfig),
/// Delete a trigger
@ -141,16 +130,6 @@ pub struct DistinctCacheConfig {
cache_name: String,
}
#[derive(Debug, clap::Parser)]
pub struct PluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// Name of the plugin to delete
#[clap(required = true)]
plugin_name: String,
}
#[derive(Debug, clap::Args)]
pub struct TableConfig {
#[clap(flatten)]
@ -214,15 +193,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
println!("distinct cache deleted successfully");
}
SubCommand::Plugin(PluginConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
plugin_name,
}) => {
client
.api_v3_configure_processing_engine_plugin_delete(database_name, &plugin_name)
.await?;
println!("Plugin {} deleted successfully", plugin_name);
}
SubCommand::Table(TableConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table_name,

View File

@ -600,97 +600,6 @@ async fn test_create_delete_distinct_cache() {
assert_contains!(&result, "[404 Not Found]: cache not found");
}
#[test_log::test(tokio::test)]
async fn test_create_plugin() {
let plugin_file = create_plugin_file(
r#"
def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("done")
"#,
);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "foo";
// Create database first
let result = run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
assert_contains!(&result, "Database \"foo\" created successfully");
// Create plugin
let plugin_name = "test_plugin";
let result = run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
debug!(result = ?result, "create plugin");
assert_contains!(&result, "Plugin test_plugin created successfully");
}
#[test_log::test(tokio::test)]
async fn test_delete_plugin() {
let plugin_file = create_plugin_file(
r#"
def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("done")
"#,
);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "foo";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
// Delete plugin
let result = run_with_confirmation(&[
"delete",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
plugin_name,
]);
debug!(result = ?result, "delete plugin");
assert_contains!(&result, "Plugin test_plugin deleted successfully");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_create_trigger_and_run() {
@ -711,22 +620,6 @@ async fn test_create_trigger_and_run() {
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
// creating the trigger should enable it
let result = run_with_confirmation(&[
"create",
@ -735,8 +628,8 @@ async fn test_create_trigger_and_run() {
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"all_tables",
"--trigger-arguments",
@ -826,22 +719,6 @@ async fn test_triggers_are_started() {
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
// creating the trigger should enable it
let result = run_with_confirmation(&[
"create",
@ -850,8 +727,8 @@ async fn test_triggers_are_started() {
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"all_tables",
"--trigger-arguments",
@ -947,21 +824,6 @@ def process_writes(influxdb3_local, table_batches, args=None):
// Setup: create database, plugin, and trigger
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
run_with_confirmation(&[
"create",
"trigger",
@ -969,8 +831,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"all_tables",
trigger_name,
@ -1025,21 +887,6 @@ def process_writes(influxdb3_local, table_batches, args=None):
// Setup: create database, plugin, and enable trigger
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
run_with_confirmation(&[
"create",
"trigger",
@ -1047,8 +894,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"all_tables",
trigger_name,
@ -1119,21 +966,6 @@ def process_writes(influxdb3_local, table_batches, args=None):
table_name,
]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"wal_rows",
"--filename",
plugin_filename,
plugin_name,
]);
// Create table-specific trigger
let result = run_with_confirmation(&[
"create",
@ -1142,8 +974,8 @@ def process_writes(influxdb3_local, table_batches, args=None):
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
&format!("table:{}", table_name),
trigger_name,
@ -1614,22 +1446,6 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let plugin_name = "test_plugin";
run_with_confirmation(&[
"create",
"plugin",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-type",
"request",
"--filename",
plugin_filename,
plugin_name,
]);
let trigger_path = "foo";
// creating the trigger should enable it
let result = run_with_confirmation(&[
@ -1639,8 +1455,8 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
db_name,
"--host",
&server_addr,
"--plugin",
plugin_name,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:foo",
"--trigger-arguments",

View File

@ -120,7 +120,6 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
"| public | system | distinct_caches | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | processing_engine_plugins | BASE TABLE |",
"| public | system | processing_engine_triggers | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+----------------------------+------------+",

View File

@ -1267,7 +1267,6 @@ mod tests {
map.insert(TableId::from(1), "test_table_2".into());
map
},
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};

View File

@ -1,18 +1,18 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::catalog::Error::{
CatalogUpdatedElsewhere, ProcessingEngineCallExists, ProcessingEngineTriggerExists,
ProcessingEngineTriggerRunning, TableNotFound,
CatalogUpdatedElsewhere, ProcessingEngineTriggerExists, ProcessingEngineTriggerRunning,
TableNotFound,
};
use bimap::{BiHashMap, Overwritten};
use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition,
DeleteTableDefinition, DeleteTriggerDefinition, DistinctCacheDefinition, DistinctCacheDelete,
FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, OrderedCatalogBatch,
PluginDefinition, TriggerDefinition, TriggerIdentifier,
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition,
DeleteTriggerDefinition, DistinctCacheDefinition, DistinctCacheDelete, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, OrderedCatalogBatch, TriggerDefinition,
TriggerIdentifier,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
@ -81,15 +81,6 @@ pub enum Error {
existing: String,
},
#[error(
"Cannot overwrite Processing Engine Call {} in Database {}",
call_name,
database_name
)]
ProcessingEngineCallExists {
database_name: String,
call_name: String,
},
#[error(
"Cannot overwrite Processing Engine Trigger {} in Database {}",
trigger_name,
@ -541,8 +532,6 @@ pub struct DatabaseSchema {
/// The database is a map of tables
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
pub table_map: BiHashMap<TableId, Arc<str>>,
pub processing_engine_plugins: HashMap<String, PluginDefinition>,
// TODO: care about performance of triggers
pub processing_engine_triggers: HashMap<String, TriggerDefinition>,
pub deleted: bool,
}
@ -554,7 +543,6 @@ impl DatabaseSchema {
name,
tables: Default::default(),
table_map: BiHashMap::new(),
processing_engine_plugins: HashMap::new(),
processing_engine_triggers: HashMap::new(),
deleted: false,
}
@ -732,8 +720,6 @@ impl UpdateDatabaseSchema for CatalogOp {
}
CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema),
CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema),
CatalogOp::DeletePlugin(delete_plugin) => delete_plugin.update_schema(schema),
CatalogOp::CreatePlugin(create_plugin) => create_plugin.update_schema(schema),
CatalogOp::CreateTrigger(create_trigger) => create_trigger.update_schema(schema),
CatalogOp::DeleteTrigger(delete_trigger) => delete_trigger.update_schema(schema),
CatalogOp::EnableTrigger(trigger_identifier) => {
@ -808,54 +794,6 @@ impl UpdateDatabaseSchema for DeleteTableDefinition {
}
}
impl UpdateDatabaseSchema for DeletePluginDefinition {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
// check that there aren't any triggers with this name.
for (trigger_name, trigger) in &schema.processing_engine_triggers {
if trigger.plugin_name == self.plugin_name {
return Err(Error::ProcessingEnginePluginInUse {
database_name: schema.name.to_string(),
plugin_name: self.plugin_name.to_string(),
trigger_name: trigger_name.to_string(),
});
}
}
schema
.to_mut()
.processing_engine_plugins
.remove(&self.plugin_name);
Ok(schema)
}
}
impl UpdateDatabaseSchema for PluginDefinition {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
match schema.processing_engine_plugins.get(&self.plugin_name) {
Some(current) if self.eq(current) => {}
Some(_) => {
return Err(ProcessingEngineCallExists {
database_name: schema.name.to_string(),
call_name: self.plugin_name.to_string(),
})
}
None => {
schema
.to_mut()
.processing_engine_plugins
.insert(self.plugin_name.to_string(), self.clone());
}
}
Ok(schema)
}
}
struct EnableTrigger(TriggerIdentifier);
struct DisableTrigger(TriggerIdentifier);
@ -1444,7 +1382,6 @@ mod tests {
map.insert(TableId::from(2), "test_table_2".into());
map
},
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};
@ -1655,7 +1592,6 @@ mod tests {
name: "test".into(),
tables: SerdeVecMap::new(),
table_map: BiHashMap::new(),
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};
@ -1714,7 +1650,6 @@ mod tests {
map.insert(TableId::from(1), "test_table_1".into());
map
},
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};
@ -1771,7 +1706,6 @@ mod tests {
map.insert(TableId::from(0), "test".into());
map
},
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};
@ -1872,7 +1806,6 @@ mod tests {
name: "test".into(),
tables: SerdeVecMap::new(),
table_map: BiHashMap::new(),
processing_engine_plugins: Default::default(),
processing_engine_triggers: Default::default(),
deleted: false,
};

View File

@ -8,9 +8,7 @@ use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{
LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, PluginType, TriggerDefinition,
};
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef, PluginType, TriggerDefinition};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
@ -42,8 +40,6 @@ struct DatabaseSnapshot {
name: Arc<str>,
tables: SerdeVecMap<TableId, TableSnapshot>,
#[serde(default)]
processing_engine_plugins: SerdeVecMap<String, ProcessingEnginePluginSnapshot>,
#[serde(default)]
processing_engine_triggers: SerdeVecMap<String, ProcessingEngineTriggerSnapshot>,
deleted: bool,
}
@ -58,11 +54,6 @@ impl From<&DatabaseSchema> for DatabaseSnapshot {
.iter()
.map(|(table_id, table_def)| (*table_id, table_def.as_ref().into()))
.collect(),
processing_engine_plugins: db
.processing_engine_plugins
.iter()
.map(|(name, call)| (name.clone(), call.into()))
.collect(),
processing_engine_triggers: db
.processing_engine_triggers
.iter()
@ -84,26 +75,15 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
(id, Arc::new(table.into()))
})
.collect();
let processing_engine_plugins: HashMap<_, _> = snap
.processing_engine_plugins
.into_iter()
.map(|(name, call)| (name, call.into()))
.collect();
let processing_engine_triggers = snap
.processing_engine_triggers
.into_iter()
.map(|(name, trigger)| {
// TODO: Decide whether to handle errors
let plugin: PluginDefinition = processing_engine_plugins
.get(&trigger.plugin_name)
.cloned()
.expect("should have plugin");
(
name,
TriggerDefinition {
trigger_name: trigger.trigger_name,
plugin_name: plugin.plugin_name.to_string(),
plugin_file_name: plugin.file_name.clone(),
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
@ -118,7 +98,6 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
name: snap.name,
tables,
table_map,
processing_engine_plugins,
processing_engine_triggers,
deleted: snap.deleted,
}
@ -171,7 +150,7 @@ struct ProcessingEnginePluginSnapshot {
#[derive(Debug, Serialize, Deserialize)]
struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String,
pub plugin_name: String,
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub trigger_arguments: Option<HashMap<String, String>>,
@ -410,31 +389,11 @@ impl From<TableSnapshot> for TableDefinition {
}
}
impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
fn from(plugin: &PluginDefinition) -> Self {
Self {
plugin_name: plugin.plugin_name.to_string(),
file_name: plugin.file_name.to_string(),
plugin_type: plugin.plugin_type,
}
}
}
impl From<ProcessingEnginePluginSnapshot> for PluginDefinition {
fn from(plugin: ProcessingEnginePluginSnapshot) -> Self {
Self {
plugin_name: plugin.plugin_name.to_string(),
file_name: plugin.file_name.to_string(),
plugin_type: plugin.plugin_type,
}
}
}
impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
fn from(trigger: &TriggerDefinition) -> Self {
ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_name: trigger.plugin_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),

View File

@ -262,7 +262,6 @@ expression: catalog
}
]
],
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"deleted": false
}

View File

@ -113,7 +113,6 @@ expression: catalog
}
]
],
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"deleted": false
}

View File

@ -97,7 +97,6 @@ expression: catalog
}
]
],
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"deleted": false
}

View File

@ -564,7 +564,7 @@ impl Client {
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
plugin_name: impl Into<String> + Send,
plugin_filename: impl Into<String> + Send,
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -577,7 +577,7 @@ impl Client {
struct Req {
db: String,
trigger_name: String,
plugin_name: String,
plugin_filename: String,
trigger_specification: String,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -585,7 +585,7 @@ impl Client {
let mut req = self.http_client.post(url).json(&Req {
db: db.into(),
trigger_name: trigger_name.into(),
plugin_name: plugin_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
trigger_arguments,
disabled,

View File

@ -6,7 +6,6 @@ use anyhow::Context;
use bytes::Bytes;
use hashbrown::HashMap;
use hyper::{Body, Response};
use influxdb3_catalog::catalog;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound;
use influxdb3_client::plugin_development::{
@ -14,10 +13,11 @@ use influxdb3_client::plugin_development::{
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
#[cfg(feature = "system-py")]
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition,
PluginType, SnapshotDetails, TriggerDefinition, TriggerIdentifier,
TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
@ -314,81 +314,11 @@ impl LocalPlugin {
#[async_trait::async_trait]
impl ProcessingEngineManager for ProcessingEngineManagerImpl {
async fn insert_plugin(
&self,
db: &str,
plugin_name: String,
file_name: String,
plugin_type: PluginType,
) -> Result<(), ProcessingEngineError> {
// first verify that we can read the file
match &self.plugin_dir {
Some(plugin_dir) => {
let path = plugin_dir.join(&file_name);
if !path.exists() {
return Err(ProcessingEngineError::PluginNotFound(file_name));
}
}
None => return Err(ProcessingEngineError::PluginDirNotSet),
}
let (db_id, db_schema) = self
.catalog
.db_id_and_schema(db)
.ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?;
let catalog_op = CatalogOp::CreatePlugin(PluginDefinition {
plugin_name,
file_name,
plugin_type,
});
let creation_time = self.time_provider.now();
let catalog_batch = CatalogBatch {
time_ns: creation_time.timestamp_nanos(),
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
ops: vec![catalog_op],
};
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? {
let wal_op = WalOp::Catalog(catalog_batch);
self.wal.write_ops(vec![wal_op]).await?;
}
Ok(())
}
async fn delete_plugin(
&self,
db: &str,
plugin_name: &str,
) -> Result<(), ProcessingEngineError> {
let (db_id, db_schema) = self
.catalog
.db_id_and_schema(db)
.ok_or_else(|| ProcessingEngineError::DatabaseNotFound(db.to_string()))?;
let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition {
plugin_name: plugin_name.to_string(),
});
let catalog_batch = CatalogBatch {
time_ns: self.time_provider.now().timestamp_nanos(),
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
ops: vec![catalog_op],
};
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? {
self.wal
.write_ops(vec![WalOp::Catalog(catalog_batch)])
.await?;
}
Ok(())
}
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
plugin_filename: String,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -396,17 +326,9 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else {
return Err(ProcessingEngineError::DatabaseNotFound(db_name.to_string()));
};
let plugin = db_schema
.processing_engine_plugins
.get(&plugin_name)
.ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound {
plugin_name: plugin_name.to_string(),
database_name: db_schema.name.to_string(),
})?;
let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition {
trigger_name,
plugin_name,
plugin_file_name: plugin.file_name.clone(),
plugin_filename,
trigger: trigger_specification,
trigger_arguments,
disabled,
@ -496,18 +418,8 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
};
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?;
let Some(plugin_definition) = db_schema
.processing_engine_plugins
.get(&trigger.plugin_name)
else {
return Err(catalog::Error::ProcessingEnginePluginNotFound {
plugin_name: trigger.plugin_name,
database_name: db_name.to_string(),
}
.into());
};
match plugin_definition.plugin_type {
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
match trigger.trigger.plugin_type() {
PluginType::WalRows => {
let rec = self
.plugin_event_tx
@ -523,7 +435,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
rec,
)
}
PluginType::Scheduled => {
PluginType::Schedule => {
let rec = self
.plugin_event_tx
.write()
@ -841,9 +753,7 @@ mod tests {
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_wal::{
Gen1Duration, PluginDefinition, PluginType, TriggerSpecificationDefinition, WalConfig,
};
use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
use influxdb3_write::{Precision, WriteBuffer};
@ -859,208 +769,6 @@ mod tests {
use std::time::Duration;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_create_plugin() -> influxdb3_write::write_buffer::Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (pem, file) = setup(start_time, test_store, wal_config).await;
let file_name = file
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
pem.write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
pem.insert_plugin(
"foo",
"my_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
let plugin = pem
.catalog
.db_schema("foo")
.expect("should have db named foo")
.processing_engine_plugins
.get("my_plugin")
.unwrap()
.clone();
let expected = PluginDefinition {
plugin_name: "my_plugin".to_string(),
file_name: file_name.to_string(),
plugin_type: PluginType::WalRows,
};
assert_eq!(expected, plugin);
// confirm that creating it again is a no-op.
pem.insert_plugin(
"foo",
"my_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
// Confirm the same contents can be added to a new name.
pem.insert_plugin(
"foo",
"my_second_plugin".to_string(),
file_name,
PluginType::WalRows,
)
.await
.unwrap();
Ok(())
}
#[tokio::test]
async fn test_delete_plugin() -> influxdb3_write::write_buffer::Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (pem, file) = setup(start_time, test_store, wal_config).await;
let file_name = file
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
// Create the DB by inserting a line.
pem.write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
// First create a plugin
pem.insert_plugin(
"foo",
"test_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
// Then delete it
pem.delete_plugin("foo", "test_plugin").await.unwrap();
// Verify plugin is gone from schema
let schema = pem.catalog.db_schema("foo").unwrap();
assert!(!schema.processing_engine_plugins.contains_key("test_plugin"));
// Verify we can add a newly named plugin
pem.insert_plugin(
"foo",
"test_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
Ok(())
}
#[tokio::test]
async fn test_delete_plugin_with_active_trigger() -> influxdb3_write::write_buffer::Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (pem, file) = setup(start_time, test_store, wal_config).await;
let file_name = file
.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
// Create the DB by inserting a line.
pem.write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;
// Create a plugin
pem.insert_plugin(
"foo",
"test_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
// Create a trigger using the plugin
pem.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
)
.await
.unwrap();
// Try to delete the plugin - should fail because trigger exists
let result = pem.delete_plugin("foo", "test_plugin").await;
assert!(matches!(
result,
Err(ProcessingEngineError::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse {
database_name,
plugin_name,
trigger_name,
})) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger"
));
Ok(())
}
#[tokio::test]
async fn test_trigger_lifecycle() -> influxdb3_write::write_buffer::Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
@ -1095,21 +803,11 @@ mod tests {
)
.await?;
// Create a plugin
pem.insert_plugin(
"foo",
"test_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
// Create an enabled trigger
pem.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
file_name,
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
@ -1190,21 +888,11 @@ mod tests {
)
.await?;
// Create a plugin
pem.insert_plugin(
"foo",
"test_plugin".to_string(),
file_name.clone(),
PluginType::WalRows,
)
.await
.unwrap();
// Create a disabled trigger
pem.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
file_name,
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,

View File

@ -6,7 +6,7 @@ use influxdb3_client::plugin_development::{
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{PluginType, TriggerSpecificationDefinition};
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
@ -53,23 +53,11 @@ pub enum ProcessingEngineError {
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
/// Inserts a plugin
async fn insert_plugin(
&self,
db: &str,
plugin_name: String,
file_name: String,
plugin_type: PluginType,
) -> Result<(), ProcessingEngineError>;
async fn delete_plugin(&self, db: &str, plugin_name: &str)
-> Result<(), ProcessingEngineError>;
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
plugin_filename: String,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,

View File

@ -65,6 +65,9 @@ pub enum Error {
#[error("tried to run a schedule plugin but the schedule iterator is over.")]
ScheduledMissingTime,
#[error("non-schedule plugin with schedule trigger: {0}")]
NonSchedulePluginWithScheduleTrigger(String),
}
#[cfg(feature = "system-py")]
@ -99,10 +102,14 @@ pub(crate) fn run_schedule_plugin(
context: PluginContext,
plugin_receiver: mpsc::Receiver<ScheduleEvent>,
) -> Result<(), Error> {
let TriggerSpecificationDefinition::Schedule { .. } = &trigger_definition.trigger else {
// TODO: these linkages should be guaranteed by code.
unreachable!("this should've been checked");
};
// Ensure that the plugin is a schedule plugin
let plugin_type = trigger_definition.trigger.plugin_type();
if !matches!(plugin_type, influxdb3_wal::PluginType::Schedule) {
return Err(Error::NonSchedulePluginWithScheduleTrigger(format!(
"{:?}",
trigger_definition
)));
}
let trigger_plugin = TriggerPlugin {
trigger_definition,
@ -195,7 +202,7 @@ mod python_plugin {
&self,
mut receiver: Receiver<WalEvent>,
) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting wal contents plugin");
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin");
loop {
let event = match receiver.recv().await {
@ -262,7 +269,7 @@ mod python_plugin {
&self,
mut receiver: Receiver<RequestEvent>,
) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting request plugin");
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting request plugin");
loop {
match receiver.recv().await {

View File

@ -26,7 +26,7 @@ use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_processing_engine::manager::ProcessingEngineManager;
use influxdb3_wal::{PluginType, TriggerSpecificationDefinition};
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::write_buffer::Error as WriteBufferError;
use influxdb3_write::BufferedWriteRequest;
@ -989,51 +989,13 @@ where
.body(Body::empty())?)
}
async fn configure_processing_engine_plugin(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let ProcessingEnginePluginCreateRequest {
db,
plugin_name,
file_name,
plugin_type,
} = if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
self.read_body_json(req).await?
};
self.processing_engine
.insert_plugin(&db, plugin_name, file_name, plugin_type)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn delete_processing_engine_plugin(&self, req: Request<Body>) -> Result<Response<Body>> {
let ProcessingEnginePluginDeleteRequest { db, plugin_name } =
if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
self.read_body_json(req).await?
};
self.processing_engine
.delete_plugin(&db, &plugin_name)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn configure_processing_engine_trigger(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let ProcessEngineTriggerCreateRequest {
db,
plugin_name,
plugin_filename,
trigger_name,
trigger_specification,
trigger_arguments,
@ -1043,7 +1005,7 @@ where
} else {
self.read_body_json(req).await?
};
debug!(%db, %plugin_name, %trigger_name, %trigger_specification, %disabled, "configure_processing_engine_trigger");
debug!(%db, %plugin_filename, %trigger_name, %trigger_specification, %disabled, "configure_processing_engine_trigger");
let Ok(trigger_spec) =
TriggerSpecificationDefinition::from_string_rep(&trigger_specification)
else {
@ -1057,7 +1019,7 @@ where
.insert_trigger(
db.as_str(),
trigger_name.clone(),
plugin_name,
plugin_filename,
trigger_spec,
trigger_arguments,
disabled,
@ -1681,26 +1643,11 @@ struct LastCacheDeleteRequest {
name: String,
}
/// Request definition for `POST /api/v3/configure/processing_engine_plugin` API
#[derive(Debug, Deserialize)]
struct ProcessingEnginePluginCreateRequest {
db: String,
plugin_name: String,
file_name: String,
plugin_type: PluginType,
}
#[derive(Debug, Deserialize)]
struct ProcessingEnginePluginDeleteRequest {
db: String,
plugin_name: String,
}
/// Request definition for `POST /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize)]
struct ProcessEngineTriggerCreateRequest {
db: String,
plugin_name: String,
plugin_filename: String,
trigger_name: String,
trigger_specification: String,
trigger_arguments: Option<HashMap<String, String>>,
@ -1844,12 +1791,6 @@ pub(crate) async fn route_request<T: TimeProvider>(
(Method::DELETE, "/api/v3/configure/last_cache") => {
http_server.configure_last_cache_delete(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_plugin") => {
http_server.configure_processing_engine_plugin(req).await
}
(Method::DELETE, "/api/v3/configure/processing_engine_plugin") => {
http_server.delete_processing_engine_plugin(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/disable") => {
http_server.disable_processing_engine_trigger(req).await
}

View File

@ -21,9 +21,7 @@ use self::{last_caches::LastCachesTable, queries::QueriesTable};
mod distinct_caches;
mod last_caches;
mod parquet_files;
use crate::system_tables::python_call::{
ProcessingEnginePluginTable, ProcessingEngineTriggerTable,
};
use crate::system_tables::python_call::ProcessingEngineTriggerTable;
mod python_call;
mod queries;
@ -36,8 +34,6 @@ pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";
pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches";
pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
const PROCESSING_ENGINE_PLUGINS_TABLE_NAME: &str = "processing_engine_plugins";
const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers";
#[derive(Debug)]
@ -114,18 +110,6 @@ impl AllSystemSchemaTablesProvider {
db_schema.id,
Arc::clone(&buffer),
))));
tables.insert(
PROCESSING_ENGINE_PLUGINS_TABLE_NAME,
Arc::new(SystemTableProvider::new(Arc::new(
ProcessingEnginePluginTable::new(
db_schema
.processing_engine_plugins
.iter()
.map(|(_name, call)| call.clone())
.collect(),
),
))),
);
tables.insert(
PROCESSING_ENGINE_TRIGGERS_TABLE_NAME,
Arc::new(SystemTableProvider::new(Arc::new(

View File

@ -2,71 +2,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::common::Result;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::Expr;
use influxdb3_wal::{PluginDefinition, TriggerDefinition};
use influxdb3_wal::TriggerDefinition;
use iox_system_tables::IoxSystemTable;
use std::sync::Arc;
#[derive(Debug)]
pub(super) struct ProcessingEnginePluginTable {
schema: SchemaRef,
plugins: Vec<PluginDefinition>,
}
fn plugin_schema() -> SchemaRef {
let columns = vec![
Field::new("plugin_name", DataType::Utf8, false),
Field::new("file_name", DataType::Utf8, false),
Field::new("plugin_type", DataType::Utf8, false),
];
Schema::new(columns).into()
}
impl ProcessingEnginePluginTable {
pub fn new(python_calls: Vec<PluginDefinition>) -> Self {
Self {
schema: plugin_schema(),
plugins: python_calls,
}
}
}
#[async_trait]
impl IoxSystemTable for ProcessingEnginePluginTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
async fn scan(
&self,
_filters: Option<Vec<Expr>>,
_limit: Option<usize>,
) -> Result<RecordBatch, DataFusionError> {
let schema = self.schema();
let columns: Vec<ArrayRef> = vec![
Arc::new(
self.plugins
.iter()
.map(|call| Some(call.plugin_name.clone()))
.collect::<StringArray>(),
),
Arc::new(
self.plugins
.iter()
.map(|p| Some(p.file_name.clone()))
.collect::<StringArray>(),
),
Arc::new(
self.plugins
.iter()
.map(|p| serde_json::to_string(&p.plugin_type).ok())
.collect::<StringArray>(),
),
];
Ok(RecordBatch::try_new(schema, columns)?)
}
}
#[derive(Debug)]
pub(super) struct ProcessingEngineTriggerTable {
schema: SchemaRef,
@ -85,7 +25,7 @@ impl ProcessingEngineTriggerTable {
fn trigger_schema() -> SchemaRef {
let columns = vec![
Field::new("trigger_name", DataType::Utf8, false),
Field::new("plugin_name", DataType::Utf8, false),
Field::new("plugin_filename", DataType::Utf8, false),
Field::new("trigger_specification", DataType::Utf8, false),
Field::new("disabled", DataType::Boolean, false),
];
@ -111,7 +51,7 @@ impl IoxSystemTable for ProcessingEngineTriggerTable {
let plugin_column = self
.triggers
.iter()
.map(|trigger| Some(trigger.plugin_name.clone()))
.map(|trigger| Some(trigger.plugin_filename.clone()))
.collect::<StringArray>();
let specification_column = self
.triggers

View File

@ -22,7 +22,7 @@ use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@ -341,8 +341,6 @@ pub enum CatalogOp {
DeleteLastCache(LastCacheDelete),
DeleteDatabase(DeleteDatabaseDefinition),
DeleteTable(DeleteTableDefinition),
CreatePlugin(PluginDefinition),
DeletePlugin(DeletePluginDefinition),
CreateTrigger(TriggerDefinition),
DeleteTrigger(DeleteTriggerDefinition),
EnableTrigger(TriggerIdentifier),
@ -624,31 +622,24 @@ pub struct DistinctCacheDelete {
pub cache_name: Arc<str>,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct PluginDefinition {
pub plugin_name: String,
pub file_name: String,
pub plugin_type: PluginType,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct DeletePluginDefinition {
pub plugin_name: String,
}
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PluginType {
WalRows,
Scheduled,
Schedule,
Request,
}
impl Display for PluginType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct TriggerDefinition {
pub trigger_name: String,
pub plugin_name: String,
pub plugin_file_name: String,
pub plugin_filename: String,
pub database_name: String,
pub trigger: TriggerSpecificationDefinition,
pub trigger_arguments: Option<HashMap<String, String>>,
@ -759,6 +750,16 @@ impl TriggerSpecificationDefinition {
}
}
}
pub fn plugin_type(&self) -> PluginType {
match self {
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => PluginType::WalRows,
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => PluginType::Schedule,
TriggerSpecificationDefinition::RequestPath { .. } => PluginType::Request,
}
}
}
#[serde_as]

View File

@ -546,8 +546,6 @@ impl BufferState {
table_buffer_map.remove(&table_definition.table_id);
}
}
CatalogOp::CreatePlugin(_) => {}
CatalogOp::DeletePlugin(_) => {}
CatalogOp::CreateTrigger(_) => {}
CatalogOp::DeleteTrigger(_) => {}
CatalogOp::EnableTrigger(_) => {}

View File

@ -10,7 +10,6 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"tables": [
[

View File

@ -10,7 +10,6 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"tables": [
[

View File

@ -10,7 +10,6 @@ expression: catalog_json
"deleted": false,
"id": 0,
"name": "db",
"processing_engine_plugins": [],
"processing_engine_triggers": [],
"tables": [
[