feat: Finish wiring up WAL plugin with trigger (#25781)

This updates the WAL so that it can have new file notifiers added that will get updated when the wal flushes. The processing engine now implements the WALNotifier trait.

I've updated the CLI test for creating a trigger to run and end-to-end test that defines a plugin, creates a trigger, writes data into the database, triggering the plugin, which writes summary statistics back into the database in a different table. The test queries the destination table to confirm that the plugin worked.
praveen/telem-custom-domain
Paul Dix 2025-01-10 12:56:49 -05:00 committed by GitHub
parent c71dafc313
commit 0da0785960
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 132 additions and 9 deletions

View File

@ -517,8 +517,11 @@ def process_rows(iterator, output):
assert_contains!(&result, "Plugin test_plugin deleted successfully");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_create_trigger() {
async fn test_create_trigger_and_run() {
// create a plugin and trigger and write data in, verifying that the trigger is activated
// and sent data
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
@ -530,8 +533,13 @@ async fn test_create_trigger() {
let plugin_file = create_plugin_file(
r#"
def process_rows(iterator, output):
pass
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
row_count = len(table_batch["rows"])
line = LineBuilder("write_reports")\
.tag("table_name", table_batch["table_name"])\
.int64_field("row_count", row_count)
influxdb3_local.write(line)
"#,
);
@ -547,7 +555,7 @@ def process_rows(iterator, output):
plugin_name,
]);
// Create trigger
// creating the trigger should activate it
let result = run_with_confirmation(&[
"create",
"trigger",
@ -563,6 +571,50 @@ def process_rows(iterator, output):
]);
debug!(result = ?result, "create trigger");
assert_contains!(&result, "Trigger test_trigger created successfully");
// now let's write data and see if it gets processed
server
.write_lp_to_db(
db_name,
"cpu,host=a f1=1.0\ncpu,host=b f1=2.0\nmem,host=a usage=234",
influxdb3_client::Precision::Second,
)
.await
.expect("write to db");
// query to see if the processed data is there
let mut check_count = 0;
let result = loop {
match server
.api_v3_query_sql(&[
("db", db_name),
("q", "SELECT table_name, row_count FROM write_reports"),
("format", "json"),
])
.await
.json::<Value>()
.await
{
Ok(value) => break value,
Err(e) => {
check_count += 1;
if check_count > 10 {
panic!("Failed to query processed data: {}", e);
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
};
assert_eq!(
result,
json!(
[
{"table_name": "cpu", "row_count": 2},
{"table_name": "mem", "row_count": 1}
]
)
);
}
#[test_log::test(tokio::test)]

View File

@ -10,12 +10,15 @@ use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestRe
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition,
PluginType, TriggerDefinition, TriggerIdentifier, TriggerSpecificationDefinition, Wal,
WalContents, WalOp,
PluginType, SnapshotDetails, TriggerDefinition, TriggerIdentifier,
TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::warn;
use std::any::Any;
use std::sync::Arc;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{mpsc, oneshot, Mutex};
pub mod manager;
@ -63,6 +66,19 @@ impl PluginChannels {
.insert(trigger, tx);
rx
}
async fn send_wal_contents(&self, wal_contents: Arc<WalContents>) {
for (db, trigger_map) in &self.active_triggers {
for (trigger, sender) in trigger_map {
if let Err(e) = sender
.send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents)))
.await
{
warn!(%e, %db, ?trigger, "error sending wal contents to plugin");
}
}
}
}
}
impl ProcessingEngineManagerImpl {
@ -391,6 +407,32 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
}
}
#[async_trait::async_trait]
impl WalFileNotifier for ProcessingEngineManagerImpl {
async fn notify(&self, write: Arc<WalContents>) {
let plugin_channels = self.plugin_event_tx.lock().await;
plugin_channels.send_wal_contents(write).await;
}
async fn notify_and_snapshot(
&self,
write: Arc<WalContents>,
snapshot_details: SnapshotDetails,
) -> Receiver<SnapshotDetails> {
let plugin_channels = self.plugin_event_tx.lock().await;
plugin_channels.send_wal_contents(write).await;
// configure a reciever that we immediately close
let (tx, rx) = oneshot::channel();
tx.send(snapshot_details).ok();
rx
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[allow(unused)]
pub(crate) enum PluginEvent {
WriteWalContents(Arc<WalContents>),

View File

@ -104,13 +104,15 @@ mod python_plugin {
use influxdb3_wal::WalOp;
use influxdb3_write::Precision;
use iox_time::Time;
use observability_deps::tracing::warn;
use observability_deps::tracing::{info, warn};
use std::time::SystemTime;
use tokio::sync::mpsc::Receiver;
#[async_trait::async_trait]
impl RunnablePlugin for TriggerPlugin {
async fn run_plugin(&self, mut receiver: Receiver<PluginEvent>) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting trigger plugin");
loop {
let event = match receiver.recv().await {
Some(event) => event,

View File

@ -160,6 +160,10 @@ impl<T: TimeProvider>
Arc::clone(&self.time_provider.0) as _,
self.write_buffer.0.wal(),
));
self.write_buffer
.0
.wal()
.add_file_notifier(Arc::clone(&processing_engine) as _);
let http = Arc::new(HttpApi::new(
self.common_state.clone(),
Arc::clone(&self.time_provider.0),

View File

@ -1036,6 +1036,7 @@ where
} else {
self.read_body_json(req).await?
};
debug!(%db, %plugin_name, %trigger_name, %trigger_specification, %disabled, "configure_processing_engine_trigger");
let Ok(trigger_spec) =
TriggerSpecificationDefinition::from_string_rep(&trigger_specification)
else {

View File

@ -113,6 +113,12 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);
/// Adds a new file notifier listener to the WAL (for use by the processing engine). The WAL
/// will send new file notifications to the listener, but ignore any snapshot receiver.
/// Only the notifier passed in the WAL constructor should be used for snapshots (i.e. the
/// `QueryableBuffer`).
fn add_file_notifier(&self, notifier: Arc<dyn WalFileNotifier>);
}
/// When the WAL persists a file with buffered ops, the contents are sent to this

View File

@ -22,6 +22,7 @@ pub struct WalObjectStore {
object_store: Arc<dyn ObjectStore>,
host_identifier_prefix: String,
file_notifier: Arc<dyn WalFileNotifier>,
added_file_notifiers: parking_lot::Mutex<Vec<Arc<dyn WalFileNotifier>>>,
/// Buffered wal ops go in here along with the state to track when to snapshot
flush_buffer: Mutex<FlushBuffer>,
}
@ -70,6 +71,7 @@ impl WalObjectStore {
object_store,
host_identifier_prefix: host_identifier_prefix.into(),
file_notifier,
added_file_notifiers: Default::default(),
flush_buffer: Mutex::new(FlushBuffer::new(
Arc::clone(&time_provider),
WalBuffer {
@ -293,16 +295,19 @@ impl WalObjectStore {
}
}
let wal_contents = Arc::new(wal_contents);
// now that we've persisted this latest notify and start the snapshot, if set
let snapshot_response = match wal_contents.snapshot {
Some(snapshot_details) => {
info!(?snapshot_details, "snapshotting wal");
let snapshot_done = self
.file_notifier
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
.notify_and_snapshot(Arc::clone(&wal_contents), snapshot_details)
.await;
let (snapshot_info, snapshot_permit) =
snapshot.expect("snapshot should be set when snapshot details are set");
Some((snapshot_done, snapshot_info, snapshot_permit))
}
None => {
@ -310,11 +315,18 @@ impl WalObjectStore {
"notify sent to buffer for wal file {}",
wal_contents.wal_file_number.as_u64()
);
self.file_notifier.notify(Arc::new(wal_contents)).await;
self.file_notifier.notify(Arc::clone(&wal_contents)).await;
None
}
};
// now send the contents to the extra notifiers
let notifiers = self.added_file_notifiers.lock().clone();
for notifier in notifiers {
// added notifiers don't do anything with the snapshot, so just notify
notifier.notify(Arc::clone(&wal_contents)).await;
}
// send all the responses back to clients
for response in responses {
let _ = response.send(WriteResult::Success(()));
@ -446,6 +458,10 @@ impl Wal for WalObjectStore {
async fn shutdown(&self) {
self.shutdown().await
}
fn add_file_notifier(&self, notifier: Arc<dyn WalFileNotifier>) {
self.added_file_notifiers.lock().push(notifier);
}
}
#[derive(Debug)]