chore(influxdb3): Overhaul processing engine guide:

- Explain the relationship between trigger specs and plugin function signatures
- Use more examples
- Add clear steps
pull/5923/head
Jason Stirnaman 2025-03-21 17:47:42 -05:00
parent 46c5061669
commit 4d0dbbd81b
3 changed files with 398 additions and 436 deletions

View File

@ -7,14 +7,13 @@ menu:
influxdb3_core:
name: Processing engine and Python plugins
weight: 4
influxdb3/core/tags: []
influxdb3/core/tags: [processing engine, python]
related:
- /influxdb3/core/reference/cli/influxdb3/test/wal_plugin/
- /influxdb3/core/reference/cli/influxdb3/create/plugin/
- /influxdb3/core/reference/cli/influxdb3/create/trigger/
source: /shared/v3-core-plugins/_index.md
---
<!--
The content of this page is at /shared/v3-core-plugins/_index.md
//SOURCE - content/shared/v3-core-plugins/_index.md
-->

View File

@ -7,14 +7,13 @@ menu:
influxdb3_enterprise:
name: Processing Engine and Python plugins
weight: 4
influxdb3/core/tags: []
influxdb3/enterprise/tags: [processing engine, python]
related:
- /influxdb3/enterprise/reference/cli/influxdb3/test/wal_plugin/
- /influxdb3/enterprise/reference/cli/influxdb3/create/plugin/
- /influxdb3/enterprise/reference/cli/influxdb3/create/trigger/
source: /shared/v3-core-plugins/_index.md
---
<!--
The content of this page is at /shared/v3-core-plugins/_index.md
//SOURCE - content/shared/v3-core-plugins/_index.md
-->

View File

@ -1,81 +1,356 @@
Use the InfluxDB 3 Processing engine to run Python code directly in your
{{% product-name %}} database to automatically process data and respond to database events.
Use the {{% product-name %}} Processing engine to run code and perform tasks
for different database events.
The Processing engine is an embedded Python VM that runs inside your InfluxDB 3 database and lets you:
{{% product-name %}} provides the InfluxDB 3 Processing engine, an embedded Python VM that can dynamically load and trigger Python plugins
in response to events in your database.
- Process data as it's written to the database
- Run code on a schedule
- Create API endpoints that execute Python code
- Maintain state between executions with an in-memory cache
## Key Concepts
Learn how to create, configure, run, and extend Python plugins that execute when specific events occur.
### Plugins
1. [Set up the Processing engine](#set-up-the-processing-engine)
2. [Add a Processing engine plugin](#add-a-processing-engine-plugin)
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
3. [Create a trigger to run a plugin](#create-a-trigger-to-run-a-plugin)
- [Create a trigger for data writes](#create-a-trigger-for-data-writes)
- [Create a trigger for scheduled events](#create-a-trigger-for-scheduled-events)
- [Create a trigger for HTTP requests](#create-a-trigger-for-http-requests)
- [Use community plugins from GitHub](#use-community-plugins-from-github)
- [Pass arguments to plugins](#pass-arguments-to-plugins)
- [Control trigger execution](#control-trigger-execution)
- [Configure error handling for a trigger](#configure-error-handling-for-a-trigger)
- [Extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management)
- [Install Python dependencies](#install-python-dependencies)
A Processing engine _plugin_ is Python code you provide to run tasks, such as
downsampling data, monitoring, creating alerts, or calling external services.
## Set up the Processing engine
> [!Note]
> #### Contribute and use community plugins
>
> [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) is a public repository on GitHub where you can find
> and contribute example plugins.
> You can reference plugins from the repository directly within a trigger configuration.
### Triggers
A _trigger_ is an InfluxDB 3 resource you create to associate a database
event (for example, a WAL flush) with the plugin that should run.
When an event occurs, the trigger passes configuration details, optional arguments, and event data to the plugin.
The Processing engine provides four types of triggers--each type corresponds to
an event type with event-specific configuration to let you handle events with targeted logic.
- **WAL Flush**: Triggered when the write-ahead log (WAL) is flushed to the object store (default is every second).
- **Scheduled Tasks**: Triggered on a schedule you specify using cron syntax.
- **On-request**: Triggered on a GET or POST request to the bound HTTP API endpoint at `/api/v3/engine/<CUSTOM_PATH>`.
<!--
- **Parquet Persistence (coming soon)**: Triggered when InfluxDB 3 persists data to object storage Parquet files.
-->
### Activate the Processing engine
To enable the Processing engine, start the {{% product-name %}} server with the
`--plugin-dir` option and a path to your plugins directory.
If the directory doesnt exist, the server creates it.
To enable the Processing engine, start your InfluxDB server with the `--plugin-dir` option:
```bash
influxdb3 serve \
--node-id node0 \
--object-store [OBJECT STORE TYPE]\
--plugin-dir /path/to/plugins
--node-id node0 \
--object-store [OBJECT_STORE_TYPE] \
--plugin-dir /path/to/plugins
```
## Shared API
## Add a Processing engine plugin
All plugin types provide the InfluxDB 3 _shared API_ for interacting with the database.
The shared API provides access to the following:
A plugin is a Python file that contains a specific function signature that corresponds to a trigger type.
Plugins:
- `LineBuilder` to create Line Protocol lines for writing to the database
- `query` to query data from any database
- `info`, `warn`, and `error` to log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL
- Receive plugin-specific arguments (such as written data, call time, or an HTTP request)
- Can receive keyword arguments (as `args`) from _trigger arguments_
- Can access the `influxdb3_local` shared API for writing, querying, and managing state
### LineBuilder
Get started using example plugins or create your own:
The `LineBuilder` is a simple API for building lines of Line Protocol to write into the database. Writes are buffered while the plugin runs and are flushed when the plugin completes. The `LineBuilder` API is available in all plugin types.
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
The following example shows how to use the `LineBuilder` API:
### Get example plugins
InfluxData maintains a repository of contributed plugins that you can use as-is or as a starting point for your own plugin.
#### From local files
You can copy example plugins from the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to your local plugin directory:
```bash
# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git
# Copy example plugins to your plugin directory
cp -r influxdb3_plugins/examples/write/* /path/to/plugins/
```
#### Directly from GitHub
You can use plugins directly from GitHub without downloading them first by using the `gh:` prefix in the plugin filename:
```bash
# Use a plugin directly from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
> [!Note]
> #### Find and contribute plugins
>
> The plugins repository includes examples for various use cases:
>
> - **Data transformation**: Process and transform incoming data
> - **Alerting**: Send notifications based on data thresholds
> - **Aggregation**: Calculate statistics on time series data
> - **Integration**: Connect to external services and APIs
> - **System monitoring**: Track resource usage and health metrics
>
> Visit [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins)
> to browse available plugins or contribute your own.
### Create a plugin
1. Create a `.py` file in your plugins directory
2. Define a function with one of the following signatures:
#### For data write events
```python
# Build line protocol incrementally
def process_writes(influxdb3_local, table_batches, args=None):
# Process data as it's written to the database
for table_batch in table_batches:
table_name = table_batch["table_name"]
rows = table_batch["rows"]
# Log information about the write
influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
# Write derived data back to the database
line = LineBuilder("processed_data")
line.tag("source_table", table_name)
line.int64_field("row_count", len(rows))
influxdb3_local.write(line)
```
#### For scheduled events
```python
def process_scheduled_call(influxdb3_local, call_time, args=None):
# Run code on a schedule
# Query recent data
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Process the results
if results:
influxdb3_local.info(f"Found {len(results)} recent metrics")
else:
influxdb3_local.warn("No recent metrics found")
```
#### For HTTP requests
```python
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Handle HTTP requests to a custom endpoint
# Log the request parameters
influxdb3_local.info(f"Received request with parameters: {query_parameters}")
# Process the request body
if request_body:
import json
data = json.loads(request_body)
influxdb3_local.info(f"Request data: {data}")
# Return a response (automatically converted to JSON)
return {"status": "success", "message": "Request processed"}
```
After adding your plugin, you can [install Python dependencies](#install-python-dependencies) or learn how to [extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management).
## Create a trigger to run a plugin
A trigger connects your plugin to a specific database event.
The plugin function signature in your plugin file determines which _trigger specification_
you can choose for configuring and activating your plugin.
Create a trigger with the `influxdb3 create trigger` command.
> [!Note]
> When specifying a local plugin file, the `--plugin-filename` parameter
> _is relative to_ the `--plugin-dir` configured for the server.
> You don't need to provide an absolute path.
### Create a trigger for data writes
Use the `table:<TABLE_NAME>` or the `all_tables` trigger specification to configure
and run a [plugin for data write events](#for-data-write-events)--for example:
```bash
# Trigger on writes to a specific table
# The plugin file must be in your configured plugin directory
influxdb3 create trigger \
--trigger-spec "table:sensor_data" \
--plugin-filename "process_sensors.py" \
--database my_database \
sensor_processor
# Trigger on writes to all tables
influxdb3 create trigger \
--trigger-spec "all_tables" \
--plugin-filename "process_all_data.py" \
--database my_database \
all_data_processor
```
The trigger runs when the database flushes ingested data for the specified tables
to the Write-Ahead Log (WAL) in the Object store (default is every second).
The plugin receives the written data and table information.
### Create a trigger for scheduled events
Use the `every:<DURATION>` or the `cron:<CRONTAB_EXPRESSION>` trigger specification
to configure and run a [plugin for scheduled events](#for-scheduled-events)--for example:
```bash
# Run every 5 minutes
influxdb3 create trigger \
--trigger-spec "every:5m" \
--plugin-filename "hourly_check.py" \
--database my_database \
regular_check
# Run on a cron schedule (8am daily)
influxdb3 create trigger \
--trigger-spec "cron:0 8 * * *" \
--plugin-filename "daily_report.py" \
--database my_database \
daily_report
```
The plugin receives the scheduled call time.
### Create a trigger for HTTP requests
[For an HTTP request plugin], use the `path:<ENDPOINT_PATH>` trigger specification to configure and enable a [plugin for HTTP requests](#for-http-requests)--for example:
```bash
# Create an endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
--trigger-spec "path:webhook" \
--plugin-filename "webhook_handler.py" \
--database my_database \
webhook_processor
```
The trigger makes your endpoint available at `/api/v3/engine/<ENDPOINT_PATH>`.
To run the plugin, send a `GET` or `POST` request to the endpoint--for example:
```bash
curl http://{{% influxdb/host %}}/api/v3/engine/webhook
```
The plugin receives the HTTP request object with methods, headers, and body.
### Use community plugins from GitHub
You can reference plugins directly from the GitHub repository by using the `gh:` prefix:
```bash
# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
### Pass arguments to plugins
Use trigger arguments to pass configuration from a trigger to the plugin it runs. You can use this for:
- Threshold values for monitoring
- Connection properties for external services
- Configuration settings for plugin behavior
```bash
influxdb3 create trigger \
--trigger-spec "every:1h" \
--plugin-filename "threshold_check.py" \
--trigger-arguments threshold=90,notify_email=admin@example.com \
--database my_database \
threshold_monitor
```
The arguments are passed to the plugin as a `Dict[str, str]` where the key is the argument name and the value is the argument value:
```python
def process_scheduled_call(influxdb3_local, call_time, args=None):
if args and "threshold" in args:
threshold = float(args["threshold"])
email = args.get("notify_email", "default@example.com")
# Use the arguments in your logic
influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")
```
### Control trigger execution
By default, triggers run synchronously—each instance waits for previous instances to complete before executing.
To allow multiple instances of the same trigger to run simultaneously, configure triggers to run asynchronously:
```bash
# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
--trigger-spec "table:metrics" \
--plugin-filename "heavy_process.py" \
--run-asynchronous \
--database my_database \
async_processor
```
### Configure error handling for a trigger
To configure error handling behavior for a trigger, use the `--error-behavior <ERROR_BEHAVIOR>` CLI option with one of the following values:
- `log` (default): Log all plugin errors to stdout and the `system.processing_engine_logs` system table.
- `retry`: Attempt to run the plugin again immediately after an error.
- `disable`: Automatically disable the plugin when an error occurs (can be re-enabled later via CLI).
```bash
# Automatically retry on error
influxdb3 create trigger \
--trigger-spec "table:important_data" \
--plugin-filename "critical_process.py" \
--error-behavior retry \
--database my_database \
critical_processor
# Disable the trigger on error
influxdb3 create trigger \
--trigger-spec "path:webhook" \
--plugin-filename "webhook_handler.py" \
--error-behavior disable \
--database my_database \
auto_disable_processor
```
## Extend plugins with API features and state management
The Processing engine includes API capabilities that allow your plugins to
interact with InfluxDB data and maintain state between executions.
These features let you build more sophisticated plugins that can transform, analyze, and respond to data.
### Use the shared API
All plugins have access to the shared API to interact with the database.
#### Write data
Use the `LineBuilder` API to create line protocol data:
```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
influxdb3_local.write(line)
# Output line protocol as a string ("weather,location=us-midwest temperature=82.5 1627680000000000000")
line_str = line.build()
# Write the data to the database
influxdb3_local.write(line)
```
Here is the Python implementation of the `LineBuilder` API:
Writes are buffered while the plugin runs and are flushed when the plugin completes.
{{% expand-wrapper %}}
{{% expand "View the `LineBuilder` Python implementation" %}}
```python
from typing import Optional
@ -190,437 +465,126 @@ class LineBuilder:
return line
```
{{% /expand %}}
{{% /expand-wrapper %}}
### Query
#### Query data
The shared API `query` function executes an SQL query with optional parameters (a [parameterized query](/influxdb3/version/query-data/sql/parameterized-queries/)) and returns results as a `List` of `Dict[String, Any]` where the key is the column name and the value is the column value. The `query` function is available in all plugin types.
The following examples show how to use the `query` function:
Execute SQL queries and get results:
```python
influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - INTERVAL '1 hour'")
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Or using parameterized queries
args = {"bar": "baz"}
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - INTERVAL '1 hour'", args)
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```
### Logging
The shared API `query` function returns results as a `List` of `Dict[String, Any]`, where the key is the column name and the value is the column value.
The shared API `info`, `warn`, and `error` functions log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL.
The `info`, `warn`, and `error` functions are available in all plugin types. Each function accepts multiple arguments, converts them to strings, and logs them as a single, space-separated message.
#### Log information
The following examples show how to use the `info`, `warn`, and `error` logging functions:
The shared API `info`, `warn`, and `error` functions accept multiple arguments,
convert them to strings, and log them as a space-separated message to the database log,
which is output in the server logs and captured in system tables that you can
query using SQL.
Add logging to track plugin execution:
```python
influxdb3_local.info("This is an info message")
influxdb3_local.warn("This is a warning message")
influxdb3_local.error("This is an error message")
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log a message that contains a data object
obj_to_log = {"hello": "world"}
influxdb3_local.info("This is an info message with an object", obj_to_log)
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
```
### Trigger Settings
#### Use the in-memory cache
#### Control trigger execution
The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions.
By default, triggers run synchronously—each instance waits for previous instances to complete before executing.
To allow multiple instances of the same trigger to run simultaneously, configure triggers to run asynchronously:
```bash
# Create an asynchronous trigger
influx create trigger --run-asynchronously
#### Configure error handling
#### Configure error behavior for plugins
The Processing engine logs all plugin errors to stdout and the `system.processing_engine_logs` system table.
To configure additional error handling for a trigger, use the `--error-behavior` flag:
- `--error-behavior retry`: Attempt to run the plugin again immediately after an error
- `--error-behavior disable`: Automatically disable the plugin when an error occurs (can be re-enabled later via CLI)
```bash
# Create a trigger that retries on error
influx create trigger --error-behavior retry
# Create a trigger that disables the plugin on error
influx create trigger --error-behavior disable
This behavior can be changed by specifying the "Error behavior", via the `--error-behavior` flag. Apart from the default `log`, you may set
* `--error-behavior retry` will immediately retry the plugin trigger in the event of error.
* `--error-behavior disable` will turn off the plugin as soon as an error occurs. You can enable it again using the CLI.
### Trigger arguments
A plugin can receive arguments from the trigger that runs it.
You can use this to provide runtime configuration and drive behavior of a plugin—for example:
- threshold values for monitoring
- connection properties for connecting to third-party services
To pass arguments to a plugin, specify trigger arguments in a comma-separated list
of key-value pairs--for example, using the CLI:
```bash
influxdb3 create trigger
--trigger-arguments key1=val1,key2=val2
```
The arguments are passed to the plugin as a `Dict[str, str]` where the key is
the argument name and the value is the argument value--for example:
```python
args = {
"key1": "value1",
"key2": "value2",
}
```
The following example shows how to access and use an argument in a WAL plugin:
```python
def process_writes(influxdb3_local, table_batches, args=None):
if args and "threshold" in args:
threshold = int(args["threshold"])
influxdb3_local.info(f"Threshold is {threshold}")
else:
influxdb3_local.warn("No threshold provided")
```
The `args` parameter is optional. If a plugin doesnt require arguments, you can omit it from the trigger definition.
## Import plugin dependencies
Use the `influxdb3 install` command to download and install Python packages that your plugin depends on.
```bash
influxdb3 install package <PACKAGE_NAME>
```
### Use `influxdb3 install` with Docker
1. Start the server
```bash
docker run \
--name CONTAINER_NAME \
-v /path/to/.influxdb3/data:/data \
-v /path/to/.influxdb3/plugins:/plugins \
quay.io/influxdb/influxdb3-{{< product-key >}}:latest \
serve --node-id=node0 \
--object-store=file \
--data-dir=/data \
--http-bind=localhost:8183 \
--plugin-dir=/plugins
```
2. Use `docker exec` to run the `influxdb3 install` command:
```bash
docker exec -it CONTAINER_NAME influxdb3 install package pandas
```
The result is an active Python virtual environment with the package installed in `<PLUGINS_DIR>/.venv`.
You can specify additional options to install dependencies from a `requirements.txt` file or a custom virtual environment path.
For more information, see the `influxdb3` CLI help:
```bash
influxdb3 install package --help
```
## Configure plugin triggers
Triggers define when and how plugins execute in response to database events. Each trigger type corresponds to a specific event, allowing precise control over automation within {{% product-name %}}.
### WAL flush trigger
When a WAL flush plugin is triggered, the plugin receives a list of `table_batches` filtered by the trigger configuration (either _all tables_ in the database or a specific table).
The following example shows a simple WAL flush plugin:
```python
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
# Skip the batch if table_name is write_reports
if table_batch["table_name"] == "write_reports":
continue
row_count = len(table_batch["rows"])
# Double the row count if table name matches args table_name
if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
row_count *= 2
# Use the LineBuilder API to write data
line = LineBuilder("write_reports")\
.tag("table_name", table_batch["table_name"])\
.int64_field("row_count", row_count)
influxdb3_local.write(line)
influxdb3_local.info("wal_plugin.py done")
```
#### WAL flush trigger configuration
When you create a trigger, you associate it with a database and provide configuration specific to the trigger type.
For a WAL flush trigger you specify a `trigger-spec`, which determines when the plugin is triggered (and what table data it receives):
- `all-tables`: triggers the plugin on every write to the associated database
- `table:<table_name>` triggers the plugin function only for writes to the specified table.
The following example creates a WAL flush trigger for the `gh:examples/wal_plugin/wal_plugin.py` plugin.
```bash
influxdb3 create trigger \
--trigger-spec "table:TABLE_NAME" \
--plugin-filename "gh:examples/wal_plugin/wal_plugin.py" \
--database DATABASE_NAME TRIGGER_NAME
```
The `gh:` prefix lets you fetch a plugin file directly from the [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) repository in GitHub.
Without the prefix, the server looks for the file inside of the plugins directory.
To provide additional configuration to your plugin, pass a list of key-value pairs in the `--trigger-arguments` option and, in your plugin, use the `args` parameter to receive the arguments.
For more information about trigger arguments, see the CLI help:
```bash
influxdb3 create trigger help
```
### Schedule trigger
Schedule plugins run on a schedule specified in cron syntax. The plugin receives the local API, the time of the trigger, and any arguments passed in the trigger definition. Here's an example of a simple schedule plugin:
```python
# see if a table has been written to in the last 5 minutes
def process_scheduled_call(influxdb3_local, time, args=None):
if args and "table_name" in args:
table_name = args["table_name"]
result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
# write an error log if the result is empty
if not result:
influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
else:
influxdb3_local.error("No table_name provided for schedule plugin")
```
#### Schedule trigger configuration
Schedule plugins are set with a `trigger-spec` of `schedule:<cron_expression>` or `every:<duration>`. The `args` parameter can be used to pass configuration to the plugin. For example, if we wanted to use the system-metrics example from the Github repo and have it collect every 10 seconds we could use the following trigger definition:
```bash
influxdb3 create trigger \
--trigger-spec "every:10s" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database mydb system-metrics
```
### On Request trigger
On Request plugins are triggered by a request to a custom HTTP API endpoint.
The plugin receives the shared API, query parameters `Dict[str, str]`, request headers `Dict[str, str]`, the request body (as bytes), and any arguments passed in the trigger definition.
On Request plugin responses follow conventions for [Flask responses](https://flask.palletsprojects.com/en/stable/quickstart/#about-responses).
#### Example: On Request plugin
```python
import json
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
for k, v in query_parameters.items():
influxdb3_local.info(f"query_parameters: {k}={v}")
for k, v in request_headers.items():
influxdb3_local.info(f"request_headers: {k}={v}")
request_data = json.loads(request_body)
influxdb3_local.info("parsed JSON request body:", request_data)
# write the data to the database
line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
# get a string of the line to return as the body
line_str = line.build()
influxdb3_local.write(line)
return {"status": "ok", "line": line_str}
```
#### On Request trigger configuration
To create a trigger for an On Request plugin, specify the `request:<ENDPOINT>` trigger-spec.
For example, the following command creates an HTTP API `/api/v3/engine/my-plugin` endpoint for the plugin file:
```bash
influxdb3 create trigger \
--trigger-spec "request:my-plugin" \
--plugin-filename "examples/my-on-request.py" \
--database mydb my-plugin
```
To run the plugin, you send an HTTP request to `<HOST>/api/v3/engine/my-plugin`.
Because all On Request plugins for a server share the same `<host>/api/v3/engine/` base URL,
the trigger-spec you define must be unique across all plugins configured for a server,
regardless of which database they are associated with.
## In-memory cache
The Processing engine provides a powerful in-memory cache system that enables plugins to persist and retrieve data between executions. This cache system is essential for maintaining state, tracking metrics over time, and optimizing performance when working with external data sources.
### Key Benefits
- **State persistence**: Maintain counters, timestamps, and other state variables across plugin executions.
- **Performance and cost optimization**: Store frequently used data to avoid expensive recalculations. Minimize external API calls by caching responses and avoiding rate limits.
- **Data enrichment**: Cache lookup tables, API responses, or reference data to enrich data efficiently.
### Cache API
The cache API is accessible via the `cache` property on the `influxdb3_local` object provided to all plugin types:
Use the shared API `cache` property to access the cache API.
```python
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
```
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `put` | `key` (str): The key to store the value under<br>`value` (Any): Any Python object to cache<br>`ttl` (Optional[float], default=None): Time in seconds before expiration<br>`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| `get` | `key` (str): The key to retrieve<br>`default` (Any, default=None): Value to return if key not found<br>`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| `delete` | `key` (str): The key to delete<br>`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |
### Cache Namespaces
##### Cache namespaces
The cache system offers two distinct namespaces, providing flexibility for different use cases:
The cache system offers two distinct namespaces:
| Namespace | Scope | Best For |
| --- | --- | --- |
| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
### Using the In-memory cache
The following examples show how to use the cache API in plugins:
##### Store and retrieve cached data
```python
# Store values in the trigger-specific namespace
influxdb3_local.cache.put("last_processed_time", time.time())
influxdb3_local.cache.put("error_count", 0)
influxdb3_local.cache.put("processed_records", {"total": 0, "errors": 0})
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Store values with expiration
influxdb3_local.cache.put("temp_data", {"value": 42}, ttl=300) # Expires in 5 minutes
influxdb3_local.cache.put("auth_token", "t0k3n", ttl=3600) # Expires in 1 hour
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Store values in the global namespace
influxdb3_local.cache.put("app_config", {"version": "1.0.2"}, use_global=True)
influxdb3_local.cache.put("global_counter", 0, use_global=True)
# Retrieve values
last_time = influxdb3_local.cache.get("last_processed_time")
auth = influxdb3_local.cache.get("auth_token")
config = influxdb3_local.cache.get("app_config", use_global=True)
# Provide defaults for missing keys
missing = influxdb3_local.cache.get("missing_key", default="Not found")
count = influxdb3_local.cache.get("visit_count", default=0)
# Delete cached values
influxdb3_local.cache.delete("temp_data")
influxdb3_local.cache.delete("app_config", use_global=True)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```
#### Example: maintaining state between executions
The following example shows a WAL plugin that uses the cache to maintain a counter across executions:
##### Store cached data with expiration
```python
def process_writes(influxdb3_local, table_batches, args=None):
# Get the current counter value or default to 0
counter = influxdb3_local.cache.get("execution_counter", default=0)
# Increment the counter
counter += 1
# Store the updated counter back in the cache
influxdb3_local.cache.put("execution_counter", counter)
influxdb3_local.info(f"This plugin has been executed {counter} times")
# Process writes normally...
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```
#### Example: sharing configuration across triggers
One benefit of using a global namespace is being more responsive to changing conditions. This example demonstrates using the global namespace to share configuration, so a scheduled call can check thresholds placed by prior trigger calls, without making a query to the DB itself:
##### Share data across plugins
```python
def process_scheduled_call(influxdb3_local, time, args=None):
# Check if we have cached configuration
config = influxdb3_local.cache.get("alert_config", use_global=True)
if not config:
# Load configuration from database
results = influxdb3_local.query("SELECT * FROM system.alert_config")
# Transform query results into config object
config = {row["name"]: row["value"] for row in results}
# Cache the configuration with a 5-minute TTL
influxdb3_local.cache.put("alert_config", config, ttl=300, use_global=True)
influxdb3_local.info("Loaded fresh configuration from database")
else:
influxdb3_local.info("Using cached configuration")
# Use the configuration
threshold = float(config.get("cpu_threshold", "90.0"))
# ...
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```
The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.
### Best practices
- [Use TTL appropriately](#use-ttl-appropriately)
- [Cache computation results](#cache-computation-results)
- [Warm the cache](#warm-the-cache)
- [Consider cache limitations](#consider-cache-limitations)
#### Use TTL appropriately
Set realistic expiration times based on how frequently data changes.
##### Track state between executions
```python
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
```
#### Cache computation results
Store the results of expensive calculations that need to be utilized frequently.
```python
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
## Install Python dependencies
If your plugin needs additional Python packages, use the `influxdb3 install` command:
```bash
# Install a package directly
influxdb3 install package pandas
```
#### Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data.
```python
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
```bash
# With Docker
docker exec -it CONTAINER_NAME influxdb3 install package pandas
```
## Consider cache limitations
- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above).
- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
This creates a Python virtual environment in your plugins directory with the specified packages installed.