feat: each plugin type uses its own channel (#25908)

pull/25909/head
Paul Dix 2025-01-23 12:35:50 -08:00 committed by GitHub
parent f1ea2d8747
commit c672ee8d1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 181 additions and 113 deletions

View File

@ -45,10 +45,12 @@ pub struct ProcessingEngineManagerImpl {
#[derive(Debug, Default)]
struct PluginChannels {
/// Map of database to trigger name to sender
active_triggers: HashMap<String, HashMap<String, mpsc::Sender<PluginEvent>>>,
/// Map of request path to the sender
request_triggers: HashMap<String, mpsc::Sender<PluginEvent>>,
/// Map of database to wal trigger name to handler
wal_triggers: HashMap<String, HashMap<String, mpsc::Sender<WalEvent>>>,
/// Map of database to schedule trigger name to handler
schedule_triggers: HashMap<String, HashMap<String, mpsc::Sender<ScheduleEvent>>>,
/// Map of request path to the request trigger handler
request_triggers: HashMap<String, mpsc::Sender<RequestEvent>>,
}
#[cfg(feature = "system-py")]
@ -60,77 +62,115 @@ impl PluginChannels {
&self,
db: String,
trigger: String,
trigger_spec: &TriggerSpecificationDefinition,
) -> Result<Option<Receiver<()>>, ProcessingEngineError> {
if let Some(trigger_map) = self.active_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(PluginEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(WalEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(ScheduleEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
}
TriggerSpecificationDefinition::RequestPath { .. } => {
if let Some(sender) = self.request_triggers.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(RequestEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
return Ok(Some(rx));
}
}
Ok(None)
}
fn remove_trigger(&mut self, db: String, trigger: String) {
if let Some(trigger_map) = self.active_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
fn remove_trigger(
&mut self,
db: String,
trigger: String,
trigger_spec: &TriggerSpecificationDefinition,
) {
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. }
| TriggerSpecificationDefinition::AllTablesWalWrite => {
if let Some(trigger_map) = self.wal_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::Schedule { .. }
| TriggerSpecificationDefinition::Every { .. } => {
if let Some(trigger_map) = self.schedule_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}
TriggerSpecificationDefinition::RequestPath { .. } => {
self.request_triggers.remove(&trigger);
}
}
}
#[cfg(feature = "system-py")]
fn add_trigger(
fn add_wal_trigger(&mut self, db: String, trigger: String) -> mpsc::Receiver<WalEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.wal_triggers.entry(db).or_default().insert(trigger, tx);
rx
}
fn add_schedule_trigger(
&mut self,
trigger_spec: &TriggerSpecificationDefinition,
db: String,
trigger: String,
) -> mpsc::Receiver<PluginEvent> {
observability_deps::tracing::info!(%db, ?trigger, ?trigger_spec, "adding trigger to plugin event channel");
) -> mpsc::Receiver<ScheduleEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.schedule_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
rx
}
match trigger_spec {
TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::AllTablesWalWrite => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Schedule { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Every { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::RequestPath { path } => {
self.request_triggers.insert(path.to_string(), tx);
}
}
fn add_request_trigger(&mut self, path: String) -> mpsc::Receiver<RequestEvent> {
let (tx, rx) = mpsc::channel(PLUGIN_EVENT_BUFFER_SIZE);
self.request_triggers.insert(path, tx);
rx
}
async fn send_wal_contents(&self, wal_contents: Arc<WalContents>) {
for (db, trigger_map) in &self.active_triggers {
for (db, trigger_map) in &self.wal_triggers {
for (trigger, sender) in trigger_map {
if let Err(e) = sender
.send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents)))
.send(WalEvent::WriteWalContents(Arc::clone(&wal_contents)))
.await
{
warn!(%e, %db, ?trigger, "error sending wal contents to plugin");
@ -144,7 +184,7 @@ impl PluginChannels {
trigger_path: &str,
request: Request,
) -> Result<(), ProcessingEngineError> {
let event = PluginEvent::Request(request);
let event = RequestEvent::Request(request);
if let Some(sender) = self.request_triggers.get(trigger_path) {
if sender.send(event).await.is_err() {
return Err(ProcessingEngineError::RequestTriggerNotFound);
@ -450,14 +490,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
})?
.clone();
let trigger_rx = self.plugin_event_tx.write().await.add_trigger(
&trigger.trigger,
db_name.to_string(),
trigger_name.to_string(),
);
let plugin_context = PluginContext {
trigger_rx,
write_buffer,
query_executor,
};
@ -473,25 +506,52 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.into());
};
match plugin_definition.plugin_type {
PluginType::WalRows => plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::Scheduled => plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
)?,
PluginType::Request => plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::WalRows => {
let rec = self
.plugin_event_tx
.write()
.await
.add_wal_trigger(db_name.to_string(), trigger_name.to_string());
plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
PluginType::Scheduled => {
let rec = self
.plugin_event_tx
.write()
.await
.add_schedule_trigger(db_name.to_string(), trigger_name.to_string());
plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
rec,
)?
}
PluginType::Request => {
let rec = self
.plugin_event_tx
.write()
.await
.add_request_trigger(trigger_name.to_string());
plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
rec,
)
}
}
}
@ -537,7 +597,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
.plugin_event_tx
.write()
.await
.send_shutdown(db_name.to_string(), trigger_name.to_string())
.send_shutdown(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
)
.await?
else {
return Ok(());
@ -548,10 +612,11 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
"shutdown trigger receiver dropped, may have received multiple shutdown requests"
);
} else {
self.plugin_event_tx
.write()
.await
.remove_trigger(db_name.to_string(), trigger_name.to_string());
self.plugin_event_tx.write().await.remove_trigger(
db_name.to_string(),
trigger_name.to_string(),
&trigger.trigger,
);
}
Ok(())
@ -738,9 +803,16 @@ impl WalFileNotifier for ProcessingEngineManagerImpl {
}
}
#[allow(unused)]
pub(crate) enum PluginEvent {
pub(crate) enum WalEvent {
WriteWalContents(Arc<WalContents>),
Shutdown(oneshot::Sender<()>),
}
pub(crate) enum ScheduleEvent {
Shutdown(oneshot::Sender<()>),
}
pub(crate) enum RequestEvent {
Request(Request),
Shutdown(oneshot::Sender<()>),
}

View File

@ -1,7 +1,7 @@
#[cfg(feature = "system-py")]
use crate::PluginCode;
use crate::{RequestEvent, ScheduleEvent, WalEvent};
#[cfg(feature = "system-py")]
use crate::PluginEvent;
use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_catalog::catalog::Catalog;
@ -73,6 +73,7 @@ pub(crate) fn run_wal_contents_plugin(
plugin_code: PluginCode,
trigger_definition: TriggerDefinition,
context: PluginContext,
plugin_receiver: mpsc::Receiver<WalEvent>,
) {
let trigger_plugin = TriggerPlugin {
trigger_definition,
@ -83,7 +84,7 @@ pub(crate) fn run_wal_contents_plugin(
};
tokio::task::spawn(async move {
trigger_plugin
.run_wal_contents_plugin(context.trigger_rx)
.run_wal_contents_plugin(plugin_receiver)
.await
.expect("trigger plugin failed");
});
@ -96,7 +97,13 @@ pub(crate) fn run_schedule_plugin(
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
plugin_receiver: mpsc::Receiver<ScheduleEvent>,
) -> Result<(), Error> {
let TriggerSpecificationDefinition::Schedule { .. } = &trigger_definition.trigger else {
// TODO: these linkages should be guaranteed by code.
unreachable!("this should've been checked");
};
let trigger_plugin = TriggerPlugin {
trigger_definition,
db_name,
@ -104,16 +111,18 @@ pub(crate) fn run_schedule_plugin(
write_buffer: context.write_buffer,
query_executor: context.query_executor,
};
let runner = python_plugin::ScheduleTriggerRunner::try_new(
&trigger_plugin.trigger_definition.trigger,
Arc::clone(&time_provider),
)?;
tokio::task::spawn(async move {
trigger_plugin
.run_schedule_plugin(context.trigger_rx, runner, time_provider)
.run_schedule_plugin(plugin_receiver, runner, time_provider)
.await
.expect("cron trigger plugin failed");
});
Ok(())
}
@ -123,6 +132,7 @@ pub(crate) fn run_request_plugin(
plugin_code: PluginCode,
trigger_definition: TriggerDefinition,
context: PluginContext,
plugin_receiver: mpsc::Receiver<RequestEvent>,
) {
let trigger_plugin = TriggerPlugin {
trigger_definition,
@ -133,7 +143,7 @@ pub(crate) fn run_request_plugin(
};
tokio::task::spawn(async move {
trigger_plugin
.run_request_plugin(context.trigger_rx)
.run_request_plugin(plugin_receiver)
.await
.expect("trigger plugin failed");
});
@ -141,8 +151,6 @@ pub(crate) fn run_request_plugin(
#[cfg(feature = "system-py")]
pub(crate) struct PluginContext {
// tokio channel for inputs
pub(crate) trigger_rx: mpsc::Receiver<PluginEvent>,
// handler to write data back to the DB.
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
// query executor to hand off to the plugin
@ -185,7 +193,7 @@ mod python_plugin {
impl TriggerPlugin {
pub(crate) async fn run_wal_contents_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
mut receiver: Receiver<WalEvent>,
) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting wal contents plugin");
@ -199,15 +207,12 @@ mod python_plugin {
};
match event {
PluginEvent::WriteWalContents(wal_contents) => {
WalEvent::WriteWalContents(wal_contents) => {
if let Err(e) = self.process_wal_contents(wal_contents).await {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
}
}
PluginEvent::Request(_) => {
warn!("ignoring request in wal contents plugin.")
}
PluginEvent::Shutdown(sender) => {
WalEvent::Shutdown(sender) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
break;
}
@ -219,7 +224,7 @@ mod python_plugin {
pub(crate) async fn run_schedule_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
mut receiver: Receiver<ScheduleEvent>,
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
@ -241,13 +246,7 @@ mod python_plugin {
warn!(?self.trigger_definition, "trigger plugin receiver closed");
break;
}
Some(PluginEvent::WriteWalContents(_)) => {
warn!("ignoring wal contents in cron plugin.")
}
Some(PluginEvent::Request(_)) => {
warn!("ignoring request in cron plugin.")
}
Some(PluginEvent::Shutdown(sender)) => {
Some(ScheduleEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
break;
}
@ -261,7 +260,7 @@ mod python_plugin {
pub(crate) async fn run_request_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
mut receiver: Receiver<RequestEvent>,
) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting request plugin");
@ -271,10 +270,7 @@ mod python_plugin {
warn!(?self.trigger_definition, "trigger plugin receiver closed");
break;
}
Some(PluginEvent::WriteWalContents(_)) => {
warn!("ignoring wal contents in request plugin.")
}
Some(PluginEvent::Request(request)) => {
Some(RequestEvent::Request(request)) => {
let Some(schema) =
self.write_buffer.catalog().db_schema(self.db_name.as_str())
else {
@ -336,7 +332,7 @@ mod python_plugin {
error!(?self.trigger_definition, "error sending response");
}
}
Some(PluginEvent::Shutdown(sender)) => {
Some(RequestEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
break;
}