feat: Add query API to Python plugins (#25766)

This ended up being a couple things rolled into one. In order to add a query API to the Python plugin, I had to pull the QueryExecutor trait out of server into a place so that the python crate could use it.

This implements the query API, but also fixes up the WAL plugin test CLI a bit. I've added a test in the CLI section so that it shows end-to-end operation of the WAL plugin test API and exercise of the entire Plugin API.

Closes #25757
pull/25777/head
Paul Dix 2025-01-09 20:13:20 -05:00 committed by GitHub
parent 63d3b867f1
commit 2d18a61949
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 581 additions and 188 deletions

27
Cargo.lock generated
View File

@ -473,9 +473,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.84"
version = "0.1.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0"
checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056"
dependencies = [
"proc-macro2",
"quote",
@ -2910,6 +2910,20 @@ dependencies = [
"serde_json",
]
[[package]]
name = "influxdb3_internal_api"
version = "0.1.0"
dependencies = [
"async-trait",
"datafusion",
"iox_query",
"iox_query_params",
"thiserror 1.0.69",
"trace",
"trace_http",
"tracker",
]
[[package]]
name = "influxdb3_load_generator"
version = "0.1.0"
@ -2954,12 +2968,17 @@ dependencies = [
name = "influxdb3_py_api"
version = "0.1.0"
dependencies = [
"async-trait",
"arrow-array",
"arrow-schema",
"futures",
"influxdb3_catalog",
"influxdb3_internal_api",
"influxdb3_wal",
"iox_query_params",
"parking_lot",
"pyo3",
"schema",
"tokio",
]
[[package]]
@ -2993,6 +3012,7 @@ dependencies = [
"influxdb3_catalog",
"influxdb3_client",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_process",
"influxdb3_sys_events",
"influxdb3_telemetry",
@ -3149,6 +3169,7 @@ dependencies = [
"influxdb3_catalog",
"influxdb3_client",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_telemetry",
"influxdb3_test_helpers",

View File

@ -7,6 +7,7 @@ members = [
"influxdb3_clap_blocks",
"influxdb3_client",
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_load_generator",
"influxdb3_process",
"influxdb3_py_api",

View File

@ -1,4 +1,5 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use anyhow::Context;
use influxdb3_client::plugin_development::WalPluginTestRequest;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
@ -53,26 +54,10 @@ pub struct WalPluginConfig {
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
/// The name of the plugin, which should match its file name on the server `<plugin-dir>/<name>.py`
/// The file name of the plugin, which should exist on the server in `<plugin-dir>/<filename>`.
/// The plugin-dir is provided on server startup.
#[clap(required = true)]
pub name: String,
}
impl From<WalPluginConfig> for WalPluginTestRequest {
fn from(val: WalPluginConfig) -> Self {
let input_arguments = val.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});
Self {
name: val.name,
input_lp: val.input_lp,
input_file: val.input_file,
input_arguments,
}
}
pub filename: String,
}
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
@ -80,7 +65,28 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.cmd {
SubCommand::WalPlugin(plugin_config) => {
let wal_plugin_test_request: WalPluginTestRequest = plugin_config.into();
let input_arguments = plugin_config.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});
let input_lp = match plugin_config.input_lp {
Some(lp) => lp,
None => {
let file_path = plugin_config
.input_file
.context("either input_lp or input_file must be provided")?;
std::fs::read_to_string(file_path).context("unable to read input file")?
}
};
let wal_plugin_test_request = WalPluginTestRequest {
filename: plugin_config.filename,
database: plugin_config.influxdb3_config.database_name,
input_lp,
input_arguments,
};
let response = client.wal_plugin_test(wal_plugin_test_request).await?;

View File

@ -849,3 +849,105 @@ async fn meta_cache_create_and_delete() {
insta::assert_yaml_snapshot!(result);
}
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_wal_plugin_test() {
use crate::ConfigProvider;
use influxdb3_client::Precision;
// Create plugin file
let plugin_file = create_plugin_file(
r#"
def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("arg1: " + args["arg1"])
query_params = {"host": args["host"]}
query_result = influxdb3_local.query_rows("SELECT * FROM cpu where host = $host", query_params)
influxdb3_local.info("query result: " + str(query_result))
for table_batch in table_batches:
influxdb3_local.info("table: " + table_batch["table_name"])
for row in table_batch["rows"]:
influxdb3_local.info("row: " + str(row))
line = LineBuilder("some_table")\
.tag("tag1", "tag1_value")\
.tag("tag2", "tag2_value")\
.int64_field("field1", 1)\
.float64_field("field2", 2.0)\
.string_field("field3", "number three")
influxdb3_local.write(line)
other_line = LineBuilder("other_table")
other_line.int64_field("other_field", 1)
other_line.float64_field("other_field2", 3.14)
other_line.time_ns(1302)
influxdb3_local.write_to_db("mytestdb", other_line)
influxdb3_local.info("done")"#,
);
let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();
let server = TestServer::configure()
.with_plugin_dir(plugin_dir)
.spawn()
.await;
let server_addr = server.client_addr();
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s2,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
Precision::Nanosecond,
)
.await
.unwrap();
let db_name = "foo";
// Run the test
let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
"--input-arguments",
"arg1=arg1_value,host=s2",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");
let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();
let expected_result = r#"{
"log_lines": [
"INFO: arg1: arg1_value",
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'time': 2, 'usage': 0.89}]",
"INFO: table: test_input",
"INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}",
"INFO: done"
],
"database_writes": {
"mytestdb": [
"other_table other_field=1i,other_field2=3.14 1302"
],
"foo": [
"some_table,tag1=tag1_value,tag2=tag2_value field1=1i,field2=2.0,field3=\"number three\""
]
},
"errors": []
}"#;
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}

View File

@ -48,6 +48,7 @@ trait ConfigProvider {
pub struct TestConfig {
auth_token: Option<(String, String)>,
host_id: Option<String>,
plugin_dir: Option<String>,
}
impl TestConfig {
@ -66,6 +67,12 @@ impl TestConfig {
self.host_id = Some(host_id.into());
self
}
/// Set the plugin dir for this [`TestServer`]
pub fn with_plugin_dir<S: Into<String>>(mut self, plugin_dir: S) -> Self {
self.plugin_dir = Some(plugin_dir.into());
self
}
}
impl ConfigProvider for TestConfig {
@ -74,6 +81,9 @@ impl ConfigProvider for TestConfig {
if let Some((token, _)) = &self.auth_token {
args.append(&mut vec!["--bearer-token".to_string(), token.to_owned()]);
}
if let Some(plugin_dir) = &self.plugin_dir {
args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]);
}
args.push("--host-id".to_string());
if let Some(host) = &self.host_id {
args.push(host.to_owned());

View File

@ -6,9 +6,9 @@ use std::collections::HashMap;
/// Request definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub name: String,
pub input_lp: Option<String>,
pub input_file: Option<String>,
pub filename: String,
pub database: String,
pub input_lp: String,
pub input_arguments: Option<HashMap<String, String>>,
}

View File

@ -0,0 +1,24 @@
[package]
name = "influxdb3_internal_api"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
# Core Crates
iox_query.workspace = true
iox_query_params.workspace = true
trace.workspace = true
trace_http.workspace = true
tracker.workspace = true
# Local Crates
# Crates.io dependencies
async-trait.workspace = true
datafusion.workspace = true
thiserror.workspace = true
[lints]
workspace = true

View File

@ -0,0 +1,4 @@
//! This crate contains the internal API for use across the crates in this code base, mainly
//! to get around circular dependency issues.
pub mod query_executor;

View File

@ -0,0 +1,134 @@
use async_trait::async_trait;
use datafusion::arrow::error::ArrowError;
use datafusion::common::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use iox_query::query_log::QueryLogEntries;
use iox_query::{QueryDatabase, QueryNamespace};
use iox_query_params::StatementParams;
use std::fmt::Debug;
use std::sync::Arc;
use trace::ctx::SpanContext;
use trace::span::Span;
use trace_http::ctx::RequestLogContext;
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
#[derive(Debug, thiserror::Error)]
pub enum QueryExecutorError {
#[error("database not found: {db_name}")]
DatabaseNotFound { db_name: String },
#[error("error while planning query: {0}")]
QueryPlanning(#[source] DataFusionError),
#[error("error while executing plan: {0}")]
ExecuteStream(#[source] DataFusionError),
#[error("unable to compose record batches from databases: {0}")]
DatabasesToRecordBatch(#[source] ArrowError),
#[error("unable to compose record batches from retention policies: {0}")]
RetentionPoliciesToRecordBatch(#[source] ArrowError),
}
#[async_trait]
pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static {
async fn query(
&self,
database: &str,
q: &str,
params: Option<StatementParams>,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError>;
fn show_databases(
&self,
include_deleted: bool,
) -> Result<SendableRecordBatchStream, QueryExecutorError>;
async fn show_retention_policies(
&self,
database: Option<&str>,
span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError>;
fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>;
}
#[derive(Debug, Clone, Copy)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl QueryKind {
pub fn query_type(&self) -> &'static str {
match self {
Self::Sql => "sql",
Self::InfluxQl => "influxql",
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct UnimplementedQueryExecutor;
#[async_trait]
impl QueryDatabase for UnimplementedQueryExecutor {
async fn namespace(
&self,
_name: &str,
_span: Option<Span>,
_include_debug_info_tables: bool,
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
unimplemented!()
}
async fn acquire_semaphore(
&self,
_span: Option<Span>,
) -> InstrumentedAsyncOwnedSemaphorePermit {
unimplemented!()
}
fn query_log(&self) -> QueryLogEntries {
unimplemented!()
}
}
#[async_trait]
impl QueryExecutor for UnimplementedQueryExecutor {
async fn query(
&self,
_database: &str,
_q: &str,
_params: Option<StatementParams>,
_kind: QueryKind,
_span_ctx: Option<SpanContext>,
_external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
}
fn show_databases(
&self,
_include_deleted: bool,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
}
async fn show_retention_policies(
&self,
_database: Option<&str>,
_span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
Err(QueryExecutorError::DatabaseNotFound {
db_name: "unimplemented".to_string(),
})
}
fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)> {
Arc::new(UnimplementedQueryExecutor) as _
}
}

View File

@ -5,20 +5,25 @@ authors.workspace = true
edition.workspace = true
license.workspace = true
[features]
system-py = ["pyo3"]
[dependencies]
arrow-array.workspace = true
arrow-schema.workspace = true
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_catalog = {path = "../influxdb3_catalog"}
async-trait.workspace = true
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
iox_query_params.workspace = true
schema.workspace = true
parking_lot.workspace = true
futures.workspace = true
tokio.workspace = true
[dependencies.pyo3]
version = "0.23.3"
# this is necessary to automatically initialize the Python interpreter
features = ["auto-initialize"]
features = ["auto-initialize", "experimental-async"]
optional = true

View File

@ -1,11 +1,20 @@
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
use arrow_array::types::Int32Type;
use arrow_array::{
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
TimestampNanosecondArray, UInt64Array,
};
use arrow_schema::DataType;
use futures::TryStreamExt;
use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind};
use influxdb3_wal::{FieldData, Row, WriteBatch};
use iox_query_params::StatementParams;
use parking_lot::Mutex;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods};
use pyo3::types::{PyDict, PyList};
use pyo3::{
pyclass, pymethods, pymodule, Bound, IntoPyObject, PyAny, PyErr, PyObject, PyResult, Python,
pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyErr, PyObject, PyResult, Python,
};
use schema::InfluxColumnType;
use std::collections::HashMap;
@ -190,8 +199,8 @@ impl PyWriteBatch {
#[pyclass]
#[derive(Debug)]
struct PyPluginCallApi {
_schema: Arc<DatabaseSchema>,
_catalog: Arc<Catalog>,
db_schema: Arc<DatabaseSchema>,
query_executor: Arc<dyn QueryExecutor>,
return_state: Arc<Mutex<PluginReturnState>>,
}
@ -280,6 +289,127 @@ impl PyPluginCallApi {
Ok(())
}
#[pyo3(signature = (query, args=None))]
fn query_rows(
&self,
query: String,
args: Option<HashMap<String, String>>,
) -> PyResult<Py<PyList>> {
let query_executor = Arc::clone(&self.query_executor);
let db_schema_name = Arc::clone(&self.db_schema.name);
let params = args.map(|args| {
let mut params = StatementParams::new();
for (key, value) in args {
params.insert(key, value);
}
params
});
// Spawn the async task
let handle = tokio::spawn(async move {
let res = query_executor
.query(
db_schema_name.as_ref(),
&query,
params,
QueryKind::Sql,
None,
None,
)
.await
.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?;
res.try_collect().await.map_err(|e| {
PyValueError::new_err(format!("Error collecting query results: {}", e))
})
});
// Block the current thread until the async task completes
let res =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(handle));
let res =
res.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?;
let batches: Vec<RecordBatch> = res
.map_err(|e| PyValueError::new_err(format!("Error collecting query results: {}", e)))?;
Python::with_gil(|py| {
let mut rows: Vec<PyObject> = Vec::new();
for batch in batches {
let num_rows = batch.num_rows();
let schema = batch.schema();
for row_idx in 0..num_rows {
let row = PyDict::new(py);
for col_idx in 0..schema.fields().len() {
let field = schema.field(col_idx);
let field_name = field.name().as_str();
let array = batch.column(col_idx);
match array.data_type() {
DataType::Int64 => {
let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::UInt64 => {
let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::Float64 => {
let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<StringArray>().unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::Boolean => {
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::Timestamp(_, _) => {
let array = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
row.set_item(field_name, array.value(row_idx))?;
}
DataType::Dictionary(_, _) => {
let col = array
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("unexpected datatype");
let values = col.values();
let values = values
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
let val = values.value(row_idx).to_string();
row.set_item(field_name, val)?;
}
_ => {
return Err(PyValueError::new_err(format!(
"Unsupported data type: {:?}",
array.data_type()
)))
}
}
}
rows.push(row.into());
}
}
let list = PyList::new(py, rows)?.unbind();
Ok(list)
})
}
}
// constant for the process writes call site string
@ -384,7 +514,7 @@ pub fn execute_python_with_batch(
code: &str,
write_batch: &WriteBatch,
schema: Arc<DatabaseSchema>,
catalog: Arc<Catalog>,
query_executor: Arc<dyn QueryExecutor>,
args: Option<HashMap<String, String>>,
) -> PyResult<PluginReturnState> {
Python::with_gil(|py| {
@ -411,50 +541,36 @@ pub fn execute_python_with_batch(
for chunk in table_chunks.chunk_time_to_chunk.values() {
for row in &chunk.rows {
let py_row = PyDict::new(py);
py_row.set_item("time", row.time).unwrap();
let mut fields = Vec::with_capacity(row.fields.len());
for field in &row.fields {
let field_name = table_def.column_id_to_name(&field.id).unwrap();
if field_name.as_ref() == "time" {
continue;
}
let py_field = PyDict::new(py);
py_field.set_item("name", field_name.as_ref()).unwrap();
match &field.value {
FieldData::String(s) => {
py_field.set_item("value", s.as_str()).unwrap();
py_row.set_item(field_name.as_ref(), s.as_str()).unwrap();
}
FieldData::Integer(i) => {
py_field.set_item("value", i).unwrap();
py_row.set_item(field_name.as_ref(), i).unwrap();
}
FieldData::UInteger(u) => {
py_field.set_item("value", u).unwrap();
py_row.set_item(field_name.as_ref(), u).unwrap();
}
FieldData::Float(f) => {
py_field.set_item("value", f).unwrap();
py_row.set_item(field_name.as_ref(), f).unwrap();
}
FieldData::Boolean(b) => {
py_field.set_item("value", b).unwrap();
py_row.set_item(field_name.as_ref(), b).unwrap();
}
FieldData::Tag(t) => {
py_field.set_item("value", t.as_str()).unwrap();
py_row.set_item(field_name.as_ref(), t.as_str()).unwrap();
}
FieldData::Key(k) => {
py_field.set_item("value", k.as_str()).unwrap();
py_row.set_item(field_name.as_ref(), k.as_str()).unwrap();
}
FieldData::Timestamp(_) => {
// return an error, this shouldn't happen
return Err(PyValueError::new_err(
"Timestamps should be in the time field",
));
FieldData::Timestamp(t) => {
py_row.set_item(field_name.as_ref(), t).unwrap();
}
};
fields.push(py_field.unbind());
}
let fields = PyList::new(py, fields).unwrap();
py_row.set_item("fields", fields.unbind()).unwrap();
rows.push(py_row.into());
}
@ -469,8 +585,8 @@ pub fn execute_python_with_batch(
let py_batches = PyList::new(py, table_batches).unwrap();
let api = PyPluginCallApi {
_schema: schema,
_catalog: catalog,
db_schema: schema,
query_executor,
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
@ -492,6 +608,7 @@ pub fn execute_python_with_batch(
Some(&globals),
None,
)?;
py_func.call1((local_api, py_batches.unbind(), args))?;
// swap with an empty return state to avoid cloning

View File

@ -34,6 +34,7 @@ influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }

View File

@ -1,14 +1,11 @@
use std::sync::Arc;
use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server};
use authz::Authorizer;
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_write::{persister::Persister, WriteBuffer};
use tokio::net::TcpListener;
use crate::{
auth::DefaultAuthorizer, http::HttpApi, query_executor, CommonServerState, QueryExecutor,
Server,
};
#[derive(Debug)]
pub struct ServerBuilder<W, Q, P, T, L> {
common_state: CommonServerState,
@ -55,7 +52,7 @@ pub struct WithWriteBuf(Arc<dyn WriteBuffer>);
#[derive(Debug)]
pub struct NoQueryExec;
#[derive(Debug)]
pub struct WithQueryExec(Arc<dyn QueryExecutor<Error = query_executor::Error>>);
pub struct WithQueryExec(Arc<dyn QueryExecutor>);
#[derive(Debug)]
pub struct NoPersister;
#[derive(Debug)]
@ -87,7 +84,7 @@ impl<Q, P, T, L> ServerBuilder<NoWriteBuf, Q, P, T, L> {
impl<W, P, T, L> ServerBuilder<W, NoQueryExec, P, T, L> {
pub fn query_executor(
self,
qe: Arc<dyn QueryExecutor<Error = query_executor::Error>>,
qe: Arc<dyn QueryExecutor>,
) -> ServerBuilder<W, WithQueryExec, P, T, L> {
ServerBuilder {
common_state: self.common_state,

View File

@ -4,11 +4,10 @@ use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use authz::Authorizer;
use crate::{query_executor, QueryExecutor};
use influxdb3_internal_api::query_executor::QueryExecutor;
pub(crate) fn make_flight_server(
server: Arc<dyn QueryExecutor<Error = query_executor::Error>>,
server: Arc<dyn QueryExecutor>,
authz: Option<Arc<dyn Authorizer>>,
) -> FlightServer<impl Flight> {
let query_db = server.upcast();

View File

@ -1,7 +1,6 @@
//! HTTP API service implementations for `server`
use crate::{query_executor, QueryKind};
use crate::{CommonServerState, QueryExecutor};
use crate::CommonServerState;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use authz::http::AuthorizationHeaderExtension;
@ -23,6 +22,7 @@ use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_cache::last_cache;
use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinality};
use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_wal::{PluginType, TriggerSpecificationDefinition};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
@ -181,7 +181,7 @@ pub enum Error {
Io(#[from] std::io::Error),
#[error("query error: {0}")]
Query(#[from] query_executor::Error),
Query(#[from] QueryExecutorError),
#[error(transparent)]
DbName(#[from] ValidateDbNameError),
@ -214,6 +214,9 @@ pub enum Error {
#[error("Python plugins not enabled on this server")]
PythonPluginsNotEnabled,
#[error("Plugin error")]
Plugin(#[from] influxdb3_write::write_buffer::plugins::Error),
}
#[derive(Debug, Error)]
@ -384,7 +387,7 @@ impl Error {
.body(body)
.unwrap()
}
Self::Query(query_executor::Error::DatabaseNotFound { .. }) => {
Self::Query(QueryExecutorError::DatabaseNotFound { .. }) => {
let err: ErrorMessage<()> = ErrorMessage {
error: self.to_string(),
data: None,
@ -437,7 +440,7 @@ pub(crate) struct HttpApi<T> {
common_state: CommonServerState,
write_buffer: Arc<dyn WriteBuffer>,
time_provider: Arc<T>,
pub(crate) query_executor: Arc<dyn QueryExecutor<Error = query_executor::Error>>,
pub(crate) query_executor: Arc<dyn QueryExecutor>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
legacy_write_param_unifier: SingleTenantRequestUnifier,
@ -448,7 +451,7 @@ impl<T> HttpApi<T> {
common_state: CommonServerState,
time_provider: Arc<T>,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor<Error = query_executor::Error>>,
query_executor: Arc<dyn QueryExecutor>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
) -> Self {
@ -1135,7 +1138,10 @@ where
let request: influxdb3_client::plugin_development::WalPluginTestRequest =
self.read_body_json(req).await?;
let output = self.write_buffer.test_wal_plugin(request).await?;
let output = self
.write_buffer
.test_wal_plugin(request, Arc::clone(&self.query_executor))
.await?;
let body = serde_json::to_string(&output)?;
Ok(Response::builder()

View File

@ -23,16 +23,12 @@ mod system_tables;
use crate::grpc::make_flight_server;
use crate::http::route_request;
use crate::http::HttpApi;
use async_trait::async_trait;
use authz::Authorizer;
use datafusion::execution::SendableRecordBatchStream;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::persister::Persister;
use iox_query::QueryDatabase;
use iox_query_params::StatementParams;
use iox_time::TimeProvider;
use observability_deps::tracing::error;
use observability_deps::tracing::info;
@ -45,9 +41,7 @@ use tokio::net::TcpListener;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tower::Layer;
use trace::ctx::SpanContext;
use trace::TraceCollector;
use trace_http::ctx::RequestLogContext;
use trace_http::ctx::TraceHeaderParser;
use trace_http::metrics::MetricFamily;
use trace_http::metrics::RequestMetrics;
@ -130,49 +124,6 @@ pub struct Server<T> {
listener: TcpListener,
}
#[async_trait]
pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static {
type Error;
async fn query(
&self,
database: &str,
q: &str,
params: Option<StatementParams>,
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, Self::Error>;
fn show_databases(
&self,
include_deleted: bool,
) -> Result<SendableRecordBatchStream, Self::Error>;
async fn show_retention_policies(
&self,
database: Option<&str>,
span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, Self::Error>;
fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>;
}
#[derive(Debug, Clone, Copy)]
pub enum QueryKind {
Sql,
InfluxQl,
}
impl QueryKind {
pub(crate) fn query_type(&self) -> &'static str {
match self {
Self::Sql => "sql",
Self::InfluxQl => "influxql",
}
}
}
impl<T> Server<T> {
pub fn authorizer(&self) -> Arc<dyn Authorizer> {
Arc::clone(&self.authorizer)

View File

@ -1,12 +1,10 @@
//! module for query executor
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA_NAME};
use crate::{query_planner::Planner, system_tables::AllSystemSchemaTablesProvider};
use crate::{QueryExecutor, QueryKind};
use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, BooleanArray};
use arrow_schema::ArrowError;
use async_trait::async_trait;
use data_types::NamespaceId;
use datafusion::catalog::{CatalogProvider, SchemaProvider, Session};
@ -23,6 +21,7 @@ use datafusion_util::MemoryStream;
use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::WriteBuffer;
@ -114,8 +113,6 @@ impl QueryExecutorImpl {
#[async_trait]
impl QueryExecutor for QueryExecutorImpl {
type Error = Error;
async fn query(
&self,
database: &str,
@ -124,15 +121,15 @@ impl QueryExecutor for QueryExecutorImpl {
kind: QueryKind,
span_ctx: Option<SpanContext>,
external_span_ctx: Option<RequestLogContext>,
) -> Result<SendableRecordBatchStream, Self::Error> {
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
info!(%database, %query, ?params, ?kind, "QueryExecutorImpl as QueryExecutor::query");
let db = self
.namespace(database, span_ctx.child_span("get database"), false)
.await
.map_err(|_| Error::DatabaseNotFound {
.map_err(|_| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?
.ok_or_else(|| Error::DatabaseNotFound {
.ok_or_else(|| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?;
@ -161,7 +158,7 @@ impl QueryExecutor for QueryExecutorImpl {
})
.await;
let plan = match plan.map_err(Error::QueryPlanning) {
let plan = match plan.map_err(QueryExecutorError::QueryPlanning) {
Ok(plan) => plan,
Err(e) => {
token.fail();
@ -182,7 +179,7 @@ impl QueryExecutor for QueryExecutorImpl {
}
Err(err) => {
token.fail();
Err(Error::ExecuteStream(err))
Err(QueryExecutorError::ExecuteStream(err))
}
}
}
@ -190,7 +187,7 @@ impl QueryExecutor for QueryExecutorImpl {
fn show_databases(
&self,
include_deleted: bool,
) -> Result<SendableRecordBatchStream, Self::Error> {
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
let mut databases = self.catalog.list_db_schema();
// sort them to ensure consistent order, first by deleted, then by name:
databases.sort_unstable_by(|a, b| match a.deleted.cmp(&b.deleted) {
@ -222,7 +219,7 @@ impl QueryExecutor for QueryExecutorImpl {
}
let schema = DatafusionSchema::new(fields);
let batch = RecordBatch::try_new(Arc::new(schema), arrays)
.map_err(Error::DatabasesToRecordBatch)?;
.map_err(QueryExecutorError::DatabasesToRecordBatch)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
@ -230,7 +227,7 @@ impl QueryExecutor for QueryExecutorImpl {
&self,
database: Option<&str>,
span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, Self::Error> {
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
let mut databases = if let Some(db) = database {
vec![db.to_owned()]
} else {
@ -244,10 +241,10 @@ impl QueryExecutor for QueryExecutorImpl {
let db = self
.namespace(&database, span_ctx.child_span("get database"), false)
.await
.map_err(|_| Error::DatabaseNotFound {
.map_err(|_| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?
.ok_or_else(|| Error::DatabaseNotFound {
.ok_or_else(|| QueryExecutorError::DatabaseNotFound {
db_name: database.to_string(),
})?;
let duration = db.retention_time_ns();
@ -337,20 +334,6 @@ fn split_database_name(db_name: &str) -> (String, String) {
)
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("database not found: {db_name}")]
DatabaseNotFound { db_name: String },
#[error("error while planning query: {0}")]
QueryPlanning(#[source] DataFusionError),
#[error("error while executing plan: {0}")]
ExecuteStream(#[source] DataFusionError),
#[error("unable to compose record batches from databases: {0}")]
DatabasesToRecordBatch(#[source] ArrowError),
#[error("unable to compose record batches from retention policies: {0}")]
RetentionPoliciesToRecordBatch(#[source] ArrowError),
}
// This implementation is for the Flight service
#[async_trait]
impl QueryDatabase for QueryExecutorImpl {
@ -364,7 +347,7 @@ impl QueryDatabase for QueryExecutorImpl {
let _span_recorder = SpanRecorder::new(span);
let db_schema = self.catalog.db_schema(name).ok_or_else(|| {
DataFusionError::External(Box::new(Error::DatabaseNotFound {
DataFusionError::External(Box::new(QueryExecutorError::DatabaseNotFound {
db_name: name.into(),
}))
})?;
@ -647,6 +630,7 @@ impl TableProvider for QueryTable {
mod tests {
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use crate::query_executor::QueryExecutorImpl;
use arrow::array::RecordBatch;
use data_types::NamespaceName;
use datafusion::assert_batches_sorted_eq;
@ -656,6 +640,7 @@ mod tests {
parquet_cache::test_cached_obj_store_and_oracle,
};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
@ -670,8 +655,6 @@ mod tests {
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet_file::storage::{ParquetStorage, StorageId};
use crate::{query_executor::QueryExecutorImpl, QueryExecutor};
use super::CreateQueryExecutorArgs;
fn make_exec(object_store: Arc<dyn ObjectStore>) -> Arc<Executor> {
@ -860,7 +843,7 @@ mod tests {
for t in test_cases {
let batch_stream = query_executor
.query(db_name, t.query, None, crate::QueryKind::Sql, None, None)
.query(db_name, t.query, None, QueryKind::Sql, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap();

View File

@ -28,6 +28,7 @@ influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }

View File

@ -69,6 +69,7 @@ use tokio::sync::watch::Receiver;
#[cfg(feature = "system-py")]
use crate::write_buffer::plugins::PluginContext;
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_internal_api::query_executor::QueryExecutor;
#[derive(Debug, Error)]
pub enum Error {
@ -135,9 +136,6 @@ pub enum Error {
#[error("error: {0}")]
AnyhowError(#[from] anyhow::Error),
#[error("reading plugin file: {0}")]
ReadPluginError(#[from] std::io::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -379,9 +377,9 @@ impl WriteBufferImpl {
}
#[cfg(feature = "system-py")]
fn read_plugin_code(&self, name: &str) -> Result<String> {
fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
let path = plugin_dir.join(format!("{}.py", name));
let path = plugin_dir.join(name);
Ok(std::fs::read_to_string(path)?)
}
}
@ -1108,20 +1106,28 @@ impl ProcessingEngineManager for WriteBufferImpl {
async fn test_wal_plugin(
&self,
request: WalPluginTestRequest,
) -> crate::Result<WalPluginTestResponse, Error> {
query_executor: Arc<dyn QueryExecutor>,
) -> crate::Result<WalPluginTestResponse, plugins::Error> {
#[cfg(feature = "system-py")]
{
// create a copy of the catalog so we don't modify the original
let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner()));
let now = self.time_provider.now();
let code = self.read_plugin_code(&request.name)?;
let code = self.read_plugin_code(&request.filename)?;
return Ok(plugins::run_test_wal_plugin(now, catalog, code, request).unwrap());
let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});
return Ok(res);
}
#[cfg(not(feature = "system-py"))]
Err(Error::AnyhowError(anyhow::anyhow!(
Err(plugins::Error::AnyhowError(anyhow::anyhow!(
"system-py feature not enabled"
)))
}

View File

@ -1,6 +1,7 @@
use crate::write_buffer::PluginEvent;
use crate::write_buffer::{plugins, PluginEvent};
use crate::{write_buffer, WriteBuffer};
use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition};
use std::fmt::Debug;
use std::sync::Arc;
@ -9,6 +10,9 @@ use tokio::sync::mpsc;
#[derive(Debug, Error)]
pub enum Error {
#[error("invalid database {0}")]
InvalidDatabase(String),
#[error("couldn't find db")]
MissingDb,
@ -24,6 +28,9 @@ pub enum Error {
#[error(transparent)]
AnyhowError(#[from] anyhow::Error),
#[error("reading plugin file: {0}")]
ReadPluginError(#[from] std::io::Error),
}
/// `[ProcessingEngineManager]` is used to interact with the processing engine,
@ -87,7 +94,8 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
async fn test_wal_plugin(
&self,
request: WalPluginTestRequest,
) -> crate::Result<WalPluginTestResponse, write_buffer::Error>;
query_executor: Arc<dyn QueryExecutor>,
) -> crate::Result<WalPluginTestResponse, plugins::Error>;
}
#[cfg(feature = "system-py")]
@ -231,6 +239,7 @@ mod python_plugin {
pub(crate) fn run_test_wal_plugin(
now_time: iox_time::Time,
catalog: Arc<influxdb3_catalog::catalog::Catalog>,
query_executor: Arc<dyn QueryExecutor>,
code: String,
request: WalPluginTestRequest,
) -> Result<WalPluginTestResponse, Error> {
@ -239,9 +248,9 @@ pub(crate) fn run_test_wal_plugin(
use data_types::NamespaceName;
use influxdb3_wal::Gen1Duration;
const TEST_NAMESPACE: &str = "_testdb";
let namespace = NamespaceName::new(TEST_NAMESPACE).unwrap();
let database = request.database;
let namespace = NamespaceName::new(database.clone())
.map_err(|_e| Error::InvalidDatabase(database.clone()))?;
// parse the lp into a write batch
let validator = WriteValidator::initialize(
namespace.clone(),
@ -249,19 +258,19 @@ pub(crate) fn run_test_wal_plugin(
now_time.timestamp_nanos(),
)?;
let data = validator.v1_parse_lines_and_update_schema(
&request.input_lp.unwrap(),
&request.input_lp,
false,
now_time,
Precision::Nanosecond,
)?;
let data = data.convert_lines_to_buffer(Gen1Duration::new_1m());
let db = catalog.db_schema("_testdb").unwrap();
let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?;
let plugin_return_state = influxdb3_py_api::system_py::execute_python_with_batch(
&code,
&data.valid_data,
db,
Arc::clone(&catalog),
query_executor,
request.input_arguments,
)?;
@ -336,7 +345,7 @@ pub(crate) fn run_test_wal_plugin(
let log_lines = plugin_return_state.log();
let mut database_writes = plugin_return_state.write_db_lines;
database_writes.insert("_testdb".to_string(), plugin_return_state.write_back_lines);
database_writes.insert(database, plugin_return_state.write_back_lines);
Ok(WalPluginTestResponse {
log_lines,
@ -353,6 +362,7 @@ mod tests {
use crate::Precision;
use data_types::NamespaceName;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use iox_time::Time;
use std::collections::HashMap;
@ -394,25 +404,32 @@ def process_writes(influxdb3_local, table_batches, args=None):
.join("\n");
let request = WalPluginTestRequest {
name: "test".into(),
input_lp: Some(lp),
input_file: None,
filename: "test".into(),
database: "_testdb".into(),
input_lp: lp,
input_arguments: Some(HashMap::from([(
String::from("arg1"),
String::from("val1"),
)])),
};
let executor: Arc<dyn QueryExecutor> = Arc::new(UnimplementedQueryExecutor);
let response =
run_test_wal_plugin(now, Arc::new(catalog), code.to_string(), request).unwrap();
run_test_wal_plugin(now, Arc::new(catalog), executor, code.to_string(), request)
.unwrap();
let expected_log_lines = vec![
"INFO: arg1: val1",
"INFO: table: cpu",
"INFO: row: {'time': 100, 'fields': [{'name': 'host', 'value': 'A'}, {'name': 'region', 'value': 'west'}, {'name': 'usage', 'value': 1}, {'name': 'system', 'value': 23.2}]}",
"INFO: table: mem", "INFO: row: {'time': 120, 'fields': [{'name': 'host', 'value': 'B'}, {'name': 'user', 'value': 43.1}]}",
"INFO: row: {'host': 'A', 'region': 'west', 'usage': 1, 'system': 23.2, 'time': 100}",
"INFO: table: mem",
"INFO: row: {'host': 'B', 'user': 43.1, 'time': 120}",
"INFO: done",
].into_iter().map(|s| s.to_string()).collect::<Vec<_>>();
]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
assert_eq!(response.log_lines, expected_log_lines);
let expected_testdb_lines = vec![
@ -475,14 +492,22 @@ def process_writes(influxdb3_local, table_batches, args=None):
let lp = ["mem,host=B user=43.1 120"].join("\n");
let request = WalPluginTestRequest {
name: "test".into(),
input_lp: Some(lp),
input_file: None,
filename: "test".into(),
database: "_testdb".into(),
input_lp: lp,
input_arguments: None,
};
let reesponse =
run_test_wal_plugin(now, Arc::clone(&catalog), code.to_string(), request).unwrap();
let executor: Arc<dyn QueryExecutor> = Arc::new(UnimplementedQueryExecutor);
let reesponse = run_test_wal_plugin(
now,
Arc::clone(&catalog),
executor,
code.to_string(),
request,
)
.unwrap();
let expected_testdb_lines = vec![
"some_table,tag1=tag1_value,tag2=tag2_value field1=1i,field2=2.0,field3=\"number three\""