feat(processing_engine): Add cron plugins and triggers to the processing engine. (#25852)

* feat(processing_engine): Add cron plugins and triggers to the processing engine.

* feat(processing_engine): switch from 'cron plugin' to 'schedule plugin', use TimeProvider.

* feat(processing_engine): add test for test scheduled plugin.
praveen/out-of-order-test
Jackson Newhouse 2025-01-18 04:18:18 -08:00 committed by GitHub
parent d800d8e368
commit 1d8d3d66fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 716 additions and 94 deletions

35
Cargo.lock generated
View File

@ -1173,6 +1173,28 @@ dependencies = [
"cc",
]
[[package]]
name = "cron"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbe58904b68d95ad2c69a2e9607ffc70ce196c1910f83a340029c1448e06ed65"
dependencies = [
"chrono",
"once_cell",
"winnow",
]
[[package]]
name = "cron"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740"
dependencies = [
"chrono",
"once_cell",
"winnow",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
@ -2974,6 +2996,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"cron 0.15.0",
"data_types",
"datafusion_util",
"hashbrown 0.15.2",
@ -3004,6 +3028,7 @@ dependencies = [
"anyhow",
"arrow-array",
"arrow-schema",
"chrono",
"futures",
"hashbrown 0.15.2",
"influxdb3_catalog",
@ -3163,6 +3188,7 @@ dependencies = [
"byteorder",
"bytes",
"crc32fast",
"cron 0.14.0",
"data_types",
"futures-util",
"hashbrown 0.15.2",
@ -7417,6 +7443,15 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.6.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d71a593cc5c42ad7876e2c1fda56f314f3754c084128833e64f1345ff8a03a"
dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"

View File

@ -58,6 +58,7 @@ bitcode = { version = "0.6.3", features = ["serde"] }
byteorder = "1.3.4"
bytes = "1.9"
chrono = "0.4"
cron = "0.15"
clap = { version = "4", features = ["derive", "env", "string"] }
clru = "0.6.2"
crc32fast = "1.2.0"

View File

@ -1,7 +1,7 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use anyhow::Context;
use hashbrown::HashMap;
use influxdb3_client::plugin_development::WalPluginTestRequest;
use influxdb3_client::plugin_development::{SchedulePluginTestRequest, WalPluginTestRequest};
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use std::error::Error;
@ -23,6 +23,15 @@ impl Config {
..
},
..
})
| SubCommand::SchedulePlugin(SchedulePluginConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
auth_token,
..
},
..
}) => {
let mut client = Client::new(host_url.clone())?;
if let Some(token) = &auth_token {
@ -39,6 +48,9 @@ pub enum SubCommand {
/// Test a WAL Plugin
#[clap(name = "wal_plugin")]
WalPlugin(WalPluginConfig),
/// Test a Cron Plugin
#[clap(name = "schedule_plugin")]
SchedulePlugin(SchedulePluginConfig),
}
#[derive(Debug, clap::Parser)]
@ -60,6 +72,22 @@ pub struct WalPluginConfig {
pub filename: String,
}
#[derive(Debug, clap::Parser)]
pub struct SchedulePluginConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
/// The file name of the plugin, which should exist on the server in `<plugin-dir>/<filename>`.
/// The plugin-dir is provided on server startup.
#[clap(required = true)]
pub filename: String,
/// Cron schedule to test against. If not given will use * * * * *
#[clap(long = "schedule")]
pub schedule: Option<String>,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let client = config.get_client()?;
@ -96,6 +124,28 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
.expect("serialize wal plugin test response as JSON")
);
}
SubCommand::SchedulePlugin(plugin_config) => {
let input_arguments = plugin_config.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});
let cron_plugin_test_request = SchedulePluginTestRequest {
filename: plugin_config.filename,
database: plugin_config.influxdb3_config.database_name,
schedule: plugin_config.schedule,
input_arguments,
};
let response = client
.schedule_plugin_test(cron_plugin_test_request)
.await?;
println!(
"{}",
serde_json::to_string_pretty(&response)
.expect("serialize cron plugin test response as JSON")
);
}
}
Ok(())

View File

@ -15,7 +15,8 @@ use test_helpers::tempfile::NamedTempFile;
use test_helpers::tempfile::TempDir;
use test_helpers::{assert_contains, assert_not_contains};
const WRITE_REPORTS_PLUGIN_CODE: &str = r#"
#[cfg(feature = "system-py")]
pub const WRITE_REPORTS_PLUGIN_CODE: &str = r#"
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
# Skip if table_name is write_reports
@ -1279,6 +1280,79 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_schedule_plugin_test() {
use crate::ConfigProvider;
use influxdb3_client::Precision;
// Create plugin file with a scheduled task
let plugin_file = create_plugin_file(
r#"
def process_scheduled_call(influxdb3_local, schedule_time, args=None):
influxdb3_local.info(f"args are {args}")
influxdb3_local.info("Successfully called")"#,
);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
// Write some test data
server
.write_lp_to_db(
"foo",
"cpu,host=host1,region=us-east usage=0.75\n\
cpu,host=host2,region=us-west usage=0.82\n\
cpu,host=host3,region=us-east usage=0.91",
Precision::Nanosecond,
)
.await
.unwrap();
let db_name = "foo";
// Run the schedule plugin test
let result = run_with_confirmation(&[
"test",
"schedule_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--schedule",
"*/5 * * * * *", // Run every 5 seconds
"--input-arguments",
"region=us-east",
plugin_name,
]);
debug!(result = ?result, "test schedule plugin");
let res = serde_json::from_str::<Value>(&result).unwrap();
// The trigger_time will be dynamic, so we'll just verify it exists and is in the right format
let trigger_time = res["trigger_time"].as_str().unwrap();
assert!(trigger_time.contains('T')); // Basic RFC3339 format check
// Check the rest of the response structure
let expected_result = serde_json::json!({
"log_lines": [
"INFO: args are {'region': 'us-east'}",
"INFO: Successfully called"
],
"database_writes": {
},
"errors": []
});
assert_eq!(res["log_lines"], expected_result["log_lines"]);
assert_eq!(res["database_writes"], expected_result["database_writes"]);
assert_eq!(res["errors"], expected_result["errors"]);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]

View File

@ -1,6 +1,9 @@
pub mod plugin_development;
use crate::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use crate::plugin_development::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use bytes::Bytes;
use hashbrown::HashMap;
use iox_query_params::StatementParam;
@ -736,6 +739,36 @@ impl Client {
}
}
/// Make a request to the `POST /api/v3/plugin_test/schedule` API
pub async fn schedule_plugin_test(
&self,
schedule_plugin_test_request: SchedulePluginTestRequest,
) -> Result<SchedulePluginTestResponse> {
let api_path = "/api/v3/plugin_test/schedule";
let url = self.base_url.join(api_path)?;
let mut req = self
.http_client
.post(url)
.json(&schedule_plugin_test_request);
if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;
if resp.status().is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}
/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {

View File

@ -19,3 +19,21 @@ pub struct WalPluginTestResponse {
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
/// Request definition for `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestRequest {
pub filename: String,
pub database: String,
pub schedule: Option<String>,
pub input_arguments: Option<HashMap<String, String>>,
}
/// Response definition for `POST /api/v3/plugin_test/schedule` API
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulePluginTestResponse {
pub trigger_time: Option<String>,
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}

View File

@ -11,6 +11,8 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
hashbrown.workspace = true
iox_time.workspace = true

View File

@ -1,4 +1,5 @@
use crate::manager::{ProcessingEngineError, ProcessingEngineManager};
use crate::plugins::Error;
#[cfg(feature = "system-py")]
use crate::plugins::PluginContext;
use anyhow::Context;
@ -6,7 +7,10 @@ use hashbrown::HashMap;
use influxdb3_catalog::catalog;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound;
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_client::plugin_development::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeletePluginDefinition, DeleteTriggerDefinition, PluginDefinition,
@ -339,7 +343,31 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
query_executor,
};
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?;
plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context);
let Some(plugin_definition) = db_schema
.processing_engine_plugins
.get(&trigger.plugin_name)
else {
return Err(catalog::Error::ProcessingEnginePluginNotFound {
plugin_name: trigger.plugin_name,
database_name: db_name.to_string(),
}
.into());
};
match plugin_definition.plugin_type {
PluginType::WalRows => plugins::run_wal_contents_plugin(
db_name.to_string(),
plugin_code,
trigger,
plugin_context,
),
PluginType::Scheduled => plugins::run_schedule_plugin(
db_name.to_string(),
plugin_code,
trigger,
Arc::clone(&self.time_provider),
plugin_context,
),
}
}
Ok(())
@ -489,6 +517,38 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
"system-py feature not enabled"
)))
}
#[cfg_attr(not(feature = "system-py"), allow(unused))]
async fn test_schedule_plugin(
&self,
request: SchedulePluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<SchedulePluginTestResponse, Error> {
#[cfg(feature = "system-py")]
{
// create a copy of the catalog so we don't modify the original
let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner()));
let now = self.time_provider.now();
let code = self.read_plugin_code(&request.filename).await?;
let res =
plugins::run_test_schedule_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
trigger_time: None,
});
return Ok(res);
}
#[cfg(not(feature = "system-py"))]
Err(plugins::Error::AnyhowError(anyhow::anyhow!(
"system-py feature not enabled"
)))
}
}
#[async_trait::async_trait]

View File

@ -1,5 +1,8 @@
use hashbrown::HashMap;
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_client::plugin_development::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{PluginType, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
@ -101,4 +104,10 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
request: WalPluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<WalPluginTestResponse, crate::plugins::Error>;
async fn test_schedule_plugin(
&self,
request: SchedulePluginTestRequest,
query_executor: Arc<dyn QueryExecutor>,
) -> Result<SchedulePluginTestResponse, crate::plugins::Error>;
}

View File

@ -1,5 +1,6 @@
#[cfg(feature = "system-py")]
use crate::PluginEvent;
use influxdb3_catalog::catalog::Catalog;
#[cfg(feature = "system-py")]
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
#[cfg(feature = "system-py")]
@ -14,11 +15,20 @@ use influxdb3_write::WriteBuffer;
use observability_deps::tracing::error;
use std::fmt::Debug;
#[cfg(feature = "system-py")]
use std::str::FromStr;
use std::sync::Arc;
use thiserror::Error;
#[cfg(feature = "system-py")]
use tokio::sync::mpsc;
use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_wal::Gen1Duration;
use influxdb3_write::write_buffer::validator::WriteValidator;
use influxdb3_write::Precision;
#[cfg(feature = "system-py")]
use iox_time::TimeProvider;
#[derive(Debug, Error)]
pub enum Error {
#[error("invalid database {0}")]
@ -45,10 +55,19 @@ pub enum Error {
#[error("error executing plugin: {0}")]
PluginExecutionError(#[from] influxdb3_py_api::ExecutePluginError),
#[error("invalid cron syntax: {0}")]
InvalidCronSyntax(#[from] cron::error::Error),
#[error("cron schedule never triggers: {0}")]
CronScheduleNeverTriggers(String),
#[error("tried to run a schedule plugin but the schedule iterator is over.")]
ScheduledMissingTime,
}
#[cfg(feature = "system-py")]
pub(crate) fn run_plugin(
pub(crate) fn run_wal_contents_plugin(
db_name: String,
plugin_code: String,
trigger_definition: TriggerDefinition,
@ -63,12 +82,40 @@ pub(crate) fn run_plugin(
};
tokio::task::spawn(async move {
trigger_plugin
.run_plugin(context.trigger_rx)
.run_wal_contents_plugin(context.trigger_rx)
.await
.expect("trigger plugin failed");
});
}
#[cfg(feature = "system-py")]
pub(crate) fn run_schedule_plugin(
db_name: String,
plugin_code: String,
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
) {
let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else {
// TODO: these linkages should be guaranteed by code.
unreachable!("this should've been checked");
};
let schedule = schedule.to_string();
let trigger_plugin = TriggerPlugin {
trigger_definition,
db_name,
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
};
tokio::task::spawn(async move {
trigger_plugin
.run_schedule_plugin(context.trigger_rx, schedule, time_provider)
.await
.expect("cron trigger plugin failed");
});
}
#[cfg(feature = "system-py")]
pub(crate) struct PluginContext {
// tokio channel for inputs
@ -79,17 +126,6 @@ pub(crate) struct PluginContext {
pub(crate) query_executor: Arc<dyn QueryExecutor>,
}
#[cfg(feature = "system-py")]
#[async_trait::async_trait]
trait RunnablePlugin {
// Returns true if it should exit
async fn process_event(&self, event: PluginEvent) -> Result<bool, Error>;
async fn run_plugin(
&self,
receiver: tokio::sync::mpsc::Receiver<PluginEvent>,
) -> Result<(), Error>;
}
#[cfg(feature = "system-py")]
#[derive(Debug)]
struct TriggerPlugin {
@ -103,20 +139,26 @@ struct TriggerPlugin {
#[cfg(feature = "system-py")]
mod python_plugin {
use super::*;
use anyhow::Context;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Utc};
use cron::{OwnedScheduleIterator, Schedule};
use data_types::NamespaceName;
use hashbrown::HashMap;
use influxdb3_py_api::system_py::execute_python_with_batch;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_py_api::system_py::{execute_python_with_batch, execute_schedule_trigger};
use influxdb3_wal::WalOp;
use influxdb3_write::Precision;
use iox_time::Time;
use observability_deps::tracing::{info, warn};
use observability_deps::tracing::{debug, info, warn};
use std::str::FromStr;
use std::time::SystemTime;
use tokio::sync::mpsc::Receiver;
#[async_trait::async_trait]
impl RunnablePlugin for TriggerPlugin {
async fn run_plugin(&self, mut receiver: Receiver<PluginEvent>) -> Result<(), Error> {
impl TriggerPlugin {
pub(crate) async fn run_wal_contents_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
) -> Result<(), Error> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_name, "starting trigger plugin");
loop {
@ -142,12 +184,54 @@ mod python_plugin {
Ok(())
}
pub(crate) async fn run_schedule_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
schedule: String,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
let schedule = Schedule::from_str(schedule.as_str())?;
let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider));
loop {
let Some(next_run_instant) = runner.next_run_time() else {
break;
};
tokio::select! {
_ = time_provider.sleep_until(next_run_instant) => {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
};
runner.run_at_time(self, schema).await?;
}
event = receiver.recv() => {
match event {
None => {
warn!(?self.trigger_definition, "trigger plugin receiver closed");
break;
}
Some(PluginEvent::WriteWalContents(_)) => {
debug!("ignoring wal contents in cron plugin.")
}
Some(PluginEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
break;
}
}
}
}
}
Ok(())
}
async fn process_event(&self, event: PluginEvent) -> Result<bool, Error> {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
};
let mut db_writes: HashMap<String, Vec<String>> = HashMap::new();
let mut db_writes = DatabaseWriteBuffer::new();
match event {
PluginEvent::WriteWalContents(wal_contents) => {
@ -173,6 +257,12 @@ mod python_plugin {
.context("table not found")?;
Some(table_id)
}
// This should not occur
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
};
let result = execute_python_with_batch(
@ -186,14 +276,12 @@ mod python_plugin {
// write the output lines to the appropriate database
if !result.write_back_lines.is_empty() {
let lines =
db_writes.entry_ref(schema.name.as_ref()).or_default();
lines.extend(result.write_back_lines);
db_writes
.add_lines(schema.name.as_ref(), result.write_back_lines);
}
for (db_name, add_lines) in result.write_db_lines {
let lines = db_writes.entry(db_name).or_default();
lines.extend(add_lines);
db_writes.add_lines(&db_name, add_lines);
}
}
WalOp::Catalog(_) => {}
@ -208,25 +296,103 @@ mod python_plugin {
}
if !db_writes.is_empty() {
for (db_name, output_lines) in db_writes {
let ingest_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
self.write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
output_lines.join("\n").as_str(),
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
)
.await?;
}
db_writes.execute(&self.write_buffer).await?;
}
Ok(false)
}
}
struct DatabaseWriteBuffer {
writes: HashMap<String, Vec<String>>,
}
impl DatabaseWriteBuffer {
fn new() -> Self {
Self {
writes: HashMap::new(),
}
}
fn add_lines(&mut self, db_name: &str, lines: Vec<String>) {
self.writes.entry_ref(db_name).or_default().extend(lines);
}
fn is_empty(&self) -> bool {
self.writes.is_empty()
}
async fn execute(self, write_buffer: &Arc<dyn WriteBuffer>) -> Result<(), Error> {
for (db_name, output_lines) in self.writes {
let ingest_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
output_lines.join("\n").as_str(),
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
)
.await?;
}
Ok(())
}
}
struct ScheduleTriggerRunner {
schedule: OwnedScheduleIterator<Utc>,
next_trigger_time: Option<DateTime<Utc>>,
}
impl ScheduleTriggerRunner {
fn new(cron_schedule: Schedule, time_provider: Arc<dyn TimeProvider>) -> Self {
let mut schedule = cron_schedule.after_owned(time_provider.now().date_time());
let next_trigger_time = schedule.next();
Self {
schedule,
next_trigger_time,
}
}
async fn run_at_time(
&mut self,
plugin: &TriggerPlugin,
db_schema: Arc<DatabaseSchema>,
) -> Result<(), Error> {
let Some(trigger_time) = self.next_trigger_time else {
return Err(anyhow!("running a cron trigger that is finished.").into());
};
let result = execute_schedule_trigger(
&plugin.plugin_code,
trigger_time,
Arc::clone(&db_schema),
Arc::clone(&plugin.query_executor),
&plugin.trigger_definition.trigger_arguments,
)?;
let mut db_writes = DatabaseWriteBuffer::new();
// write the output lines to the appropriate database
if !result.write_back_lines.is_empty() {
db_writes.add_lines(db_schema.name.as_ref(), result.write_back_lines);
}
for (db_name, add_lines) in result.write_db_lines {
db_writes.add_lines(&db_name, add_lines);
}
db_writes.execute(&plugin.write_buffer).await?;
self.advance_time();
Ok(())
}
fn advance_time(&mut self) {
self.next_trigger_time = self.schedule.next();
}
/// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on.
fn next_run_time(&self) -> Option<Time> {
let next_trigger_time = Time::from_datetime(*self.next_trigger_time.as_ref()?);
Some(next_trigger_time)
}
}
}
#[cfg(feature = "system-py")]
@ -269,52 +435,62 @@ pub(crate) fn run_test_wal_plugin(
&request.input_arguments,
)?;
// validate the generated output lines
let mut errors = Vec::new();
let log_lines = plugin_return_state.log();
// first for the write back database
let validator =
WriteValidator::initialize(namespace, Arc::clone(&catalog), now_time.timestamp_nanos())?;
let lp = plugin_return_state.write_back_lines.join("\n");
match validator.v1_parse_lines_and_update_schema(&lp, false, now_time, Precision::Nanosecond) {
Ok(data) => {
let data = data.convert_lines_to_buffer(Gen1Duration::new_1m());
let mut database_writes = plugin_return_state.write_db_lines;
database_writes.insert(database, plugin_return_state.write_back_lines);
for err in data.errors {
errors.push(format!("{:?}", err));
}
}
Err(write_buffer::Error::ParseError(e)) => {
errors.push(format!("line protocol parse error on write back: {:?}", e));
}
Err(e) => {
errors.push(format!(
"Failed to validate output lines on write back: {}",
e
));
}
let test_write_handler = TestWriteHandler::new(Arc::clone(&catalog), now_time);
let errors = test_write_handler.validate_all_writes(&database_writes);
Ok(WalPluginTestResponse {
log_lines,
database_writes,
errors,
})
}
#[derive(Debug)]
pub struct TestWriteHandler {
catalog: Arc<Catalog>,
now_time: iox_time::Time,
}
impl TestWriteHandler {
pub fn new(catalog: Arc<Catalog>, now_time: iox_time::Time) -> Self {
Self { catalog, now_time }
}
// now for any other dbs that received writes
for (db_name, lines) in &plugin_return_state.write_db_lines {
let namespace = match NamespaceName::new(db_name.to_string()) {
Ok(namespace) => namespace,
/// Validates a vec of lines for a namespace, returning any errors that arise as strings
fn validate_write_lines(
&self,
namespace: NamespaceName<'static>,
lines: &[String],
) -> Vec<String> {
let mut errors = Vec::new();
let db_name = namespace.as_str();
let validator = match WriteValidator::initialize(
namespace.clone(),
Arc::clone(&self.catalog),
self.now_time.timestamp_nanos(),
) {
Ok(v) => v,
Err(e) => {
errors.push(format!("database name {} is invalid: {}", db_name, e));
continue;
errors.push(format!(
"Failed to initialize validator for db {}: {}",
db_name, e
));
return errors;
}
};
let validator = WriteValidator::initialize(
namespace,
Arc::clone(&catalog),
now_time.timestamp_nanos(),
)?;
let lp = lines.join("\n");
match validator.v1_parse_lines_and_update_schema(
&lp,
false,
now_time,
self.now_time,
Precision::Nanosecond,
) {
Ok(data) => {
@ -336,17 +512,72 @@ pub(crate) fn run_test_wal_plugin(
));
}
}
errors
}
pub fn validate_all_writes(&self, writes: &HashMap<String, Vec<String>>) -> Vec<String> {
let mut all_errors = Vec::new();
for (db_name, lines) in writes {
let namespace = match NamespaceName::new(db_name.to_string()) {
Ok(namespace) => namespace,
Err(e) => {
all_errors.push(format!("database name {} is invalid: {}", db_name, e));
continue;
}
};
let db_errors = self.validate_write_lines(namespace, lines);
all_errors.extend(db_errors);
}
all_errors
}
}
#[cfg(feature = "system-py")]
pub(crate) fn run_test_schedule_plugin(
now_time: iox_time::Time,
catalog: Arc<Catalog>,
query_executor: Arc<dyn QueryExecutor>,
code: String,
request: influxdb3_client::plugin_development::SchedulePluginTestRequest,
) -> Result<influxdb3_client::plugin_development::SchedulePluginTestResponse, Error> {
let database = request.database;
let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?;
let cron_schedule = request.schedule.as_deref().unwrap_or("* * * * * *");
let schedule = cron::Schedule::from_str(cron_schedule)?;
let Some(schedule_time) = schedule.after(&now_time.date_time()).next() else {
return Err(Error::CronScheduleNeverTriggers(cron_schedule.to_string()));
};
let plugin_return_state = influxdb3_py_api::system_py::execute_schedule_trigger(
&code,
schedule_time,
db,
query_executor,
&request.input_arguments,
)?;
let log_lines = plugin_return_state.log();
let mut database_writes = plugin_return_state.write_db_lines;
database_writes.insert(database, plugin_return_state.write_back_lines);
Ok(WalPluginTestResponse {
log_lines,
database_writes,
errors,
})
let mut database_writes = plugin_return_state.write_db_lines;
if !plugin_return_state.write_back_lines.is_empty() {
database_writes.insert(database, plugin_return_state.write_back_lines);
}
let test_write_handler = TestWriteHandler::new(Arc::clone(&catalog), now_time);
let errors = test_write_handler.validate_all_writes(&database_writes);
let trigger_time = schedule_time.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true);
Ok(
influxdb3_client::plugin_development::SchedulePluginTestResponse {
trigger_time: Some(trigger_time),
log_lines,
database_writes,
errors,
},
)
}
#[cfg(feature = "system-py")]

View File

@ -12,6 +12,7 @@ system-py = ["pyo3"]
anyhow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
chrono.workspace = true
hashbrown.workspace = true
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }

View File

@ -6,6 +6,7 @@ use arrow_array::{
TimestampNanosecondArray, UInt64Array,
};
use arrow_schema::DataType;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use hashbrown::HashMap;
use influxdb3_catalog::catalog::DatabaseSchema;
@ -17,7 +18,7 @@ use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use pyo3::exceptions::{PyException, PyValueError};
use pyo3::prelude::{PyAnyMethods, PyModule};
use pyo3::types::{PyDict, PyList, PyTuple};
use pyo3::types::{PyDateTime, PyDict, PyList, PyTuple};
use pyo3::{
create_exception, pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyObject,
PyResult, Python,
@ -257,6 +258,8 @@ impl PyPluginCallApi {
// constant for the process writes call site string
const PROCESS_WRITES_CALL_SITE: &str = "process_writes";
const PROCESS_SCHEDULED_CALL_SITE: &str = "process_scheduled_call";
const LINE_BUILDER_CODE: &str = r#"
from typing import Optional
from collections import OrderedDict
@ -370,6 +373,18 @@ class LineBuilder:
return line"#;
fn args_to_py_object<'py>(
py: Python<'py>,
args: &Option<HashMap<String, String>>,
) -> Option<Bound<'py, PyDict>> {
args.as_ref().map(|args| {
let dict = PyDict::new(py);
for (key, value) in args {
dict.set_item(key, value).unwrap();
}
dict
})
}
pub fn execute_python_with_batch(
code: &str,
write_batch: &WriteBatch,
@ -480,13 +495,7 @@ pub fn execute_python_with_batch(
let local_api = api.into_pyobject(py).map_err(anyhow::Error::from)?;
// turn args into an optional dict to pass into python
let args = args.as_ref().map(|args| {
let dict = PyDict::new(py);
for (key, value) in args {
dict.set_item(key, value).unwrap();
}
dict
});
let args = args_to_py_object(py, args);
// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)
@ -511,6 +520,54 @@ pub fn execute_python_with_batch(
})
}
pub fn execute_schedule_trigger(
code: &str,
schedule_time: DateTime<Utc>,
schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
args: &Option<HashMap<String, String>>,
) -> PyResult<PluginReturnState> {
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
let py_datetime = PyDateTime::from_timestamp(py, schedule_time.timestamp() as f64, None)?;
py.run(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)?;
let api = PyPluginCallApi {
db_schema: schema,
query_executor,
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
let local_api = api.into_pyobject(py)?;
// turn args into an optional dict to pass into python
let args = args_to_py_object(py, args);
// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)?;
let py_func = py.eval(
&CString::new(PROCESS_SCHEDULED_CALL_SITE).unwrap(),
Some(&globals),
None,
)?;
py_func.call1((local_api, py_datetime, args))?;
// swap with an empty return state to avoid cloning
let empty_return_state = PluginReturnState::default();
let ret = std::mem::replace(&mut *return_state.lock(), empty_return_state);
Ok(ret)
})
}
// Module initialization
#[pymodule]
fn influxdb3_py_api(_m: &Bound<'_, PyModule>) -> PyResult<()> {

View File

@ -1175,6 +1175,34 @@ where
Err(Error::PythonPluginsNotEnabled)
}
/// Endpoint for testing a plugin that will be trigger on some cron schedule.
#[cfg(feature = "system-py")]
async fn test_processing_engine_schedule_plugin(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let request: influxdb3_client::plugin_development::SchedulePluginTestRequest =
self.read_body_json(req).await?;
let output = self
.processing_engine
.test_schedule_plugin(request, Arc::clone(&self.query_executor))
.await?;
let body = serde_json::to_string(&output)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(body))?)
}
#[cfg(not(feature = "system-py"))]
async fn test_processing_engine_schedule_plugin(
&self,
_req: Request<Body>,
) -> Result<Response<Body>> {
Err(Error::PythonPluginsNotEnabled)
}
async fn delete_database(&self, req: Request<Body>) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<DeleteDatabaseRequest>(query)?;
@ -1775,6 +1803,11 @@ pub(crate) async fn route_request<T: TimeProvider>(
(Method::POST, "/api/v3/plugin_test/wal") => {
http_server.test_processing_engine_wal_plugin(req).await
}
(Method::POST, "/api/v3/plugin_test/schedule") => {
http_server
.test_processing_engine_schedule_plugin(req)
.await
}
_ => {
let body = Body::from("not found");
Ok(Response::builder()

View File

@ -21,6 +21,7 @@ async-trait.workspace = true
bitcode.workspace = true
bytes.workspace = true
byteorder.workspace = true
cron = "0.14"
crc32fast.workspace = true
futures-util.workspace = true
hashbrown.workspace = true

View File

@ -9,6 +9,7 @@ pub mod serialize;
mod snapshot_tracker;
use async_trait::async_trait;
use cron::Schedule;
use data_types::Timestamp;
use hashbrown::HashMap;
use indexmap::IndexMap;
@ -635,6 +636,7 @@ pub struct DeletePluginDefinition {
#[serde(rename_all = "snake_case")]
pub enum PluginType {
WalRows,
Scheduled,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
@ -666,6 +668,7 @@ pub struct TriggerIdentifier {
pub enum TriggerSpecificationDefinition {
SingleTableWalWrite { table_name: String },
AllTablesWalWrite,
Schedule { schedule: String },
}
impl TriggerSpecificationDefinition {
@ -684,6 +687,17 @@ impl TriggerSpecificationDefinition {
})
}
"all_tables" => Ok(TriggerSpecificationDefinition::AllTablesWalWrite),
s if s.starts_with("schedule:") => {
let cron_schedule = s.trim_start_matches("schedule:").trim();
if cron_schedule.is_empty() || Schedule::from_str(cron_schedule).is_err() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
});
}
Ok(TriggerSpecificationDefinition::Schedule {
schedule: cron_schedule.to_string(),
})
}
_ => Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
}),
@ -696,6 +710,9 @@ impl TriggerSpecificationDefinition {
format!("table:{}", table_name)
}
TriggerSpecificationDefinition::AllTablesWalWrite => "all_tables".to_string(),
TriggerSpecificationDefinition::Schedule { schedule } => {
format!("schedule:{}", schedule)
}
}
}
}