fix(processing_engine): Use default globals so that builtins are automatically included.

bugfix/include_builtins_in_pyo3
Jackson Newhouse 2025-02-26 15:18:37 -08:00
parent d8efdb4024
commit ba773cfce8
2 changed files with 100 additions and 32 deletions

View File

@ -1417,6 +1417,88 @@ def process_scheduled_call(influxdb3_local, schedule_time, args=None):
assert_eq!(res["errors"], expected_result["errors"]);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_schedule_plugin_test_with_strftime() {
use crate::server::ConfigProvider;
use influxdb3_client::Precision;
// Create plugin file with a scheduled task
let plugin_file = create_plugin_file(
r#"
import datetime
def process_scheduled_call(influxdb3_local, schedule_time, args=None):
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
influxdb3_local.info(f"Current timestamp: {timestamp}")
influxdb3_local.info(f"args are {args}")
influxdb3_local.info("Successfully called")"#,
);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
// Write some test data
server
.write_lp_to_db(
"foo",
"cpu,host=host1,region=us-east usage=0.75\n\
cpu,host=host2,region=us-west usage=0.82\n\
cpu,host=host3,region=us-east usage=0.91",
Precision::Nanosecond,
)
.await
.unwrap();
let db_name = "foo";
// Run the schedule plugin test
let result = run_with_confirmation(&[
"test",
"schedule_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--schedule",
"*/5 * * * * *", // Run every 5 seconds
"--input-arguments",
"region=us-east",
plugin_name,
]);
debug!(result = ?result, "test schedule plugin");
let res = serde_json::from_str::<Value>(&result).unwrap();
// The trigger_time will be dynamic, so we'll just verify it exists and is in the right format
let trigger_time = res["trigger_time"].as_str().unwrap();
assert!(trigger_time.contains('T')); // Basic RFC3339 format check
// Check the rest of the response structure
// Modified expectations to include the timestamp message
let log_lines = &res["log_lines"];
assert_eq!(log_lines.as_array().unwrap().len(), 3);
assert!(
log_lines[0]
.as_str()
.unwrap()
.starts_with("INFO: Current timestamp:")
);
assert_eq!(
log_lines[1].as_str().unwrap(),
"INFO: args are {'region': 'us-east'}"
);
assert_eq!(log_lines[2].as_str().unwrap(), "INFO: Successfully called");
assert_eq!(res["database_writes"], serde_json::json!({}));
assert_eq!(res["errors"], serde_json::json!([]));
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_wal_plugin_errors() {

View File

@ -454,14 +454,11 @@ pub fn execute_python_with_batch(
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
py.run(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;
py.run(&CString::new(LINE_BUILDER_CODE).unwrap(), None, None)
.map_err(|e| {
anyhow::Error::new(e).context("failed to eval the LineBuilder API code")
})?;
// convert the write batch into a python object
let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len());
@ -558,14 +555,10 @@ pub fn execute_python_with_batch(
let args = args_to_py_object(py, args);
// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)
py.run(&CString::new(code).unwrap(), None, None)
.map_err(anyhow::Error::from)?;
let py_func = py
.eval(
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
Some(&globals),
None,
)
.eval(&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(), None, None)
.map_err(|_| ExecutePluginError::MissingProcessWritesFunction)?;
py_func
@ -614,19 +607,16 @@ pub fn execute_schedule_trigger(
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
let py_datetime = PyDateTime::from_timestamp(py, schedule_time.timestamp() as f64, None)
.map_err(|e| {
anyhow::Error::new(e).context("error converting the schedule time to Python time")
})?;
py.run(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;
py.run(&CString::new(LINE_BUILDER_CODE).unwrap(), None, None)
.map_err(|e| {
anyhow::Error::new(e).context("failed to eval the LineBuilder API code")
})?;
let api = PyPluginCallApi {
db_schema: schema,
@ -641,13 +631,13 @@ pub fn execute_schedule_trigger(
let args = args_to_py_object(py, args);
// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)
py.run(&CString::new(code).unwrap(), None, None)
.map_err(anyhow::Error::from)?;
let py_func = py
.eval(
&CString::new(PROCESS_SCHEDULED_CALL_SITE).unwrap(),
Some(&globals),
None,
None,
)
.map_err(|_| ExecutePluginError::MissingProcessScheduledCallFunction)?;
@ -699,14 +689,10 @@ pub fn execute_request_trigger(
};
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
py.run(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;
py.run(&CString::new(LINE_BUILDER_CODE).unwrap(), None, None)
.map_err(|e| {
anyhow::Error::new(e).context("failed to eval the LineBuilder API code")
})?;
let api = PyPluginCallApi {
db_schema,
@ -724,13 +710,13 @@ pub fn execute_request_trigger(
let request_params = map_to_py_object(py, &request_headers);
// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)
py.run(&CString::new(code).unwrap(), None, None)
.map_err(anyhow::Error::from)?;
let py_func = py
.eval(
&CString::new(PROCESS_REQUEST_CALL_SITE).unwrap(),
Some(&globals),
None,
None,
)
.map_err(|_| ExecutePluginError::MissingProcessRequestFunction)?;