From 2d18a61949370b1136c41af03e819a77e33a158b Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 9 Jan 2025 20:13:20 -0500 Subject: [PATCH] feat: Add query API to Python plugins (#25766) This ended up being a couple things rolled into one. In order to add a query API to the Python plugin, I had to pull the QueryExecutor trait out of server into a place so that the python crate could use it. This implements the query API, but also fixes up the WAL plugin test CLI a bit. I've added a test in the CLI section so that it shows end-to-end operation of the WAL plugin test API and exercise of the entire Plugin API. Closes #25757 --- Cargo.lock | 27 ++- Cargo.toml | 1 + influxdb3/src/commands/test.rs | 46 ++--- influxdb3/tests/server/cli.rs | 102 +++++++++++ influxdb3/tests/server/main.rs | 10 ++ influxdb3_client/src/plugin_development.rs | 6 +- influxdb3_internal_api/Cargo.toml | 24 +++ influxdb3_internal_api/src/lib.rs | 4 + influxdb3_internal_api/src/query_executor.rs | 134 ++++++++++++++ influxdb3_py_api/Cargo.toml | 11 +- influxdb3_py_api/src/system_py.rs | 179 +++++++++++++++---- influxdb3_server/Cargo.toml | 1 + influxdb3_server/src/builder.rs | 11 +- influxdb3_server/src/grpc.rs | 5 +- influxdb3_server/src/http.rs | 20 ++- influxdb3_server/src/lib.rs | 49 ----- influxdb3_server/src/query_executor/mod.rs | 47 ++--- influxdb3_write/Cargo.toml | 1 + influxdb3_write/src/write_buffer/mod.rs | 24 ++- influxdb3_write/src/write_buffer/plugins.rs | 67 ++++--- 20 files changed, 581 insertions(+), 188 deletions(-) create mode 100644 influxdb3_internal_api/Cargo.toml create mode 100644 influxdb3_internal_api/src/lib.rs create mode 100644 influxdb3_internal_api/src/query_executor.rs diff --git a/Cargo.lock b/Cargo.lock index 6db154b723..96cc211e0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,9 +473,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.84" +version = "0.1.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" dependencies = [ "proc-macro2", "quote", @@ -2910,6 +2910,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "influxdb3_internal_api" +version = "0.1.0" +dependencies = [ + "async-trait", + "datafusion", + "iox_query", + "iox_query_params", + "thiserror 1.0.69", + "trace", + "trace_http", + "tracker", +] + [[package]] name = "influxdb3_load_generator" version = "0.1.0" @@ -2954,12 +2968,17 @@ dependencies = [ name = "influxdb3_py_api" version = "0.1.0" dependencies = [ - "async-trait", + "arrow-array", + "arrow-schema", + "futures", "influxdb3_catalog", + "influxdb3_internal_api", "influxdb3_wal", + "iox_query_params", "parking_lot", "pyo3", "schema", + "tokio", ] [[package]] @@ -2993,6 +3012,7 @@ dependencies = [ "influxdb3_catalog", "influxdb3_client", "influxdb3_id", + "influxdb3_internal_api", "influxdb3_process", "influxdb3_sys_events", "influxdb3_telemetry", @@ -3149,6 +3169,7 @@ dependencies = [ "influxdb3_catalog", "influxdb3_client", "influxdb3_id", + "influxdb3_internal_api", "influxdb3_py_api", "influxdb3_telemetry", "influxdb3_test_helpers", diff --git a/Cargo.toml b/Cargo.toml index 2b2f5817bb..0f6d697ff4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "influxdb3_clap_blocks", "influxdb3_client", "influxdb3_id", + "influxdb3_internal_api", "influxdb3_load_generator", "influxdb3_process", "influxdb3_py_api", diff --git a/influxdb3/src/commands/test.rs b/influxdb3/src/commands/test.rs index 137dcc0c78..117268f4e6 100644 --- a/influxdb3/src/commands/test.rs +++ b/influxdb3/src/commands/test.rs @@ -1,4 +1,5 @@ use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList}; +use anyhow::Context; use influxdb3_client::plugin_development::WalPluginTestRequest; use influxdb3_client::Client; use secrecy::ExposeSecret; @@ -53,26 +54,10 @@ pub struct WalPluginConfig { /// If given pass this map of string key/value pairs as input arguments #[clap(long = "input-arguments")] pub input_arguments: Option>>, - /// The name of the plugin, which should match its file name on the server `/.py` + /// The file name of the plugin, which should exist on the server in `/`. + /// The plugin-dir is provided on server startup. #[clap(required = true)] - pub name: String, -} - -impl From for WalPluginTestRequest { - fn from(val: WalPluginConfig) -> Self { - let input_arguments = val.input_arguments.map(|a| { - a.into_iter() - .map(|SeparatedKeyValue((k, v))| (k, v)) - .collect::>() - }); - - Self { - name: val.name, - input_lp: val.input_lp, - input_file: val.input_file, - input_arguments, - } - } + pub filename: String, } pub async fn command(config: Config) -> Result<(), Box> { @@ -80,7 +65,28 @@ pub async fn command(config: Config) -> Result<(), Box> { match config.cmd { SubCommand::WalPlugin(plugin_config) => { - let wal_plugin_test_request: WalPluginTestRequest = plugin_config.into(); + let input_arguments = plugin_config.input_arguments.map(|a| { + a.into_iter() + .map(|SeparatedKeyValue((k, v))| (k, v)) + .collect::>() + }); + + let input_lp = match plugin_config.input_lp { + Some(lp) => lp, + None => { + let file_path = plugin_config + .input_file + .context("either input_lp or input_file must be provided")?; + std::fs::read_to_string(file_path).context("unable to read input file")? + } + }; + + let wal_plugin_test_request = WalPluginTestRequest { + filename: plugin_config.filename, + database: plugin_config.influxdb3_config.database_name, + input_lp, + input_arguments, + }; let response = client.wal_plugin_test(wal_plugin_test_request).await?; diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index fa77139289..5baeec728c 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -849,3 +849,105 @@ async fn meta_cache_create_and_delete() { insta::assert_yaml_snapshot!(result); } + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_wal_plugin_test() { + use crate::ConfigProvider; + use influxdb3_client::Precision; + + // Create plugin file + let plugin_file = create_plugin_file( + r#" +def process_writes(influxdb3_local, table_batches, args=None): + influxdb3_local.info("arg1: " + args["arg1"]) + + query_params = {"host": args["host"]} + query_result = influxdb3_local.query_rows("SELECT * FROM cpu where host = $host", query_params) + influxdb3_local.info("query result: " + str(query_result)) + + for table_batch in table_batches: + influxdb3_local.info("table: " + table_batch["table_name"]) + + for row in table_batch["rows"]: + influxdb3_local.info("row: " + str(row)) + + line = LineBuilder("some_table")\ + .tag("tag1", "tag1_value")\ + .tag("tag2", "tag2_value")\ + .int64_field("field1", 1)\ + .float64_field("field2", 2.0)\ + .string_field("field3", "number three") + influxdb3_local.write(line) + + other_line = LineBuilder("other_table") + other_line.int64_field("other_field", 1) + other_line.float64_field("other_field2", 3.14) + other_line.time_ns(1302) + + influxdb3_local.write_to_db("mytestdb", other_line) + + influxdb3_local.info("done")"#, + ); + + let plugin_dir = plugin_file.path().parent().unwrap().to_str().unwrap(); + let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir) + .spawn() + .await; + let server_addr = server.client_addr(); + + server + .write_lp_to_db( + "foo", + "cpu,host=s1,region=us-east usage=0.9 1\n\ + cpu,host=s2,region=us-east usage=0.89 2\n\ + cpu,host=s1,region=us-east usage=0.85 3", + Precision::Nanosecond, + ) + .await + .unwrap(); + + let db_name = "foo"; + + // Run the test + let result = run_with_confirmation(&[ + "test", + "wal_plugin", + "--database", + db_name, + "--host", + &server_addr, + "--lp", + "test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500", + "--input-arguments", + "arg1=arg1_value,host=s2", + plugin_name, + ]); + debug!(result = ?result, "test wal plugin"); + + let res = serde_json::from_str::(&result).unwrap(); + + let expected_result = r#"{ + "log_lines": [ + "INFO: arg1: arg1_value", + "INFO: query result: [{'host': 's2', 'region': 'us-east', 'time': 2, 'usage': 0.89}]", + "INFO: table: test_input", + "INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}", + "INFO: done" + ], + "database_writes": { + "mytestdb": [ + "other_table other_field=1i,other_field2=3.14 1302" + ], + "foo": [ + "some_table,tag1=tag1_value,tag2=tag2_value field1=1i,field2=2.0,field3=\"number three\"" + ] + }, + "errors": [] +}"#; + let expected_result = serde_json::from_str::(expected_result).unwrap(); + assert_eq!(res, expected_result); +} diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index f2aac07bcf..1180cc38f2 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -48,6 +48,7 @@ trait ConfigProvider { pub struct TestConfig { auth_token: Option<(String, String)>, host_id: Option, + plugin_dir: Option, } impl TestConfig { @@ -66,6 +67,12 @@ impl TestConfig { self.host_id = Some(host_id.into()); self } + + /// Set the plugin dir for this [`TestServer`] + pub fn with_plugin_dir>(mut self, plugin_dir: S) -> Self { + self.plugin_dir = Some(plugin_dir.into()); + self + } } impl ConfigProvider for TestConfig { @@ -74,6 +81,9 @@ impl ConfigProvider for TestConfig { if let Some((token, _)) = &self.auth_token { args.append(&mut vec!["--bearer-token".to_string(), token.to_owned()]); } + if let Some(plugin_dir) = &self.plugin_dir { + args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]); + } args.push("--host-id".to_string()); if let Some(host) = &self.host_id { args.push(host.to_owned()); diff --git a/influxdb3_client/src/plugin_development.rs b/influxdb3_client/src/plugin_development.rs index afe05ed360..4da1711b1d 100644 --- a/influxdb3_client/src/plugin_development.rs +++ b/influxdb3_client/src/plugin_development.rs @@ -6,9 +6,9 @@ use std::collections::HashMap; /// Request definition for `POST /api/v3/plugin_test/wal` API #[derive(Debug, Serialize, Deserialize)] pub struct WalPluginTestRequest { - pub name: String, - pub input_lp: Option, - pub input_file: Option, + pub filename: String, + pub database: String, + pub input_lp: String, pub input_arguments: Option>, } diff --git a/influxdb3_internal_api/Cargo.toml b/influxdb3_internal_api/Cargo.toml new file mode 100644 index 0000000000..e2259cf5c1 --- /dev/null +++ b/influxdb3_internal_api/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "influxdb3_internal_api" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +# Core Crates +iox_query.workspace = true +iox_query_params.workspace = true +trace.workspace = true +trace_http.workspace = true +tracker.workspace = true + +# Local Crates + +# Crates.io dependencies +async-trait.workspace = true +datafusion.workspace = true +thiserror.workspace = true + +[lints] +workspace = true diff --git a/influxdb3_internal_api/src/lib.rs b/influxdb3_internal_api/src/lib.rs new file mode 100644 index 0000000000..61ccc6254d --- /dev/null +++ b/influxdb3_internal_api/src/lib.rs @@ -0,0 +1,4 @@ +//! This crate contains the internal API for use across the crates in this code base, mainly +//! to get around circular dependency issues. + +pub mod query_executor; diff --git a/influxdb3_internal_api/src/query_executor.rs b/influxdb3_internal_api/src/query_executor.rs new file mode 100644 index 0000000000..05266d45f5 --- /dev/null +++ b/influxdb3_internal_api/src/query_executor.rs @@ -0,0 +1,134 @@ +use async_trait::async_trait; +use datafusion::arrow::error::ArrowError; +use datafusion::common::DataFusionError; +use datafusion::execution::SendableRecordBatchStream; +use iox_query::query_log::QueryLogEntries; +use iox_query::{QueryDatabase, QueryNamespace}; +use iox_query_params::StatementParams; +use std::fmt::Debug; +use std::sync::Arc; +use trace::ctx::SpanContext; +use trace::span::Span; +use trace_http::ctx::RequestLogContext; +use tracker::InstrumentedAsyncOwnedSemaphorePermit; + +#[derive(Debug, thiserror::Error)] +pub enum QueryExecutorError { + #[error("database not found: {db_name}")] + DatabaseNotFound { db_name: String }, + #[error("error while planning query: {0}")] + QueryPlanning(#[source] DataFusionError), + #[error("error while executing plan: {0}")] + ExecuteStream(#[source] DataFusionError), + #[error("unable to compose record batches from databases: {0}")] + DatabasesToRecordBatch(#[source] ArrowError), + #[error("unable to compose record batches from retention policies: {0}")] + RetentionPoliciesToRecordBatch(#[source] ArrowError), +} + +#[async_trait] +pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static { + async fn query( + &self, + database: &str, + q: &str, + params: Option, + kind: QueryKind, + span_ctx: Option, + external_span_ctx: Option, + ) -> Result; + + fn show_databases( + &self, + include_deleted: bool, + ) -> Result; + + async fn show_retention_policies( + &self, + database: Option<&str>, + span_ctx: Option, + ) -> Result; + + fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>; +} + +#[derive(Debug, Clone, Copy)] +pub enum QueryKind { + Sql, + InfluxQl, +} + +impl QueryKind { + pub fn query_type(&self) -> &'static str { + match self { + Self::Sql => "sql", + Self::InfluxQl => "influxql", + } + } +} + +#[derive(Debug, Copy, Clone)] +pub struct UnimplementedQueryExecutor; + +#[async_trait] +impl QueryDatabase for UnimplementedQueryExecutor { + async fn namespace( + &self, + _name: &str, + _span: Option, + _include_debug_info_tables: bool, + ) -> Result>, DataFusionError> { + unimplemented!() + } + + async fn acquire_semaphore( + &self, + _span: Option, + ) -> InstrumentedAsyncOwnedSemaphorePermit { + unimplemented!() + } + + fn query_log(&self) -> QueryLogEntries { + unimplemented!() + } +} + +#[async_trait] +impl QueryExecutor for UnimplementedQueryExecutor { + async fn query( + &self, + _database: &str, + _q: &str, + _params: Option, + _kind: QueryKind, + _span_ctx: Option, + _external_span_ctx: Option, + ) -> Result { + Err(QueryExecutorError::DatabaseNotFound { + db_name: "unimplemented".to_string(), + }) + } + + fn show_databases( + &self, + _include_deleted: bool, + ) -> Result { + Err(QueryExecutorError::DatabaseNotFound { + db_name: "unimplemented".to_string(), + }) + } + + async fn show_retention_policies( + &self, + _database: Option<&str>, + _span_ctx: Option, + ) -> Result { + Err(QueryExecutorError::DatabaseNotFound { + db_name: "unimplemented".to_string(), + }) + } + + fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)> { + Arc::new(UnimplementedQueryExecutor) as _ + } +} diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml index a76817489e..aab94b4c36 100644 --- a/influxdb3_py_api/Cargo.toml +++ b/influxdb3_py_api/Cargo.toml @@ -5,20 +5,25 @@ authors.workspace = true edition.workspace = true license.workspace = true - [features] system-py = ["pyo3"] + [dependencies] +arrow-array.workspace = true +arrow-schema.workspace = true influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_catalog = {path = "../influxdb3_catalog"} -async-trait.workspace = true +influxdb3_internal_api = { path = "../influxdb3_internal_api" } +iox_query_params.workspace = true schema.workspace = true parking_lot.workspace = true +futures.workspace = true +tokio.workspace = true [dependencies.pyo3] version = "0.23.3" # this is necessary to automatically initialize the Python interpreter -features = ["auto-initialize"] +features = ["auto-initialize", "experimental-async"] optional = true diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 2a4dcef6be..1bc87c2e88 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -1,11 +1,20 @@ -use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; +use arrow_array::types::Int32Type; +use arrow_array::{ + BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray, + TimestampNanosecondArray, UInt64Array, +}; +use arrow_schema::DataType; +use futures::TryStreamExt; +use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition}; +use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; use influxdb3_wal::{FieldData, Row, WriteBatch}; +use iox_query_params::StatementParams; use parking_lot::Mutex; use pyo3::exceptions::PyValueError; use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods}; use pyo3::types::{PyDict, PyList}; use pyo3::{ - pyclass, pymethods, pymodule, Bound, IntoPyObject, PyAny, PyErr, PyObject, PyResult, Python, + pyclass, pymethods, pymodule, Bound, IntoPyObject, Py, PyAny, PyErr, PyObject, PyResult, Python, }; use schema::InfluxColumnType; use std::collections::HashMap; @@ -190,8 +199,8 @@ impl PyWriteBatch { #[pyclass] #[derive(Debug)] struct PyPluginCallApi { - _schema: Arc, - _catalog: Arc, + db_schema: Arc, + query_executor: Arc, return_state: Arc>, } @@ -280,6 +289,127 @@ impl PyPluginCallApi { Ok(()) } + + #[pyo3(signature = (query, args=None))] + fn query_rows( + &self, + query: String, + args: Option>, + ) -> PyResult> { + let query_executor = Arc::clone(&self.query_executor); + let db_schema_name = Arc::clone(&self.db_schema.name); + + let params = args.map(|args| { + let mut params = StatementParams::new(); + for (key, value) in args { + params.insert(key, value); + } + params + }); + + // Spawn the async task + let handle = tokio::spawn(async move { + let res = query_executor + .query( + db_schema_name.as_ref(), + &query, + params, + QueryKind::Sql, + None, + None, + ) + .await + .map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?; + + res.try_collect().await.map_err(|e| { + PyValueError::new_err(format!("Error collecting query results: {}", e)) + }) + }); + + // Block the current thread until the async task completes + let res = + tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(handle)); + + let res = + res.map_err(|e| PyValueError::new_err(format!("Error executing query: {}", e)))?; + + let batches: Vec = res + .map_err(|e| PyValueError::new_err(format!("Error collecting query results: {}", e)))?; + + Python::with_gil(|py| { + let mut rows: Vec = Vec::new(); + + for batch in batches { + let num_rows = batch.num_rows(); + let schema = batch.schema(); + + for row_idx in 0..num_rows { + let row = PyDict::new(py); + for col_idx in 0..schema.fields().len() { + let field = schema.field(col_idx); + let field_name = field.name().as_str(); + + let array = batch.column(col_idx); + + match array.data_type() { + DataType::Int64 => { + let array = array.as_any().downcast_ref::().unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::UInt64 => { + let array = array.as_any().downcast_ref::().unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::Float64 => { + let array = array.as_any().downcast_ref::().unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::Utf8 => { + let array = array.as_any().downcast_ref::().unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::Boolean => { + let array = array.as_any().downcast_ref::().unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::Timestamp(_, _) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + row.set_item(field_name, array.value(row_idx))?; + } + DataType::Dictionary(_, _) => { + let col = array + .as_any() + .downcast_ref::>() + .expect("unexpected datatype"); + + let values = col.values(); + let values = values + .as_any() + .downcast_ref::() + .expect("unexpected datatype"); + + let val = values.value(row_idx).to_string(); + row.set_item(field_name, val)?; + } + _ => { + return Err(PyValueError::new_err(format!( + "Unsupported data type: {:?}", + array.data_type() + ))) + } + } + } + rows.push(row.into()); + } + } + + let list = PyList::new(py, rows)?.unbind(); + Ok(list) + }) + } } // constant for the process writes call site string @@ -384,7 +514,7 @@ pub fn execute_python_with_batch( code: &str, write_batch: &WriteBatch, schema: Arc, - catalog: Arc, + query_executor: Arc, args: Option>, ) -> PyResult { Python::with_gil(|py| { @@ -411,50 +541,36 @@ pub fn execute_python_with_batch( for chunk in table_chunks.chunk_time_to_chunk.values() { for row in &chunk.rows { let py_row = PyDict::new(py); - py_row.set_item("time", row.time).unwrap(); - let mut fields = Vec::with_capacity(row.fields.len()); + for field in &row.fields { let field_name = table_def.column_id_to_name(&field.id).unwrap(); - if field_name.as_ref() == "time" { - continue; - } - let py_field = PyDict::new(py); - py_field.set_item("name", field_name.as_ref()).unwrap(); - match &field.value { FieldData::String(s) => { - py_field.set_item("value", s.as_str()).unwrap(); + py_row.set_item(field_name.as_ref(), s.as_str()).unwrap(); } FieldData::Integer(i) => { - py_field.set_item("value", i).unwrap(); + py_row.set_item(field_name.as_ref(), i).unwrap(); } FieldData::UInteger(u) => { - py_field.set_item("value", u).unwrap(); + py_row.set_item(field_name.as_ref(), u).unwrap(); } FieldData::Float(f) => { - py_field.set_item("value", f).unwrap(); + py_row.set_item(field_name.as_ref(), f).unwrap(); } FieldData::Boolean(b) => { - py_field.set_item("value", b).unwrap(); + py_row.set_item(field_name.as_ref(), b).unwrap(); } FieldData::Tag(t) => { - py_field.set_item("value", t.as_str()).unwrap(); + py_row.set_item(field_name.as_ref(), t.as_str()).unwrap(); } FieldData::Key(k) => { - py_field.set_item("value", k.as_str()).unwrap(); + py_row.set_item(field_name.as_ref(), k.as_str()).unwrap(); } - FieldData::Timestamp(_) => { - // return an error, this shouldn't happen - return Err(PyValueError::new_err( - "Timestamps should be in the time field", - )); + FieldData::Timestamp(t) => { + py_row.set_item(field_name.as_ref(), t).unwrap(); } }; - - fields.push(py_field.unbind()); } - let fields = PyList::new(py, fields).unwrap(); - py_row.set_item("fields", fields.unbind()).unwrap(); rows.push(py_row.into()); } @@ -469,8 +585,8 @@ pub fn execute_python_with_batch( let py_batches = PyList::new(py, table_batches).unwrap(); let api = PyPluginCallApi { - _schema: schema, - _catalog: catalog, + db_schema: schema, + query_executor, return_state: Default::default(), }; let return_state = Arc::clone(&api.return_state); @@ -492,6 +608,7 @@ pub fn execute_python_with_batch( Some(&globals), None, )?; + py_func.call1((local_api, py_batches.unbind(), args))?; // swap with an empty return state to avoid cloning diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index c1aa58fb96..c5aca26053 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -34,6 +34,7 @@ influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } +influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_wal = { path = "../influxdb3_wal"} influxdb3_write = { path = "../influxdb3_write" } diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index c7d9b4dd4e..56edfa14e1 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -1,14 +1,11 @@ use std::sync::Arc; +use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server}; use authz::Authorizer; +use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_write::{persister::Persister, WriteBuffer}; use tokio::net::TcpListener; -use crate::{ - auth::DefaultAuthorizer, http::HttpApi, query_executor, CommonServerState, QueryExecutor, - Server, -}; - #[derive(Debug)] pub struct ServerBuilder { common_state: CommonServerState, @@ -55,7 +52,7 @@ pub struct WithWriteBuf(Arc); #[derive(Debug)] pub struct NoQueryExec; #[derive(Debug)] -pub struct WithQueryExec(Arc>); +pub struct WithQueryExec(Arc); #[derive(Debug)] pub struct NoPersister; #[derive(Debug)] @@ -87,7 +84,7 @@ impl ServerBuilder { impl ServerBuilder { pub fn query_executor( self, - qe: Arc>, + qe: Arc, ) -> ServerBuilder { ServerBuilder { common_state: self.common_state, diff --git a/influxdb3_server/src/grpc.rs b/influxdb3_server/src/grpc.rs index 5793416117..075fb254de 100644 --- a/influxdb3_server/src/grpc.rs +++ b/influxdb3_server/src/grpc.rs @@ -4,11 +4,10 @@ use arrow_flight::flight_service_server::{ FlightService as Flight, FlightServiceServer as FlightServer, }; use authz::Authorizer; - -use crate::{query_executor, QueryExecutor}; +use influxdb3_internal_api::query_executor::QueryExecutor; pub(crate) fn make_flight_server( - server: Arc>, + server: Arc, authz: Option>, ) -> FlightServer { let query_db = server.upcast(); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index aabb510e75..175025992b 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1,7 +1,6 @@ //! HTTP API service implementations for `server` -use crate::{query_executor, QueryKind}; -use crate::{CommonServerState, QueryExecutor}; +use crate::CommonServerState; use arrow::record_batch::RecordBatch; use arrow::util::pretty; use authz::http::AuthorizationHeaderExtension; @@ -23,6 +22,7 @@ use hyper::{Body, Method, Request, Response, StatusCode}; use influxdb3_cache::last_cache; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinality}; use influxdb3_catalog::catalog::Error as CatalogError; +use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; use influxdb3_write::persister::TrackedMemoryArrowWriter; @@ -181,7 +181,7 @@ pub enum Error { Io(#[from] std::io::Error), #[error("query error: {0}")] - Query(#[from] query_executor::Error), + Query(#[from] QueryExecutorError), #[error(transparent)] DbName(#[from] ValidateDbNameError), @@ -214,6 +214,9 @@ pub enum Error { #[error("Python plugins not enabled on this server")] PythonPluginsNotEnabled, + + #[error("Plugin error")] + Plugin(#[from] influxdb3_write::write_buffer::plugins::Error), } #[derive(Debug, Error)] @@ -384,7 +387,7 @@ impl Error { .body(body) .unwrap() } - Self::Query(query_executor::Error::DatabaseNotFound { .. }) => { + Self::Query(QueryExecutorError::DatabaseNotFound { .. }) => { let err: ErrorMessage<()> = ErrorMessage { error: self.to_string(), data: None, @@ -437,7 +440,7 @@ pub(crate) struct HttpApi { common_state: CommonServerState, write_buffer: Arc, time_provider: Arc, - pub(crate) query_executor: Arc>, + pub(crate) query_executor: Arc, max_request_bytes: usize, authorizer: Arc, legacy_write_param_unifier: SingleTenantRequestUnifier, @@ -448,7 +451,7 @@ impl HttpApi { common_state: CommonServerState, time_provider: Arc, write_buffer: Arc, - query_executor: Arc>, + query_executor: Arc, max_request_bytes: usize, authorizer: Arc, ) -> Self { @@ -1135,7 +1138,10 @@ where let request: influxdb3_client::plugin_development::WalPluginTestRequest = self.read_body_json(req).await?; - let output = self.write_buffer.test_wal_plugin(request).await?; + let output = self + .write_buffer + .test_wal_plugin(request, Arc::clone(&self.query_executor)) + .await?; let body = serde_json::to_string(&output)?; Ok(Response::builder() diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 44915e562a..60b5879623 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -23,16 +23,12 @@ mod system_tables; use crate::grpc::make_flight_server; use crate::http::route_request; use crate::http::HttpApi; -use async_trait::async_trait; use authz::Authorizer; -use datafusion::execution::SendableRecordBatchStream; use hyper::server::conn::AddrIncoming; use hyper::server::conn::Http; use hyper::service::service_fn; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::persister::Persister; -use iox_query::QueryDatabase; -use iox_query_params::StatementParams; use iox_time::TimeProvider; use observability_deps::tracing::error; use observability_deps::tracing::info; @@ -45,9 +41,7 @@ use tokio::net::TcpListener; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tower::Layer; -use trace::ctx::SpanContext; use trace::TraceCollector; -use trace_http::ctx::RequestLogContext; use trace_http::ctx::TraceHeaderParser; use trace_http::metrics::MetricFamily; use trace_http::metrics::RequestMetrics; @@ -130,49 +124,6 @@ pub struct Server { listener: TcpListener, } -#[async_trait] -pub trait QueryExecutor: QueryDatabase + Debug + Send + Sync + 'static { - type Error; - - async fn query( - &self, - database: &str, - q: &str, - params: Option, - kind: QueryKind, - span_ctx: Option, - external_span_ctx: Option, - ) -> Result; - - fn show_databases( - &self, - include_deleted: bool, - ) -> Result; - - async fn show_retention_policies( - &self, - database: Option<&str>, - span_ctx: Option, - ) -> Result; - - fn upcast(&self) -> Arc<(dyn QueryDatabase + 'static)>; -} - -#[derive(Debug, Clone, Copy)] -pub enum QueryKind { - Sql, - InfluxQl, -} - -impl QueryKind { - pub(crate) fn query_type(&self) -> &'static str { - match self { - Self::Sql => "sql", - Self::InfluxQl => "influxql", - } - } -} - impl Server { pub fn authorizer(&self) -> Arc { Arc::clone(&self.authorizer) diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 0a39a08e33..05e3f7fb5a 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -1,12 +1,10 @@ //! module for query executor use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA_NAME}; use crate::{query_planner::Planner, system_tables::AllSystemSchemaTablesProvider}; -use crate::{QueryExecutor, QueryKind}; use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow_array::{Array, BooleanArray}; -use arrow_schema::ArrowError; use async_trait::async_trait; use data_types::NamespaceId; use datafusion::catalog::{CatalogProvider, SchemaProvider, Session}; @@ -23,6 +21,7 @@ use datafusion_util::MemoryStream; use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME}; use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; +use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind}; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_write::WriteBuffer; @@ -114,8 +113,6 @@ impl QueryExecutorImpl { #[async_trait] impl QueryExecutor for QueryExecutorImpl { - type Error = Error; - async fn query( &self, database: &str, @@ -124,15 +121,15 @@ impl QueryExecutor for QueryExecutorImpl { kind: QueryKind, span_ctx: Option, external_span_ctx: Option, - ) -> Result { + ) -> Result { info!(%database, %query, ?params, ?kind, "QueryExecutorImpl as QueryExecutor::query"); let db = self .namespace(database, span_ctx.child_span("get database"), false) .await - .map_err(|_| Error::DatabaseNotFound { + .map_err(|_| QueryExecutorError::DatabaseNotFound { db_name: database.to_string(), })? - .ok_or_else(|| Error::DatabaseNotFound { + .ok_or_else(|| QueryExecutorError::DatabaseNotFound { db_name: database.to_string(), })?; @@ -161,7 +158,7 @@ impl QueryExecutor for QueryExecutorImpl { }) .await; - let plan = match plan.map_err(Error::QueryPlanning) { + let plan = match plan.map_err(QueryExecutorError::QueryPlanning) { Ok(plan) => plan, Err(e) => { token.fail(); @@ -182,7 +179,7 @@ impl QueryExecutor for QueryExecutorImpl { } Err(err) => { token.fail(); - Err(Error::ExecuteStream(err)) + Err(QueryExecutorError::ExecuteStream(err)) } } } @@ -190,7 +187,7 @@ impl QueryExecutor for QueryExecutorImpl { fn show_databases( &self, include_deleted: bool, - ) -> Result { + ) -> Result { let mut databases = self.catalog.list_db_schema(); // sort them to ensure consistent order, first by deleted, then by name: databases.sort_unstable_by(|a, b| match a.deleted.cmp(&b.deleted) { @@ -222,7 +219,7 @@ impl QueryExecutor for QueryExecutorImpl { } let schema = DatafusionSchema::new(fields); let batch = RecordBatch::try_new(Arc::new(schema), arrays) - .map_err(Error::DatabasesToRecordBatch)?; + .map_err(QueryExecutorError::DatabasesToRecordBatch)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } @@ -230,7 +227,7 @@ impl QueryExecutor for QueryExecutorImpl { &self, database: Option<&str>, span_ctx: Option, - ) -> Result { + ) -> Result { let mut databases = if let Some(db) = database { vec![db.to_owned()] } else { @@ -244,10 +241,10 @@ impl QueryExecutor for QueryExecutorImpl { let db = self .namespace(&database, span_ctx.child_span("get database"), false) .await - .map_err(|_| Error::DatabaseNotFound { + .map_err(|_| QueryExecutorError::DatabaseNotFound { db_name: database.to_string(), })? - .ok_or_else(|| Error::DatabaseNotFound { + .ok_or_else(|| QueryExecutorError::DatabaseNotFound { db_name: database.to_string(), })?; let duration = db.retention_time_ns(); @@ -337,20 +334,6 @@ fn split_database_name(db_name: &str) -> (String, String) { ) } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("database not found: {db_name}")] - DatabaseNotFound { db_name: String }, - #[error("error while planning query: {0}")] - QueryPlanning(#[source] DataFusionError), - #[error("error while executing plan: {0}")] - ExecuteStream(#[source] DataFusionError), - #[error("unable to compose record batches from databases: {0}")] - DatabasesToRecordBatch(#[source] ArrowError), - #[error("unable to compose record batches from retention policies: {0}")] - RetentionPoliciesToRecordBatch(#[source] ArrowError), -} - // This implementation is for the Flight service #[async_trait] impl QueryDatabase for QueryExecutorImpl { @@ -364,7 +347,7 @@ impl QueryDatabase for QueryExecutorImpl { let _span_recorder = SpanRecorder::new(span); let db_schema = self.catalog.db_schema(name).ok_or_else(|| { - DataFusionError::External(Box::new(Error::DatabaseNotFound { + DataFusionError::External(Box::new(QueryExecutorError::DatabaseNotFound { db_name: name.into(), })) })?; @@ -647,6 +630,7 @@ impl TableProvider for QueryTable { mod tests { use std::{num::NonZeroUsize, sync::Arc, time::Duration}; + use crate::query_executor::QueryExecutorImpl; use arrow::array::RecordBatch; use data_types::NamespaceName; use datafusion::assert_batches_sorted_eq; @@ -656,6 +640,7 @@ mod tests { parquet_cache::test_cached_obj_store_and_oracle, }; use influxdb3_catalog::catalog::Catalog; + use influxdb3_internal_api::query_executor::{QueryExecutor, QueryKind}; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; @@ -670,8 +655,6 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use parquet_file::storage::{ParquetStorage, StorageId}; - use crate::{query_executor::QueryExecutorImpl, QueryExecutor}; - use super::CreateQueryExecutorArgs; fn make_exec(object_store: Arc) -> Arc { @@ -860,7 +843,7 @@ mod tests { for t in test_cases { let batch_stream = query_executor - .query(db_name, t.query, None, crate::QueryKind::Sql, None, None) + .query(db_name, t.query, None, QueryKind::Sql, None, None) .await .unwrap(); let batches: Vec = batch_stream.try_collect().await.unwrap(); diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 22a7f6ee17..5606d72c5a 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -28,6 +28,7 @@ influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } +influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_telemetry = { path = "../influxdb3_telemetry" } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index d7df44bd1b..26928d15fd 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -69,6 +69,7 @@ use tokio::sync::watch::Receiver; #[cfg(feature = "system-py")] use crate::write_buffer::plugins::PluginContext; use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; #[derive(Debug, Error)] pub enum Error { @@ -135,9 +136,6 @@ pub enum Error { #[error("error: {0}")] AnyhowError(#[from] anyhow::Error), - - #[error("reading plugin file: {0}")] - ReadPluginError(#[from] std::io::Error), } pub type Result = std::result::Result; @@ -379,9 +377,9 @@ impl WriteBufferImpl { } #[cfg(feature = "system-py")] - fn read_plugin_code(&self, name: &str) -> Result { + fn read_plugin_code(&self, name: &str) -> Result { let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; - let path = plugin_dir.join(format!("{}.py", name)); + let path = plugin_dir.join(name); Ok(std::fs::read_to_string(path)?) } } @@ -1108,20 +1106,28 @@ impl ProcessingEngineManager for WriteBufferImpl { async fn test_wal_plugin( &self, request: WalPluginTestRequest, - ) -> crate::Result { + query_executor: Arc, + ) -> crate::Result { #[cfg(feature = "system-py")] { // create a copy of the catalog so we don't modify the original let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); let now = self.time_provider.now(); - let code = self.read_plugin_code(&request.name)?; + let code = self.read_plugin_code(&request.filename)?; - return Ok(plugins::run_test_wal_plugin(now, catalog, code, request).unwrap()); + let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) + .unwrap_or_else(|e| WalPluginTestResponse { + log_lines: vec![], + database_writes: Default::default(), + errors: vec![e.to_string()], + }); + + return Ok(res); } #[cfg(not(feature = "system-py"))] - Err(Error::AnyhowError(anyhow::anyhow!( + Err(plugins::Error::AnyhowError(anyhow::anyhow!( "system-py feature not enabled" ))) } diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_write/src/write_buffer/plugins.rs index 0855dc2c8f..ed9c571d75 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_write/src/write_buffer/plugins.rs @@ -1,6 +1,7 @@ -use crate::write_buffer::PluginEvent; +use crate::write_buffer::{plugins, PluginEvent}; use crate::{write_buffer, WriteBuffer}; use influxdb3_client::plugin_development::{WalPluginTestRequest, WalPluginTestResponse}; +use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition}; use std::fmt::Debug; use std::sync::Arc; @@ -9,6 +10,9 @@ use tokio::sync::mpsc; #[derive(Debug, Error)] pub enum Error { + #[error("invalid database {0}")] + InvalidDatabase(String), + #[error("couldn't find db")] MissingDb, @@ -24,6 +28,9 @@ pub enum Error { #[error(transparent)] AnyhowError(#[from] anyhow::Error), + + #[error("reading plugin file: {0}")] + ReadPluginError(#[from] std::io::Error), } /// `[ProcessingEngineManager]` is used to interact with the processing engine, @@ -87,7 +94,8 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { async fn test_wal_plugin( &self, request: WalPluginTestRequest, - ) -> crate::Result; + query_executor: Arc, + ) -> crate::Result; } #[cfg(feature = "system-py")] @@ -231,6 +239,7 @@ mod python_plugin { pub(crate) fn run_test_wal_plugin( now_time: iox_time::Time, catalog: Arc, + query_executor: Arc, code: String, request: WalPluginTestRequest, ) -> Result { @@ -239,9 +248,9 @@ pub(crate) fn run_test_wal_plugin( use data_types::NamespaceName; use influxdb3_wal::Gen1Duration; - const TEST_NAMESPACE: &str = "_testdb"; - - let namespace = NamespaceName::new(TEST_NAMESPACE).unwrap(); + let database = request.database; + let namespace = NamespaceName::new(database.clone()) + .map_err(|_e| Error::InvalidDatabase(database.clone()))?; // parse the lp into a write batch let validator = WriteValidator::initialize( namespace.clone(), @@ -249,19 +258,19 @@ pub(crate) fn run_test_wal_plugin( now_time.timestamp_nanos(), )?; let data = validator.v1_parse_lines_and_update_schema( - &request.input_lp.unwrap(), + &request.input_lp, false, now_time, Precision::Nanosecond, )?; let data = data.convert_lines_to_buffer(Gen1Duration::new_1m()); - let db = catalog.db_schema("_testdb").unwrap(); + let db = catalog.db_schema(&database).ok_or(Error::MissingDb)?; let plugin_return_state = influxdb3_py_api::system_py::execute_python_with_batch( &code, &data.valid_data, db, - Arc::clone(&catalog), + query_executor, request.input_arguments, )?; @@ -336,7 +345,7 @@ pub(crate) fn run_test_wal_plugin( let log_lines = plugin_return_state.log(); let mut database_writes = plugin_return_state.write_db_lines; - database_writes.insert("_testdb".to_string(), plugin_return_state.write_back_lines); + database_writes.insert(database, plugin_return_state.write_back_lines); Ok(WalPluginTestResponse { log_lines, @@ -353,6 +362,7 @@ mod tests { use crate::Precision; use data_types::NamespaceName; use influxdb3_catalog::catalog::Catalog; + use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; use iox_time::Time; use std::collections::HashMap; @@ -394,25 +404,32 @@ def process_writes(influxdb3_local, table_batches, args=None): .join("\n"); let request = WalPluginTestRequest { - name: "test".into(), - input_lp: Some(lp), - input_file: None, + filename: "test".into(), + database: "_testdb".into(), + input_lp: lp, input_arguments: Some(HashMap::from([( String::from("arg1"), String::from("val1"), )])), }; + let executor: Arc = Arc::new(UnimplementedQueryExecutor); + let response = - run_test_wal_plugin(now, Arc::new(catalog), code.to_string(), request).unwrap(); + run_test_wal_plugin(now, Arc::new(catalog), executor, code.to_string(), request) + .unwrap(); let expected_log_lines = vec![ "INFO: arg1: val1", "INFO: table: cpu", - "INFO: row: {'time': 100, 'fields': [{'name': 'host', 'value': 'A'}, {'name': 'region', 'value': 'west'}, {'name': 'usage', 'value': 1}, {'name': 'system', 'value': 23.2}]}", - "INFO: table: mem", "INFO: row: {'time': 120, 'fields': [{'name': 'host', 'value': 'B'}, {'name': 'user', 'value': 43.1}]}", + "INFO: row: {'host': 'A', 'region': 'west', 'usage': 1, 'system': 23.2, 'time': 100}", + "INFO: table: mem", + "INFO: row: {'host': 'B', 'user': 43.1, 'time': 120}", "INFO: done", - ].into_iter().map(|s| s.to_string()).collect::>(); + ] + .into_iter() + .map(|s| s.to_string()) + .collect::>(); assert_eq!(response.log_lines, expected_log_lines); let expected_testdb_lines = vec![ @@ -475,14 +492,22 @@ def process_writes(influxdb3_local, table_batches, args=None): let lp = ["mem,host=B user=43.1 120"].join("\n"); let request = WalPluginTestRequest { - name: "test".into(), - input_lp: Some(lp), - input_file: None, + filename: "test".into(), + database: "_testdb".into(), + input_lp: lp, input_arguments: None, }; - let reesponse = - run_test_wal_plugin(now, Arc::clone(&catalog), code.to_string(), request).unwrap(); + let executor: Arc = Arc::new(UnimplementedQueryExecutor); + + let reesponse = run_test_wal_plugin( + now, + Arc::clone(&catalog), + executor, + code.to_string(), + request, + ) + .unwrap(); let expected_testdb_lines = vec![ "some_table,tag1=tag1_value,tag2=tag2_value field1=1i,field2=2.0,field3=\"number three\""