diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index c507e8a50b..3ca8138221 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -632,6 +632,7 @@ pub async fn command(config: Config) -> Result<()> { let processing_engine = ProcessingEngineManagerImpl::new( setup_processing_engine_env_manager(&config.processing_engine_config), write_buffer.catalog(), + config.node_identifier_prefix, Arc::clone(&write_buffer), Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _, diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index a990f197c4..969a0eccfc 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -482,6 +482,7 @@ impl Catalog { &self, db_name: &str, trigger_name: &str, + node_id: Arc, plugin_filename: ValidPluginFilename<'_>, trigger_specification: &str, trigger_settings: TriggerSettings, @@ -501,6 +502,7 @@ impl Catalog { trigger_name: trigger_name.to_string(), plugin_filename: plugin_filename.to_string(), database_name: Arc::clone(&db.name), + node_id: Arc::clone(&node_id), trigger, trigger_settings, trigger_arguments: trigger_arguments.clone(), diff --git a/influxdb3_catalog/src/log.rs b/influxdb3_catalog/src/log.rs index c8fd462be3..f4745cd15d 100644 --- a/influxdb3_catalog/src/log.rs +++ b/influxdb3_catalog/src/log.rs @@ -673,6 +673,7 @@ pub struct CreateTriggerLog { pub trigger_name: String, pub plugin_filename: String, pub database_name: Arc, + pub node_id: Arc, pub trigger: TriggerSpecificationDefinition, pub trigger_settings: TriggerSettings, pub trigger_arguments: Option>, diff --git a/influxdb3_catalog/src/snapshot.rs b/influxdb3_catalog/src/snapshot.rs index dd8a2238ed..4eada614ee 100644 --- a/influxdb3_catalog/src/snapshot.rs +++ b/influxdb3_catalog/src/snapshot.rs @@ -216,11 +216,12 @@ impl From for DatabaseSchema { CreateTriggerLog { trigger_name: trigger.trigger_name, plugin_filename: trigger.plugin_filename, + database_name: trigger.database_name, + node_id: trigger.node_id, trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), trigger_settings: trigger.trigger_settings, trigger_arguments: trigger.trigger_arguments, disabled: trigger.disabled, - database_name: trigger.database_name, }, ) }) @@ -289,6 +290,7 @@ struct ProcessingEngineTriggerSnapshot { pub trigger_name: String, pub plugin_filename: String, pub database_name: Arc, + pub node_id: Arc, pub trigger_specification: String, pub trigger_settings: TriggerSettings, pub trigger_arguments: Option>, @@ -541,6 +543,7 @@ impl From<&CreateTriggerLog> for ProcessingEngineTriggerSnapshot { trigger_name: trigger.trigger_name.to_string(), plugin_filename: trigger.plugin_filename.to_string(), database_name: Arc::clone(&trigger.database_name), + node_id: Arc::clone(&trigger.node_id), trigger_specification: serde_json::to_string(&trigger.trigger) .expect("should be able to serialize trigger specification"), trigger_settings: trigger.trigger_settings, diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 21f0902c97..01661bf45e 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -43,6 +43,7 @@ pub mod virtualenv; pub struct ProcessingEngineManagerImpl { environment_manager: ProcessingEngineEnvironmentManager, catalog: Arc, + node_id: Arc, write_buffer: Arc, query_executor: Arc, time_provider: Arc, @@ -207,6 +208,7 @@ impl ProcessingEngineManagerImpl { pub fn new( environment: ProcessingEngineEnvironmentManager, catalog: Arc, + node_id: impl Into>, write_buffer: Arc, query_executor: Arc, time_provider: Arc, @@ -233,6 +235,7 @@ impl ProcessingEngineManagerImpl { let pem = Arc::new(Self { environment_manager: environment, catalog, + node_id: node_id.into(), write_buffer, query_executor, sys_event_store, @@ -246,6 +249,10 @@ impl ProcessingEngineManagerImpl { pem } + pub fn node_id(&self) -> Arc { + Arc::clone(&self.node_id) + } + pub async fn validate_plugin_filename<'a>( &self, name: &'a str, @@ -372,6 +379,14 @@ impl ProcessingEngineManagerImpl { })? .clone(); + if trigger.node_id != self.node_id { + error!( + "Not running trigger {}, as it is configured for node id {}. Multi-node not supported in core, so this shouldn't happen.", + trigger_name, trigger.node_id + ); + return Ok(()); + } + let plugin_context = PluginContext { write_buffer: Arc::clone(&self.write_buffer), query_executor: Arc::clone(&self.query_executor), @@ -811,6 +826,7 @@ mod tests { .create_processing_engine_trigger( "foo", "test_trigger", + Arc::clone(&pem.node_id), file_name, &TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(), TriggerSettings::default(), @@ -889,6 +905,7 @@ mod tests { .create_processing_engine_trigger( "foo", "test_trigger", + Arc::clone(&pem.node_id), file_name, &TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(), TriggerSettings::default(), @@ -1014,6 +1031,7 @@ def process_writes(influxdb3_local, table_batches, args=None): ProcessingEngineManagerImpl::new( environment_manager, catalog, + "test_node", wbuf, qe, time_provider, diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 2e2ede6515..8bc93938ff 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -973,6 +973,7 @@ where .create_processing_engine_trigger( &db, &trigger_name, + self.processing_engine.node_id(), plugin_filename, &trigger_specification, trigger_settings, diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 15e6a7ce9c..a51c7202ef 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -766,9 +766,10 @@ mod tests { }, DedicatedExecutor::new_testing(), )); + let node_identifier_prefix = "test_host"; let persister = Arc::new(Persister::new( Arc::clone(&object_store), - "test_host", + node_identifier_prefix, Arc::clone(&time_provider) as _, )); let sample_node_id = Arc::from("sample-host-id"); @@ -841,6 +842,7 @@ mod tests { package_manager: Arc::new(DisabledManager), }, write_buffer.catalog(), + node_identifier_prefix, Arc::clone(&write_buffer), Arc::clone(&query_executor) as _, Arc::clone(&time_provider) as _,