Merge pull request #6217 from influxdata/jts-triggers
Processing engine examples and cleanuppull/6219/head
commit
2e3ec87501
|
|
@ -15,7 +15,9 @@ The plugin API lets you:
|
|||
|
||||
## Get started with the shared API
|
||||
|
||||
Each plugin automatically has access to the shared API through the `influxdb3_local` object. You don’t need to import any libraries. The API becomes available as soon as your plugin runs.
|
||||
Each plugin automatically has access to the shared API through the `influxdb3_local` object. You don't need to import any libraries. The API becomes available as soon as your plugin runs.
|
||||
|
||||
If your plugin requires third-party Python packages (like `pandas`, `requests`, or `influxdb3-python`), see [Manage plugin dependencies](/influxdb3/version/plugins/#manage-plugin-dependencies) for installation instructions.
|
||||
|
||||
## Write data
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ Once you have all the prerequisites in place, follow these steps to implement th
|
|||
- [Set up the Processing Engine](#set-up-the-processing-engine)
|
||||
- [Add a Processing Engine plugin](#add-a-processing-engine-plugin)
|
||||
- [Set up a trigger](#set-up-a-trigger)
|
||||
- [Advanced trigger configuration](#advanced-trigger-configuration)
|
||||
- [Manage plugin dependencies](#manage-plugin-dependencies)
|
||||
{{% show-in "enterprise" %}}
|
||||
- [Distributed cluster considerations](#distributed-cluster-considerations)
|
||||
{{% /show-in %}}
|
||||
|
|
@ -99,16 +99,11 @@ Visit the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3
|
|||
|
||||
#### Add example plugins
|
||||
|
||||
You can either copy a plugin or retrieve it directly from the repository:
|
||||
You have two options for using example plugins:
|
||||
|
||||
{{< code-tabs-wrapper >}}
|
||||
##### Option 1: Copy plugins locally
|
||||
|
||||
{{% code-tabs %}}
|
||||
[Copy locally](#)
|
||||
[Fetch via gh:](#)
|
||||
{{% /code-tabs %}}
|
||||
|
||||
{{% code-tab-content %}}
|
||||
Clone the `influxdata/influxdb3_plugins` repository and copy plugins to your configured plugin directory:
|
||||
|
||||
```bash
|
||||
# Clone the repository
|
||||
|
|
@ -117,23 +112,25 @@ git clone https://github.com/influxdata/influxdb3_plugins.git
|
|||
# Copy a plugin to your configured plugin directory
|
||||
cp influxdb3_plugins/examples/schedule/system_metrics/system_metrics.py /path/to/plugins/
|
||||
```
|
||||
{{% /code-tab-content %}}
|
||||
|
||||
{{% code-tab-content %}}
|
||||
|
||||
##### Option 2: Reference plugins directly from GitHub
|
||||
|
||||
Skip downloading plugins by referencing them directly from GitHub using the `gh:` prefix:
|
||||
|
||||
```bash
|
||||
# To retrieve and use a plugin directly from GitHub,
|
||||
# use the `gh:` prefix in the plugin filename:
|
||||
# Create a trigger using a plugin from GitHub
|
||||
influxdb3 create trigger \
|
||||
--trigger-spec "every:1m" \
|
||||
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
|
||||
--database my_database \
|
||||
system_metrics
|
||||
--trigger-spec "every:1m" \
|
||||
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
|
||||
--database my_database \
|
||||
system_metrics
|
||||
```
|
||||
|
||||
{{% /code-tab-content %}}
|
||||
This approach:
|
||||
|
||||
{{< /code-tabs-wrapper >}}
|
||||
- Ensures you're using the latest version
|
||||
- Simplifies updates and maintenance
|
||||
- Reduces local storage requirements
|
||||
|
||||
Plugins have various functions such as:
|
||||
|
||||
|
|
@ -322,6 +319,53 @@ The trigger runs when the database flushes ingested data for the specified table
|
|||
|
||||
The plugin receives the written data and table information.
|
||||
|
||||
#### Trigger on data writes with table exclusion
|
||||
|
||||
If you want to use a single trigger for all tables but exclude specific tables,
|
||||
you can use trigger arguments and your plugin code to filter out unwanted tables--for example:
|
||||
|
||||
{{% code-placeholders "DATABASE_NAME|AUTH_TOKEN" %}}
|
||||
```bash
|
||||
influxdb3 create trigger \
|
||||
--database DATABASE_NAME \
|
||||
--token AUTH_TOKEN \
|
||||
--plugin-filename processor.py \
|
||||
--trigger-spec "all_tables" \
|
||||
--trigger-arguments "exclude_tables=temp_data,debug_info,system_logs" \
|
||||
data_processor
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
Replace the following:
|
||||
|
||||
- {{% code-placeholder-key %}}DATABASE_NAME{{% /code-placeholder-key %}}: the name of the database
|
||||
- {{% code-placeholder-key %}}AUTH_TOKEN{{% /code-placeholder-key %}}: your {{% token-link "database" %}}{{% show-in
|
||||
"enterprise" %}} with write permissions on the specified database{{% /show-in %}}
|
||||
|
||||
Then, in your plugin:
|
||||
|
||||
```python
|
||||
# processor.py
|
||||
def on_write(self, database, table_name, batch):
|
||||
# Get excluded tables from trigger arguments
|
||||
excluded_tables = set(self.args.get('exclude_tables', '').split(','))
|
||||
|
||||
if table_name in excluded_tables:
|
||||
return
|
||||
|
||||
# Process allowed tables
|
||||
self.process_data(database, table_name, batch)
|
||||
```
|
||||
|
||||
##### Recommendations
|
||||
|
||||
- **Early return**: Check exclusions as early as possible in your plugin.
|
||||
- **Efficient lookups**: Use sets for O(1) lookup performance with large exclusion lists.
|
||||
- **Performance**: Log skipped tables for debugging but avoid excessive logging in production.
|
||||
- **Multiple triggers**: For few tables, consider creating separate table-specific
|
||||
triggers instead of filtering within plugin code.
|
||||
See HTTP API [Processing engine endpoints](/influxdb3/version/api/v3/#tag/Processing-engine) for managing triggers.
|
||||
|
||||
#### Trigger on a schedule
|
||||
|
||||
```bash
|
||||
|
|
@ -442,91 +486,9 @@ influxdb3 create trigger \
|
|||
auto_disable_processor
|
||||
```
|
||||
|
||||
## Advanced trigger configuration
|
||||
## Manage plugin dependencies
|
||||
|
||||
After creating basic triggers, you can enhance your plugins with these advanced features:
|
||||
|
||||
### Access community plugins from GitHub
|
||||
|
||||
Skip downloading plugins by referencing them directly from GitHub:
|
||||
|
||||
```bash
|
||||
# Create a trigger using a plugin from GitHub
|
||||
influxdb3 create trigger \
|
||||
--trigger-spec "every:1m" \
|
||||
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
|
||||
--database my_database \
|
||||
system_metrics
|
||||
```
|
||||
|
||||
This approach:
|
||||
|
||||
- Ensures you're using the latest version
|
||||
- Simplifies updates and maintenance
|
||||
- Reduces local storage requirements
|
||||
|
||||
### Configure your triggers
|
||||
|
||||
#### Pass configuration arguments
|
||||
|
||||
Provide runtine configuration to your plugins:
|
||||
|
||||
```bash
|
||||
# Pass threshold and email settings to a plugin
|
||||
Provide runtime configuration to your plugins:
|
||||
--trigger-spec "every:1h" \
|
||||
--plugin-filename "threshold_check.py" \
|
||||
--trigger-arguments threshold=90,notify_email=admin@example.com \
|
||||
--database my_database \
|
||||
threshold_monitor
|
||||
```
|
||||
|
||||
Your plugin accesses these values through the `args` parameter:
|
||||
|
||||
```python
|
||||
def process_scheduled_call(influxdb3_local, call_time, args=None):
|
||||
if args and "threshold" in args:
|
||||
threshold = float(args["threshold"])
|
||||
email = args.get("notify_email", "default@example.com")
|
||||
|
||||
# Use the arguments in your logic
|
||||
influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")
|
||||
```
|
||||
|
||||
#### Set execution mode
|
||||
|
||||
Choose between synchronous (default) or asynchronous execution:
|
||||
|
||||
```bash
|
||||
# Allow multiple trigger instances to run simultaneously
|
||||
influxdb3 create trigger \
|
||||
--trigger-spec "table:metrics" \
|
||||
--plugin-filename "heavy_process.py" \
|
||||
--run-asynchronous \
|
||||
--database my_database \
|
||||
async_processor
|
||||
```
|
||||
|
||||
Use asynchronous execution when:
|
||||
|
||||
- Processing might take longer than the trigger interval
|
||||
- Multiple events need to be handled simultaneously
|
||||
- Performance is more important than sequential execution
|
||||
|
||||
#### Configure error handling
|
||||
|
||||
Control how your trigger responds to errors:
|
||||
```bash
|
||||
# Automatically retry on error
|
||||
influxdb3 create trigger \
|
||||
--trigger-spec "table:important_data" \
|
||||
--plugin-filename "critical_process.py" \
|
||||
--error-behavior retry \
|
||||
--database my_database \
|
||||
critical_processor
|
||||
```
|
||||
|
||||
### Install Python dependencies
|
||||
|
||||
Use the `influxdb3 install package` command to add third-party libraries (like `pandas`, `requests`, or `influxdb3-python`) to your plugin environment.
|
||||
This installs packages into the Processing Engine’s embedded Python environment to ensure compatibility with your InfluxDB instance.
|
||||
|
|
|
|||
Loading…
Reference in New Issue