feat(processing_engine): Add REST API endpoints for activating and deactivating triggers. (#25711)

pull/25729/head
Jackson Newhouse 2025-01-02 09:23:18 -08:00 committed by GitHub
parent de227b95d9
commit 29dacc318a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 814 additions and 63 deletions

View File

@ -9,9 +9,10 @@ use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition,
DeleteTableDefinition, FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete,
MetaCacheDefinition, MetaCacheDelete, OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
TriggerIdentifier,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
@ -99,6 +100,18 @@ pub enum Error {
trigger_name: String,
},
#[error(
"Cannot delete plugin {} in database {} because it is used by trigger {}",
plugin_name,
database_name,
trigger_name
)]
ProcessingEnginePluginInUse {
database_name: String,
plugin_name: String,
trigger_name: String,
},
#[error(
"Processing Engine Plugin {} not in DB schema for {}",
plugin_name,
@ -317,8 +330,14 @@ impl Catalog {
.flat_map(|schema| {
schema
.processing_engine_triggers
.keys()
.map(move |key| (schema.name.to_string(), key.to_string()))
.iter()
.filter_map(move |(key, trigger)| {
if trigger.disabled {
None
} else {
Some((schema.name.to_string(), key.to_string()))
}
})
})
.collect();
result
@ -692,8 +711,15 @@ impl UpdateDatabaseSchema for CatalogOp {
}
CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema),
CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema),
CatalogOp::DeletePlugin(delete_plugin) => delete_plugin.update_schema(schema),
CatalogOp::CreatePlugin(create_plugin) => create_plugin.update_schema(schema),
CatalogOp::CreateTrigger(create_trigger) => create_trigger.update_schema(schema),
CatalogOp::EnableTrigger(trigger_identifier) => {
EnableTrigger(trigger_identifier.clone()).update_schema(schema)
}
CatalogOp::DisableTrigger(trigger_identifier) => {
DisableTrigger(trigger_identifier.clone()).update_schema(schema)
}
}
}
}
@ -760,6 +786,29 @@ impl UpdateDatabaseSchema for DeleteTableDefinition {
}
}
impl UpdateDatabaseSchema for DeletePluginDefinition {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
// check that there aren't any triggers with this name.
for (trigger_name, trigger) in &schema.processing_engine_triggers {
if trigger.plugin_name == self.plugin_name {
return Err(Error::ProcessingEnginePluginInUse {
database_name: schema.name.to_string(),
plugin_name: self.plugin_name.to_string(),
trigger_name: trigger_name.to_string(),
});
}
}
schema
.to_mut()
.processing_engine_plugins
.remove(&self.plugin_name);
Ok(schema)
}
}
impl UpdateDatabaseSchema for PluginDefinition {
fn update_schema<'a>(
&self,
@ -785,6 +834,57 @@ impl UpdateDatabaseSchema for PluginDefinition {
}
}
struct EnableTrigger(TriggerIdentifier);
struct DisableTrigger(TriggerIdentifier);
impl UpdateDatabaseSchema for EnableTrigger {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else {
return Err(Error::ProcessingEngineTriggerNotFound {
database_name: self.0.db_name.to_string(),
trigger_name: self.0.trigger_name.to_string(),
});
};
if !trigger.disabled {
return Ok(schema);
}
let mut_trigger = schema
.to_mut()
.processing_engine_triggers
.get_mut(&self.0.trigger_name)
.expect("already checked containment");
mut_trigger.disabled = false;
Ok(schema)
}
}
impl UpdateDatabaseSchema for DisableTrigger {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else {
return Err(Error::ProcessingEngineTriggerNotFound {
database_name: self.0.db_name.to_string(),
trigger_name: self.0.trigger_name.to_string(),
});
};
if trigger.disabled {
return Ok(schema);
}
let mut_trigger = schema
.to_mut()
.processing_engine_triggers
.get_mut(&self.0.trigger_name)
.expect("already checked containment");
mut_trigger.disabled = true;
Ok(schema)
}
}
impl UpdateDatabaseSchema for TriggerDefinition {
fn update_schema<'a>(
&self,

View File

@ -105,6 +105,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
plugin_name: plugin.plugin_name.to_string(),
plugin,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
disabled: trigger.disabled,
},
)
})
@ -171,6 +172,7 @@ struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String,
pub plugin_name: String,
pub trigger_specification: String,
pub disabled: bool,
}
/// Representation of Arrow's `DataType` for table snapshots.
@ -434,6 +436,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
plugin_name: trigger.plugin_name.to_string(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
disabled: trigger.disabled,
}
}
}

View File

@ -973,6 +973,19 @@ where
.body(Body::empty())?)
}
async fn delete_processing_engine_plugin(&self, req: Request<Body>) -> Result<Response<Body>> {
let ProcessingEnginePluginDeleteRequest { db, plugin_name } =
if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
self.read_body_json(req).await?
};
self.write_buffer.delete_plugin(&db, &plugin_name).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn configure_processing_engine_trigger(
&self,
req: Request<Body>,
@ -982,6 +995,7 @@ where
plugin_name,
trigger_name,
trigger_specification,
disabled,
} = if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
@ -993,13 +1007,47 @@ where
trigger_name.clone(),
plugin_name,
trigger_specification,
disabled,
)
.await?;
if !disabled {
self.write_buffer
.run_trigger(
Arc::clone(&self.write_buffer),
db.as_str(),
trigger_name.as_str(),
)
.await?;
}
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn deactivate_processing_engine_trigger(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
self.write_buffer
.run_trigger(
.deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str())
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn activate_processing_engine_trigger(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
self.write_buffer
.activate_trigger(
Arc::clone(&self.write_buffer),
db.as_str(),
trigger_name.as_str(),
delete_req.db.as_str(),
delete_req.trigger_name.as_str(),
)
.await?;
Ok(Response::builder()
@ -1441,6 +1489,12 @@ struct ProcessingEnginePluginCreateRequest {
plugin_type: PluginType,
}
#[derive(Debug, Deserialize)]
struct ProcessingEnginePluginDeleteRequest {
db: String,
plugin_name: String,
}
/// Request definition for `POST /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize)]
struct ProcessEngineTriggerCreateRequest {
@ -1448,6 +1502,13 @@ struct ProcessEngineTriggerCreateRequest {
plugin_name: String,
trigger_name: String,
trigger_specification: TriggerSpecificationDefinition,
disabled: bool,
}
#[derive(Debug, Deserialize)]
struct ProcessingEngineTriggerIdentifier {
db: String,
trigger_name: String,
}
#[derive(Debug, Deserialize)]
@ -1563,6 +1624,15 @@ pub(crate) async fn route_request<T: TimeProvider>(
(Method::POST, "/api/v3/configure/processing_engine_plugin") => {
http_server.configure_processing_engine_plugin(req).await
}
(Method::DELETE, "/api/v3/configure/processing_engine_plugin") => {
http_server.delete_processing_engine_plugin(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/deactivate") => {
http_server.deactivate_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/activate") => {
http_server.activate_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger") => {
http_server.configure_processing_engine_trigger(req).await
}

View File

@ -1,4 +1,4 @@
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::common::Result;
@ -94,6 +94,7 @@ fn trigger_schema() -> SchemaRef {
Field::new("trigger_name", DataType::Utf8, false),
Field::new("plugin_name", DataType::Utf8, false),
Field::new("trigger_specification", DataType::Utf8, false),
Field::new("disabled", DataType::Boolean, false),
];
Schema::new(columns).into()
}
@ -124,10 +125,16 @@ impl IoxSystemTable for ProcessingEngineTriggerTable {
.iter()
.map(|trigger| serde_json::to_string(&trigger.trigger).ok())
.collect::<StringArray>();
let disabled = self
.triggers
.iter()
.map(|trigger| Some(trigger.disabled))
.collect::<BooleanArray>();
let columns: Vec<ArrayRef> = vec![
Arc::new(trigger_column),
Arc::new(plugin_column),
Arc::new(specification_column),
Arc::new(disabled),
];
Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?)
}

View File

@ -106,7 +106,7 @@ pub trait Wal: Debug + Send + Sync + 'static {
#[async_trait]
pub trait WalFileNotifier: Debug + Send + Sync + 'static {
/// Notify the handler that a new WAL file has been persisted with the given contents.
fn notify(&self, write: WalContents);
async fn notify(&self, write: WalContents);
/// Notify the handler that a new WAL file has been persisted with the given contents and tell
/// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
@ -301,7 +301,10 @@ pub enum CatalogOp {
DeleteDatabase(DeleteDatabaseDefinition),
DeleteTable(DeleteTableDefinition),
CreatePlugin(PluginDefinition),
DeletePlugin(DeletePluginDefinition),
CreateTrigger(TriggerDefinition),
EnableTrigger(TriggerIdentifier),
DisableTrigger(TriggerIdentifier),
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
@ -587,6 +590,11 @@ pub struct PluginDefinition {
pub plugin_type: PluginType,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct DeletePluginDefinition {
pub plugin_name: String,
}
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PluginType {
@ -600,6 +608,13 @@ pub struct TriggerDefinition {
pub trigger: TriggerSpecificationDefinition,
// TODO: decide whether this should be populated from a reference rather than stored on its own.
pub plugin: PluginDefinition,
pub disabled: bool,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct TriggerIdentifier {
pub db_name: String,
pub trigger_name: String,
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]

View File

@ -134,7 +134,7 @@ impl WalObjectStore {
match wal_contents.snapshot {
// This branch uses so much time
None => self.file_notifier.notify(wal_contents),
None => self.file_notifier.notify(wal_contents).await,
Some(snapshot_details) => {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;
@ -151,7 +151,7 @@ impl WalObjectStore {
if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number {
// Instead just notify about the WAL, as this snapshot has already been taken
// and WAL files may have been cleared.
self.file_notifier.notify(wal_contents);
self.file_notifier.notify(wal_contents).await;
} else {
let snapshot_done = self
.file_notifier
@ -297,7 +297,7 @@ impl WalObjectStore {
"notify sent to buffer for wal file {}",
wal_contents.wal_file_number.as_u64()
);
self.file_notifier.notify(wal_contents);
self.file_notifier.notify(wal_contents).await;
None
}
};
@ -1100,7 +1100,7 @@ mod tests {
#[async_trait]
impl WalFileNotifier for TestNotifier {
fn notify(&self, write: WalContents) {
async fn notify(&self, write: WalContents) {
self.notified_writes.lock().push(write);
}

View File

@ -30,10 +30,9 @@ use influxdb3_cache::last_cache::{self, LastCacheProvider};
use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider};
use influxdb3_cache::parquet_cache::ParquetCacheOracle;
use influxdb3_catalog::catalog;
use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::FieldDataType;
use influxdb3_wal::TableDefinition;
use influxdb3_wal::{
object_store::WalObjectStore, DeleteDatabaseDefinition, PluginDefinition, PluginType,
TriggerDefinition, TriggerSpecificationDefinition, WalContents,
@ -44,6 +43,8 @@ use influxdb3_wal::{
};
use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition};
use influxdb3_wal::{DatabaseDefinition, FieldDefinition};
use influxdb3_wal::{DeletePluginDefinition, TableDefinition};
use influxdb3_wal::{FieldDataType, TriggerIdentifier};
use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
use iox_query::QueryChunk;
use iox_time::{Time, TimeProvider};
@ -59,13 +60,11 @@ use schema::Schema;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::sync::watch::Receiver;
#[cfg(feature = "system-py")]
use {
crate::write_buffer::plugins::PluginContext,
influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound,
};
use crate::write_buffer::plugins::PluginContext;
#[derive(Debug, Error)]
pub enum Error {
@ -848,12 +847,38 @@ impl ProcessingEngineManager for WriteBufferImpl {
Ok(())
}
async fn delete_plugin(&self, db: &str, plugin_name: &str) -> crate::Result<(), Error> {
let (db_id, db_schema) =
self.catalog
.db_id_and_schema(db)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db.to_string(),
})?;
let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition {
plugin_name: plugin_name.to_string(),
});
let catalog_batch = CatalogBatch {
time_ns: self.time_provider.now().timestamp_nanos(),
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
ops: vec![catalog_op],
};
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? {
self.wal
.write_ops(vec![WalOp::Catalog(catalog_batch)])
.await?;
}
Ok(())
}
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
trigger_specification: TriggerSpecificationDefinition,
disabled: bool,
) -> crate::Result<(), Error> {
let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else {
return Err(Error::DatabaseNotFound {
@ -872,6 +897,7 @@ impl ProcessingEngineManager for WriteBufferImpl {
plugin_name,
plugin: plugin.clone(),
trigger: trigger_specification,
disabled,
});
let creation_time = self.time_provider.now();
let catalog_batch = CatalogBatch {
@ -910,7 +936,10 @@ impl ProcessingEngineManager for WriteBufferImpl {
trigger_name: trigger_name.to_string(),
})?
.clone();
let trigger_rx = self.buffer.subscribe_to_plugin_events();
let trigger_rx = self
.buffer
.subscribe_to_plugin_events(trigger_name.to_string())
.await;
let plugin_context = PluginContext {
trigger_rx,
write_buffer,
@ -920,12 +949,103 @@ impl ProcessingEngineManager for WriteBufferImpl {
Ok(())
}
async fn deactivate_trigger(
&self,
db_name: &str,
trigger_name: &str,
) -> std::result::Result<(), Error> {
let (db_id, db_schema) =
self.catalog
.db_id_and_schema(db_name)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
let trigger = db_schema
.processing_engine_triggers
.get(trigger_name)
.ok_or_else(|| ProcessingEngineTriggerNotFound {
database_name: db_name.to_string(),
trigger_name: trigger_name.to_string(),
})?;
// Already disabled, so this is a no-op
if trigger.disabled {
return Ok(());
};
let mut deactivated = trigger.clone();
deactivated.disabled = true;
let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier {
db_name: db_name.to_string(),
trigger_name: trigger_name.to_string(),
});
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
time_ns: self.time_provider.now().timestamp_nanos(),
ops: vec![catalog_op],
})? {
let wal_op = WalOp::Catalog(catalog_batch);
self.wal.write_ops(vec![wal_op]).await?;
}
// TODO: handle processing engine errors
self.buffer
.deactivate_trigger(trigger_name.to_string())
.await
.unwrap();
Ok(())
}
async fn activate_trigger(
&self,
write_buffer: Arc<dyn WriteBuffer>,
db_name: &str,
trigger_name: &str,
) -> std::result::Result<(), Error> {
let (db_id, db_schema) =
self.catalog
.db_id_and_schema(db_name)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
let trigger = db_schema
.processing_engine_triggers
.get(trigger_name)
.ok_or_else(|| ProcessingEngineTriggerNotFound {
database_name: db_name.to_string(),
trigger_name: trigger_name.to_string(),
})?;
// Already disabled, so this is a no-op
if !trigger.disabled {
return Ok(());
};
let mut activated = trigger.clone();
activated.disabled = false;
let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier {
db_name: db_name.to_string(),
trigger_name: trigger_name.to_string(),
});
if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_schema.name),
time_ns: self.time_provider.now().timestamp_nanos(),
ops: vec![catalog_op],
})? {
let wal_op = WalOp::Catalog(catalog_batch);
self.wal.write_ops(vec![wal_op]).await?;
}
self.run_trigger(write_buffer, db_name, trigger_name)
.await?;
Ok(())
}
}
#[derive(Clone)]
#[allow(unused)]
pub(crate) enum PluginEvent {
WriteWalContents(Arc<WalContents>),
Shutdown(oneshot::Sender<()>),
}
impl WriteBuffer for WriteBufferImpl {}
@ -2681,4 +2801,391 @@ mod tests {
}
batches
}
#[tokio::test]
async fn test_create_plugin() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
let empty_udf = r#"def example(iterator, output):
return"#;
write_buffer
.insert_plugin(
"foo",
"my_plugin".to_string(),
empty_udf.to_string(),
"example".to_string(),
PluginType::WalRows,
)
.await?;
let plugin = write_buffer
.catalog
.db_schema("foo")
.expect("should have db named foo")
.processing_engine_plugins
.get("my_plugin")
.unwrap()
.clone();
let expected = PluginDefinition {
plugin_name: "my_plugin".to_string(),
code: empty_udf.to_string(),
function_name: "example".to_string(),
plugin_type: PluginType::WalRows,
};
assert_eq!(expected, plugin);
// confirm that creating it again is a no-op.
write_buffer
.insert_plugin(
"foo",
"my_plugin".to_string(),
empty_udf.to_string(),
"example".to_string(),
PluginType::WalRows,
)
.await?;
// confirm that a different argument is an error
let Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineCallExists { .. })) =
write_buffer
.insert_plugin(
"foo",
"my_plugin".to_string(),
empty_udf.to_string(),
"bad_example".to_string(),
PluginType::WalRows,
)
.await
else {
panic!("failed to insert plugin");
};
// Confirm the same contents can be added to a new name.
write_buffer
.insert_plugin(
"foo",
"my_second_plugin".to_string(),
empty_udf.to_string(),
"example".to_string(),
PluginType::WalRows,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_delete_plugin() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
// Create the DB by inserting a line.
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
// First create a plugin
write_buffer
.insert_plugin(
"foo",
"test_plugin".to_string(),
"def process(iterator, output): pass".to_string(),
"process".to_string(),
PluginType::WalRows,
)
.await?;
// Then delete it
write_buffer.delete_plugin("foo", "test_plugin").await?;
// Verify plugin is gone from schema
let schema = write_buffer.catalog().db_schema("foo").unwrap();
assert!(!schema.processing_engine_plugins.contains_key("test_plugin"));
// Verify we can add a newly named plugin
write_buffer
.insert_plugin(
"foo",
"test_plugin".to_string(),
"def new_process(iterator, output): pass".to_string(),
"new_process".to_string(),
PluginType::WalRows,
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_delete_plugin_with_active_trigger() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
// Create the DB by inserting a line.
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
// Create a plugin
write_buffer
.insert_plugin(
"foo",
"test_plugin".to_string(),
"def process(iterator, output): pass".to_string(),
"process".to_string(),
PluginType::WalRows,
)
.await
.unwrap();
// Create a trigger using the plugin
write_buffer
.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
TriggerSpecificationDefinition::AllTablesWalWrite,
false,
)
.await
.unwrap();
// Try to delete the plugin - should fail because trigger exists
let result = write_buffer.delete_plugin("foo", "test_plugin").await;
assert!(matches!(
result,
Err(Error::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse {
database_name,
plugin_name,
trigger_name,
})) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger"
));
Ok(())
}
#[tokio::test]
async fn test_trigger_lifecycle() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
// convert to Arc<WriteBuffer>
let write_buffer: Arc<dyn WriteBuffer> = write_buffer.clone();
// Create the DB by inserting a line.
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
// Create a plugin
write_buffer
.insert_plugin(
"foo",
"test_plugin".to_string(),
"def process(iterator, output): pass".to_string(),
"process".to_string(),
PluginType::WalRows,
)
.await?;
// Create an enabled trigger
write_buffer
.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
TriggerSpecificationDefinition::AllTablesWalWrite,
false,
)
.await?;
// Run the trigger
write_buffer
.run_trigger(Arc::clone(&write_buffer), "foo", "test_trigger")
.await?;
// Deactivate the trigger
let result = write_buffer.deactivate_trigger("foo", "test_trigger").await;
assert!(result.is_ok());
// Verify trigger is disabled in schema
let schema = write_buffer.catalog().db_schema("foo").unwrap();
let trigger = schema
.processing_engine_triggers
.get("test_trigger")
.unwrap();
assert!(trigger.disabled);
// Activate the trigger
let result = write_buffer
.activate_trigger(Arc::clone(&write_buffer), "foo", "test_trigger")
.await;
assert!(result.is_ok());
// Verify trigger is enabled and running
let schema = write_buffer.catalog().db_schema("foo").unwrap();
let trigger = schema
.processing_engine_triggers
.get("test_trigger")
.unwrap();
assert!(!trigger.disabled);
Ok(())
}
#[tokio::test]
async fn test_create_disabled_trigger() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
// Create the DB by inserting a line.
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
// Create a plugin
write_buffer
.insert_plugin(
"foo",
"test_plugin".to_string(),
"def process(iterator, output): pass".to_string(),
"process".to_string(),
PluginType::WalRows,
)
.await?;
// Create a disabled trigger
write_buffer
.insert_trigger(
"foo",
"test_trigger".to_string(),
"test_plugin".to_string(),
TriggerSpecificationDefinition::AllTablesWalWrite,
true,
)
.await?;
// Verify trigger is created but disabled
let schema = write_buffer.catalog().db_schema("foo").unwrap();
let trigger = schema
.processing_engine_triggers
.get("test_trigger")
.unwrap();
assert!(trigger.disabled);
// Verify trigger is not in active triggers list
assert!(write_buffer.catalog().triggers().is_empty());
Ok(())
}
#[tokio::test]
async fn test_activate_nonexistent_trigger() -> Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
};
let (write_buffer, _, _) =
setup_cache_optional(start_time, test_store, wal_config, false).await;
let write_buffer: Arc<dyn WriteBuffer> = write_buffer.clone();
// Create the DB by inserting a line.
write_buffer
.write_lp(
NamespaceName::new("foo").unwrap(),
"cpu,warehouse=us-east,room=01a,device=10001 reading=37\n",
start_time,
false,
Precision::Nanosecond,
)
.await?;
let result = write_buffer
.activate_trigger(Arc::clone(&write_buffer), "foo", "nonexistent_trigger")
.await;
assert!(matches!(
result,
Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound {
database_name,
trigger_name,
})) if database_name == "foo" && trigger_name == "nonexistent_trigger"
));
Ok(())
}
}

View File

@ -1,11 +1,10 @@
use crate::write_buffer::PluginEvent;
use crate::{write_buffer, WriteBuffer};
use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition};
use observability_deps::tracing::warn;
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
#[derive(Debug, Error)]
pub enum Error {
@ -18,6 +17,9 @@ pub enum Error {
#[error(transparent)]
WriteBufferError(#[from] write_buffer::Error),
#[error("failed to send shutdown message back")]
FailedToShutdown,
}
/// `[ProcessingEngineManager]` is used to interact with the processing engine,
@ -35,12 +37,19 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
plugin_type: PluginType,
) -> crate::Result<(), write_buffer::Error>;
async fn delete_plugin(
&self,
db: &str,
plugin_name: &str,
) -> crate::Result<(), write_buffer::Error>;
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
trigger_specification: TriggerSpecificationDefinition,
disabled: bool,
) -> crate::Result<(), write_buffer::Error>;
/// Starts running the trigger, which will run in the background.
@ -50,6 +59,19 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
db_name: &str,
trigger_name: &str,
) -> crate::Result<(), write_buffer::Error>;
async fn deactivate_trigger(
&self,
db_name: &str,
trigger_name: &str,
) -> Result<(), write_buffer::Error>;
async fn activate_trigger(
&self,
write_buffer: Arc<dyn WriteBuffer>,
db_name: &str,
trigger_name: &str,
) -> Result<(), write_buffer::Error>;
}
#[cfg(feature = "system-py")]
@ -72,31 +94,26 @@ pub(crate) fn run_plugin(
pub(crate) struct PluginContext {
// tokio channel for inputs
pub(crate) trigger_rx: tokio::sync::broadcast::Receiver<PluginEvent>,
pub(crate) trigger_rx: mpsc::Receiver<PluginEvent>,
// handler to write data back to the DB.
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
}
#[async_trait::async_trait]
trait RunnablePlugin {
// Returns true if it should exit
async fn process_event(
&self,
event: PluginEvent,
write_buffer: Arc<dyn WriteBuffer>,
) -> Result<(), Error>;
) -> Result<bool, Error>;
async fn run_plugin(&self, context: &mut PluginContext) -> Result<(), Error> {
loop {
match context.trigger_rx.recv().await {
Err(RecvError::Closed) => {
break;
}
Err(RecvError::Lagged(_)) => {
warn!("plugin lagged");
}
Ok(event) => {
self.process_event(event, context.write_buffer.clone())
.await?;
}
while let Some(event) = context.trigger_rx.recv().await {
if self
.process_event(event, context.write_buffer.clone())
.await?
{
break;
}
}
Ok(())
@ -124,7 +141,7 @@ mod python_plugin {
&self,
event: PluginEvent,
write_buffer: Arc<dyn WriteBuffer>,
) -> Result<(), Error> {
) -> Result<bool, Error> {
let Some(schema) = write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
};
@ -168,6 +185,10 @@ mod python_plugin {
}
}
}
PluginEvent::Shutdown(sender) => {
sender.send(()).map_err(|_| Error::FailedToShutdown)?;
return Ok(true);
}
}
if !output_lines.is_empty() {
let ingest_time = SystemTime::now()
@ -184,7 +205,7 @@ mod python_plugin {
.await?;
}
Ok(())
Ok(false)
}
}
}

View File

@ -29,7 +29,7 @@ use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use parquet::format::FileMetaData;
use schema::sort::SortKey;
use schema::Schema;
@ -37,7 +37,7 @@ use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{mpsc, oneshot, Mutex};
#[derive(Debug)]
pub struct QueryableBuffer {
@ -52,7 +52,7 @@ pub struct QueryableBuffer {
/// Sends a notification to this watch channel whenever a snapshot info is persisted
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
plugin_event_tx: Mutex<Option<broadcast::Sender<PluginEvent>>>,
plugin_event_tx: Mutex<HashMap<String, mpsc::Sender<PluginEvent>>>,
}
pub struct QueryableBufferArgs {
@ -91,7 +91,7 @@ impl QueryableBuffer {
parquet_cache,
persisted_snapshot_notify_rx,
persisted_snapshot_notify_tx,
plugin_event_tx: Mutex::new(None),
plugin_event_tx: Mutex::new(HashMap::new()),
}
}
@ -388,26 +388,55 @@ impl QueryableBuffer {
}
#[cfg(feature = "system-py")]
pub(crate) fn subscribe_to_plugin_events(&self) -> broadcast::Receiver<PluginEvent> {
let mut sender = self.plugin_event_tx.lock();
pub(crate) async fn subscribe_to_plugin_events(
&self,
trigger_name: String,
) -> mpsc::Receiver<PluginEvent> {
let mut senders = self.plugin_event_tx.lock().await;
if sender.is_none() {
let (tx, rx) = broadcast::channel(1024);
*sender = Some(tx);
return rx;
// TODO: should we be checking for replacements?
let (plugin_tx, plugin_rx) = mpsc::channel(4);
senders.insert(trigger_name, plugin_tx);
plugin_rx
}
/// Deactivates a running trigger by sending it a oneshot sender. It should send back a message and then immediately shut down.
pub(crate) async fn deactivate_trigger(
&self,
#[allow(unused)] trigger_name: String,
) -> Result<(), anyhow::Error> {
#[cfg(feature = "system-py")]
{
let Some(sender) = self.plugin_event_tx.lock().await.remove(&trigger_name) else {
anyhow::bail!("no trigger named '{}' found", trigger_name);
};
let (oneshot_tx, oneshot_rx) = oneshot::channel();
sender.send(PluginEvent::Shutdown(oneshot_tx)).await?;
oneshot_rx.await?;
}
Ok(())
}
async fn send_to_plugins(&self, wal_contents: &WalContents) {
let senders = self.plugin_event_tx.lock().await;
if !senders.is_empty() {
let wal_contents = Arc::new(wal_contents.clone());
for (plugin, sender) in senders.iter() {
if let Err(err) = sender
.send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents)))
.await
{
error!("failed to send plugin event to plugin {}: {}", plugin, err);
}
}
}
sender.as_ref().unwrap().subscribe()
}
}
#[async_trait]
impl WalFileNotifier for QueryableBuffer {
fn notify(&self, write: WalContents) {
if let Some(sender) = self.plugin_event_tx.lock().as_ref() {
if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) {
error!(%err, "Error sending WAL content to plugins");
}
}
async fn notify(&self, write: WalContents) {
self.send_to_plugins(&write).await;
self.buffer_contents(write)
}
@ -416,11 +445,7 @@ impl WalFileNotifier for QueryableBuffer {
write: WalContents,
snapshot_details: SnapshotDetails,
) -> Receiver<SnapshotDetails> {
if let Some(sender) = self.plugin_event_tx.lock().as_ref() {
if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) {
error!(%err, "Error sending WAL content to plugins");
}
}
self.send_to_plugins(&write).await;
self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details)
.await
}
@ -537,6 +562,9 @@ impl BufferState {
}
CatalogOp::CreatePlugin(_) => {}
CatalogOp::CreateTrigger(_) => {}
CatalogOp::EnableTrigger(_) => {}
CatalogOp::DisableTrigger(_) => {}
CatalogOp::DeletePlugin(_) => {}
}
}
}
@ -786,7 +814,7 @@ mod tests {
wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64;
// write the lp into the buffer
queryable_buffer.notify(wal_contents);
queryable_buffer.notify(wal_contents).await;
// now force a snapshot, persisting the data to parquet file. Also, buffer up a new write
let snapshot_sequence_number = SnapshotSequenceNumber::new(1);