Add plugin documentation for Core and Enterprise
parent
c0e3f4941f
commit
b78162d24c
File diff suppressed because it is too large
Load Diff
|
@ -4,12 +4,12 @@
|
|||
|
||||
argparse@^2.0.1:
|
||||
version "2.0.1"
|
||||
resolved "https://registry.yarnpkg.com/argparse/-/argparse-2.0.1.tgz#246f50f3ca78a3240f6c997e8a9bd1eac49e4b38"
|
||||
resolved "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz"
|
||||
integrity sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==
|
||||
|
||||
js-yaml@^4.1.0:
|
||||
version "4.1.0"
|
||||
resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-4.1.0.tgz#c1fb65f8f5017901cdd2c951864ba18458a10602"
|
||||
resolved "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz"
|
||||
integrity sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==
|
||||
dependencies:
|
||||
argparse "^2.0.1"
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
---
|
||||
title: Python Plugins and Processing Engine
|
||||
description: Instructions for using the Python processing engine in InfluxDB 3
|
||||
menu:
|
||||
influxdb3_core:
|
||||
name: Processing Engine and Python Plugins
|
||||
weight: 2
|
||||
influxdb3/core/tags: []
|
||||
source: /shared/v3-core-plugins/_index.md
|
||||
---
|
||||
|
||||
<!--
|
||||
The content of this page is at /shared/v3-core-plugins/_index.md
|
||||
-->
|
|
@ -0,0 +1,14 @@
|
|||
---
|
||||
title: Python Plugins and Processing Engine
|
||||
description: Instructions for using the Python processing engine in InfluxDB 3
|
||||
menu:
|
||||
influxdb3_enterprise:
|
||||
name: Processing Engine and Python Plugins
|
||||
weight: 2
|
||||
influxdb3/enterprise/tags: []
|
||||
source: /shared/v3-core-plugins/_index.md
|
||||
---
|
||||
|
||||
<!--
|
||||
The content of this page is at /shared/v3-core-plugins/_index.md
|
||||
-->
|
|
@ -0,0 +1,297 @@
|
|||
> [!Important]
|
||||
> #### Processing engine only works with Docker
|
||||
>
|
||||
> The Processing engine is currently supported only in Docker x86 environments. Non-Docker support is coming soon. The engine, API, and developer experience are actively evolving and may change. Join our [Discord](https://discord.gg/9zaNCW2PRT) for updates and feedback.
|
||||
|
||||
InfluxDB 3 has an embedded Python VM for dynamically loading plugins that can execute code in the database. There are four types of plugins that can be triggered by the following events in the database:
|
||||
|
||||
* **WAL flush**: Triggered when the write-ahead log (WAL) is flushed to object store (once a second by default)
|
||||
* **Parquet persistenc (coming soon)**: Triggered when data is persisted to object store in Parquet format
|
||||
* **Scheduled tasks**: Triggered by a schedule, specified in cron sytnax
|
||||
* **On Request**: Bind to a specific endpoint under `/api/v3/engine` and trigger when GET or POST requests are made
|
||||
|
||||
Each plugin type has a different trigger configuration, which will be described in the section on each plugin type.
|
||||
|
||||
## Installing Plugins from the repo
|
||||
The repository at [https://github.com/influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) contans example plugins and contributions from the community. To install a plugin from the repository, you can using the `influxdb3` CLI.
|
||||
|
||||
Just use `gh:<path>` as the plugin name to install a plugin from the repository. For example, to install the `wal_plugin.py` from the repository, you can use the following command:
|
||||
|
||||
```shell
|
||||
influxdb3 create plugin -d=mydb --filename "gh:examples/shedule/system_metrics" --plugin-type=scheduled system_metrics
|
||||
influxdb3 create plugin -d=mydb --filename "gh:examples/wal_plugin" --plugin-type=wal_rows wal_plugin_example
|
||||
```
|
||||
|
||||
You will then need to create a trigger to activate the plugin. Details on triggers for each type of plugin are provided below.
|
||||
|
||||
## Shared API
|
||||
|
||||
Within any of the plugin types, a shared API is available to interact with the database. The shared API provides access to the following:
|
||||
* `LineBuilder` to create Line Protocol lines that can be written to the database
|
||||
* `query` to query data from any database
|
||||
* `info`, `warn`, and `error` to log messages to the database log, which will be output in the server logs and captured in system tables queryable by SQL
|
||||
|
||||
### Line Builder
|
||||
|
||||
The `LineBuilder` is a simple API for building lines of Line Protocol to write into the database. Writes are buffered while the plugin runs and are flushed when the plugin completes. The `LineBuilder` API is available in all plugin types. Here are some examples of using the `LineBuilder` API:
|
||||
|
||||
```python
|
||||
line = LineBuilder("weather")
|
||||
.tag("location", "us-midwest")
|
||||
.float64_field("temperature", 82.5)
|
||||
.time_ns(1627680000000000000)
|
||||
influxdb3_local.write(line)
|
||||
|
||||
# to output it as a string: "weather,location=us-midwest temperature=82.5 1627680000000000000"
|
||||
line_str = line.build()
|
||||
|
||||
# or build incrementally
|
||||
line = LineBuilder("weather")
|
||||
line.tag("location", "us-midwest")
|
||||
line.float64_field("temperature", 82.5)
|
||||
line.time_ns(1627680000000000000)
|
||||
influxdb3_local.write(line)
|
||||
```
|
||||
|
||||
Here is the Python implementation of the `LineBuilder` API:
|
||||
|
||||
```python
|
||||
from typing import Optional
|
||||
from collections import OrderedDict
|
||||
|
||||
class InfluxDBError(Exception):
|
||||
"""Base exception for InfluxDB-related errors"""
|
||||
pass
|
||||
|
||||
class InvalidMeasurementError(InfluxDBError):
|
||||
"""Raised when measurement name is invalid"""
|
||||
pass
|
||||
|
||||
class InvalidKeyError(InfluxDBError):
|
||||
"""Raised when a tag or field key is invalid"""
|
||||
pass
|
||||
|
||||
class InvalidLineError(InfluxDBError):
|
||||
"""Raised when a line protocol string is invalid"""
|
||||
pass
|
||||
|
||||
class LineBuilder:
|
||||
def __init__(self, measurement: str):
|
||||
if ' ' in measurement:
|
||||
raise InvalidMeasurementError("Measurement name cannot contain spaces")
|
||||
self.measurement = measurement
|
||||
self.tags: OrderedDict[str, str] = OrderedDict()
|
||||
self.fields: OrderedDict[str, str] = OrderedDict()
|
||||
self._timestamp_ns: Optional[int] = None
|
||||
|
||||
def _validate_key(self, key: str, key_type: str) -> None:
|
||||
"""Validate that a key does not contain spaces, commas, or equals signs."""
|
||||
if not key:
|
||||
raise InvalidKeyError(f"{key_type} key cannot be empty")
|
||||
if ' ' in key:
|
||||
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
|
||||
if ',' in key:
|
||||
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
|
||||
if '=' in key:
|
||||
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")
|
||||
|
||||
def tag(self, key: str, value: str) -> 'LineBuilder':
|
||||
"""Add a tag to the line protocol."""
|
||||
self._validate_key(key, "tag")
|
||||
self.tags[key] = str(value)
|
||||
return self
|
||||
|
||||
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
|
||||
"""Add an unsigned integer field to the line protocol."""
|
||||
self._validate_key(key, "field")
|
||||
if value < 0:
|
||||
raise ValueError(f"uint64 field '{key}' cannot be negative")
|
||||
self.fields[key] = f"{value}u"
|
||||
return self
|
||||
|
||||
def int64_field(self, key: str, value: int) -> 'LineBuilder':
|
||||
"""Add an integer field to the line protocol."""
|
||||
self._validate_key(key, "field")
|
||||
self.fields[key] = f"{value}i"
|
||||
return self
|
||||
|
||||
def float64_field(self, key: str, value: float) -> 'LineBuilder':
|
||||
"""Add a float field to the line protocol."""
|
||||
self._validate_key(key, "field")
|
||||
# Check if value has no decimal component
|
||||
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
|
||||
return self
|
||||
|
||||
def string_field(self, key: str, value: str) -> 'LineBuilder':
|
||||
"""Add a string field to the line protocol."""
|
||||
self._validate_key(key, "field")
|
||||
# Escape quotes and backslashes in string values
|
||||
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
|
||||
self.fields[key] = f'"{escaped_value}"'
|
||||
return self
|
||||
|
||||
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
|
||||
"""Add a boolean field to the line protocol."""
|
||||
self._validate_key(key, "field")
|
||||
self.fields[key] = 't' if value else 'f'
|
||||
return self
|
||||
|
||||
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
|
||||
"""Set the timestamp in nanoseconds."""
|
||||
self._timestamp_ns = timestamp_ns
|
||||
return self
|
||||
|
||||
def build(self) -> str:
|
||||
"""Build the line protocol string."""
|
||||
# Start with measurement name (escape commas only)
|
||||
line = self.measurement.replace(',', '\\,')
|
||||
|
||||
# Add tags if present
|
||||
if self.tags:
|
||||
tags_str = ','.join(
|
||||
f"{k}={v}" for k, v in self.tags.items()
|
||||
)
|
||||
line += f",{tags_str}"
|
||||
|
||||
# Add fields (required)
|
||||
if not self.fields:
|
||||
raise InvalidLineError(f"At least one field is required: {line}")
|
||||
|
||||
fields_str = ','.join(
|
||||
f"{k}={v}" for k, v in self.fields.items()
|
||||
)
|
||||
line += f" {fields_str}"
|
||||
|
||||
# Add timestamp if present
|
||||
if self._timestamp_ns is not None:
|
||||
line += f" {self._timestamp_ns}"
|
||||
|
||||
return line
|
||||
```
|
||||
|
||||
### Query
|
||||
The `query` function on the API will execute a SQL query with optional parameters (through a parameterized query) and return the results as a `List` of `Dict[String, Any]` where the key is the column name and the value is the value for that column. The `query` function is available in all plugin types.
|
||||
|
||||
Some examples:
|
||||
|
||||
```python
|
||||
influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - 'interval 1 hour'")
|
||||
|
||||
# or using parameterized queries
|
||||
args = {"bar": "baz"}
|
||||
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - 'interval 1 hour'", args)
|
||||
```
|
||||
|
||||
### Logging
|
||||
The `info`, `warn`, and `error` functions on the API will log messages to the database log, which will be output in the server logs and captured in system tables queryable by SQL. The `info`, `warn`, and `error` functions are available in all plugin types. The functions take an arbitrary number of arguments and will convert them to strings and join them into a single message separated by a space. Examples:
|
||||
|
||||
```python
|
||||
ifluxdb3_local.info("This is an info message")
|
||||
influxdb3_local.warn("This is a warning message")
|
||||
influxdb3_local.error("This is an error message")
|
||||
|
||||
obj_to_log = {"hello": "world"}
|
||||
influxdb3_local.info("This is an info message with an object", obj_to_log)
|
||||
```
|
||||
|
||||
### Trigger arguments
|
||||
Every plugin type can receive arguments from the configuration of the trigger. This is useful for passing configuration to the plugin. This can drive behavior like things to monitor for or it could be connection information to third party services that the plugin will interact with. The arguments are passed as a `Dict` of `String` to `String` where the key is the argument name and the value is the argument value. Here's an example of how to use arguments in a plugin:
|
||||
|
||||
```python
|
||||
def process_writes(influxdb3_local, table_batches, args=None):
|
||||
if args and "threshold" in args:
|
||||
threshold = int(args["threshold"])
|
||||
influxdb3_local.info(f"Threshold is {threshold}")
|
||||
else:
|
||||
influxdb3_local.warn("No threshold provided")
|
||||
```
|
||||
|
||||
The `args` parameter is optional and can be omitted from the trigger definitions if the plugin does not need to use arguments.
|
||||
|
||||
## Imports
|
||||
The Python plugins run using the system Python in the Docker container. Pip is installed in the container and can be used to install any dependencies.
|
||||
You will need to start up the server with the `PYTHONPATH` set to the location of your site packages for your virtual environment. For example: `PYTHONPATH=myenvl/lib/python3.13/site-packages`
|
||||
|
||||
## WAL Flush Plugin
|
||||
When a WAL flush plugin is triggered, the plugin will receive a list of `table_batches` that have matched the plugin trigger (either all tables in the database or a specific table). Here's an example of a simple WAL flush plugin
|
||||
|
||||
```python
|
||||
def process_writes(influxdb3_local, table_batches, args=None):
|
||||
for table_batch in table_batches:
|
||||
# Skip if table_name is write_reports
|
||||
if table_batch["table_name"] == "write_reports":
|
||||
continue
|
||||
|
||||
row_count = len(table_batch["rows"])
|
||||
|
||||
# Double row count if table name matches args table_name
|
||||
if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
|
||||
row_count *= 2
|
||||
|
||||
line = LineBuilder("write_reports")\
|
||||
.tag("table_name", table_batch["table_name"])\
|
||||
.int64_field("row_count", row_count)
|
||||
influxdb3_local.write(line)
|
||||
|
||||
influxdb3_local.info("wal_plugin.py done")
|
||||
```
|
||||
|
||||
### WAL Flush Trigger Configuration
|
||||
|
||||
Every trigger is associated with a specific database. The best reference for the arguments for trigger definition can be accessed through the CLI help:
|
||||
|
||||
```shell
|
||||
influxdb3 create trigger help
|
||||
```
|
||||
|
||||
For the WAL plugin, the `trigger-spec` can be either `all-tables` which will trigger on any write to the assoicated database or `table:<table_name>` which will call the `process_writes` function only with the writes for the given table. The `args` parameter can be used to pass configuration to the plugin.
|
||||
|
||||
## Schedule Plugin
|
||||
Schedule plugins run on a schedule specified in cron syntax. The plugin will receive the local API, the time of the trigger, and any arguments passed in the trigger definition. Here's an example of a simple schedule plugin:
|
||||
|
||||
```python
|
||||
# see if a table has been written to in the last 5 minutes
|
||||
def process_scheduled_call(influxdb3_local, time, args=None):
|
||||
if args and "table_name" in args:
|
||||
table_name = args["table_name"]
|
||||
result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
|
||||
# write an error log if the result is empty
|
||||
if not result:
|
||||
influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
|
||||
else:
|
||||
influxdb3_local.error("No table_name provided for schedule plugin")
|
||||
```
|
||||
|
||||
### Schedule Trigger Configuration
|
||||
Schedule plugins are set with a `trigger-spec` of `schedule:<cron_expression>`. The `args` parameter can be used to pass configuration to the plugin. For example, if we wanted the above plugin to run the check every minute, we would use `schedule:*/5 * * * *` as the `trigger-spec`.
|
||||
|
||||
## On Request Plugin
|
||||
On Request plugins are triggered by a request to a specific endpoint under `/api/v3/engine`. The plugin will receive the local API, query parameters `Dict[str, str]`, request headers `Dict[str, str]`, request body (as bytes), and any arguments passed in the trigger definition. Here's an example of a simple On Request plugin:
|
||||
|
||||
```python
|
||||
import json
|
||||
|
||||
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
|
||||
for k, v in query_parameters.items():
|
||||
influxdb3_local.info(f"query_parameters: {k}={v}")
|
||||
for k, v in request_headers.items():
|
||||
influxdb3_local.info(f"request_headers: {k}={v}")
|
||||
|
||||
request_data = json.loads(request_body)
|
||||
|
||||
influxdb3_local.info("parsed JSON request body:", request_data)
|
||||
|
||||
# write the data to the database
|
||||
line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
|
||||
# get a string of the line to return as the body
|
||||
line_str = line.build()
|
||||
|
||||
influxdb3_local.write(line)
|
||||
|
||||
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "ok", "line": line_str})
|
||||
```
|
||||
|
||||
### On Request Trigger Configuration
|
||||
On Request plugins are set with a `trigger-spec` of `request:<endpoint>`. The `args` parameter can be used to pass configuration to the plugin. For example, if we wanted the above plugin to run on the endpoint `/api/v3/engine/my_plugin`, we would use `request:my_plugin` as the `trigger-spec`.
|
||||
|
||||
Trigger specs must be unique across all configured plugins, regardless of which database they are tied to, given the path is the same.
|
Loading…
Reference in New Issue