From eb78a7fb593793438e13837133849a25f3b83e0e Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Mon, 3 Mar 2025 14:41:08 -0800 Subject: [PATCH] feat(processing_engine): Support Flask semantics for responses from request plugins. --- influxdb3/src/commands/create.rs | 2 +- influxdb3/tests/cli/mod.rs | 515 +++++++++++++++++++++++++++++- influxdb3_py_api/src/system_py.rs | 182 +++++++++-- 3 files changed, 662 insertions(+), 37 deletions(-) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index f06ea4cf85..e25e44b4e5 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -472,7 +472,7 @@ mod tests { let trigger_arguments = trigger_arguments.expect("args must include trigger arguments"); - assert_eq!(2, trigger_arguments.0.len()); + assert_eq!(2, trigger_arguments.len()); let query_path = trigger_arguments .into_iter() diff --git a/influxdb3/tests/cli/mod.rs b/influxdb3/tests/cli/mod.rs index e00bf9432f..17a8926604 100644 --- a/influxdb3/tests/cli/mod.rs +++ b/influxdb3/tests/cli/mod.rs @@ -1701,7 +1701,7 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ influxdb3_local.write(line) - return 200, {"Content-Type": "application/json"}, json.dumps({"status": "ok", "line": line_str}) + return {"status": "ok", "line": line_str} "#; let plugin_file = create_plugin_file(plugin_code); let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); @@ -1776,7 +1776,7 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ import json def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): - return 200, {"Content-Type": "application/json"}, json.dumps({"status": "updated"}) + return {"status": "updated"} "#; // clear all bytes from the plugin file plugin_file.reopen().unwrap().set_len(0).unwrap(); @@ -1800,6 +1800,517 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_ let body = serde_json::from_str::(&body).unwrap(); assert_eq!(body, json!({"status": "updated"})); } +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_string_response() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a simple string (should become HTML with 200 status) + return "Hello, World!" +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_string"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "string_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test string response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 200); + assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); + let body = response.text().await.unwrap(); + assert_eq!(body, "Hello, World!"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_dict_json_response() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a dictionary (should be converted to JSON) + return {"message": "Hello, World!", "status": "success"} +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_dict"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "dict_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test dict/JSON response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 200); + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/json" + ); + let body = response.json::().await.unwrap(); + assert_eq!( + body, + json!({"message": "Hello, World!", "status": "success"}) + ); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_tuple_response_with_status() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a tuple with content and status code + return "Created successfully", 201 +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_tuple_status"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "tuple_status_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test tuple with status response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 201); + assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); + let body = response.text().await.unwrap(); + assert_eq!(body, "Created successfully"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_tuple_response_with_headers() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a tuple with content and headers + return "Custom Content-Type", {"Content-Type": "text/plain", "X-Custom-Header": "test-value"} +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_tuple_headers"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "tuple_headers_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test tuple with headers response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 200); + assert_eq!( + response.headers().get("content-type").unwrap(), + "text/plain" + ); + assert_eq!( + response.headers().get("x-custom-header").unwrap(), + "test-value" + ); + let body = response.text().await.unwrap(); + assert_eq!(body, "Custom Content-Type"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_tuple_response_with_status_and_headers() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a tuple with content, status, and headers + return "Not Found", 404, {"Content-Type": "text/plain", "X-Error-Code": "NOT_FOUND"} +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_tuple_status_headers"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "tuple_status_headers_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test tuple with status and headers response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 404); + assert_eq!( + response.headers().get("content-type").unwrap(), + "text/plain" + ); + assert_eq!(response.headers().get("x-error-code").unwrap(), "NOT_FOUND"); + let body = response.text().await.unwrap(); + assert_eq!(body, "Not Found"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_list_json_response() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a list (should be converted to JSON array) + return ["item1", "item2", "item3"] +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_list"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "list_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test list/JSON response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 200); + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/json" + ); + let body = response.json::().await.unwrap(); + assert_eq!(body, json!(["item1", "item2", "item3"])); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_iterator_response() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a generator that yields strings + def generate_content(): + yield "Line 1\n" + yield "Line 2\n" + yield "Line 3\n" + + return generate_content() +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_iterator"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "iterator_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test iterator/generator response + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 200); + assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); + let body = response.text().await.unwrap(); + assert_eq!(body, "Line 1\nLine 2\nLine 3\n"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_response_object() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Creating a mock Flask Response object + class FlaskResponse: + def __init__(self, response, status=200, headers=None): + self.response = response + self.status_code = status + self.headers = headers or {} + + def get_data(self): + return self.response + + def __flask_response__(self): + return True + + # Return a Flask Response object + response = FlaskResponse( + "Custom Flask Response", + status=202, + headers={"Content-Type": "text/custom", "X-Generated-By": "FlaskResponse"} + ) + return response +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_response_obj"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "response_obj_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test Flask Response object + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 202); + assert_eq!( + response.headers().get("content-type").unwrap(), + "text/custom" + ); + assert_eq!( + response.headers().get("x-generated-by").unwrap(), + "FlaskResponse" + ); + let body = response.text().await.unwrap(); + assert_eq!(body, "Custom Flask Response"); +} + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_flask_json_dict_with_status_in_tuple() { + let plugin_code = r#" +def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): + # Return a tuple with a dictionary and status code + return {"error": "Not Found", "code": 404}, 404 +"#; + let plugin_file = create_plugin_file(plugin_code); + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_filename = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + let db_name = "flask_test_json_status"; + + // Setup: create database and plugin + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); + + let trigger_path = "json_status_test"; + run_with_confirmation(&[ + "create", + "trigger", + "--database", + db_name, + "--host", + &server_addr, + "--plugin-filename", + plugin_filename, + "--trigger-spec", + "request:test_route", + trigger_path, + ]); + + // Send request to test JSON dict with status + let client = reqwest::Client::new(); + let response = client + .get(format!("{}/api/v3/engine/test_route", server_addr)) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), 404); + assert_eq!( + response.headers().get("content-type").unwrap(), + "application/json" + ); + let body = response.json::().await.unwrap(); + assert_eq!(body, json!({"error": "Not Found", "code": 404})); +} #[cfg(feature = "system-py")] #[test_log::test(tokio::test)] diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index ab8d00ded7..9f3065665f 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -3,7 +3,7 @@ #![allow(clippy::useless_conversion)] use crate::ExecutePluginError; use crate::logging::{LogLevel, ProcessingEngineLog}; -use anyhow::Context; +use anyhow::{Context, bail}; use arrow_array::types::Int32Type; use arrow_array::{ BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray, @@ -729,33 +729,8 @@ pub fn execute_request_trigger( .call1((local_api, query_params, request_params, request_body, args)) .map_err(|e| anyhow::anyhow!("Python function call failed: {}", e))?; - // the return from the process_request function should be a tuple of (status_code, response_headers, body) - let response_code: i64 = result.get_item(0).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?.extract().context("unable to convert first tuple element from Python request function return to integer")?; - let headers_binding = result.get_item(1).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?; - let py_headers_dict = headers_binding - .downcast::() - .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDict: {}", e))?; - let body_binding = result.get_item(2).context("Python request function didn't return a tuple of (status_code, response_headers, body)")?; - let response_body: &str = body_binding.extract().context( - "unable to convert the third tuple element from Python request function to a string", - )?; - - // Then convert the dict to HashMap - let response_headers: std::collections::HashMap = py_headers_dict - .extract() - .map_err(|e| anyhow::anyhow!("error converting response headers into hashmap {}", e))?; - - // convert the returned i64 to a u16 or it's a 500 - let response_code: u16 = response_code.try_into().unwrap_or_else(|_| { - warn!( - "Invalid response code from Python request trigger: {}", - response_code - ); - - 500 - }); - - let response_headers: HashMap = response_headers.into_iter().collect(); + // Process the result according to Flask conventions + let (response_code, response_headers, response_body) = process_flask_response(py, result)?; // swap with an empty return state to avoid cloning let empty_return_state = PluginReturnState::default(); @@ -776,15 +751,154 @@ pub fn execute_request_trigger( ) } - Ok(( - response_code, - response_headers, - response_body.to_string(), - ret, - )) + Ok((response_code, response_headers, response_body, ret)) }) } +fn process_flask_response( + py: Python<'_>, + result: Bound<'_, PyAny>, +) -> Result<(u16, HashMap, String), anyhow::Error> { + let default_status: u16 = 200; + let default_headers: HashMap = HashMap::new(); + let default_mimetype = "text/html"; + + // Check if it's a Flask Response object + if let Ok(true) = result + .call_method0("__flask_response__") + .and_then(|r| r.extract::()) + { + // It's a Flask Response object, extract status, headers, and body + let status: u16 = result.getattr("status_code")?.extract()?; + let body: String = result.call_method0("get_data")?.extract()?; + + // Extract headers + let headers_dict = result.getattr("headers")?; + let headers: std::collections::HashMap = headers_dict.extract()?; + + return Ok((status, headers.into_iter().collect(), body)); + } + + // Check if it's a tuple + if let Ok(tuple) = result.downcast::() { + let tuple_len = tuple.len()?; + if tuple_len > 0 && tuple_len <= 3 { + // Extract response part (first element) + let response = tuple.get_item(0)?; + let (response_body, mut headers) = process_response_part(py, response)?; + + let mut status = default_status; + + // Handle status and/or headers if provided + if tuple_len >= 2 { + let second = tuple.get_item(1)?; + + // Check if the second item is a status code or headers + if let Ok(status_code) = second.extract::() { + status = status_code.try_into().unwrap_or(500); + } else if let Ok(header_dict) = second.downcast::() { + // It's headers + let additional_headers: std::collections::HashMap = + header_dict.extract()?; + headers.extend(additional_headers); + } + + // If there's a third item, it must be headers + if tuple_len == 3 { + let header_item = tuple.get_item(2)?; + let Ok(header_dict) = header_item.downcast::() else { + bail!("expected a dictionary in header item"); + }; + let additional_headers: std::collections::HashMap = + header_dict.extract()?; + headers.extend(additional_headers); + } + } + + return Ok((status, headers, response_body)); + } + } + + // Check if it's a string + if let Ok(string_val) = result.extract::() { + let mut headers = default_headers.clone(); + headers.insert("Content-Type".to_string(), default_mimetype.to_string()); + return Ok((default_status, headers, string_val)); + } + + // Check if it's a dict or list for JSON response + if result.is_instance_of::() || result.is_instance_of::() { + // We need to jsonify this + let json_module = py.import("json")?; + let json_string = json_module.call_method1("dumps", (result,))?; + let response_body: String = json_string.extract()?; + + let mut headers = default_headers.clone(); + headers.insert("Content-Type".to_string(), "application/json".to_string()); + + return Ok((default_status, headers, response_body)); + } + + // Check if it's an iterator or generator returning strings or bytes + if let Ok(true) = result.hasattr("__iter__") { + // Handling streaming response would require adapting the return type + // For now, we'll convert it to a concatenated string + let mut stream_content = String::new(); + + let iter = result.try_iter()?; + for item in iter { + let item = item?; + if let Ok(s) = item.extract::() { + stream_content.push_str(&s); + } else if let Ok(b) = item.extract::<&[u8]>() { + // Convert bytes to string - might need better UTF-8 handling + if let Ok(s) = std::str::from_utf8(b) { + stream_content.push_str(s); + } + } + } + + let mut headers = default_headers.clone(); + headers.insert("Content-Type".to_string(), default_mimetype.to_string()); + + return Ok((default_status, headers, stream_content)); + } + + // If we can't identify the response type, return an error + Err(anyhow::anyhow!( + "Unsupported return type from Python function" + )) +} + +fn process_response_part( + py: Python<'_>, + response: Bound<'_, PyAny>, +) -> Result<(String, HashMap), anyhow::Error> { + let mut headers = HashMap::new(); + + // If it's a string, return it directly + if let Ok(string_val) = response.extract::() { + headers.insert("Content-Type".to_string(), "text/html".to_string()); + return Ok((string_val, headers)); + } + + // If it's a dict or list, convert to JSON + if response.is_instance_of::() || response.is_instance_of::() { + let json_module = py.import("json")?; + let json_string = json_module.call_method1("dumps", (response,))?; + let response_body: String = json_string.extract()?; + + headers.insert("Content-Type".to_string(), "application/json".to_string()); + return Ok((response_body, headers)); + } + + // Default fallback + let response_str = response.str()?.extract::()?; + headers.insert("Content-Type".to_string(), "text/plain".to_string()); + + Ok((response_str, headers)) +} + // Module initialization #[pymodule] fn influxdb3_py_api(_m: &Bound<'_, PyModule>) -> PyResult<()> {