From 1d8d3d66fca733d9b26b46d609a2149d4b75d68b Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Sat, 18 Jan 2025 04:18:18 -0800 Subject: [PATCH] feat(processing_engine): Add cron plugins and triggers to the processing engine. (#25852) * feat(processing_engine): Add cron plugins and triggers to the processing engine. * feat(processing_engine): switch from 'cron plugin' to 'schedule plugin', use TimeProvider. * feat(processing_engine): add test for test scheduled plugin. --- Cargo.lock | 35 ++ Cargo.toml | 1 + influxdb3/src/commands/test.rs | 52 ++- influxdb3/tests/server/cli.rs | 76 +++- influxdb3_client/src/lib.rs | 35 +- influxdb3_client/src/plugin_development.rs | 18 + influxdb3_processing_engine/Cargo.toml | 2 + influxdb3_processing_engine/src/lib.rs | 64 +++- influxdb3_processing_engine/src/manager.rs | 11 +- influxdb3_processing_engine/src/plugins.rs | 391 ++++++++++++++++----- influxdb3_py_api/Cargo.toml | 1 + influxdb3_py_api/src/system_py.rs | 73 +++- influxdb3_server/src/http.rs | 33 ++ influxdb3_wal/Cargo.toml | 1 + influxdb3_wal/src/lib.rs | 17 + 15 files changed, 716 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 122396a595..a32146a6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,6 +1173,28 @@ dependencies = [ "cc", ] +[[package]] +name = "cron" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbe58904b68d95ad2c69a2e9607ffc70ce196c1910f83a340029c1448e06ed65" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + [[package]] name = "crossbeam-channel" version = "0.5.14" @@ -2974,6 +2996,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "chrono", + "cron 0.15.0", "data_types", "datafusion_util", "hashbrown 0.15.2", @@ -3004,6 +3028,7 @@ dependencies = [ "anyhow", "arrow-array", "arrow-schema", + "chrono", "futures", "hashbrown 0.15.2", "influxdb3_catalog", @@ -3163,6 +3188,7 @@ dependencies = [ "byteorder", "bytes", "crc32fast", + "cron 0.14.0", "data_types", "futures-util", "hashbrown 0.15.2", @@ -7417,6 +7443,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d71a593cc5c42ad7876e2c1fda56f314f3754c084128833e64f1345ff8a03a" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 7bb9bda703..5d7e1325e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ bitcode = { version = "0.6.3", features = ["serde"] } byteorder = "1.3.4" bytes = "1.9" chrono = "0.4" +cron = "0.15" clap = { version = "4", features = ["derive", "env", "string"] } clru = "0.6.2" crc32fast = "1.2.0" diff --git a/influxdb3/src/commands/test.rs b/influxdb3/src/commands/test.rs index fd8c09d030..39f3b9f3e7 100644 --- a/influxdb3/src/commands/test.rs +++ b/influxdb3/src/commands/test.rs @@ -1,7 +1,7 @@ use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList}; use anyhow::Context; use hashbrown::HashMap; -use influxdb3_client::plugin_development::WalPluginTestRequest; +use influxdb3_client::plugin_development::{SchedulePluginTestRequest, WalPluginTestRequest}; use influxdb3_client::Client; use secrecy::ExposeSecret; use std::error::Error; @@ -23,6 +23,15 @@ impl Config { .. }, .. + }) + | SubCommand::SchedulePlugin(SchedulePluginConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. }) => { let mut client = Client::new(host_url.clone())?; if let Some(token) = &auth_token { @@ -39,6 +48,9 @@ pub enum SubCommand { /// Test a WAL Plugin #[clap(name = "wal_plugin")] WalPlugin(WalPluginConfig), + /// Test a Cron Plugin + #[clap(name = "schedule_plugin")] + SchedulePlugin(SchedulePluginConfig), } #[derive(Debug, clap::Parser)] @@ -60,6 +72,22 @@ pub struct WalPluginConfig { pub filename: String, } +#[derive(Debug, clap::Parser)] +pub struct SchedulePluginConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + /// If given pass this map of string key/value pairs as input arguments + #[clap(long = "input-arguments")] + pub input_arguments: Option>>, + /// The file name of the plugin, which should exist on the server in `/`. + /// The plugin-dir is provided on server startup. + #[clap(required = true)] + pub filename: String, + /// Cron schedule to test against. If not given will use * * * * * + #[clap(long = "schedule")] + pub schedule: Option, +} + pub async fn command(config: Config) -> Result<(), Box> { let client = config.get_client()?; @@ -96,6 +124,28 @@ pub async fn command(config: Config) -> Result<(), Box> { .expect("serialize wal plugin test response as JSON") ); } + SubCommand::SchedulePlugin(plugin_config) => { + let input_arguments = plugin_config.input_arguments.map(|a| { + a.into_iter() + .map(|SeparatedKeyValue((k, v))| (k, v)) + .collect::>() + }); + let cron_plugin_test_request = SchedulePluginTestRequest { + filename: plugin_config.filename, + database: plugin_config.influxdb3_config.database_name, + schedule: plugin_config.schedule, + input_arguments, + }; + let response = client + .schedule_plugin_test(cron_plugin_test_request) + .await?; + + println!( + "{}", + serde_json::to_string_pretty(&response) + .expect("serialize cron plugin test response as JSON") + ); + } } Ok(()) diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index ef21e916da..05327212d0 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -15,7 +15,8 @@ use test_helpers::tempfile::NamedTempFile; use test_helpers::tempfile::TempDir; use test_helpers::{assert_contains, assert_not_contains}; -const WRITE_REPORTS_PLUGIN_CODE: &str = r#" +#[cfg(feature = "system-py")] +pub const WRITE_REPORTS_PLUGIN_CODE: &str = r#" def process_writes(influxdb3_local, table_batches, args=None): for table_batch in table_batches: # Skip if table_name is write_reports @@ -1279,6 +1280,79 @@ def process_writes(influxdb3_local, table_batches, args=None): let expected_result = serde_json::from_str::(expected_result).unwrap(); assert_eq!(res, expected_result); } +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_schedule_plugin_test() { + use crate::ConfigProvider; + use influxdb3_client::Precision; + + // Create plugin file with a scheduled task + let plugin_file = create_plugin_file( + r#" +def process_scheduled_call(influxdb3_local, schedule_time, args=None): + influxdb3_local.info(f"args are {args}") + influxdb3_local.info("Successfully called")"#, + ); + + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + + // Write some test data + server + .write_lp_to_db( + "foo", + "cpu,host=host1,region=us-east usage=0.75\n\ + cpu,host=host2,region=us-west usage=0.82\n\ + cpu,host=host3,region=us-east usage=0.91", + Precision::Nanosecond, + ) + .await + .unwrap(); + + let db_name = "foo"; + + // Run the schedule plugin test + let result = run_with_confirmation(&[ + "test", + "schedule_plugin", + "--database", + db_name, + "--host", + &server_addr, + "--schedule", + "*/5 * * * * *", // Run every 5 seconds + "--input-arguments", + "region=us-east", + plugin_name, + ]); + debug!(result = ?result, "test schedule plugin"); + + let res = serde_json::from_str::(&result).unwrap(); + + // The trigger_time will be dynamic, so we'll just verify it exists and is in the right format + let trigger_time = res["trigger_time"].as_str().unwrap(); + assert!(trigger_time.contains('T')); // Basic RFC3339 format check + + // Check the rest of the response structure + let expected_result = serde_json::json!({ + "log_lines": [ + "INFO: args are {'region': 'us-east'}", + "INFO: Successfully called" + ], + "database_writes": { + }, + "errors": [] + }); + assert_eq!(res["log_lines"], expected_result["log_lines"]); + assert_eq!(res["database_writes"], expected_result["database_writes"]); + assert_eq!(res["errors"], expected_result["errors"]); +} #[cfg(feature = "system-py")] #[test_log::test(tokio::test)] diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 8510e1a856..6aa843727b 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -1,6 +1,9 @@ pub mod plugin_development; -use crate::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use crate::plugin_development::{ + SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest, + WalPluginTestResponse, +}; use bytes::Bytes; use hashbrown::HashMap; use iox_query_params::StatementParam; @@ -736,6 +739,36 @@ impl Client { } } + /// Make a request to the `POST /api/v3/plugin_test/schedule` API + pub async fn schedule_plugin_test( + &self, + schedule_plugin_test_request: SchedulePluginTestRequest, + ) -> Result { + let api_path = "/api/v3/plugin_test/schedule"; + let url = self.base_url.join(api_path)?; + + let mut req = self + .http_client + .post(url) + .json(&schedule_plugin_test_request); + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(Method::POST, api_path, src))?; + + if resp.status().is_success() { + resp.json().await.map_err(Error::Json) + } else { + Err(Error::ApiError { + code: resp.status(), + message: resp.text().await.map_err(Error::Text)?, + }) + } + } + /// Send a `/ping` request to the target `influxdb3` server to check its /// status and gather `version` and `revision` information pub async fn ping(&self) -> Result { diff --git a/influxdb3_client/src/plugin_development.rs b/influxdb3_client/src/plugin_development.rs index 6bd394fb1b..dcf80f2488 100644 --- a/influxdb3_client/src/plugin_development.rs +++ b/influxdb3_client/src/plugin_development.rs @@ -19,3 +19,21 @@ pub struct WalPluginTestResponse { pub database_writes: HashMap>, pub errors: Vec, } + +/// Request definition for `POST /api/v3/plugin_test/schedule` API +#[derive(Debug, Serialize, Deserialize)] +pub struct SchedulePluginTestRequest { + pub filename: String, + pub database: String, + pub schedule: Option, + pub input_arguments: Option>, +} + +/// Response definition for `POST /api/v3/plugin_test/schedule` API +#[derive(Debug, Serialize, Deserialize)] +pub struct SchedulePluginTestResponse { + pub trigger_time: Option, + pub log_lines: Vec, + pub database_writes: HashMap>, + pub errors: Vec, +} diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index 9b2b9f51e5..dbc7ecb3f3 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -11,6 +11,8 @@ license.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +chrono.workspace = true +cron.workspace = true data_types.workspace = true hashbrown.workspace = true iox_time.workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 7a683b6841..0b1ae5de1f 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -1,4 +1,5 @@ use crate::manager::{ProcessingEngineError, ProcessingEngineManager}; +use crate::plugins::Error; #[cfg(feature = "system-py")] use crate::plugins::PluginContext; use anyhow::Context; @@ -6,7 +7,10 @@ use hashbrown::HashMap; use influxdb3_catalog::catalog; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; -use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_client::plugin_development::{ + SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest, + WalPluginTestResponse, +}; use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition, @@ -339,7 +343,31 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { query_executor, }; let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?; - plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context); + let Some(plugin_definition) = db_schema + .processing_engine_plugins + .get(&trigger.plugin_name) + else { + return Err(catalog::Error::ProcessingEnginePluginNotFound { + plugin_name: trigger.plugin_name, + database_name: db_name.to_string(), + } + .into()); + }; + match plugin_definition.plugin_type { + PluginType::WalRows => plugins::run_wal_contents_plugin( + db_name.to_string(), + plugin_code, + trigger, + plugin_context, + ), + PluginType::Scheduled => plugins::run_schedule_plugin( + db_name.to_string(), + plugin_code, + trigger, + Arc::clone(&self.time_provider), + plugin_context, + ), + } } Ok(()) @@ -489,6 +517,38 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { "system-py feature not enabled" ))) } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn test_schedule_plugin( + &self, + request: SchedulePluginTestRequest, + query_executor: Arc, + ) -> Result { + #[cfg(feature = "system-py")] + { + // create a copy of the catalog so we don't modify the original + let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); + let now = self.time_provider.now(); + + let code = self.read_plugin_code(&request.filename).await?; + + let res = + plugins::run_test_schedule_plugin(now, catalog, query_executor, code, request) + .unwrap_or_else(|e| SchedulePluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + trigger_time: None, + }); + + return Ok(res); + } + + #[cfg(not(feature = "system-py"))] + Err(plugins::Error::AnyhowError(anyhow::anyhow!( + "system-py feature not enabled" + ))) + } } #[async_trait::async_trait] diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index e58a353ca0..6f132392ca 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -1,5 +1,8 @@ use hashbrown::HashMap; -use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_client::plugin_development::{ + SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest, + WalPluginTestResponse, +}; use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; use influxdb3_write::WriteBuffer; @@ -101,4 +104,10 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { request: WalPluginTestRequest, query_executor: Arc, ) -> Result; + + async fn test_schedule_plugin( + &self, + request: SchedulePluginTestRequest, + query_executor: Arc, + ) -> Result; } diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index bc2d059164..b64484714a 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -1,5 +1,6 @@ #[cfg(feature = "system-py")] use crate::PluginEvent; +use influxdb3_catalog::catalog::Catalog; #[cfg(feature = "system-py")] use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; #[cfg(feature = "system-py")] @@ -14,11 +15,20 @@ use influxdb3_write::WriteBuffer; use observability_deps::tracing::error; use std::fmt::Debug; #[cfg(feature = "system-py")] +use std::str::FromStr; use std::sync::Arc; use thiserror::Error; #[cfg(feature = "system-py")] use tokio::sync::mpsc; +use data_types::NamespaceName; +use hashbrown::HashMap; +use influxdb3_wal::Gen1Duration; +use influxdb3_write::write_buffer::validator::WriteValidator; +use influxdb3_write::Precision; +#[cfg(feature = "system-py")] +use iox_time::TimeProvider; + #[derive(Debug, Error)] pub enum Error { #[error("invalid database {0}")] @@ -45,10 +55,19 @@ pub enum Error { #[error("error executing plugin: {0}")] PluginExecutionError(#[from] influxdb3_py_api::ExecutePluginError), + + #[error("invalid cron syntax: {0}")] + InvalidCronSyntax(#[from] cron::error::Error), + + #[error("cron schedule never triggers: {0}")] + CronScheduleNeverTriggers(String), + + #[error("tried to run a schedule plugin but the schedule iterator is over.")] + ScheduledMissingTime, } #[cfg(feature = "system-py")] -pub(crate) fn run_plugin( +pub(crate) fn run_wal_contents_plugin( db_name: String, plugin_code: String, trigger_definition: TriggerDefinition, @@ -63,12 +82,40 @@ pub(crate) fn run_plugin( }; tokio::task::spawn(async move { trigger_plugin - .run_plugin(context.trigger_rx) + .run_wal_contents_plugin(context.trigger_rx) .await .expect("trigger plugin failed"); }); } +#[cfg(feature = "system-py")] +pub(crate) fn run_schedule_plugin( + db_name: String, + plugin_code: String, + trigger_definition: TriggerDefinition, + time_provider: Arc, + context: PluginContext, +) { + let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else { + // TODO: these linkages should be guaranteed by code. + unreachable!("this should've been checked"); + }; + let schedule = schedule.to_string(); + let trigger_plugin = TriggerPlugin { + trigger_definition, + db_name, + plugin_code, + write_buffer: context.write_buffer, + query_executor: context.query_executor, + }; + tokio::task::spawn(async move { + trigger_plugin + .run_schedule_plugin(context.trigger_rx, schedule, time_provider) + .await + .expect("cron trigger plugin failed"); + }); +} + #[cfg(feature = "system-py")] pub(crate) struct PluginContext { // tokio channel for inputs @@ -79,17 +126,6 @@ pub(crate) struct PluginContext { pub(crate) query_executor: Arc, } -#[cfg(feature = "system-py")] -#[async_trait::async_trait] -trait RunnablePlugin { - // Returns true if it should exit - async fn process_event(&self, event: PluginEvent) -> Result; - async fn run_plugin( - &self, - receiver: tokio::sync::mpsc::Receiver, - ) -> Result<(), Error>; -} - #[cfg(feature = "system-py")] #[derive(Debug)] struct TriggerPlugin { @@ -103,20 +139,26 @@ struct TriggerPlugin { #[cfg(feature = "system-py")] mod python_plugin { use super::*; - use anyhow::Context; + use anyhow::{anyhow, Context}; + use chrono::{DateTime, Utc}; + use cron::{OwnedScheduleIterator, Schedule}; use data_types::NamespaceName; use hashbrown::HashMap; - use influxdb3_py_api::system_py::execute_python_with_batch; + use influxdb3_catalog::catalog::DatabaseSchema; + use influxdb3_py_api::system_py::{execute_python_with_batch, execute_schedule_trigger}; use influxdb3_wal::WalOp; use influxdb3_write::Precision; use iox_time::Time; - use observability_deps::tracing::{info, warn}; + use observability_deps::tracing::{debug, info, warn}; + use std::str::FromStr; use std::time::SystemTime; use tokio::sync::mpsc::Receiver; - #[async_trait::async_trait] - impl RunnablePlugin for TriggerPlugin { - async fn run_plugin(&self, mut receiver: Receiver) -> Result<(), Error> { + impl TriggerPlugin { + pub(crate) async fn run_wal_contents_plugin( + &self, + mut receiver: Receiver, + ) -> Result<(), Error> { info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting trigger plugin"); loop { @@ -142,12 +184,54 @@ mod python_plugin { Ok(()) } + + pub(crate) async fn run_schedule_plugin( + &self, + mut receiver: Receiver, + schedule: String, + time_provider: Arc, + ) -> Result<(), Error> { + let schedule = Schedule::from_str(schedule.as_str())?; + let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider)); + + loop { + let Some(next_run_instant) = runner.next_run_time() else { + break; + }; + + tokio::select! { + _ = time_provider.sleep_until(next_run_instant) => { + let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { + return Err(Error::MissingDb); + }; + runner.run_at_time(self, schema).await?; + } + event = receiver.recv() => { + match event { + None => { + warn!(?self.trigger_definition, "trigger plugin receiver closed"); + break; + } + Some(PluginEvent::WriteWalContents(_)) => { + debug!("ignoring wal contents in cron plugin.") + } + Some(PluginEvent::Shutdown(sender)) => { + sender.send(()).map_err(|_| Error::FailedToShutdown)?; + break; + } + } + } + } + } + + Ok(()) + } async fn process_event(&self, event: PluginEvent) -> Result { let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else { return Err(Error::MissingDb); }; - let mut db_writes: HashMap> = HashMap::new(); + let mut db_writes = DatabaseWriteBuffer::new(); match event { PluginEvent::WriteWalContents(wal_contents) => { @@ -173,6 +257,12 @@ mod python_plugin { .context("table not found")?; Some(table_id) } + // This should not occur + TriggerSpecificationDefinition::Schedule { + schedule + } => { + return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into()) + } }; let result = execute_python_with_batch( @@ -186,14 +276,12 @@ mod python_plugin { // write the output lines to the appropriate database if !result.write_back_lines.is_empty() { - let lines = - db_writes.entry_ref(schema.name.as_ref()).or_default(); - lines.extend(result.write_back_lines); + db_writes + .add_lines(schema.name.as_ref(), result.write_back_lines); } for (db_name, add_lines) in result.write_db_lines { - let lines = db_writes.entry(db_name).or_default(); - lines.extend(add_lines); + db_writes.add_lines(&db_name, add_lines); } } WalOp::Catalog(_) => {} @@ -208,25 +296,103 @@ mod python_plugin { } if !db_writes.is_empty() { - for (db_name, output_lines) in db_writes { - let ingest_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - self.write_buffer - .write_lp( - NamespaceName::new(db_name).unwrap(), - output_lines.join("\n").as_str(), - Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), - false, - Precision::Nanosecond, - ) - .await?; - } + db_writes.execute(&self.write_buffer).await?; } Ok(false) } } + + struct DatabaseWriteBuffer { + writes: HashMap>, + } + impl DatabaseWriteBuffer { + fn new() -> Self { + Self { + writes: HashMap::new(), + } + } + + fn add_lines(&mut self, db_name: &str, lines: Vec) { + self.writes.entry_ref(db_name).or_default().extend(lines); + } + + fn is_empty(&self) -> bool { + self.writes.is_empty() + } + + async fn execute(self, write_buffer: &Arc) -> Result<(), Error> { + for (db_name, output_lines) in self.writes { + let ingest_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + output_lines.join("\n").as_str(), + Time::from_timestamp_nanos(ingest_time.as_nanos() as i64), + false, + Precision::Nanosecond, + ) + .await?; + } + Ok(()) + } + } + + struct ScheduleTriggerRunner { + schedule: OwnedScheduleIterator, + next_trigger_time: Option>, + } + impl ScheduleTriggerRunner { + fn new(cron_schedule: Schedule, time_provider: Arc) -> Self { + let mut schedule = cron_schedule.after_owned(time_provider.now().date_time()); + let next_trigger_time = schedule.next(); + Self { + schedule, + next_trigger_time, + } + } + + async fn run_at_time( + &mut self, + plugin: &TriggerPlugin, + db_schema: Arc, + ) -> Result<(), Error> { + let Some(trigger_time) = self.next_trigger_time else { + return Err(anyhow!("running a cron trigger that is finished.").into()); + }; + let result = execute_schedule_trigger( + &plugin.plugin_code, + trigger_time, + Arc::clone(&db_schema), + Arc::clone(&plugin.query_executor), + &plugin.trigger_definition.trigger_arguments, + )?; + + let mut db_writes = DatabaseWriteBuffer::new(); + // write the output lines to the appropriate database + if !result.write_back_lines.is_empty() { + db_writes.add_lines(db_schema.name.as_ref(), result.write_back_lines); + } + + for (db_name, add_lines) in result.write_db_lines { + db_writes.add_lines(&db_name, add_lines); + } + db_writes.execute(&plugin.write_buffer).await?; + self.advance_time(); + Ok(()) + } + fn advance_time(&mut self) { + self.next_trigger_time = self.schedule.next(); + } + + /// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on. + fn next_run_time(&self) -> Option