docs-v2/content/shared/v3-core-plugins/_index.md

24 KiB
Raw Blame History

Use the {{% product-name %}} Processing Engine to run code and perform tasks for different database events.

{{% product-name %}} provides the InfluxDB 3 Processing Engine, an embedded Python VM that can dynamically load and trigger Python plugins in response to events in your database.

Key Concepts

Plugins

A Processing Engine plugin is Python code you provide to run tasks, such as downsampling data, monitoring, creating alerts, or calling external services.

[!Note]

Contribute and use community plugins

influxdata/influxdb3_plugins is a public repository on GitHub where you can find and contribute example plugins. You can reference plugins from the repository directly within a trigger configuration.

Triggers

A trigger is an InfluxDB 3 resource you create to associate a database event (for example, a WAL flush) with the plugin that should run. When an event occurs, the trigger passes configuration details, optional arguments, and event data to the plugin.

The Processing Engine provides four types of triggers--each type corresponds to an event type with event-specific configuration to let you handle events with targeted logic.

  • WAL Flush: Triggered when the write-ahead log (WAL) is flushed to the object store (default is every second).
  • Scheduled Tasks: Triggered on a schedule you specify using cron syntax.
  • On-request: Triggered on a GET or POST request to the bound HTTP API endpoint at /api/v3/engine/<CUSTOM_PATH>.

Activate the Processing Engine

To enable the Processing Engine, start the {{% product-name %}} server with the --plugin-dir option and a path to your plugins directory. If the directory doesnt exist, the server creates it.

influxdb3 serve --node-id node0 --object-store [OBJECT STORE TYPE] --plugin-dir /path/to/plugins

Shared API

All plugin types provide the InfluxDB 3 shared API for interacting with the database. The shared API provides access to the following:

  • LineBuilder to create Line Protocol lines for writing to the database
  • query to query data from any database
  • info, warn, and error to log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL

LineBuilder

The LineBuilder is a simple API for building lines of Line Protocol to write into the database. Writes are buffered while the plugin runs and are flushed when the plugin completes. The LineBuilder API is available in all plugin types.

The following example shows how to use the LineBuilder API:

# Build line protocol incrementally
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
influxdb3_local.write(line)

# Output line protocol as a string ("weather,location=us-midwest temperature=82.5 1627680000000000000")
line_str = line.build()

Here is the Python implementation of the LineBuilder API:

from typing import Optional
from collections import OrderedDict

class InfluxDBError(Exception):
    """Base exception for InfluxDB-related errors"""
    pass

class InvalidMeasurementError(InfluxDBError):
    """Raised when measurement name is invalid"""
    pass

class InvalidKeyError(InfluxDBError):
    """Raised when a tag or field key is invalid"""
    pass

class InvalidLineError(InfluxDBError):
    """Raised when a line protocol string is invalid"""
    pass

class LineBuilder:
    def __init__(self, measurement: str):
        if ' ' in measurement:
            raise InvalidMeasurementError("Measurement name cannot contain spaces")
        self.measurement = measurement
        self.tags: OrderedDict[str, str] = OrderedDict()
        self.fields: OrderedDict[str, str] = OrderedDict()
        self._timestamp_ns: Optional[int] = None

    def _validate_key(self, key: str, key_type: str) -> None:
        """Validate that a key does not contain spaces, commas, or equals signs."""
        if not key:
            raise InvalidKeyError(f"{key_type} key cannot be empty")
        if ' ' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
        if ',' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
        if '=' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")

    def tag(self, key: str, value: str) -> 'LineBuilder':
        """Add a tag to the line protocol."""
        self._validate_key(key, "tag")
        self.tags[key] = str(value)
        return self

    def uint64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an unsigned integer field to the line protocol."""
        self._validate_key(key, "field")
        if value < 0:
            raise ValueError(f"uint64 field '{key}' cannot be negative")
        self.fields[key] = f"{value}u"
        return self

    def int64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an integer field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = f"{value}i"
        return self

    def float64_field(self, key: str, value: float) -> 'LineBuilder':
        """Add a float field to the line protocol."""
        self._validate_key(key, "field")
        # Check if value has no decimal component
        self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
        return self

    def string_field(self, key: str, value: str) -> 'LineBuilder':
        """Add a string field to the line protocol."""
        self._validate_key(key, "field")
        # Escape quotes and backslashes in string values
        escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
        self.fields[key] = f'"{escaped_value}"'
        return self

    def bool_field(self, key: str, value: bool) -> 'LineBuilder':
        """Add a boolean field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = 't' if value else 'f'
        return self

    def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
        """Set the timestamp in nanoseconds."""
        self._timestamp_ns = timestamp_ns
        return self

    def build(self) -> str:
        """Build the line protocol string."""
        # Start with measurement name (escape commas only)
        line = self.measurement.replace(',', '\\,')

        # Add tags if present
        if self.tags:
            tags_str = ','.join(
                f"{k}={v}" for k, v in self.tags.items()
            )
            line += f",{tags_str}"

        # Add fields (required)
        if not self.fields:
            raise InvalidLineError(f"At least one field is required: {line}")

        fields_str = ','.join(
            f"{k}={v}" for k, v in self.fields.items()
        )
        line += f" {fields_str}"

        # Add timestamp if present
        if self._timestamp_ns is not None:
            line += f" {self._timestamp_ns}"

        return line

Query

The shared API query function executes an SQL query with optional parameters (a parameterized query) and returns results as a List of Dict[String, Any] where the key is the column name and the value is the column value. The query function is available in all plugin types.

The following examples show how to use the query function:

influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - INTERVAL '1 hour'")

# Or using parameterized queries
args = {"bar": "baz"}
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - INTERVAL '1 hour'", args)

Logging

The shared API info, warn, and error functions log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL. The info, warn, and error functions are available in all plugin types. Each function accepts multiple arguments, converts them to strings, and logs them as a single, space-separated message.

The following examples show how to use the info, warn, and error logging functions:

influxdb3_local.info("This is an info message")
influxdb3_local.warn("This is a warning message")
influxdb3_local.error("This is an error message")

# Log a message that contains a data object
obj_to_log = {"hello": "world"}
influxdb3_local.info("This is an info message with an object", obj_to_log)

Trigger Settings

Control trigger execution

By default, triggers run synchronously—each instance waits for previous instances to complete before executing.

To allow multiple instances of the same trigger to run simultaneously, configure triggers to run asynchronously:

# Create an asynchronous trigger
influx create trigger --run-asynchronously

#### Configure error handling
#### Configure error behavior for plugins

The Processing Engine logs all plugin errors to stdout and the `system.processing_engine_logs` system table.

To  configure additional error handling for a trigger, use the `--error-behavior` flag:

- `--error-behavior retry`: Attempt to run the plugin again immediately after an error
- `--error-behavior disable`: Automatically disable the plugin when an error occurs (can be re-enabled later via CLI)

```bash
# Create a trigger that retries on error
influx create trigger --error-behavior retry

# Create a trigger that disables the plugin on error
influx create trigger --error-behavior disable
This behavior can be changed by specifying the "Error behavior", via the `--error-behavior` flag. Apart from the default `log`, you may set

* `--error-behavior retry` will immediately retry the plugin trigger in the event of error.
* `--error-behavior disable` will turn off the plugin as soon as an error occurs. You can enable it again using the CLI.

### Trigger arguments

A plugin can receive arguments from the trigger that runs it.
You can use this to provide runtime configuration and drive behavior of a plugin—for example:

- threshold values for monitoring
- connection properties for connecting to third-party services

To pass arguments to a plugin, specify trigger arguments in a comma-separated list
of key-value pairs--for example, using the CLI:

```bash
influxdb3 create trigger
--trigger-arguments key1=val1,key2=val2

The arguments are passed to the plugin as a Dict[str, str] where the key is the argument name and the value is the argument value--for example:

args = {
    "key1": "value1",
    "key2": "value2",
}

The following example shows how to access and use an argument in a WAL plugin:

def process_writes(influxdb3_local, table_batches, args=None):
    if args and "threshold" in args:
        threshold = int(args["threshold"])
        influxdb3_local.info(f"Threshold is {threshold}")
    else:
        influxdb3_local.warn("No threshold provided")

The args parameter is optional. If a plugin doesnt require arguments, you can omit it from the trigger definition.

Import plugin dependencies

Use the influxdb3 install command to download and install Python packages that your plugin depends on.

influxdb3 install package <PACKAGE_NAME>

Use influxdb3 install with Docker

  1. Start the server

    docker run \
    --name CONTAINER_NAME \
    -v /path/to/.influxdb3/data:/data \
    -v /path/to/.influxdb3/plugins:/plugins \
    quay.io/influxdb/influxdb3-{{< product-key >}}:latest \
    serve --node-id=node0 \
    --object-store=file \
    --data-dir=/data \
    --http-bind=localhost:8183 \
    --plugin-dir=/plugins
    
  2. Use docker exec to run the influxdb3 install command:

    docker exec -it CONTAINER_NAME influxdb3 install package pandas
    

The result is an active Python virtual environment with the package installed in <PLUGINS_DIR>/.venv. You can specify additional options to install dependencies from a requirements.txt file or a custom virtual environment path. For more information, see the influxdb3 CLI help:

influxdb3 install package --help

Configure plugin triggers

Triggers define when and how plugins execute in response to database events. Each trigger type corresponds to a specific event, allowing precise control over automation within {{% product-name %}}.

WAL flush trigger

When a WAL flush plugin is triggered, the plugin receives a list of table_batches filtered by the trigger configuration (either all tables in the database or a specific table).

The following example shows a simple WAL flush plugin:

def process_writes(influxdb3_local, table_batches, args=None):
    for table_batch in table_batches:
        # Skip the batch if table_name is write_reports
        if table_batch["table_name"] == "write_reports":
            continue

        row_count = len(table_batch["rows"])

        # Double the row count if table name matches args table_name
        if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
            row_count *= 2

        # Use the LineBuilder API to write data
        line = LineBuilder("write_reports")\
            .tag("table_name", table_batch["table_name"])\
            .int64_field("row_count", row_count)
        influxdb3_local.write(line)

    influxdb3_local.info("wal_plugin.py done")

WAL flush trigger configuration

When you create a trigger, you associate it with a database and provide configuration specific to the trigger type.

For a WAL flush trigger you specify a trigger-spec, which determines when the plugin is triggered (and what table data it receives):

  • all-tables: triggers the plugin on every write to the associated database
  • table:<table_name> triggers the plugin function only for writes to the specified table.

The following example creates a WAL flush trigger for the gh:examples/wal_plugin/wal_plugin.py plugin.

influxdb3 create trigger \
  --trigger-spec "table:TABLE_NAME" \
  --plugin-filename "gh:examples/wal_plugin/wal_plugin.py" \
  --database DATABASE_NAME TRIGGER_NAME

The gh: prefix lets you fetch a plugin file directly from the influxdata/influxdb3_plugins repository in GitHub. Without the prefix, the server looks for the file inside of the plugins directory.

To provide additional configuration to your plugin, pass a list of key-value pairs in the --trigger-arguments option and, in your plugin, use the args parameter to receive the arguments. For more information about trigger arguments, see the CLI help:

influxdb3 create trigger help

Schedule trigger

Schedule plugins run on a schedule specified in cron syntax. The plugin receives the local API, the time of the trigger, and any arguments passed in the trigger definition. Here's an example of a simple schedule plugin:

# see if a table has been written to in the last 5 minutes
def process_scheduled_call(influxdb3_local, time, args=None):
    if args and "table_name" in args:
        table_name = args["table_name"]
        result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
        # write an error log if the result is empty
        if not result:
            influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
    else:
        influxdb3_local.error("No table_name provided for schedule plugin")

Schedule trigger configuration

Schedule plugins are set with a trigger-spec of schedule:<cron_expression> or every:<duration>. The args parameter can be used to pass configuration to the plugin. For example, if we wanted to use the system-metrics example from the Github repo and have it collect every 10 seconds we could use the following trigger definition:

influxdb3 create trigger \
  --trigger-spec "every:10s" \
  --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
  --database mydb system-metrics

On Request trigger

On Request plugins are triggered by a request to a custom HTTP API endpoint. The plugin receives the shared API, query parameters Dict[str, str], request headers Dict[str, str], the request body (as bytes), and any arguments passed in the trigger definition. On Request plugin responses follow conventions for Flask responses.

Example: On Request plugin

import json

def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
    for k, v in query_parameters.items():
        influxdb3_local.info(f"query_parameters: {k}={v}")
    for k, v in request_headers.items():
        influxdb3_local.info(f"request_headers: {k}={v}")

    request_data = json.loads(request_body)

    influxdb3_local.info("parsed JSON request body:", request_data)

    # write the data to the database
    line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
    # get a string of the line to return as the body
    line_str = line.build()

    influxdb3_local.write(line)

    return {"status": "ok", "line": line_str}

On Request trigger configuration

To create a trigger for an On Request plugin, specify the request:<ENDPOINT> trigger-spec.

For example, the following command creates an HTTP API /api/v3/engine/my-plugin endpoint for the plugin file:

influxdb3 create trigger \
  --trigger-spec "request:my-plugin" \
  --plugin-filename "examples/my-on-request.py" \
  --database mydb my-plugin

To run the plugin, you send an HTTP request to <HOST>/api/v3/engine/my-plugin.

Because all On Request plugins for a server share the same <host>/api/v3/engine/ base URL, the trigger-spec you define must be unique across all plugins configured for a server, regardless of which database they are associated with.

In-memory cache

The Processing Engine provides a powerful in-memory cache system that enables plugins to persist and retrieve data between executions. This cache system is essential for maintaining state, tracking metrics over time, and optimizing performance when working with external data sources.

Key Benefits

  • State persistence: Maintain counters, timestamps, and other state variables across plugin executions.
  • Performance and cost optimization: Store frequently used data to avoid expensive recalculations. Minimize external API calls by caching responses and avoiding rate limits.
  • Data Enrichment: Cache lookup tables, API responses, or reference data to enrich data efficiently.

Cache API

The cache API is accessible via the cache property on the influxdb3_local object provided to all plugin types:

# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)
Method Parameters Returns Description
put key (str): The key to store the value under
value (Any): Any Python object to cache
ttl (Optional[float], default=None): Time in seconds before expiration
use_global (bool, default=False): If True, uses global namespace
None Stores a value in the cache with an optional time-to-live
get key (str): The key to retrieve
default (Any, default=None): Value to return if key not found
use_global (bool, default=False): If True, uses global namespace
Any Retrieves a value from the cache or returns default if not found
delete key (str): The key to delete
use_global (bool, default=False): If True, uses global namespace
bool Deletes a value from the cache. Returns True if deleted, False if not found

Cache Namespaces

The cache system offers two distinct namespaces, providing flexibility for different use cases:

Namespace Scope Best For
Trigger-specific (default) Isolated to a single trigger Plugin state, counters, timestamps specific to one plugin
Global Shared across all triggers Configuration, lookup tables, service states that should be available to all plugins

Using the In-Memory Cache

The following examples show how to use the cache API in plugins:

# Store values in the trigger-specific namespace
influxdb3_local.cache.put("last_processed_time", time.time())
influxdb3_local.cache.put("error_count", 0)
influxdb3_local.cache.put("processed_records", {"total": 0, "errors": 0})

# Store values with expiration
influxdb3_local.cache.put("temp_data", {"value": 42}, ttl=300)  # Expires in 5 minutes
influxdb3_local.cache.put("auth_token", "t0k3n", ttl=3600)     # Expires in 1 hour

# Store values in the global namespace
influxdb3_local.cache.put("app_config", {"version": "1.0.2"}, use_global=True)
influxdb3_local.cache.put("global_counter", 0, use_global=True)

# Retrieve values
last_time = influxdb3_local.cache.get("last_processed_time")
auth = influxdb3_local.cache.get("auth_token")
config = influxdb3_local.cache.get("app_config", use_global=True)

# Provide defaults for missing keys
missing = influxdb3_local.cache.get("missing_key", default="Not found")
count = influxdb3_local.cache.get("visit_count", default=0)

# Delete cached values
influxdb3_local.cache.delete("temp_data")
influxdb3_local.cache.delete("app_config", use_global=True)

Example: Maintaining State Between Executions

This example shows a WAL plugin that uses the cache to maintain a counter across executions:


def process_writes(influxdb3_local, table_batches, args=None):
    # Get the current counter value or default to 0
    counter = influxdb3_local.cache.get("execution_counter", default=0)
    
    # Increment the counter
    counter += 1
    
    # Store the updated counter back in the cache
    influxdb3_local.cache.put("execution_counter", counter)
    
    influxdb3_local.info(f"This plugin has been executed {counter} times")
    
    # Process writes normally...

Example: Sharing Configuration Across Triggers

One benefit of using a global namespace is being more responsive to changing conditions. This example demonstrates using the global namespace to share configuration, so a scheduled call can check thresholds placed by prior trigger calls, without making a query to the DB itself:

def process_scheduled_call(influxdb3_local, time, args=None):
    # Check if we have cached configuration
    config = influxdb3_local.cache.get("alert_config", use_global=True)
    
    if not config:
        # Load configuration from database
        results = influxdb3_local.query("SELECT * FROM system.alert_config")
        
        # Transform query results into config object
        config = {row["name"]: row["value"] for row in results}
        
        # Cache the configuration with a 5-minute TTL
        influxdb3_local.cache.put("alert_config", config, ttl=300, use_global=True)
        influxdb3_local.info("Loaded fresh configuration from database")
    else:
        influxdb3_local.info("Using cached configuration")
    
    # Use the configuration
    threshold = float(config.get("cpu_threshold", "90.0"))
    # ...

The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.

Best Practices

Use TTL Appropriately

Set realistic expiration times based on how frequently data changes.

# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)

Cache Computation Results

Store the results of expensive calculations that need to be utilized frequently.

# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)

Implement Cache Warm-Up

Prime the cache at startup for critical data. This can be especially useful for global namespace data where multiple triggers will need this data.

# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())

Cache Limitations

  • Memory Usage: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
  • Server Restarts: The cache is cleared when the server restarts, so it's recommended you design your plugins to handle cache initialization (as noted above).
  • Concurrency: Be cautious when multiple trigger instances might update the same cache key simultaneously to prevent inaccurate or out-of-date data access.