feat(processing_engine): Support Flask semantics for responses from request plugins.

processing_engine/request_plugin_flask_semantics
Jackson Newhouse 2025-03-03 14:41:08 -08:00
parent e00e0f1a86
commit eb78a7fb59
3 changed files with 662 additions and 37 deletions

View File

@ -472,7 +472,7 @@ mod tests {
let trigger_arguments = trigger_arguments.expect("args must include trigger arguments");
assert_eq!(2, trigger_arguments.0.len());
assert_eq!(2, trigger_arguments.len());
let query_path = trigger_arguments
.into_iter()

View File

@ -1701,7 +1701,7 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
influxdb3_local.write(line)
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "ok", "line": line_str})
return {"status": "ok", "line": line_str}
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
@ -1776,7 +1776,7 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
import json
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "updated"})
return {"status": "updated"}
"#;
// clear all bytes from the plugin file
plugin_file.reopen().unwrap().set_len(0).unwrap();
@ -1800,6 +1800,517 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
let body = serde_json::from_str::<serde_json::Value>(&body).unwrap();
assert_eq!(body, json!({"status": "updated"}));
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_string_response() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a simple string (should become HTML with 200 status)
return "Hello, World!"
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_string";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "string_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test string response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(response.headers().get("content-type").unwrap(), "text/html");
let body = response.text().await.unwrap();
assert_eq!(body, "Hello, World!");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_dict_json_response() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a dictionary (should be converted to JSON)
return {"message": "Hello, World!", "status": "success"}
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_dict";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "dict_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test dict/JSON response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/json"
);
let body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(
body,
json!({"message": "Hello, World!", "status": "success"})
);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_tuple_response_with_status() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a tuple with content and status code
return "Created successfully", 201
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_tuple_status";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "tuple_status_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test tuple with status response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 201);
assert_eq!(response.headers().get("content-type").unwrap(), "text/html");
let body = response.text().await.unwrap();
assert_eq!(body, "Created successfully");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_tuple_response_with_headers() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a tuple with content and headers
return "Custom Content-Type", {"Content-Type": "text/plain", "X-Custom-Header": "test-value"}
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_tuple_headers";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "tuple_headers_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test tuple with headers response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(
response.headers().get("content-type").unwrap(),
"text/plain"
);
assert_eq!(
response.headers().get("x-custom-header").unwrap(),
"test-value"
);
let body = response.text().await.unwrap();
assert_eq!(body, "Custom Content-Type");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_tuple_response_with_status_and_headers() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a tuple with content, status, and headers
return "Not Found", 404, {"Content-Type": "text/plain", "X-Error-Code": "NOT_FOUND"}
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_tuple_status_headers";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "tuple_status_headers_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test tuple with status and headers response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 404);
assert_eq!(
response.headers().get("content-type").unwrap(),
"text/plain"
);
assert_eq!(response.headers().get("x-error-code").unwrap(), "NOT_FOUND");
let body = response.text().await.unwrap();
assert_eq!(body, "Not Found");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_list_json_response() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a list (should be converted to JSON array)
return ["item1", "item2", "item3"]
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_list";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "list_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test list/JSON response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/json"
);
let body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(body, json!(["item1", "item2", "item3"]));
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_iterator_response() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a generator that yields strings
def generate_content():
yield "Line 1\n"
yield "Line 2\n"
yield "Line 3\n"
return generate_content()
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_iterator";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "iterator_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test iterator/generator response
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
assert_eq!(response.headers().get("content-type").unwrap(), "text/html");
let body = response.text().await.unwrap();
assert_eq!(body, "Line 1\nLine 2\nLine 3\n");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_response_object() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Creating a mock Flask Response object
class FlaskResponse:
def __init__(self, response, status=200, headers=None):
self.response = response
self.status_code = status
self.headers = headers or {}
def get_data(self):
return self.response
def __flask_response__(self):
return True
# Return a Flask Response object
response = FlaskResponse(
"Custom Flask Response",
status=202,
headers={"Content-Type": "text/custom", "X-Generated-By": "FlaskResponse"}
)
return response
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_response_obj";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "response_obj_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test Flask Response object
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 202);
assert_eq!(
response.headers().get("content-type").unwrap(),
"text/custom"
);
assert_eq!(
response.headers().get("x-generated-by").unwrap(),
"FlaskResponse"
);
let body = response.text().await.unwrap();
assert_eq!(body, "Custom Flask Response");
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_flask_json_dict_with_status_in_tuple() {
let plugin_code = r#"
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Return a tuple with a dictionary and status code
return {"error": "Not Found", "code": 404}, 404
"#;
let plugin_file = create_plugin_file(plugin_code);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
let db_name = "flask_test_json_status";
// Setup: create database and plugin
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);
let trigger_path = "json_status_test";
run_with_confirmation(&[
"create",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
"--plugin-filename",
plugin_filename,
"--trigger-spec",
"request:test_route",
trigger_path,
]);
// Send request to test JSON dict with status
let client = reqwest::Client::new();
let response = client
.get(format!("{}/api/v3/engine/test_route", server_addr))
.send()
.await
.unwrap();
assert_eq!(response.status(), 404);
assert_eq!(
response.headers().get("content-type").unwrap(),
"application/json"
);
let body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(body, json!({"error": "Not Found", "code": 404}));
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]

View File

@ -3,7 +3,7 @@
#![allow(clippy::useless_conversion)]
use crate::ExecutePluginError;
use crate::logging::{LogLevel, ProcessingEngineLog};
use anyhow::Context;
use anyhow::{Context, bail};
use arrow_array::types::Int32Type;
use arrow_array::{
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
@ -729,33 +729,8 @@ pub fn execute_request_trigger(
.call1((local_api, query_params, request_params, request_body, args))
.map_err(|e| anyhow::anyhow!("Python function call failed: {}", e))?;
// the return from the process_request function should be a tuple of (status_code, response_headers, body)
let response_code: i64 = result.get_item(0).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?.extract().context("unable to convert first tuple element from Python request function return to integer")?;
let headers_binding = result.get_item(1).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?;
let py_headers_dict = headers_binding
.downcast::<PyDict>()
.map_err(|e| anyhow::anyhow!("Failed to downcast to PyDict: {}", e))?;
let body_binding = result.get_item(2).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?;
let response_body: &str = body_binding.extract().context(
"unable to convert the third tuple element from Python request function to a string",
)?;
// Then convert the dict to HashMap
let response_headers: std::collections::HashMap<String, String> = py_headers_dict
.extract()
.map_err(|e| anyhow::anyhow!("error converting response headers into hashmap {}", e))?;
// convert the returned i64 to a u16 or it's a 500
let response_code: u16 = response_code.try_into().unwrap_or_else(|_| {
warn!(
"Invalid response code from Python request trigger: {}",
response_code
);
500
});
let response_headers: HashMap<String, String> = response_headers.into_iter().collect();
// Process the result according to Flask conventions
let (response_code, response_headers, response_body) = process_flask_response(py, result)?;
// swap with an empty return state to avoid cloning
let empty_return_state = PluginReturnState::default();
@ -776,15 +751,154 @@ pub fn execute_request_trigger(
)
}
Ok((
response_code,
response_headers,
response_body.to_string(),
ret,
))
Ok((response_code, response_headers, response_body, ret))
})
}
fn process_flask_response(
py: Python<'_>,
result: Bound<'_, PyAny>,
) -> Result<(u16, HashMap<String, String>, String), anyhow::Error> {
let default_status: u16 = 200;
let default_headers: HashMap<String, String> = HashMap::new();
let default_mimetype = "text/html";
// Check if it's a Flask Response object
if let Ok(true) = result
.call_method0("__flask_response__")
.and_then(|r| r.extract::<bool>())
{
// It's a Flask Response object, extract status, headers, and body
let status: u16 = result.getattr("status_code")?.extract()?;
let body: String = result.call_method0("get_data")?.extract()?;
// Extract headers
let headers_dict = result.getattr("headers")?;
let headers: std::collections::HashMap<String, String> = headers_dict.extract()?;
return Ok((status, headers.into_iter().collect(), body));
}
// Check if it's a tuple
if let Ok(tuple) = result.downcast::<PyTuple>() {
let tuple_len = tuple.len()?;
if tuple_len > 0 && tuple_len <= 3 {
// Extract response part (first element)
let response = tuple.get_item(0)?;
let (response_body, mut headers) = process_response_part(py, response)?;
let mut status = default_status;
// Handle status and/or headers if provided
if tuple_len >= 2 {
let second = tuple.get_item(1)?;
// Check if the second item is a status code or headers
if let Ok(status_code) = second.extract::<i64>() {
status = status_code.try_into().unwrap_or(500);
} else if let Ok(header_dict) = second.downcast::<PyDict>() {
// It's headers
let additional_headers: std::collections::HashMap<String, String> =
header_dict.extract()?;
headers.extend(additional_headers);
}
// If there's a third item, it must be headers
if tuple_len == 3 {
let header_item = tuple.get_item(2)?;
let Ok(header_dict) = header_item.downcast::<PyDict>() else {
bail!("expected a dictionary in header item");
};
let additional_headers: std::collections::HashMap<String, String> =
header_dict.extract()?;
headers.extend(additional_headers);
}
}
return Ok((status, headers, response_body));
}
}
// Check if it's a string
if let Ok(string_val) = result.extract::<String>() {
let mut headers = default_headers.clone();
headers.insert("Content-Type".to_string(), default_mimetype.to_string());
return Ok((default_status, headers, string_val));
}
// Check if it's a dict or list for JSON response
if result.is_instance_of::<PyDict>() || result.is_instance_of::<PyList>() {
// We need to jsonify this
let json_module = py.import("json")?;
let json_string = json_module.call_method1("dumps", (result,))?;
let response_body: String = json_string.extract()?;
let mut headers = default_headers.clone();
headers.insert("Content-Type".to_string(), "application/json".to_string());
return Ok((default_status, headers, response_body));
}
// Check if it's an iterator or generator returning strings or bytes
if let Ok(true) = result.hasattr("__iter__") {
// Handling streaming response would require adapting the return type
// For now, we'll convert it to a concatenated string
let mut stream_content = String::new();
let iter = result.try_iter()?;
for item in iter {
let item = item?;
if let Ok(s) = item.extract::<String>() {
stream_content.push_str(&s);
} else if let Ok(b) = item.extract::<&[u8]>() {
// Convert bytes to string - might need better UTF-8 handling
if let Ok(s) = std::str::from_utf8(b) {
stream_content.push_str(s);
}
}
}
let mut headers = default_headers.clone();
headers.insert("Content-Type".to_string(), default_mimetype.to_string());
return Ok((default_status, headers, stream_content));
}
// If we can't identify the response type, return an error
Err(anyhow::anyhow!(
"Unsupported return type from Python function"
))
}
fn process_response_part(
py: Python<'_>,
response: Bound<'_, PyAny>,
) -> Result<(String, HashMap<String, String>), anyhow::Error> {
let mut headers = HashMap::new();
// If it's a string, return it directly
if let Ok(string_val) = response.extract::<String>() {
headers.insert("Content-Type".to_string(), "text/html".to_string());
return Ok((string_val, headers));
}
// If it's a dict or list, convert to JSON
if response.is_instance_of::<PyDict>() || response.is_instance_of::<PyList>() {
let json_module = py.import("json")?;
let json_string = json_module.call_method1("dumps", (response,))?;
let response_body: String = json_string.extract()?;
headers.insert("Content-Type".to_string(), "application/json".to_string());
return Ok((response_body, headers));
}
// Default fallback
let response_str = response.str()?.extract::<String>()?;
headers.insert("Content-Type".to_string(), "text/plain".to_string());
Ok((response_str, headers))
}
// Module initialization
#[pymodule]
fn influxdb3_py_api(_m: &Bound<'_, PyModule>) -> PyResult<()> {