feat: return better plugin execution errors (#25842)
* feat: return better plugin execution errors This sets up the framework for fleshing out more useful plugin execution errors that get returned to the user during testing. We'll also want to capture these for logging in system tables. Also fixes a test that was broken in previous commit on time limits. Didn't show up because of the feature flag. * fix: compile errors without system-py featurepull/25850/head
parent
6ebbf26763
commit
1f71750bfe
|
@ -2999,6 +2999,7 @@ dependencies = [
|
|||
name = "influxdb3_py_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
"futures",
|
||||
|
@ -3011,6 +3012,7 @@ dependencies = [
|
|||
"observability_deps",
|
||||
"parking_lot",
|
||||
"pyo3",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ use pretty_assertions::assert_eq;
|
|||
use serde_json::{json, Value};
|
||||
use test_helpers::assert_contains;
|
||||
use test_helpers::tempfile::NamedTempFile;
|
||||
#[cfg(feature = "system-py")]
|
||||
use test_helpers::tempfile::TempDir;
|
||||
|
||||
pub fn run(args: &[&str]) -> String {
|
||||
let process = Command::cargo_bin("influxdb3")
|
||||
|
@ -945,7 +947,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
influxdb3_local.info("arg1: " + args["arg1"])
|
||||
|
||||
query_params = {"host": args["host"]}
|
||||
query_result = influxdb3_local.query("SELECT * FROM cpu where host = $host", query_params)
|
||||
query_result = influxdb3_local.query("SELECT host, region, usage FROM cpu where host = $host", query_params)
|
||||
influxdb3_local.info("query result: " + str(query_result))
|
||||
|
||||
for table_batch in table_batches:
|
||||
|
@ -984,9 +986,9 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9 1\n\
|
||||
cpu,host=s2,region=us-east usage=0.89 2\n\
|
||||
cpu,host=s1,region=us-east usage=0.85 3",
|
||||
"cpu,host=s1,region=us-east usage=0.9\n\
|
||||
cpu,host=s2,region=us-east usage=0.89\n\
|
||||
cpu,host=s1,region=us-east usage=0.85",
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.await
|
||||
|
@ -1015,7 +1017,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
let expected_result = r#"{
|
||||
"log_lines": [
|
||||
"INFO: arg1: arg1_value",
|
||||
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'time': 2, 'usage': 0.89}]",
|
||||
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]",
|
||||
"INFO: table: test_input",
|
||||
"INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}",
|
||||
"INFO: done"
|
||||
|
@ -1033,3 +1035,108 @@ def process_writes(influxdb3_local, table_batches, args=None):
|
|||
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
|
||||
assert_eq!(res, expected_result);
|
||||
}
|
||||
|
||||
#[cfg(feature = "system-py")]
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_wal_plugin_errors() {
|
||||
use crate::ConfigProvider;
|
||||
use influxdb3_client::Precision;
|
||||
|
||||
struct Test {
|
||||
name: &'static str,
|
||||
plugin_code: &'static str,
|
||||
expected_error: &'static str,
|
||||
}
|
||||
|
||||
let tests = vec![
|
||||
Test {
|
||||
name: "invalid_python",
|
||||
plugin_code: r#"
|
||||
lkjasdf9823
|
||||
jjjjj / sss"#,
|
||||
expected_error: "error executing plugin: IndentationError: unexpected indent (<string>, line 2)",
|
||||
},
|
||||
Test {
|
||||
name: "no_process_writes",
|
||||
plugin_code: r#"
|
||||
def not_process_writes(influxdb3_local, table_batches, args=None):
|
||||
influxdb3_local.info("done")"#,
|
||||
expected_error: "error executing plugin: the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)",
|
||||
},
|
||||
Test {
|
||||
name: "no_args",
|
||||
plugin_code: r#"
|
||||
def process_writes(influxdb3_local, table_batches):
|
||||
influxdb3_local.info("done")
|
||||
"#,
|
||||
expected_error: "error executing plugin: TypeError: process_writes() takes 2 positional arguments but 3 were given",
|
||||
},
|
||||
Test {
|
||||
name: "no_table_batches",
|
||||
plugin_code: r#"
|
||||
def process_writes(influxdb3_local, args=None):
|
||||
influxdb3_local.info("done")
|
||||
"#,
|
||||
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
|
||||
},
|
||||
Test {
|
||||
name: "no_influxdb3_local",
|
||||
plugin_code: r#"
|
||||
def process_writes(table_batches, args=None):
|
||||
influxdb3_local.info("done")
|
||||
"#,
|
||||
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
|
||||
}];
|
||||
|
||||
let plugin_dir = TempDir::new().unwrap();
|
||||
|
||||
let server = TestServer::configure()
|
||||
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
|
||||
.spawn()
|
||||
.await;
|
||||
let server_addr = server.client_addr();
|
||||
|
||||
server
|
||||
.write_lp_to_db(
|
||||
"foo",
|
||||
"cpu,host=s1,region=us-east usage=0.9\n\
|
||||
cpu,host=s2,region=us-east usage=0.89\n\
|
||||
cpu,host=s1,region=us-east usage=0.85",
|
||||
Precision::Nanosecond,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db_name = "foo";
|
||||
|
||||
for test in tests {
|
||||
let mut plugin_file = NamedTempFile::new_in(plugin_dir.path()).unwrap();
|
||||
writeln!(plugin_file, "{}", test.plugin_code).unwrap();
|
||||
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();
|
||||
|
||||
let result = run_with_confirmation(&[
|
||||
"test",
|
||||
"wal_plugin",
|
||||
"--database",
|
||||
db_name,
|
||||
"--host",
|
||||
&server_addr,
|
||||
"--lp",
|
||||
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
|
||||
"--input-arguments",
|
||||
"arg1=arg1_value,host=s2",
|
||||
plugin_name,
|
||||
]);
|
||||
debug!(result = ?result, "test wal plugin");
|
||||
|
||||
println!("{}", result);
|
||||
let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();
|
||||
let errors = res.get("errors").unwrap().as_array().unwrap();
|
||||
let error = errors[0].as_str().unwrap();
|
||||
assert_eq!(
|
||||
error, test.expected_error,
|
||||
"test: {}, response was: {}",
|
||||
test.name, result
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,9 @@ pub enum Error {
|
|||
|
||||
#[error("reading plugin file: {0}")]
|
||||
ReadPluginError(#[from] std::io::Error),
|
||||
|
||||
#[error("error executing plugin: {0}")]
|
||||
PluginExecutionError(#[from] influxdb3_py_api::ExecutePluginError),
|
||||
}
|
||||
|
||||
#[cfg(feature = "system-py")]
|
||||
|
|
|
@ -9,6 +9,7 @@ license.workspace = true
|
|||
system-py = ["pyo3"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arrow-array.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
hashbrown.workspace = true
|
||||
|
@ -20,6 +21,7 @@ iox_query_params.workspace = true
|
|||
observability_deps.workspace = true
|
||||
parking_lot.workspace = true
|
||||
futures.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[dependencies.pyo3]
|
||||
|
|
|
@ -1,2 +1,11 @@
|
|||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ExecutePluginError {
|
||||
#[error("the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)")]
|
||||
MissingProcessWritesFunction,
|
||||
|
||||
#[error("{0}")]
|
||||
PluginError(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[cfg(feature = "system-py")]
|
||||
pub mod system_py;
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use crate::ExecutePluginError;
|
||||
use anyhow::Context;
|
||||
use arrow_array::types::Int32Type;
|
||||
use arrow_array::{
|
||||
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
|
||||
|
@ -353,7 +355,7 @@ pub fn execute_python_with_batch(
|
|||
query_executor: Arc<dyn QueryExecutor>,
|
||||
table_filter: Option<TableId>,
|
||||
args: &Option<HashMap<String, String>>,
|
||||
) -> PyResult<PluginReturnState> {
|
||||
) -> Result<PluginReturnState, ExecutePluginError> {
|
||||
Python::with_gil(|py| {
|
||||
// import the LineBuilder for use in the python code
|
||||
let globals = PyDict::new(py);
|
||||
|
@ -362,7 +364,8 @@ pub fn execute_python_with_batch(
|
|||
&CString::new(LINE_BUILDER_CODE).unwrap(),
|
||||
Some(&globals),
|
||||
None,
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;
|
||||
|
||||
// convert the write batch into a python object
|
||||
let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len());
|
||||
|
@ -373,11 +376,11 @@ pub fn execute_python_with_batch(
|
|||
continue;
|
||||
}
|
||||
}
|
||||
let table_def = schema.tables.get(table_id).unwrap();
|
||||
let table_def = schema.tables.get(table_id).context("table not found")?;
|
||||
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("table_name", table_def.table_name.as_ref())
|
||||
.unwrap();
|
||||
.context("failed to set table_name")?;
|
||||
|
||||
let mut rows: Vec<PyObject> = Vec::new();
|
||||
for chunk in table_chunks.chunk_time_to_chunk.values() {
|
||||
|
@ -385,31 +388,49 @@ pub fn execute_python_with_batch(
|
|||
let py_row = PyDict::new(py);
|
||||
|
||||
for field in &row.fields {
|
||||
let field_name = table_def.column_id_to_name(&field.id).unwrap();
|
||||
let field_name = table_def
|
||||
.column_id_to_name(&field.id)
|
||||
.context("field not found")?;
|
||||
match &field.value {
|
||||
FieldData::String(s) => {
|
||||
py_row.set_item(field_name.as_ref(), s.as_str()).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), s.as_str())
|
||||
.context("failed to set string field")?;
|
||||
}
|
||||
FieldData::Integer(i) => {
|
||||
py_row.set_item(field_name.as_ref(), i).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), i)
|
||||
.context("failed to set integer field")?;
|
||||
}
|
||||
FieldData::UInteger(u) => {
|
||||
py_row.set_item(field_name.as_ref(), u).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), u)
|
||||
.context("failed to set unsigned integer field")?;
|
||||
}
|
||||
FieldData::Float(f) => {
|
||||
py_row.set_item(field_name.as_ref(), f).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), f)
|
||||
.context("failed to set float field")?;
|
||||
}
|
||||
FieldData::Boolean(b) => {
|
||||
py_row.set_item(field_name.as_ref(), b).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), b)
|
||||
.context("failed to set boolean field")?;
|
||||
}
|
||||
FieldData::Tag(t) => {
|
||||
py_row.set_item(field_name.as_ref(), t.as_str()).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), t.as_str())
|
||||
.context("failed to set tag field")?;
|
||||
}
|
||||
FieldData::Key(k) => {
|
||||
py_row.set_item(field_name.as_ref(), k.as_str()).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), k.as_str())
|
||||
.context("failed to set key field")?;
|
||||
}
|
||||
FieldData::Timestamp(t) => {
|
||||
py_row.set_item(field_name.as_ref(), t).unwrap();
|
||||
py_row
|
||||
.set_item(field_name.as_ref(), t)
|
||||
.context("failed to set timestamp")?;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -418,13 +439,15 @@ pub fn execute_python_with_batch(
|
|||
}
|
||||
}
|
||||
|
||||
let rows = PyList::new(py, rows).unwrap();
|
||||
let rows = PyList::new(py, rows).context("failed to create rows list")?;
|
||||
|
||||
dict.set_item("rows", rows.unbind()).unwrap();
|
||||
dict.set_item("rows", rows.unbind())
|
||||
.context("failed to set rows")?;
|
||||
table_batches.push(dict);
|
||||
}
|
||||
|
||||
let py_batches = PyList::new(py, table_batches).unwrap();
|
||||
let py_batches =
|
||||
PyList::new(py, table_batches).context("failed to create table_batches list")?;
|
||||
|
||||
let api = PyPluginCallApi {
|
||||
db_schema: schema,
|
||||
|
@ -432,7 +455,7 @@ pub fn execute_python_with_batch(
|
|||
return_state: Default::default(),
|
||||
};
|
||||
let return_state = Arc::clone(&api.return_state);
|
||||
let local_api = api.into_pyobject(py)?;
|
||||
let local_api = api.into_pyobject(py).map_err(anyhow::Error::from)?;
|
||||
|
||||
// turn args into an optional dict to pass into python
|
||||
let args = args.as_ref().map(|args| {
|
||||
|
@ -444,14 +467,19 @@ pub fn execute_python_with_batch(
|
|||
});
|
||||
|
||||
// run the code and get the python function to call
|
||||
py.run(&CString::new(code).unwrap(), Some(&globals), None)?;
|
||||
let py_func = py.eval(
|
||||
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
|
||||
Some(&globals),
|
||||
None,
|
||||
)?;
|
||||
py.run(&CString::new(code).unwrap(), Some(&globals), None)
|
||||
.map_err(anyhow::Error::from)?;
|
||||
let py_func = py
|
||||
.eval(
|
||||
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
|
||||
Some(&globals),
|
||||
None,
|
||||
)
|
||||
.map_err(|_| ExecutePluginError::MissingProcessWritesFunction)?;
|
||||
|
||||
py_func.call1((local_api, py_batches.unbind(), args))?;
|
||||
py_func
|
||||
.call1((local_api, py_batches.unbind(), args))
|
||||
.map_err(anyhow::Error::from)?;
|
||||
|
||||
// swap with an empty return state to avoid cloning
|
||||
let empty_return_state = PluginReturnState::default();
|
||||
|
|
Loading…
Reference in New Issue