feat: better plugin errors for query and line building (#25850)

bugfix/run_triggers_on_start
Paul Dix 2025-01-16 16:57:30 -05:00 committed by GitHub
parent 1f71750bfe
commit daa5cbd270
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 10 deletions

View File

@ -1086,7 +1086,25 @@ def process_writes(table_batches, args=None):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
}];
},
Test {
name: "line_builder_no_field",
plugin_code: r#"
def process_writes(influxdb3_local, table_batches, args=None):
line = LineBuilder("some_table")
influxdb3_local.write(line)
"#,
expected_error: "error executing plugin: InvalidLineError: At least one field is required: some_table",
},
Test {
name: "query_no_table",
plugin_code: r#"
def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.query("SELECT foo FROM not_here")
"#,
expected_error: "error executing plugin: QueryError: error: error while planning query: Error during planning: table 'public.iox.not_here' not found executing query: SELECT foo FROM not_here",
}
];
let plugin_dir = TempDir::new().unwrap();

View File

@ -15,15 +15,18 @@ use influxdb3_wal::{FieldData, WriteBatch};
use iox_query_params::StatementParams;
use observability_deps::tracing::{error, info, warn};
use parking_lot::Mutex;
use pyo3::exceptions::PyValueError;
use pyo3::exceptions::{PyException, PyValueError};
use pyo3::prelude::{PyAnyMethods, PyModule};
use pyo3::types::{PyDict, PyList};
use pyo3::{
pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyObject, PyResult, Python,
create_exception, pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyObject,
PyResult, Python,
};
use std::ffi::CString;
use std::sync::Arc;
create_exception!(influxdb3_py_api, QueryError, PyException);
#[pyclass]
#[derive(Debug)]
struct PyPluginCallApi {
@ -143,10 +146,12 @@ impl PyPluginCallApi {
let res = query_executor
.query_sql(db_schema_name.as_ref(), &query, params, None, None)
.await
.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?;
.map_err(|e| {
QueryError::new_err(format!("error: {} executing query: {}", e, query))
})?;
res.try_collect().await.map_err(|e| {
PyValueError::new_err(format!("Error collecting query results: {}", e))
QueryError::new_err(format!("error: {} executing query: {}", e, query))
})
});
@ -154,11 +159,9 @@ impl PyPluginCallApi {
let res =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(handle));
let res =
res.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?;
let res = res.map_err(|e| QueryError::new_err(format!("join error: {}", e)))?;
let batches: Vec<RecordBatch> = res
.map_err(|e| PyValueError::new_err(format!("Error collecting query results: {}", e)))?;
let batches: Vec<RecordBatch> = res?;
Python::with_gil(|py| {
let mut rows: Vec<PyObject> = Vec::new();
@ -255,6 +258,10 @@ class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass
class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass
class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
@ -335,7 +342,7 @@ class LineBuilder:
# Add fields (required)
if not self.fields:
raise ValueError("At least one field is required")
raise InvalidLineError(f"At least one field is required: {line}")
fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()