feat(processing_engine): Runtime and write-back improvements (#25672)

* Move processing engine invocation to a seperate tokio task.
* Support writing back line protocol from python via insert_line_protocol().
* Update structs to work with bincode.
pull/25678/head
Jackson Newhouse 2024-12-17 16:38:12 -08:00 committed by GitHub
parent 31b9209dd6
commit 8bfccb74ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 445 additions and 183 deletions

3
Cargo.lock generated
View File

@ -3267,8 +3267,10 @@ dependencies = [
name = "influxdb3_py_api"
version = "0.1.0"
dependencies = [
"async-trait",
"influxdb3_catalog",
"influxdb3_wal",
"parking_lot",
"pyo3",
"schema",
]
@ -3472,6 +3474,7 @@ dependencies = [
"parquet",
"parquet_file",
"pretty_assertions",
"pyo3",
"schema",
"serde",
"serde_json",

View File

@ -475,20 +475,18 @@ pub async fn command(config: Config) -> Result<()> {
)
.map_err(Error::InitializeMetaCache)?;
let write_buffer_impl = Arc::new(
WriteBufferImpl::new(WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache,
meta_cache,
time_provider: Arc::<SystemProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
wal_config,
parquet_cache,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,
);
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache,
meta_cache,
time_provider: Arc::<SystemProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
wal_config,
parquet_cache,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
let telemetry_store = setup_telemetry_store(
&config.object_store_config,

View File

@ -108,6 +108,16 @@ pub enum Error {
},
#[error("Processing Engine Unimplemented: {}", feature_description)]
ProcessingEngineUnimplemented { feature_description: String },
#[error(
"Processing Engine Trigger {} not in DB {}",
trigger_name,
database_name
)]
ProcessingEngineTriggerNotFound {
database_name: String,
trigger_name: String,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -356,6 +366,21 @@ impl Catalog {
}
}
pub fn triggers(&self) -> Vec<(String, String)> {
let inner = self.inner.read();
let result = inner
.databases
.values()
.flat_map(|schema| {
schema
.processing_engine_triggers
.keys()
.map(move |key| (schema.name.to_string(), key.to_string()))
})
.collect();
result
}
pub fn inner(&self) -> &RwLock<InnerCatalog> {
&self.inner
}
@ -891,10 +916,10 @@ impl TableDefinition {
.expect("tables defined from ops should not exceed column limits")
}
pub(crate) fn check_and_add_new_fields<'a>(
&'a self,
pub(crate) fn check_and_add_new_fields(
&self,
table_definition: &influxdb3_wal::TableDefinition,
) -> Result<Cow<'a, Self>> {
) -> Result<Cow<'_, Self>> {
// validate the series key is the same
if table_definition.key != self.series_key {
return Err(Error::SeriesKeyMismatch {

View File

@ -9,7 +9,7 @@ use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{
LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, TriggerDefinition,
LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, PluginType, TriggerDefinition,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
@ -163,7 +163,7 @@ struct ProcessingEnginePluginSnapshot {
pub plugin_name: String,
pub code: String,
pub function_name: String,
pub plugin_type: String,
pub plugin_type: PluginType,
}
#[derive(Debug, Serialize, Deserialize)]
@ -411,7 +411,7 @@ impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
plugin_name: plugin.plugin_name.to_string(),
code: plugin.code.to_string(),
function_name: plugin.function_name.to_string(),
plugin_type: serde_json::to_string(&plugin.plugin_type).unwrap(),
plugin_type: plugin.plugin_type,
}
}
}
@ -419,10 +419,10 @@ impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
impl From<ProcessingEnginePluginSnapshot> for PluginDefinition {
fn from(plugin: ProcessingEnginePluginSnapshot) -> Self {
Self {
plugin_name: plugin.plugin_type.to_string(),
plugin_name: plugin.plugin_name.to_string(),
code: plugin.code.to_string(),
function_name: plugin.function_name.to_string(),
plugin_type: serde_json::from_str(&plugin.plugin_type).expect("serialized plugin type"),
plugin_type: plugin.plugin_type,
}
}
}

View File

@ -11,7 +11,9 @@ system-py = ["pyo3"]
[dependencies]
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_catalog = {path = "../influxdb3_catalog"}
schema = { workspace = true }
async-trait.workspace = true
schema.workspace = true
parking_lot.workspace = true
[dependencies.pyo3]
version = "0.23.3"

View File

@ -1,5 +1,6 @@
use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition};
use influxdb3_wal::{FieldData, Row, WriteBatch};
use parking_lot::Mutex;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods};
use pyo3::{pyclass, pymethods, pymodule, Bound, IntoPyObject, PyErr, PyObject, PyResult, Python};
@ -101,7 +102,7 @@ pub struct PyWriteBatch {
#[pymethods]
impl PyWriteBatch {
fn get_iterator_for_table(&self, table_name: &str) -> PyResult<PyWriteBatchIterator> {
fn get_iterator_for_table(&self, table_name: &str) -> PyResult<Option<PyWriteBatchIterator>> {
// Find table ID from name
let table_id = self
.schema
@ -112,9 +113,9 @@ impl PyWriteBatch {
})?;
// Get table chunks
let chunks = self.write_batch.table_chunks.get(table_id).ok_or_else(|| {
PyErr::new::<PyValueError, _>(format!("No data for table '{}'", table_name))
})?;
let Some(chunks) = self.write_batch.table_chunks.get(table_id) else {
return Ok(None);
};
// Get table definition
let table_def = self.schema.tables.get(table_id).ok_or_else(|| {
@ -124,7 +125,7 @@ impl PyWriteBatch {
))
})?;
Ok(PyWriteBatchIterator {
Ok(Some(PyWriteBatchIterator {
table_definition: Arc::clone(table_def),
// TODO: avoid copying all the data at once.
rows: chunks
@ -133,7 +134,22 @@ impl PyWriteBatch {
.flat_map(|chunk| chunk.rows.clone())
.collect(),
current_index: 0,
})
}))
}
}
#[derive(Debug)]
#[pyclass]
pub struct PyLineProtocolOutput {
lines: Arc<Mutex<Vec<String>>>,
}
#[pymethods]
impl PyLineProtocolOutput {
fn insert_line_protocol(&mut self, line: &str) -> PyResult<()> {
let mut lines = self.lines.lock();
lines.push(line.to_string());
Ok(())
}
}
@ -143,13 +159,26 @@ impl PyWriteBatch {
table_name: &str,
setup_code: &str,
call_site: &str,
) -> PyResult<()> {
let iterator = self.get_iterator_for_table(table_name)?;
) -> PyResult<Vec<String>> {
let Some(iterator) = self.get_iterator_for_table(table_name)? else {
return Ok(Vec::new());
};
Python::with_gil(|py| {
py.run(&CString::new(setup_code)?, None, None)?;
let py_func = py.eval(&CString::new(call_site)?, None, None)?;
py_func.call1((iterator,))?;
Ok::<(), PyErr>(())
// Create the output collector with shared state
let lines = Arc::new(Mutex::new(Vec::new()));
let output = PyLineProtocolOutput {
lines: Arc::clone(&lines),
};
// Pass both iterator and output collector to the Python function
py_func.call1((iterator, output.into_pyobject(py)?))?;
let output_lines = lines.lock().clone();
Ok(output_lines)
})
}
}

View File

@ -984,11 +984,18 @@ where
self.write_buffer
.insert_trigger(
db.as_str(),
trigger_name,
trigger_name.clone(),
plugin_name,
trigger_specification,
)
.await?;
self.write_buffer
.run_trigger(
Arc::clone(&self.write_buffer),
db.as_str(),
trigger_name.as_str(),
)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)

View File

@ -801,26 +801,24 @@ mod tests {
let sample_host_id = Arc::from("sample-host-id");
let instance_id = Arc::from("sample-instance-id");
let catalog = Arc::new(Catalog::new(sample_host_id, instance_id));
let write_buffer_impl = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
influxdb3_write::write_buffer::WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
Arc::clone(&time_provider) as _,
Arc::clone(&catalog),
)
.unwrap(),
time_provider: Arc::clone(&time_provider) as _,
executor: Arc::clone(&exec),
wal_config: WalConfig::test_config(),
parquet_cache: Some(parquet_cache),
},
)
.await
.unwrap(),
);
let write_buffer_impl = influxdb3_write::write_buffer::WriteBufferImpl::new(
influxdb3_write::write_buffer::WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
Arc::clone(&time_provider) as _,
Arc::clone(&catalog),
)
.unwrap(),
time_provider: Arc::clone(&time_provider) as _,
executor: Arc::clone(&exec),
wal_config: WalConfig::test_config(),
parquet_cache: Some(parquet_cache),
},
)
.await
.unwrap();
let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
&time_provider,

View File

@ -682,29 +682,27 @@ mod tests {
let host_id = Arc::from("sample-host-id");
let instance_id = Arc::from("instance-id");
let catalog = Arc::new(Catalog::new(host_id, instance_id));
let write_buffer_impl = Arc::new(
WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap(),
time_provider: Arc::<MockProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
wal_config: WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
parquet_cache: Some(parquet_cache),
})
.await
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap(),
);
time_provider: Arc::<MockProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
wal_config: WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
parquet_cache: Some(parquet_cache),
})
.await
.unwrap();
let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files);

View File

@ -554,7 +554,7 @@ pub struct TriggerDefinition {
}
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum TriggerSpecificationDefinition {
SingleTableWalWrite { table_name: String },
AllTablesWalWrite,

View File

@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
[features]
"system-py" = ["influxdb3_py_api/system-py"]
"system-py" = ["influxdb3_py_api/system-py", "pyo3"]
[dependencies]
# Core Crates
@ -61,6 +61,12 @@ tokio.workspace = true
url.workspace = true
uuid.workspace = true
[dependencies.pyo3]
version = "0.23.3"
# this is necessary to automatically initialize the Python interpreter
features = ["auto-initialize"]
optional = true
[dev-dependencies]
# Core Crates
arrow_util.workspace = true

View File

@ -24,8 +24,8 @@ use influxdb3_id::ParquetFileId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, DbId};
use influxdb3_wal::MetaCacheDefinition;
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
use influxdb3_wal::{MetaCacheDefinition, PluginType, TriggerSpecificationDefinition};
use iox_query::QueryChunk;
use iox_time::Time;
use serde::{Deserialize, Serialize};
@ -33,6 +33,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use write_buffer::plugins::ProcessingEngineManager;
#[derive(Debug, Error)]
pub enum Error {
@ -171,30 +172,6 @@ pub trait LastCacheManager: Debug + Send + Sync + 'static {
) -> Result<(), write_buffer::Error>;
}
/// `[ProcessingEngineManager]` is used to interact with the processing engine,
/// in particular plugins and triggers.
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
/// Inserts a plugin
async fn insert_plugin(
&self,
db: &str,
plugin_name: String,
code: String,
function_name: String,
plugin_type: PluginType,
) -> Result<(), write_buffer::Error>;
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
trigger_specification: TriggerSpecificationDefinition,
) -> Result<(), write_buffer::Error>;
}
/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
/// returning an error for any invalid lines. This is the error information for a single invalid line.
#[derive(Debug, Serialize)]

View File

@ -1,6 +1,8 @@
//! Implementation of an in-memory buffer for writes that persists data into a wal if it is configured.
pub mod persisted_files;
#[allow(dead_code)]
pub mod plugins;
pub mod queryable_buffer;
mod table_buffer;
pub mod validator;
@ -9,7 +11,7 @@ use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager, ProcessingEngineManager};
use crate::{chunk::ParquetChunk, write_buffer, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager,
ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
@ -31,7 +33,7 @@ use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::{
object_store::WalObjectStore, DeleteDatabaseDefinition, PluginDefinition, PluginType,
TriggerDefinition, TriggerSpecificationDefinition,
TriggerDefinition, TriggerSpecificationDefinition, WalContents,
};
use influxdb3_wal::{
CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize,
@ -45,6 +47,7 @@ use object_store::path::Path as ObjPath;
use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, error};
use parquet_file::storage::ParquetExecInput;
use plugins::ProcessingEngineManager;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
use std::sync::Arc;
@ -52,6 +55,12 @@ use std::time::Duration;
use thiserror::Error;
use tokio::sync::watch::Receiver;
#[cfg(feature = "system-py")]
use {
crate::write_buffer::plugins::PluginContext,
influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound,
};
#[derive(Debug, Error)]
pub enum Error {
#[error("parsing for line protocol failed")]
@ -166,7 +175,7 @@ impl WriteBufferImpl {
wal_config,
parquet_cache,
}: WriteBufferImplArgs,
) -> Result<Self> {
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
let persisted_snapshots = persister
.load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START)
@ -211,7 +220,7 @@ impl WriteBufferImpl {
)
.await?;
Ok(Self {
let result = Arc::new(Self {
catalog,
parquet_cache,
persister,
@ -222,7 +231,15 @@ impl WriteBufferImpl {
last_cache,
persisted_files,
buffer: queryable_buffer,
})
});
let write_buffer: Arc<dyn WriteBuffer> = result.clone();
let triggers = result.catalog().triggers();
for (db_name, trigger_name) in triggers {
result
.run_trigger(Arc::clone(&write_buffer), &db_name, &trigger_name)
.await?;
}
Ok(result)
}
pub fn catalog(&self) -> Arc<Catalog> {
@ -771,6 +788,46 @@ impl ProcessingEngineManager for WriteBufferImpl {
self.wal.write_ops(vec![wal_op]).await?;
Ok(())
}
#[cfg_attr(not(feature = "system-py"), allow(unused))]
async fn run_trigger(
&self,
write_buffer: Arc<dyn WriteBuffer>,
db_name: &str,
trigger_name: &str,
) -> crate::Result<(), write_buffer::Error> {
#[cfg(feature = "system-py")]
{
let (_db_id, db_schema) =
self.catalog
.db_id_and_schema(db_name)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
let trigger = db_schema
.processing_engine_triggers
.get(trigger_name)
.ok_or_else(|| ProcessingEngineTriggerNotFound {
database_name: db_name.to_string(),
trigger_name: trigger_name.to_string(),
})?
.clone();
let trigger_rx = self.buffer.subscribe_to_plugin_events();
let plugin_context = PluginContext {
trigger_rx,
write_buffer,
};
plugins::run_plugin(db_name.to_string(), trigger, plugin_context);
}
Ok(())
}
}
#[derive(Clone)]
#[allow(unused)]
pub(crate) enum PluginEvent {
WriteWalContents(Arc<WalContents>),
}
impl WriteBuffer for WriteBufferImpl {}
@ -1299,7 +1356,7 @@ mod tests {
// do three writes to force a snapshot
do_writes(
db_name,
&write_buffer,
write_buffer.as_ref(),
&[
TestWrite {
lp: "cpu bar=1",
@ -1324,7 +1381,7 @@ mod tests {
// WAL period left from before:
do_writes(
db_name,
&write_buffer,
write_buffer.as_ref(),
&[
TestWrite {
lp: "cpu bar=4",
@ -1345,7 +1402,7 @@ mod tests {
// and finally, do two more, with a catalog update, forcing persistence
do_writes(
db_name,
&write_buffer,
write_buffer.as_ref(),
&[
TestWrite {
lp: "cpu bar=6,asdf=true",
@ -1578,7 +1635,7 @@ mod tests {
// do some writes to get a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!("{tbl_name},name=espresso price=2.50"),
@ -1645,7 +1702,7 @@ mod tests {
// Do six writes to trigger a snapshot
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!("{tbl_name},name=espresso,type=drink price=2.50"),
@ -1731,7 +1788,7 @@ mod tests {
// do some writes to get a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!("{tbl_name},name=espresso price=2.50"),
@ -1809,7 +1866,7 @@ mod tests {
// do some writes to get a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!("{tbl_name},name=espresso price=2.50"),
@ -1854,7 +1911,7 @@ mod tests {
// do some writes to get a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!("{tbl_name},name=espresso price=2.50"),
@ -1929,7 +1986,7 @@ mod tests {
// make some writes to generate a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!(
@ -2035,7 +2092,7 @@ mod tests {
// make some writes to generate a snapshot:
do_writes(
db_name,
&wbuf,
wbuf.as_ref(),
&[
TestWrite {
lp: format!(
@ -2254,7 +2311,11 @@ mod tests {
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext, Arc<dyn TimeProvider>) {
) -> (
Arc<WriteBufferImpl>,
IOxSessionContext,
Arc<dyn TimeProvider>,
) {
setup_cache_optional(start, object_store, wal_config, true).await
}
@ -2263,7 +2324,11 @@ mod tests {
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
use_cache: bool,
) -> (WriteBufferImpl, IOxSessionContext, Arc<dyn TimeProvider>) {
) -> (
Arc<WriteBufferImpl>,
IOxSessionContext,
Arc<dyn TimeProvider>,
) {
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let (object_store, parquet_cache) = if use_cache {
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(

View File

@ -0,0 +1,190 @@
use crate::write_buffer::PluginEvent;
use crate::{write_buffer, WriteBuffer};
use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition};
use observability_deps::tracing::warn;
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::broadcast::error::RecvError;
#[derive(Debug, Error)]
pub enum Error {
#[error("couldn't find db")]
MissingDb,
#[cfg(feature = "system-py")]
#[error(transparent)]
PyError(#[from] pyo3::PyErr),
#[error(transparent)]
WriteBufferError(#[from] write_buffer::Error),
}
/// `[ProcessingEngineManager]` is used to interact with the processing engine,
/// in particular plugins and triggers.
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
/// Inserts a plugin
async fn insert_plugin(
&self,
db: &str,
plugin_name: String,
code: String,
function_name: String,
plugin_type: PluginType,
) -> crate::Result<(), write_buffer::Error>;
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_name: String,
trigger_specification: TriggerSpecificationDefinition,
) -> crate::Result<(), write_buffer::Error>;
/// Starts running the trigger, which will run in the background.
async fn run_trigger(
&self,
write_buffer: Arc<dyn WriteBuffer>,
db_name: &str,
trigger_name: &str,
) -> crate::Result<(), write_buffer::Error>;
}
#[cfg(feature = "system-py")]
pub(crate) fn run_plugin(
db_name: String,
trigger_definition: TriggerDefinition,
mut context: PluginContext,
) {
let trigger_plugin = TriggerPlugin {
trigger_definition,
db_name,
};
tokio::task::spawn(async move {
trigger_plugin
.run_plugin(&mut context)
.await
.expect("trigger plugin failed");
});
}
pub(crate) struct PluginContext {
// tokio channel for inputs
pub(crate) trigger_rx: tokio::sync::broadcast::Receiver<PluginEvent>,
// handler to write data back to the DB.
pub(crate) write_buffer: Arc<dyn WriteBuffer>,
}
#[async_trait::async_trait]
trait RunnablePlugin {
async fn process_event(
&self,
event: PluginEvent,
write_buffer: Arc<dyn WriteBuffer>,
) -> Result<(), Error>;
async fn run_plugin(&self, context: &mut PluginContext) -> Result<(), Error> {
loop {
match context.trigger_rx.recv().await {
Err(RecvError::Closed) => {
break;
}
Err(RecvError::Lagged(_)) => {
warn!("plugin lagged");
}
Ok(event) => {
self.process_event(event, context.write_buffer.clone())
.await?;
}
}
}
Ok(())
}
}
#[derive(Debug)]
struct TriggerPlugin {
trigger_definition: TriggerDefinition,
db_name: String,
}
#[cfg(feature = "system-py")]
mod python_plugin {
use super::*;
use crate::Precision;
use data_types::NamespaceName;
use influxdb3_py_api::system_py::PyWriteBatch;
use influxdb3_wal::WalOp;
use iox_time::Time;
use std::time::SystemTime;
#[async_trait::async_trait]
impl RunnablePlugin for TriggerPlugin {
async fn process_event(
&self,
event: PluginEvent,
write_buffer: Arc<dyn WriteBuffer>,
) -> Result<(), Error> {
let Some(schema) = write_buffer.catalog().db_schema(self.db_name.as_str()) else {
return Err(Error::MissingDb);
};
let mut output_lines = Vec::new();
match event {
PluginEvent::WriteWalContents(wal_contents) => {
for wal_op in &wal_contents.ops {
match wal_op {
WalOp::Write(write_batch) => {
let py_write_batch = PyWriteBatch {
// TODO: don't clone the write batch
write_batch: write_batch.clone(),
schema: Arc::clone(&schema),
};
match &self.trigger_definition.trigger {
TriggerSpecificationDefinition::SingleTableWalWrite {
table_name,
} => {
output_lines.extend(py_write_batch.call_against_table(
table_name,
&self.trigger_definition.plugin.code,
&self.trigger_definition.plugin.function_name,
)?);
}
TriggerSpecificationDefinition::AllTablesWalWrite => {
for table in schema.table_map.right_values() {
output_lines.extend(
py_write_batch.call_against_table(
table.as_ref(),
&self.trigger_definition.plugin.code,
&self.trigger_definition.plugin.function_name,
)?,
);
}
}
}
}
WalOp::Catalog(_) => {}
}
}
}
}
if !output_lines.is_empty() {
let ingest_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
write_buffer
.write_lp(
NamespaceName::new(self.db_name.to_string()).unwrap(),
output_lines.join("\n").as_str(),
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
)
.await?;
}
Ok(())
}
}
}

View File

@ -3,6 +3,7 @@ use crate::paths::ParquetFilePath;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::write_buffer::PluginEvent;
use crate::{ParquetFile, ParquetFileId, PersistedSnapshot};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@ -20,8 +21,6 @@ use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{DbId, TableId};
#[cfg(feature = "system-py")]
use influxdb3_py_api::system_py::PyWriteBatch;
use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch};
use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
use iox_query::exec::Executor;
@ -29,15 +28,15 @@ use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use parquet::format::FileMetaData;
use schema::sort::SortKey;
use schema::Schema;
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{broadcast, oneshot};
#[derive(Debug)]
pub struct QueryableBuffer {
@ -52,6 +51,7 @@ pub struct QueryableBuffer {
/// Sends a notification to this watch channel whenever a snapshot info is persisted
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
plugin_event_tx: Mutex<Option<broadcast::Sender<PluginEvent>>>,
}
pub struct QueryableBufferArgs {
@ -90,6 +90,7 @@ impl QueryableBuffer {
parquet_cache,
persisted_snapshot_notify_rx,
persisted_snapshot_notify_tx,
plugin_event_tx: Mutex::new(None),
}
}
@ -368,11 +369,28 @@ impl QueryableBuffer {
let mut buffer = self.buffer.write();
buffer.db_to_table.remove(db_id);
}
#[cfg(feature = "system-py")]
pub(crate) fn subscribe_to_plugin_events(&self) -> broadcast::Receiver<PluginEvent> {
let mut sender = self.plugin_event_tx.lock();
if sender.is_none() {
let (tx, rx) = broadcast::channel(1024);
*sender = Some(tx);
return rx;
}
sender.as_ref().unwrap().subscribe()
}
}
#[async_trait]
impl WalFileNotifier for QueryableBuffer {
fn notify(&self, write: WalContents) {
if let Some(sender) = self.plugin_event_tx.lock().as_ref() {
if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) {
error!(%err, "Error sending WAL content to plugins");
}
}
self.buffer_contents(write)
}
@ -381,6 +399,11 @@ impl WalFileNotifier for QueryableBuffer {
write: WalContents,
snapshot_details: SnapshotDetails,
) -> Receiver<SnapshotDetails> {
if let Some(sender) = self.plugin_event_tx.lock().as_ref() {
if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) {
error!(%err, "Error sending WAL content to plugins");
}
}
self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details)
.await
}
@ -508,65 +531,6 @@ impl BufferState {
.db_schema_by_id(&write_batch.database_id)
.expect("database should exist");
// TODO: factor this out
#[cfg(feature = "system-py")]
{
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::TriggerSpecificationDefinition::SingleTableWalWrite;
let write_tables: hashbrown::HashSet<_> = write_batch
.table_chunks
.keys()
.map(|key| {
let table_name = db_schema.table_map.get_by_left(key).unwrap();
table_name.to_string()
})
.collect();
let triggers: Vec<_> = db_schema
.processing_engine_triggers
.values()
.filter_map(|trigger| match &trigger.trigger {
SingleTableWalWrite { table_name } => {
if write_tables.contains(table_name.as_str()) {
Some((trigger, vec![table_name.clone()]))
} else {
None
}
}
TriggerSpecificationDefinition::AllTablesWalWrite => {
if !write_tables.is_empty() {
Some((
trigger,
write_tables.iter().map(ToString::to_string).collect(),
))
} else {
None
}
}
})
.collect();
if !triggers.is_empty() {
// Create PyWriteBatch instance
let py_write_batch = PyWriteBatch {
write_batch: write_batch.clone(),
schema: db_schema.clone(),
};
for (trigger, write_tables) in triggers {
for table in &write_tables {
if let Err(err) = py_write_batch.call_against_table(
table,
trigger.plugin.code.as_str(),
trigger.plugin.function_name.as_str(),
) {
error!(
"failed to call trigger {} with error {}",
trigger.trigger_name, err
)
}
}
}
}
}
let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default();
for (table_id, table_chunks) in write_batch.table_chunks {