feat(processing_engine): Allow async plugin execution.

processing_engine/async_execution
Jackson Newhouse 2025-02-12 15:28:55 -08:00
parent 80de52f15f
commit dc154302b5
14 changed files with 221 additions and 114 deletions

3
Cargo.lock generated
View File

@ -2919,6 +2919,7 @@ dependencies = [
"bytes",
"hashbrown 0.15.2",
"influxdb3_types",
"influxdb3_wal",
"iox_query_params",
"mockito",
"reqwest 0.11.27",
@ -3007,6 +3008,7 @@ dependencies = [
"cron 0.15.0",
"data_types",
"datafusion_util",
"futures-util",
"hashbrown 0.15.2",
"humantime",
"hyper 0.14.32",
@ -3205,6 +3207,7 @@ dependencies = [
"hashbrown 0.15.2",
"hyper 0.14.32",
"influxdb3_cache",
"influxdb3_wal",
"iox_http",
"iox_query_params",
"serde",

View File

@ -229,6 +229,8 @@ pub struct TriggerConfig {
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
#[clap(long)]
run_asynchronous: bool,
/// Name for the new trigger
trigger_name: String,
}
@ -351,6 +353,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification,
trigger_arguments,
disabled,
run_asynchronous,
}) => {
let trigger_arguments: Option<HashMap<String, String>> = trigger_arguments.map(|a| {
a.into_iter()
@ -358,7 +361,6 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
.collect::<HashMap<String, String>>()
});
//println!("does this work?");
match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
@ -367,6 +369,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification.string_rep(),
trigger_arguments,
disabled,
run_asynchronous,
)
.await
{

View File

@ -10,8 +10,10 @@ use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::DistinctCacheDefinition;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef, PluginType, TriggerDefinition};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType,
TriggerDefinition, TriggerFlag,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
@ -153,6 +155,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
flags: trigger.flags,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
@ -223,6 +226,7 @@ struct ProcessingEngineTriggerSnapshot {
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub flags: Vec<TriggerFlag>,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
@ -471,6 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
flags: trigger.flags.clone(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_arguments: trigger.trigger_arguments.clone(),

View File

@ -11,6 +11,7 @@ iox_query_params.workspace = true
# Local deps
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
bytes.workspace = true

View File

@ -12,6 +12,7 @@ use url::Url;
use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
use influxdb3_wal::TriggerFlag;
/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
@ -456,6 +457,7 @@ impl Client {
}
/// Make a request to `POST /api/v3/configure/processing_engine_trigger`
#[allow(clippy::too_many_arguments)]
pub async fn api_v3_configure_processing_engine_trigger_create(
&self,
db: impl Into<String> + Send,
@ -464,7 +466,13 @@ impl Client {
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
execute_async: bool,
) -> Result<()> {
let flags = if execute_async {
vec![TriggerFlag::ExecuteAsynchronously]
} else {
vec![]
};
let _bytes = self
.send_json_get_bytes(
Method::POST,
@ -474,6 +482,7 @@ impl Client {
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
flags,
trigger_arguments,
disabled,
}),

View File

@ -15,6 +15,7 @@ bytes.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
futures-util.workspace = true
humantime.workspace = true
hashbrown.workspace = true
hyper.workspace = true

View File

@ -19,7 +19,8 @@ use influxdb3_types::http::{
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents,
WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
@ -345,6 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
@ -360,6 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger_name,
plugin_filename,
trigger: trigger_specification,
flags,
trigger_arguments,
disabled,
database_name: db_name.to_string(),
@ -449,7 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
};
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?);
match trigger.trigger.plugin_type() {
PluginType::WalRows => {
let rec = self
@ -642,13 +645,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();
let res =
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});
})
})
.await?;
return Ok(res);
}
@ -674,13 +679,16 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();
let res = plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
})
.await?
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
@ -850,6 +858,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
@ -935,6 +944,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,

View File

@ -7,7 +7,7 @@ use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
@ -57,11 +57,13 @@ pub enum ProcessingEngineError {
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
#[allow(clippy::too_many_arguments)]
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,

View File

@ -81,12 +81,15 @@ pub enum PluginError {
#[error("error reading file from Github: {0} {1}")]
FetchingFromGithub(reqwest::StatusCode, String),
#[error("Join error, please report: {0}")]
JoinError(#[from] tokio::task::JoinError),
}
#[cfg(feature = "system-py")]
pub(crate) fn run_wal_contents_plugin(
db_name: String,
plugin_code: PluginCode,
plugin_code: Arc<PluginCode>,
trigger_definition: TriggerDefinition,
context: PluginContext,
plugin_receiver: mpsc::Receiver<WalEvent>,
@ -117,7 +120,7 @@ pub struct ProcessingEngineEnvironmentManager {
#[cfg(feature = "system-py")]
pub(crate) fn run_schedule_plugin(
db_name: String,
plugin_code: PluginCode,
plugin_code: Arc<PluginCode>,
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
@ -158,7 +161,7 @@ pub(crate) fn run_schedule_plugin(
#[cfg(feature = "system-py")]
pub(crate) fn run_request_plugin(
db_name: String,
plugin_code: PluginCode,
plugin_code: Arc<PluginCode>,
trigger_definition: TriggerDefinition,
context: PluginContext,
plugin_receiver: mpsc::Receiver<RequestEvent>,
@ -190,10 +193,10 @@ pub(crate) struct PluginContext {
}
#[cfg(feature = "system-py")]
#[derive(Debug)]
#[derive(Debug, Clone)]
struct TriggerPlugin {
trigger_definition: TriggerDefinition,
plugin_code: PluginCode,
plugin_code: Arc<PluginCode>,
db_name: String,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
@ -207,6 +210,8 @@ mod python_plugin {
use chrono::{DateTime, Duration, Utc};
use cron::{OwnedScheduleIterator, Schedule as CronSchedule};
use data_types::NamespaceName;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use humantime::{format_duration, parse_duration};
use hyper::http::HeaderValue;
use hyper::{Body, Response, StatusCode};
@ -215,7 +220,7 @@ mod python_plugin {
execute_python_with_batch, execute_request_trigger, execute_schedule_trigger,
PluginReturnState, ProcessingEngineLogger,
};
use influxdb3_wal::{WalContents, WalOp};
use influxdb3_wal::{TriggerFlag, WalContents, WalOp};
use influxdb3_write::Precision;
use iox_time::Time;
use observability_deps::tracing::{info, warn};
@ -229,25 +234,36 @@ mod python_plugin {
mut receiver: Receiver<WalEvent>,
) -> Result<(), PluginError> {
info!(?self.trigger_definition.trigger_name, ?self.trigger_definition.database_name, ?self.trigger_definition.plugin_filename, "starting wal contents plugin");
let run_in_dedicated_task = self
.trigger_definition
.flags
.contains(&TriggerFlag::ExecuteAsynchronously);
let mut futures = FuturesUnordered::new();
loop {
let event = match receiver.recv().await {
Some(event) => event,
None => {
warn!(?self.trigger_definition, "trigger plugin receiver closed");
break;
}
};
match event {
WalEvent::WriteWalContents(wal_contents) => {
if let Err(e) = self.process_wal_contents(wal_contents).await {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
tokio::select! {
event = receiver.recv() => {
match event {
Some(WalEvent::WriteWalContents(wal_contents)) => {
if run_in_dedicated_task {
let clone = self.clone();
futures.push(tokio::task::spawn(async move {
clone.process_wal_contents(wal_contents).await
}));
} else if let Err(e) = self.process_wal_contents(wal_contents).await {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
}
}
Some(WalEvent::Shutdown(sender)) => {
sender.send(()).map_err(|_| PluginError::FailedToShutdown)?;
break;
}
None => {break;}
}
}
WalEvent::Shutdown(sender) => {
sender.send(()).map_err(|_| PluginError::FailedToShutdown)?;
break;
Some(result) = futures.next() => {
if let Err(e) = result {
error!(?self.trigger_definition, "error processing wal contents: {}", e);
}
}
}
}
@ -261,6 +277,11 @@ mod python_plugin {
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), PluginError> {
let run_in_dedicated_task = self
.trigger_definition
.flags
.contains(&TriggerFlag::ExecuteAsynchronously);
let mut futures = FuturesUnordered::new();
loop {
let Some(next_run_instant) = runner.next_run_time() else {
break;
@ -271,7 +292,21 @@ mod python_plugin {
let Some(schema) = self.write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(PluginError::MissingDb);
};
runner.run_at_time(self, schema).await?;
let Some(trigger_time) = runner.next_trigger_time else {
return Err(anyhow!("running a cron trigger that is finished.").into());
};
runner.advance_time();
if run_in_dedicated_task {
let trigger =self.clone();
let fut = async move {tokio::task::spawn_blocking(move || {
ScheduleTriggerRunner::run_at_time(trigger, trigger_time, schema)}).await
};
futures.push(fut);
} else {
ScheduleTriggerRunner::run_at_time(self.clone(), trigger_time, schema).await?;
}
}
event = receiver.recv() => {
match event {
@ -285,6 +320,11 @@ mod python_plugin {
}
}
}
Some(result) = futures.next() => {
if let Err(e) = result {
error!(?self.trigger_definition, "error running async scheduled: {}", e);
}
}
}
}
@ -310,19 +350,27 @@ mod python_plugin {
error!(?self.trigger_definition, "missing db schema");
return Err(PluginError::MissingDb);
};
let result = execute_request_trigger(
self.plugin_code.code().as_ref(),
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
)),
&self.trigger_definition.trigger_arguments,
request.query_params,
request.headers,
request.body,
);
let plugin_code = self.plugin_code.code();
let query_executor = Arc::clone(&self.query_executor);
let logger = Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
));
let trigger_arguments = self.trigger_definition.trigger_arguments.clone();
let result = tokio::task::spawn_blocking(move || {
execute_request_trigger(
plugin_code.as_ref(),
schema,
query_executor,
logger,
&trigger_arguments,
request.query_params,
request.headers,
request.body,
)
})
.await?;
// produce the HTTP response
let response = match result {
@ -387,7 +435,7 @@ mod python_plugin {
return Err(PluginError::MissingDb);
};
for wal_op in &wal_contents.ops {
for (op_index, wal_op) in wal_contents.ops.iter().enumerate() {
match wal_op {
WalOp::Write(write_batch) => {
// determine if this write batch is for this database
@ -397,46 +445,60 @@ mod python_plugin {
continue;
}
let table_filter = match &self.trigger_definition.trigger {
TriggerSpecificationDefinition::AllTablesWalWrite => {
// no filter
None
}
TriggerSpecificationDefinition::SingleTableWalWrite {
table_name,
} => {
let table_id = schema
.table_name_to_id(table_name.as_ref())
.context("table not found")?;
Some(table_id)
}
// This should not occur
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
}
};
TriggerSpecificationDefinition::AllTablesWalWrite => {
// no filter
None
}
TriggerSpecificationDefinition::SingleTableWalWrite {
table_name,
} => {
let table_id = schema
.table_name_to_id(table_name.as_ref())
.context("table not found")?;
Some(table_id)
}
// This should not occur
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
}
};
let result = execute_python_with_batch(
self.plugin_code.code().as_ref(),
write_batch,
Arc::clone(&schema),
Arc::clone(&self.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
)),
table_filter,
&self.trigger_definition.trigger_arguments,
)?;
let logger = Some(ProcessingEngineLogger::new(
Arc::clone(&self.sys_event_store),
self.trigger_definition.trigger_name.as_str(),
));
let plugin_code = Arc::clone(&self.plugin_code.code());
let query_executor = Arc::clone(&self.query_executor);
let schema_clone = Arc::clone(&schema);
let trigger_arguments = self.trigger_definition.trigger_arguments.clone();
let wal_contents_clone = Arc::clone(&wal_contents);
let result = tokio::task::spawn_blocking(move || {
let write_batch = match &wal_contents_clone.ops[op_index] {
WalOp::Write(wb) => wb,
_ => unreachable!("Index was checked."),
};
execute_python_with_batch(
plugin_code.as_ref(),
write_batch,
schema_clone,
query_executor,
logger,
table_filter,
&trigger_arguments,
)
})
.await??;
let errors = self.handle_return_state(result).await;
// TODO: here is one spot we'll pick up errors to put into the plugin system table
@ -572,34 +634,34 @@ mod python_plugin {
}
async fn run_at_time(
&mut self,
plugin: &TriggerPlugin,
plugin: TriggerPlugin,
trigger_time: DateTime<Utc>,
db_schema: Arc<DatabaseSchema>,
) -> Result<(), PluginError> {
let Some(trigger_time) = self.next_trigger_time else {
return Err(anyhow!("running a cron trigger that is finished.").into());
};
let result = execute_schedule_trigger(
plugin.plugin_code.code().as_ref(),
trigger_time,
Arc::clone(&db_schema),
Arc::clone(&plugin.query_executor),
Some(ProcessingEngineLogger::new(
Arc::clone(&plugin.sys_event_store),
plugin.trigger_definition.trigger_name.as_str(),
)),
&plugin.trigger_definition.trigger_arguments,
)?;
let plugin_code = plugin.plugin_code.code();
let query_executor = Arc::clone(&plugin.query_executor);
let logger = Some(ProcessingEngineLogger::new(
Arc::clone(&plugin.sys_event_store),
plugin.trigger_definition.trigger_name.as_str(),
));
let trigger_arguments = plugin.trigger_definition.trigger_arguments.clone();
let result = tokio::task::spawn_blocking(move || {
execute_schedule_trigger(
plugin_code.as_ref(),
trigger_time,
db_schema,
query_executor,
logger,
&trigger_arguments,
)
})
.await??;
let errors = plugin.handle_return_state(result).await;
// TODO: here is one spot we'll pick up errors to put into the plugin system table
for error in errors {
error!(?plugin.trigger_definition, "error running schedule plugin: {}", error);
}
self.advance_time();
Ok(())
}

View File

@ -665,7 +665,7 @@ pub fn execute_schedule_trigger(
logger.log(
LogLevel::Info,
format!(
"finished execution in {:?}",
"finished execution in {}",
format_duration(runtime.unwrap_or_default())
),
);

View File

@ -1002,6 +1002,7 @@ where
db,
plugin_filename,
trigger_name,
flags,
trigger_specification,
trigger_arguments,
disabled,
@ -1025,6 +1026,7 @@ where
db.as_str(),
trigger_name.clone(),
plugin_filename,
flags,
trigger_spec,
trigger_arguments,
disabled,

View File

@ -12,6 +12,7 @@ iox_query_params.workspace = true
# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
serde.workspace = true

View File

@ -1,13 +1,13 @@
use crate::write::Precision;
use hashbrown::HashMap;
use hyper::header::ACCEPT;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use influxdb3_cache::distinct_cache::MaxCardinality;
use influxdb3_wal::TriggerFlag;
use iox_query_params::StatementParams;
use serde::{Deserialize, Serialize};
use crate::write::Precision;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("invalid mime type ({0})")]
@ -158,6 +158,7 @@ pub struct ProcessingEngineTriggerCreateRequest {
pub db: String,
pub plugin_filename: String,
pub trigger_name: String,
pub flags: Vec<TriggerFlag>,
pub trigger_specification: String,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,

View File

@ -642,10 +642,17 @@ pub struct TriggerDefinition {
pub plugin_filename: String,
pub database_name: String,
pub trigger: TriggerSpecificationDefinition,
pub flags: Vec<TriggerFlag>,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum TriggerFlag {
ExecuteAsynchronously,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct DeleteTriggerDefinition {
pub trigger_name: String,