feat(processing_engine): Attach a node id to the processing engine and triggers for multi-node support compatibility.

processing_engine/node_id
Jackson Newhouse 2025-03-13 13:32:44 -07:00
parent 941e0e0f15
commit 7a22abf788
7 changed files with 30 additions and 2 deletions

View File

@ -632,6 +632,7 @@ pub async fn command(config: Config) -> Result<()> {
let processing_engine = ProcessingEngineManagerImpl::new( let processing_engine = ProcessingEngineManagerImpl::new(
setup_processing_engine_env_manager(&config.processing_engine_config), setup_processing_engine_env_manager(&config.processing_engine_config),
write_buffer.catalog(), write_buffer.catalog(),
config.node_identifier_prefix,
Arc::clone(&write_buffer), Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _, Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _, Arc::clone(&time_provider) as _,

View File

@ -482,6 +482,7 @@ impl Catalog {
&self, &self,
db_name: &str, db_name: &str,
trigger_name: &str, trigger_name: &str,
node_id: Arc<str>,
plugin_filename: ValidPluginFilename<'_>, plugin_filename: ValidPluginFilename<'_>,
trigger_specification: &str, trigger_specification: &str,
trigger_settings: TriggerSettings, trigger_settings: TriggerSettings,
@ -501,6 +502,7 @@ impl Catalog {
trigger_name: trigger_name.to_string(), trigger_name: trigger_name.to_string(),
plugin_filename: plugin_filename.to_string(), plugin_filename: plugin_filename.to_string(),
database_name: Arc::clone(&db.name), database_name: Arc::clone(&db.name),
node_id: Arc::clone(&node_id),
trigger, trigger,
trigger_settings, trigger_settings,
trigger_arguments: trigger_arguments.clone(), trigger_arguments: trigger_arguments.clone(),

View File

@ -673,6 +673,7 @@ pub struct CreateTriggerLog {
pub trigger_name: String, pub trigger_name: String,
pub plugin_filename: String, pub plugin_filename: String,
pub database_name: Arc<str>, pub database_name: Arc<str>,
pub node_id: Arc<str>,
pub trigger: TriggerSpecificationDefinition, pub trigger: TriggerSpecificationDefinition,
pub trigger_settings: TriggerSettings, pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>, pub trigger_arguments: Option<HashMap<String, String>>,

View File

@ -216,11 +216,12 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
CreateTriggerLog { CreateTriggerLog {
trigger_name: trigger.trigger_name, trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename, plugin_filename: trigger.plugin_filename,
database_name: trigger.database_name,
node_id: trigger.node_id,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
trigger_settings: trigger.trigger_settings, trigger_settings: trigger.trigger_settings,
trigger_arguments: trigger.trigger_arguments, trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled, disabled: trigger.disabled,
database_name: trigger.database_name,
}, },
) )
}) })
@ -289,6 +290,7 @@ struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String, pub trigger_name: String,
pub plugin_filename: String, pub plugin_filename: String,
pub database_name: Arc<str>, pub database_name: Arc<str>,
pub node_id: Arc<str>,
pub trigger_specification: String, pub trigger_specification: String,
pub trigger_settings: TriggerSettings, pub trigger_settings: TriggerSettings,
pub trigger_arguments: Option<HashMap<String, String>>, pub trigger_arguments: Option<HashMap<String, String>>,
@ -541,6 +543,7 @@ impl From<&CreateTriggerLog> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(), trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(), plugin_filename: trigger.plugin_filename.to_string(),
database_name: Arc::clone(&trigger.database_name), database_name: Arc::clone(&trigger.database_name),
node_id: Arc::clone(&trigger.node_id),
trigger_specification: serde_json::to_string(&trigger.trigger) trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"), .expect("should be able to serialize trigger specification"),
trigger_settings: trigger.trigger_settings, trigger_settings: trigger.trigger_settings,

View File

@ -43,6 +43,7 @@ pub mod virtualenv;
pub struct ProcessingEngineManagerImpl { pub struct ProcessingEngineManagerImpl {
environment_manager: ProcessingEngineEnvironmentManager, environment_manager: ProcessingEngineEnvironmentManager,
catalog: Arc<Catalog>, catalog: Arc<Catalog>,
node_id: Arc<str>,
write_buffer: Arc<dyn WriteBuffer>, write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>, query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
@ -207,6 +208,7 @@ impl ProcessingEngineManagerImpl {
pub fn new( pub fn new(
environment: ProcessingEngineEnvironmentManager, environment: ProcessingEngineEnvironmentManager,
catalog: Arc<Catalog>, catalog: Arc<Catalog>,
node_id: impl Into<Arc<str>>,
write_buffer: Arc<dyn WriteBuffer>, write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>, query_executor: Arc<dyn QueryExecutor>,
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
@ -233,6 +235,7 @@ impl ProcessingEngineManagerImpl {
let pem = Arc::new(Self { let pem = Arc::new(Self {
environment_manager: environment, environment_manager: environment,
catalog, catalog,
node_id: node_id.into(),
write_buffer, write_buffer,
query_executor, query_executor,
sys_event_store, sys_event_store,
@ -246,6 +249,10 @@ impl ProcessingEngineManagerImpl {
pem pem
} }
pub fn node_id(&self) -> Arc<str> {
Arc::clone(&self.node_id)
}
pub async fn validate_plugin_filename<'a>( pub async fn validate_plugin_filename<'a>(
&self, &self,
name: &'a str, name: &'a str,
@ -372,6 +379,14 @@ impl ProcessingEngineManagerImpl {
})? })?
.clone(); .clone();
if trigger.node_id != self.node_id {
error!(
"Not running trigger {}, as it is configured for node id {}. Multi-node not supported in core, so this shouldn't happen.",
trigger_name, trigger.node_id
);
return Ok(());
}
let plugin_context = PluginContext { let plugin_context = PluginContext {
write_buffer: Arc::clone(&self.write_buffer), write_buffer: Arc::clone(&self.write_buffer),
query_executor: Arc::clone(&self.query_executor), query_executor: Arc::clone(&self.query_executor),
@ -811,6 +826,7 @@ mod tests {
.create_processing_engine_trigger( .create_processing_engine_trigger(
"foo", "foo",
"test_trigger", "test_trigger",
Arc::clone(&pem.node_id),
file_name, file_name,
&TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(), &TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(),
TriggerSettings::default(), TriggerSettings::default(),
@ -889,6 +905,7 @@ mod tests {
.create_processing_engine_trigger( .create_processing_engine_trigger(
"foo", "foo",
"test_trigger", "test_trigger",
Arc::clone(&pem.node_id),
file_name, file_name,
&TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(), &TriggerSpecificationDefinition::AllTablesWalWrite.string_rep(),
TriggerSettings::default(), TriggerSettings::default(),
@ -1014,6 +1031,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
ProcessingEngineManagerImpl::new( ProcessingEngineManagerImpl::new(
environment_manager, environment_manager,
catalog, catalog,
"test_node",
wbuf, wbuf,
qe, qe,
time_provider, time_provider,

View File

@ -973,6 +973,7 @@ where
.create_processing_engine_trigger( .create_processing_engine_trigger(
&db, &db,
&trigger_name, &trigger_name,
self.processing_engine.node_id(),
plugin_filename, plugin_filename,
&trigger_specification, &trigger_specification,
trigger_settings, trigger_settings,

View File

@ -766,9 +766,10 @@ mod tests {
}, },
DedicatedExecutor::new_testing(), DedicatedExecutor::new_testing(),
)); ));
let node_identifier_prefix = "test_host";
let persister = Arc::new(Persister::new( let persister = Arc::new(Persister::new(
Arc::clone(&object_store), Arc::clone(&object_store),
"test_host", node_identifier_prefix,
Arc::clone(&time_provider) as _, Arc::clone(&time_provider) as _,
)); ));
let sample_node_id = Arc::from("sample-host-id"); let sample_node_id = Arc::from("sample-host-id");
@ -841,6 +842,7 @@ mod tests {
package_manager: Arc::new(DisabledManager), package_manager: Arc::new(DisabledManager),
}, },
write_buffer.catalog(), write_buffer.catalog(),
node_identifier_prefix,
Arc::clone(&write_buffer), Arc::clone(&write_buffer),
Arc::clone(&query_executor) as _, Arc::clone(&query_executor) as _,
Arc::clone(&time_provider) as _, Arc::clone(&time_provider) as _,