fix(processing_engine): fix retry behavior on batches to try individual batch. (#26105)

pull/26108/head
Jackson Newhouse 2025-03-05 15:23:25 -08:00 committed by GitHub
parent a1f0f2b691
commit 85023f075b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 19 additions and 20 deletions

View File

@ -553,23 +553,20 @@ mod python_plugin {
&self,
wal_contents: Arc<WalContents>,
) -> Result<PluginNextState, PluginError> {
// loop for retry case
loop {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str())
else {
return Err(PluginError::MissingDb);
};
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(PluginError::MissingDb);
};
for (op_index, wal_op) in wal_contents.ops.iter().enumerate() {
match wal_op {
WalOp::Write(write_batch) => {
// determine if this write batch is for this database
if write_batch.database_name.as_ref()
!= self.trigger_definition.database_name
{
continue;
}
let table_filter = match &self.trigger_definition.trigger {
for (op_index, wal_op) in wal_contents.ops.iter().enumerate() {
match wal_op {
WalOp::Write(write_batch) => {
// determine if this write batch is for this database
if write_batch.database_name.as_ref()
!= self.trigger_definition.database_name
{
continue;
}
let table_filter = match &self.trigger_definition.trigger {
TriggerSpecificationDefinition::AllTablesWalWrite => {
// no filter
None
@ -598,6 +595,8 @@ mod python_plugin {
}
};
// loop for retries, in general it will only run once.
loop {
let logger = Some(self.logger.clone());
let plugin_code = Arc::clone(&self.plugin_code.code());
let query_executor = Arc::clone(&self.query_executor);
@ -633,6 +632,7 @@ mod python_plugin {
);
error!(?self.trigger_definition, "error running wal plugin: {}", error);
}
break;
}
Err(err) => {
match self.trigger_definition.trigger_settings.error_behavior {
@ -648,7 +648,6 @@ mod python_plugin {
"error running against batch {}, will retry",
err
);
break;
}
ErrorBehavior::Disable => {
return Ok(PluginNextState::Disable(
@ -659,11 +658,12 @@ mod python_plugin {
}
}
}
WalOp::Catalog(_) => {}
WalOp::Noop(_) => {}
}
WalOp::Catalog(_) => {}
WalOp::Noop(_) => {}
}
}
Ok(PluginNextState::SuccessfulRun)
}
/// Handles the return state from the plugin, writing back lines and handling any errors.
@ -833,7 +833,6 @@ mod python_plugin {
"retrying trigger {} on error",
plugin.trigger_definition.trigger_name
);
continue;
}
ErrorBehavior::Disable => {
return Ok(PluginNextState::Disable(plugin.trigger_definition));