feat(processing_engine): error handling for triggers. (#26086)

bugfix/add_matching_pattern
Jackson Newhouse 2025-03-04 09:32:58 -08:00 committed by GitHub
parent 357c05f3d0
commit c930d9eef8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 369 additions and 153 deletions

1
Cargo.lock generated
View File

@ -3223,6 +3223,7 @@ dependencies = [
"bitcode",
"byteorder",
"bytes",
"clap",
"crc32fast",
"cron 0.14.0",
"data_types",

View File

@ -3,7 +3,7 @@ use base64::Engine as _;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
use hashbrown::HashMap;
use influxdb3_client::Client;
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::{ErrorBehavior, TriggerSettings, TriggerSpecificationDefinition};
use rand::RngCore;
use rand::rngs::OsRng;
use secrecy::ExposeSecret;
@ -228,8 +228,12 @@ pub struct TriggerConfig {
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
/// Run each instance of the trigger asynchronously, allowing multiple triggers to run simultaneously.
#[clap(long)]
run_asynchronous: bool,
/// How you wish the system to respond in the event of an error from the plugin
#[clap(long, value_enum, default_value_t = ErrorBehavior::Log)]
error_behavior: ErrorBehavior,
/// Name for the new trigger
trigger_name: String,
}
@ -353,6 +357,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_arguments,
disabled,
run_asynchronous,
error_behavior,
}) => {
let trigger_arguments: Option<HashMap<String, String>> = trigger_arguments.map(|a| {
a.into_iter()
@ -360,6 +365,11 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
.collect::<HashMap<String, String>>()
});
let trigger_settings = TriggerSettings {
run_async: run_asynchronous,
error_behavior,
};
match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
@ -368,7 +378,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification.string_rep(),
trigger_arguments,
disabled,
run_asynchronous,
trigger_settings,
)
.await
{

View File

@ -12,7 +12,7 @@ use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType,
TriggerDefinition, TriggerFlag,
TriggerDefinition, TriggerSettings,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
@ -155,7 +155,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
flags: trigger.flags,
trigger_settings: trigger.trigger_settings,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
@ -226,7 +226,7 @@ struct ProcessingEngineTriggerSnapshot {
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub flags: Vec<TriggerFlag>,
pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
@ -475,7 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
flags: trigger.flags.clone(),
trigger_settings: trigger.trigger_settings,
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_arguments: trigger.trigger_arguments.clone(),

View File

@ -12,7 +12,7 @@ use url::Url;
use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
use influxdb3_wal::TriggerFlag;
use influxdb3_wal::TriggerSettings;
/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
@ -466,13 +466,8 @@ impl Client {
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
execute_async: bool,
trigger_settings: TriggerSettings,
) -> Result<()> {
let flags = if execute_async {
vec![TriggerFlag::ExecuteAsynchronously]
} else {
vec![]
};
let _bytes = self
.send_json_get_bytes(
Method::POST,
@ -482,7 +477,7 @@ impl Client {
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
flags,
trigger_settings,
trigger_arguments,
disabled,
}),

View File

@ -19,7 +19,7 @@ use influxdb3_types::http::{
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents,
TriggerIdentifier, TriggerSettings, TriggerSpecificationDefinition, Wal, WalContents,
WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
@ -346,7 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_settings: TriggerSettings,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -362,7 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger_name,
plugin_filename,
trigger: trigger_specification,
flags,
trigger_settings,
trigger_arguments,
disabled,
database_name: db_name.to_string(),
@ -428,6 +428,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
processing_engine_manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError> {
@ -451,6 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
manager: processing_engine_manager,
};
let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?);
match trigger.trigger.plugin_type() {
@ -578,6 +580,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError> {
@ -611,17 +614,21 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
self.wal.write_ops(vec![wal_op]).await?;
}
self.run_trigger(write_buffer, query_executor, db_name, trigger_name)
self.run_trigger(write_buffer, query_executor, manager, db_name, trigger_name)
.await?;
Ok(())
}
async fn start_triggers(&self) -> Result<(), ProcessingEngineError> {
async fn start_triggers(
&self,
manager: Arc<dyn ProcessingEngineManager>,
) -> Result<(), ProcessingEngineError> {
let triggers = self.catalog.active_triggers();
for (db_name, trigger_name) in triggers {
self.run_trigger(
Arc::clone(&self.write_buffer),
Arc::clone(&self.query_executor),
Arc::clone(&manager),
&db_name,
&trigger_name,
)
@ -803,7 +810,7 @@ mod tests {
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, TriggerSpecificationDefinition, WalConfig};
use influxdb3_wal::{Gen1Duration, TriggerSettings, TriggerSpecificationDefinition, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
use influxdb3_write::{Precision, WriteBuffer};
@ -840,6 +847,8 @@ mod tests {
// convert to Arc<WriteBuffer>
let write_buffer: Arc<dyn WriteBuffer> = Arc::clone(&pem.write_buffer);
let query_executor = Arc::clone(&pem.query_executor);
let pem: Arc<dyn ProcessingEngineManager> = Arc::new(pem);
// Create the DB by inserting a line.
write_buffer
@ -858,7 +867,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSettings::default(),
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
@ -868,7 +877,8 @@ mod tests {
// Run the trigger
pem.run_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"test_trigger",
)
@ -891,7 +901,8 @@ mod tests {
let result = pem
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"test_trigger",
)
@ -944,7 +955,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSettings::default(),
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,
@ -978,6 +989,8 @@ mod tests {
let (pem, _file_name) = setup(start_time, test_store, wal_config).await;
let write_buffer: Arc<dyn WriteBuffer> = Arc::clone(&pem.write_buffer);
let query_executor = Arc::clone(&pem.query_executor);
let pem: Arc<dyn ProcessingEngineManager> = Arc::new(pem);
// Create the DB by inserting a line.
write_buffer
@ -994,7 +1007,8 @@ mod tests {
let result = pem
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem.query_executor),
Arc::clone(&query_executor),
Arc::clone(&pem),
"foo",
"nonexistent_trigger",
)

View File

@ -7,7 +7,7 @@ use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition};
use influxdb3_wal::{TriggerSettings, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
@ -60,7 +60,7 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_settings: TriggerSettings,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -78,6 +78,7 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
processing_engine_manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError>;
@ -92,11 +93,15 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
manager: Arc<dyn ProcessingEngineManager>,
db_name: &str,
trigger_name: &str,
) -> Result<(), ProcessingEngineError>;
async fn start_triggers(&self) -> Result<(), ProcessingEngineError>;
async fn start_triggers(
&self,
manager: Arc<dyn ProcessingEngineManager>,
) -> Result<(), ProcessingEngineError>;
async fn test_wal_plugin(
&self,

View File

@ -1,6 +1,7 @@
#[cfg(feature = "system-py")]
use crate::PluginCode;
use crate::environment::PythonEnvironmentManager;
use crate::manager::ProcessingEngineManager;
#[cfg(feature = "system-py")]
use crate::{RequestEvent, ScheduleEvent, WalEvent};
use data_types::NamespaceName;
@ -172,6 +173,8 @@ pub(crate) struct PluginContext {
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
// query executor to hand off to the plugin
pub(crate) query_executor: Arc<dyn QueryExecutor>,
// processing engine manager for disabling plugins if they fail.
pub(crate) manager: Arc<dyn ProcessingEngineManager>,
// sys events for writing logs to ring buffers
pub(crate) sys_event_store: Arc<SysEventStore>,
}
@ -184,6 +187,7 @@ struct TriggerPlugin {
db_name: String,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
manager: Arc<dyn ProcessingEngineManager>,
logger: ProcessingEngineLogger,
}
@ -205,7 +209,7 @@ mod python_plugin {
PluginReturnState, ProcessingEngineLogger, execute_python_with_batch,
execute_request_trigger, execute_schedule_trigger,
};
use influxdb3_wal::{TriggerFlag, WalContents, WalOp};
use influxdb3_wal::{ErrorBehavior, WalContents, WalOp};
use influxdb3_write::Precision;
use iox_time::Time;
use observability_deps::tracing::{info, warn};
@ -230,6 +234,7 @@ mod python_plugin {
db_name,
write_buffer: Arc::clone(&context.write_buffer),
query_executor: Arc::clone(&context.query_executor),
manager: Arc::clone(&context.manager),
logger,
}
}
@ -239,21 +244,41 @@ mod python_plugin {
mut receiver: Receiver<WalEvent>,
) -> Result<(), PluginError> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin");
let run_in_dedicated_task = self
.trigger_definition
.flags
.contains(&TriggerFlag::ExecuteAsynchronously);
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
event = receiver.recv() => {
match event {
Some(WalEvent::WriteWalContents(wal_contents)) => {
if run_in_dedicated_task {
if self.trigger_definition.trigger_settings.run_async {
let clone = self.clone();
futures.push(async move {clone.process_wal_contents(wal_contents).await});
} else if let Err(e) = self.process_wal_contents(wal_contents).await {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
} else {
match self.process_wal_contents(wal_contents).await? {
PluginNextState::SuccessfulRun => {}
PluginNextState::LogError(error_log) => {
self.logger.log(LogLevel::Error, error_log);
}
PluginNextState::Disable(trigger_definition) => {
warn!("disabling trigger {}", trigger_definition.trigger_name);
self.send_disable_trigger();
while let Some(event) = receiver.recv().await {
match event {
WalEvent::WriteWalContents(_) => {
warn!("skipping wal contents because trigger is being disabled")
}
WalEvent::Shutdown(shutdown) => {
if shutdown.send(()).is_err() {
error!("failed to send back shutdown for trigger {}", trigger_definition.trigger_name);
}
break;
}
}
}
}
}
}
}
Some(WalEvent::Shutdown(sender)) => {
@ -264,8 +289,35 @@ mod python_plugin {
}
}
Some(result) = futures.next() => {
if let Err(e) = result {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
match result {
Ok(result) => {
match result {
PluginNextState::SuccessfulRun => {}
PluginNextState::LogError(error_log) => {
error!("trigger failed with error {}", error_log);
self.logger.log(LogLevel::Error, error_log);
},
PluginNextState::Disable(_) => {
self.send_disable_trigger();
while let Some(event) = receiver.recv().await {
match event {
WalEvent::WriteWalContents(_) => {
warn!("skipping wal contents because trigger is being disabled")
}
WalEvent::Shutdown(shutdown) => {
if shutdown.send(()).is_err() {
error!("failed to send back shutdown for trigger {}", self.trigger_definition.trigger_name);
}
break;
}
}
}
}
}
}
Err(err) => {
error!(?self.trigger_definition, "error processing wal contents: {}", err);
}
}
}
}
@ -274,16 +326,26 @@ mod python_plugin {
Ok(())
}
/// This sends the disable trigger command to the processing engine manager,
/// it is done in a separate task so that the caller can send back shutdown.
pub(crate) fn send_disable_trigger(&self) {
let manager = Arc::clone(&self.manager);
let db_name = self.trigger_definition.database_name.clone();
let trigger_name = self.trigger_definition.trigger_name.clone();
let fut = async move {
manager
.disable_trigger(db_name.as_str(), trigger_name.as_str())
.await
};
// start the disable call, then look for the shutdown message
tokio::spawn(fut);
}
pub(crate) async fn run_schedule_plugin(
&self,
mut receiver: Receiver<ScheduleEvent>,
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), PluginError> {
let run_in_dedicated_task = self
.trigger_definition
.flags
.contains(&TriggerFlag::ExecuteAsynchronously);
let mut futures = FuturesUnordered::new();
loop {
let Some(next_run_instant) = runner.next_run_time() else {
@ -301,13 +363,40 @@ mod python_plugin {
};
runner.advance_time();
if run_in_dedicated_task {
if self.trigger_definition.trigger_settings.run_async {
let trigger =self.clone();
let fut = async move {ScheduleTriggerRunner::run_at_time(trigger, trigger_time, schema).await};
futures.push(fut);
} else if let Err(err) = ScheduleTriggerRunner::run_at_time(self.clone(), trigger_time, schema).await {
self.logger.log(LogLevel::Error, format!("error running scheduled plugin: {}", err));
error!(?self.trigger_definition, "error running scheduled plugin: {}", err);
} else {
match ScheduleTriggerRunner::run_at_time(self.clone(), trigger_time, schema).await {
Ok(plugin_state) => {
match plugin_state {
PluginNextState::SuccessfulRun => {}
PluginNextState::LogError(err) => {
self.logger.log(LogLevel::Error, format!("error running scheduled plugin: {}", err));
error!(?self.trigger_definition, "error running scheduled plugin: {}", err);
}
PluginNextState::Disable(trigger_definition) => {
warn!("disabling trigger {} due to error", trigger_definition.trigger_name);
self.send_disable_trigger();
let Some(ScheduleEvent::Shutdown(sender)) = receiver.recv().await else {
warn!("didn't receive shutdown notification from receiver");
break;
};
if sender.send(()).is_err() {
error!("failed to send shutdown message back");
}
break;
}
}
}
Err(err) => {
self.logger.log(LogLevel::Error, format!("error running scheduled plugin: {}", err));
error!(?self.trigger_definition, "error running scheduled plugin: {}", err);
}
}
}
}
event = receiver.recv() => {
@ -323,9 +412,34 @@ mod python_plugin {
}
}
Some(result) = futures.next() => {
if let Err(e) = result {
self.logger.log(LogLevel::Error, format!("error running async scheduled plugin: {}", e));
error!(?self.trigger_definition, "error running async scheduled plugin: {}", e);
match result {
Err(e) => {
self.logger.log(LogLevel::Error, format!("error running async scheduled plugin: {}", e));
error!(?self.trigger_definition, "error running async scheduled plugin: {}", e);
}
Ok(result) => {
match result {
PluginNextState::SuccessfulRun => {}
PluginNextState::LogError(err) => {
self.logger.log(LogLevel::Error, format!("error running async scheduled plugin: {}", err));
error!(?self.trigger_definition, "error running async scheduled plugin: {}", err);
}
PluginNextState::Disable(trigger_definition) => {
warn!("disabling trigger {} due to error", trigger_definition.trigger_name);
self.send_disable_trigger();
let Some(ScheduleEvent::Shutdown(sender)) = receiver.recv().await else {
warn!("didn't receive shutdown notification from receiver");
break;
};
if sender.send(()).is_err() {
error!("failed to send shutdown message back");
}
break;
}
}
}
}
}
}
@ -438,88 +552,118 @@ mod python_plugin {
async fn process_wal_contents(
&self,
wal_contents: Arc<WalContents>,
) -> Result<(), PluginError> {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(PluginError::MissingDb);
};
) -> Result<PluginNextState, PluginError> {
// loop for retry case
loop {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str())
else {
return Err(PluginError::MissingDb);
};
for (op_index, wal_op) in wal_contents.ops.iter().enumerate() {
match wal_op {
WalOp::Write(write_batch) => {
// determine if this write batch is for this database
if write_batch.database_name.as_ref()
!= self.trigger_definition.database_name
{
continue;
}
let table_filter = match &self.trigger_definition.trigger {
TriggerSpecificationDefinition::AllTablesWalWrite => {
// no filter
None
for (op_index, wal_op) in wal_contents.ops.iter().enumerate() {
match wal_op {
WalOp::Write(write_batch) => {
// determine if this write batch is for this database
if write_batch.database_name.as_ref()
!= self.trigger_definition.database_name
{
continue;
}
TriggerSpecificationDefinition::SingleTableWalWrite {
table_name,
} => {
let table_id = schema
.table_name_to_id(table_name.as_ref())
.context("table not found")?;
Some(table_id)
}
// This should not occur
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
}
};
let logger = Some(self.logger.clone());
let plugin_code = Arc::clone(&self.plugin_code.code());
let query_executor = Arc::clone(&self.query_executor);
let schema_clone = Arc::clone(&schema);
let trigger_arguments = self.trigger_definition.trigger_arguments.clone();
let wal_contents_clone = Arc::clone(&wal_contents);
let result = tokio::task::spawn_blocking(move || {
let write_batch = match &wal_contents_clone.ops[op_index] {
WalOp::Write(wb) => wb,
_ => unreachable!("Index was checked."),
let table_filter = match &self.trigger_definition.trigger {
TriggerSpecificationDefinition::AllTablesWalWrite => {
// no filter
None
}
TriggerSpecificationDefinition::SingleTableWalWrite {
table_name,
} => {
let table_id = schema
.table_name_to_id(table_name.as_ref())
.context("table not found")?;
Some(table_id)
}
// This should not occur
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
}
};
execute_python_with_batch(
plugin_code.as_ref(),
write_batch,
schema_clone,
query_executor,
logger,
table_filter,
&trigger_arguments,
)
})
.await??;
let errors = self.handle_return_state(result).await;
for error in errors {
self.logger.log(
LogLevel::Error,
format!("error running wal plugin: {}", error),
);
error!(?self.trigger_definition, "error running wal plugin: {}", error);
let logger = Some(self.logger.clone());
let plugin_code = Arc::clone(&self.plugin_code.code());
let query_executor = Arc::clone(&self.query_executor);
let schema_clone = Arc::clone(&schema);
let trigger_arguments =
self.trigger_definition.trigger_arguments.clone();
let wal_contents_clone = Arc::clone(&wal_contents);
let result = tokio::task::spawn_blocking(move || {
let write_batch = match &wal_contents_clone.ops[op_index] {
WalOp::Write(wb) => wb,
_ => unreachable!("Index was checked."),
};
execute_python_with_batch(
plugin_code.as_ref(),
write_batch,
schema_clone,
query_executor,
logger,
table_filter,
&trigger_arguments,
)
})
.await?;
match result {
Ok(result) => {
let errors = self.handle_return_state(result).await;
for error in errors {
self.logger.log(
LogLevel::Error,
format!("error running wal plugin: {}", error),
);
error!(?self.trigger_definition, "error running wal plugin: {}", error);
}
}
Err(err) => {
match self.trigger_definition.trigger_settings.error_behavior {
ErrorBehavior::Log => {
self.logger.log(
LogLevel::Error,
format!("error executing against batch {}", err),
);
error!(?self.trigger_definition, "error running against batch: {}", err);
}
ErrorBehavior::Retry => {
info!(
"error running against batch {}, will retry",
err
);
break;
}
ErrorBehavior::Disable => {
return Ok(PluginNextState::Disable(
self.trigger_definition.clone(),
));
}
}
}
}
}
WalOp::Catalog(_) => {}
WalOp::Noop(_) => {}
}
WalOp::Catalog(_) => {}
WalOp::Noop(_) => {}
}
}
Ok(())
}
/// Handles the return state from the plugin, writing back lines and handling any errors.
@ -579,6 +723,12 @@ mod python_plugin {
Every(Duration),
}
enum PluginNextState {
SuccessfulRun,
LogError(String),
Disable(TriggerDefinition),
}
pub(crate) struct ScheduleTriggerRunner {
schedule: Schedule,
next_trigger_time: Option<DateTime<Utc>>,
@ -645,29 +795,52 @@ mod python_plugin {
plugin: TriggerPlugin,
trigger_time: DateTime<Utc>,
db_schema: Arc<DatabaseSchema>,
) -> Result<(), PluginError> {
let plugin_code = plugin.plugin_code.code();
let query_executor = Arc::clone(&plugin.query_executor);
let logger = Some(plugin.logger.clone());
let trigger_arguments = plugin.trigger_definition.trigger_arguments.clone();
let result = tokio::task::spawn_blocking(move || {
execute_schedule_trigger(
plugin_code.as_ref(),
trigger_time,
db_schema,
query_executor,
logger,
&trigger_arguments,
)
})
.await??;
) -> Result<PluginNextState, PluginError> {
// This loop is here just for the retry case.
loop {
let plugin_code = plugin.plugin_code.code();
let query_executor = Arc::clone(&plugin.query_executor);
let logger = Some(plugin.logger.clone());
let trigger_arguments = plugin.trigger_definition.trigger_arguments.clone();
let schema = Arc::clone(&db_schema);
let errors = plugin.handle_return_state(result).await;
// TODO: here is one spot we'll pick up errors to put into the plugin system table
for error in errors {
error!(?plugin.trigger_definition, "error running schedule plugin: {}", error);
let result = tokio::task::spawn_blocking(move || {
execute_schedule_trigger(
plugin_code.as_ref(),
trigger_time,
schema,
query_executor,
logger,
&trigger_arguments,
)
})
.await?;
match result {
Ok(result) => {
let errors = plugin.handle_return_state(result).await;
// TODO: here is one spot we'll pick up errors to put into the plugin system table
for error in errors {
error!(?plugin.trigger_definition, "error running schedule plugin: {}", error);
}
return Ok(PluginNextState::SuccessfulRun);
}
Err(err) => match &plugin.trigger_definition.trigger_settings.error_behavior {
ErrorBehavior::Log => {
return Ok(PluginNextState::LogError(err.to_string()));
}
ErrorBehavior::Retry => {
warn!(
"retrying trigger {} on error",
plugin.trigger_definition.trigger_name
);
continue;
}
ErrorBehavior::Disable => {
return Ok(PluginNextState::Disable(plugin.trigger_definition));
}
},
}
}
Ok(())
}
fn advance_time(&mut self) {

View File

@ -206,7 +206,7 @@ impl<T: TimeProvider>
let processing_engine = Arc::new(self.processing_engine.0);
processing_engine
.start_triggers()
.start_triggers(Arc::clone(&processing_engine) as _)
.await
.expect("failed to start processing engine triggers");

View File

@ -1011,7 +1011,7 @@ where
db,
plugin_filename,
trigger_name,
flags,
trigger_settings,
trigger_specification,
trigger_arguments,
disabled,
@ -1035,7 +1035,7 @@ where
db.as_str(),
trigger_name.clone(),
plugin_filename,
flags,
trigger_settings,
trigger_spec,
trigger_arguments,
disabled,
@ -1046,6 +1046,7 @@ where
.run_trigger(
Arc::clone(&self.write_buffer),
Arc::clone(&self.query_executor),
Arc::clone(&self.processing_engine),
db.as_str(),
trigger_name.as_str(),
)
@ -1139,6 +1140,7 @@ where
.enable_trigger(
Arc::clone(&self.write_buffer),
Arc::clone(&self.query_executor),
Arc::clone(&self.processing_engine),
delete_req.db.as_str(),
delete_req.trigger_name.as_str(),
)

View File

@ -4,7 +4,7 @@ use hyper::HeaderMap;
use hyper::header::ACCEPT;
use hyper::http::HeaderValue;
use influxdb3_cache::distinct_cache::MaxCardinality;
use influxdb3_wal::TriggerFlag;
use influxdb3_wal::TriggerSettings;
use iox_query_params::StatementParams;
use serde::{Deserialize, Serialize};
@ -158,7 +158,7 @@ pub struct ProcessingEngineTriggerCreateRequest {
pub db: String,
pub plugin_filename: String,
pub trigger_name: String,
pub flags: Vec<TriggerFlag>,
pub trigger_settings: TriggerSettings,
pub trigger_specification: String,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,

View File

@ -33,6 +33,7 @@ serde.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tokio.workspace = true
clap = "4.5.23"
[lints]
workspace = true

View File

@ -64,6 +64,8 @@ pub enum Error {
trigger_spec: String,
context: Option<String>,
},
#[error("invalid error behavior {0}")]
InvalidErrorBehavior(String),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -642,15 +644,28 @@ pub struct TriggerDefinition {
pub plugin_filename: String,
pub database_name: String,
pub trigger: TriggerSpecificationDefinition,
pub flags: Vec<TriggerFlag>,
pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy, Default)]
#[serde(rename_all = "snake_case")]
pub enum TriggerFlag {
ExecuteAsynchronously,
pub struct TriggerSettings {
pub run_async: bool,
pub error_behavior: ErrorBehavior,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy, Default, clap::ValueEnum)]
#[serde(rename_all = "snake_case")]
pub enum ErrorBehavior {
#[default]
/// Log the error to the service output and system.processing_engine_logs table.
Log,
/// Rerun the trigger on error.
Retry,
/// Turn off the plugin until it is manually re-enabled.
Disable,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]