feat(processing_engine): Attach a node id to the processing engine and triggers for multi-node support compatibility. (#26136)

pull/26135/head
Jackson Newhouse 2025-03-13 16:10:02 -07:00 committed by GitHub
parent 941e0e0f15
commit 331f88533c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 30 additions and 2 deletions

View File

@ -632,6 +632,7 @@ pub async fn command(config: Config) -> Result<()> {
let processing_engine = ProcessingEngineManagerImpl::new(
setup_processing_engine_env_manager(&config.processing_engine_config),
write_buffer.catalog(),
config.node_identifier_prefix,
Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _,

View File

@ -482,6 +482,7 @@ impl Catalog {
&self,
db_name: &str,
trigger_name: &str,
node_id: Arc<str>,
plugin_filename: ValidPluginFilename<'_>,
trigger_specification: &str,
trigger_settings: TriggerSettings,
@ -501,6 +502,7 @@ impl Catalog {
trigger_name: trigger_name.to_string(),
plugin_filename: plugin_filename.to_string(),
database_name: Arc::clone(&db.name),
node_id: Arc::clone(&node_id),
trigger,
trigger_settings,
trigger_arguments: trigger_arguments.clone(),

View File

@ -673,6 +673,7 @@ pub struct CreateTriggerLog {
pub trigger_name: String,
pub plugin_filename: String,
pub database_name: Arc<str>,
pub node_id: Arc<str>,
pub trigger: TriggerSpecificationDefinition,
pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>,

View File

@ -216,11 +216,12 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
CreateTriggerLog {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
database_name: trigger.database_name,
node_id: trigger.node_id,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
trigger_settings: trigger.trigger_settings,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
},
)
})
@ -289,6 +290,7 @@ struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String,
pub plugin_filename: String,
pub database_name: Arc<str>,
pub node_id: Arc<str>,
pub trigger_specification: String,
pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>,
@ -541,6 +543,7 @@ impl From<&CreateTriggerLog> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: Arc::clone(&trigger.database_name),
node_id: Arc::clone(&trigger.node_id),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_settings: trigger.trigger_settings,

View File

@ -43,6 +43,7 @@ pub mod virtualenv;
pub struct ProcessingEngineManagerImpl {
environment_manager: ProcessingEngineEnvironmentManager,
catalog: Arc<Catalog>,
node_id: Arc<str>,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
@ -207,6 +208,7 @@ impl ProcessingEngineManagerImpl {
pub fn new(
environment: ProcessingEngineEnvironmentManager,
catalog: Arc<Catalog>,
node_id: impl Into<Arc<str>>,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>,
@ -233,6 +235,7 @@ impl ProcessingEngineManagerImpl {
let pem = Arc::new(Self {
environment_manager: environment,
catalog,
node_id: node_id.into(),
write_buffer,
query_executor,
sys_event_store,
@ -246,6 +249,10 @@ impl ProcessingEngineManagerImpl {
pem
}
pub fn node_id(&self) -> Arc<str> {
Arc::clone(&self.node_id)
}
pub async fn validate_plugin_filename<'a>(
&self,
name: &'a str,
@ -372,6 +379,14 @@ impl ProcessingEngineManagerImpl {
})?
.clone();
if trigger.node_id != self.node_id {
error!(
"Not running trigger {}, as it is configured for node id {}. Multi-node not supported in core, so this shouldn't happen.",
trigger_name, trigger.node_id
);
return Ok(());
}
let plugin_context = PluginContext {
write_buffer: Arc::clone(&self.write_buffer),
query_executor: Arc::clone(&self.query_executor),
@ -811,6 +826,7 @@ mod tests {
.create_processing_engine_trigger(
"foo",
"test_trigger",
Arc::clone(&pem.node_id),
file_name,
&TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(),
TriggerSettings::default(),
@ -889,6 +905,7 @@ mod tests {
.create_processing_engine_trigger(
"foo",
"test_trigger",
Arc::clone(&pem.node_id),
file_name,
&TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(),
TriggerSettings::default(),
@ -1014,6 +1031,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
ProcessingEngineManagerImpl::new(
environment_manager,
catalog,
"test_node",
wbuf,
qe,
time_provider,

View File

@ -973,6 +973,7 @@ where
.create_processing_engine_trigger(
&db,
&trigger_name,
self.processing_engine.node_id(),
plugin_filename,
&trigger_specification,
trigger_settings,

View File

@ -766,9 +766,10 @@ mod tests {
},
DedicatedExecutor::new_testing(),
));
let node_identifier_prefix = "test_host";
let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
"test_host",
node_identifier_prefix,
Arc::clone(&time_provider) as _,
));
let sample_node_id = Arc::from("sample-host-id");
@ -841,6 +842,7 @@ mod tests {
package_manager: Arc::new(DisabledManager),
},
write_buffer.catalog(),
node_identifier_prefix,
Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _,