Merge pull request #5923 from influxdata/jts/5917-processing-engine-trigger-settings-formatting

Jts/5917 revise processing engine guide, misc fixes
pull/5881/head^2
Jason Stirnaman 2025-03-24 11:21:18 -05:00 committed by GitHub
commit 253c16ab75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 622 additions and 622 deletions

View File

@ -59,7 +59,7 @@ GitHub Copilot should help document InfluxData products by creating clear, accur
- Follow Google Developer Documentation style guidelines
- For API references, follow YouTube Data API style
- Use semantic line feeds (one sentence per line)
- Use only h2-h6 headings in content (h1 comes from frontmatter)
- Use only h2-h6 headings in content (h1 comes from frontmatter title properties)
- Use sentence case for headings
- Use GitHub callout syntax
- Image naming: `project/version-context-description.png`

View File

@ -7,14 +7,13 @@ menu:
influxdb3_core:
name: Processing engine and Python plugins
weight: 4
influxdb3/core/tags: []
influxdb3/core/tags: [processing engine, python]
related:
- /influxdb3/core/reference/cli/influxdb3/test/wal_plugin/
- /influxdb3/core/reference/cli/influxdb3/create/plugin/
- /influxdb3/core/reference/cli/influxdb3/create/trigger/
source: /shared/v3-core-plugins/_index.md
---
<!--
The content of this page is at /shared/v3-core-plugins/_index.md
//SOURCE - content/shared/v3-core-plugins/_index.md
-->

View File

@ -8,9 +8,10 @@ menu:
parent: influxdb3 create
name: influxdb3 create trigger
weight: 400
alias: /influxdb3/core/cli/influxdb3/create/plugin/
source: /shared/influxdb3-cli/create/trigger.md
---
<!--
The content of this file is at content/shared/influxdb3-cli/create/trigger.md
//SOURCE - content/shared/influxdb3-cli/create/trigger.md
-->

View File

@ -7,14 +7,13 @@ menu:
influxdb3_enterprise:
name: Processing Engine and Python plugins
weight: 4
influxdb3/core/tags: []
influxdb3/enterprise/tags: [processing engine, python]
related:
- /influxdb3/enterprise/reference/cli/influxdb3/test/wal_plugin/
- /influxdb3/enterprise/reference/cli/influxdb3/create/plugin/
- /influxdb3/enterprise/reference/cli/influxdb3/create/trigger/
source: /shared/v3-core-plugins/_index.md
---
<!--
The content of this page is at /shared/v3-core-plugins/_index.md
//SOURCE - content/shared/v3-core-plugins/_index.md
-->

View File

@ -8,9 +8,10 @@ menu:
parent: influxdb3 create
name: influxdb3 create trigger
weight: 400
alias: /influxdb3/enterprise/cli/influxdb3/create/plugin/
source: /shared/influxdb3-cli/create/trigger.md
---
<!--
The content of this file is at content/shared/influxdb3-cli/create/trigger.md
//SOURCE - content/shared/influxdb3-cli/create/trigger.md
-->

View File

@ -156,15 +156,17 @@ The following examples show how to start InfluxDB 3 with different object store
```bash
# Memory object store
# Stores data in RAM; doesn't persist data
influxdb3 serve --node-id=host01 --object-store=memory
influxdb3 serve \
--node-id host01 \
--object-store memory
```
```bash
# Filesystem object store
# Provide the filesystem directory
influxdb3 serve \
--node-id=host01 \
--object-store=file \
--node-id host01 \
--object-store file \
--data-dir ~/.influxdb3
```
@ -196,11 +198,11 @@ docker run -it \
# S3 object store (default is the us-east-1 region)
# Specify the Object store type and associated options
influxdb3 serve \
--node-id=host01 \
--object-store=s3 \
--bucket=BUCKET \
--aws-access-key=AWS_ACCESS_KEY \
--aws-secret-access-key=AWS_SECRET_ACCESS_KEY
--node-id host01 \
--object-store s3 \
--bucket BUCKET \
--aws-access-key AWS_ACCESS_KEY_ID \
--aws-secret-access-key AWS_SECRET_ACCESS_KEY
```
```bash
@ -208,12 +210,12 @@ influxdb3 serve \
# (using the AWS S3 API with additional parameters)
# Specify the object store type and associated options
influxdb3 serve \
--node-id=host01 \
--object-store=s3 \
--bucket=BUCKET \
--aws-access-key=AWS_ACCESS_KEY \
--aws-secret-access-key=AWS_SECRET_ACCESS_KEY \
--aws-endpoint=ENDPOINT \
--node-id host01 \
--object-store s3 \
--bucket BUCKET \
--aws-access-key-id AWS_ACCESS_KEY_ID \
--aws-secret-access-key AWS_SECRET_ACCESS_KEY \
--aws-endpoint ENDPOINT \
--aws-allow-http
```
@ -314,7 +316,7 @@ cpu,host=Alpha,region=us-west,application=webserver val=6i,usage_percent=25.3,st
If you save the preceding line protocol to a file (for example, `server_data`), then you can use the `influxdb3` CLI to write the data--for example:
```bash
influxdb3 write --database=mydb --file=server_data
influxdb3 write --database mydb --file server_data
```
##### Example: write data using the /api/v3 HTTP API
@ -420,7 +422,11 @@ curl "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&no_sync=tr
The `no_sync` CLI option controls when writes are acknowledged--for example:
```bash
influxdb3 write --bucket=mydb --org=my_org --token=my-token --no-sync
influxdb3 write \
--bucket mydb \
--org my_org \
--token my-token \
--no-sync
```
### Create a database or table
@ -466,7 +472,7 @@ The `query` subcommand includes options to help ensure that the right database i
#### Example: query `“SHOW TABLES”` on the `servers` database:
```console
$ influxdb3 query --database=servers "SHOW TABLES"
$ influxdb3 query --database servers "SHOW TABLES"
+---------------+--------------------+--------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+--------------+------------+
@ -482,7 +488,7 @@ $ influxdb3 query --database=servers "SHOW TABLES"
#### Example: query the `cpu` table, limiting to 10 rows:
```console
$ influxdb3 query --database=servers "SELECT DISTINCT usage_percent, time FROM cpu LIMIT 10"
$ influxdb3 query --database servers "SELECT DISTINCT usage_percent, time FROM cpu LIMIT 10"
+---------------+---------------------+
| usage_percent | time |
+---------------+---------------------+
@ -506,7 +512,10 @@ $ influxdb3 query --database=servers "SELECT DISTINCT usage_percent, time FROM c
To query using InfluxQL, enter the `influxdb3 query` subcommand and specify `influxql` in the language option--for example:
```bash
influxdb3 query --database=servers --language=influxql "SELECT DISTINCT usage_percent FROM cpu WHERE time >= now() - 1d"
influxdb3 query \
--database servers \
--language influxql \
"SELECT DISTINCT usage_percent FROM cpu WHERE time >= now() - 1d"
```
### Query using the API
@ -617,11 +626,11 @@ The following command creates a last value cache named `cpuCache`:
```bash
influxdb3 create last_cache \
--database=servers \
--table=cpu \
--key-columns=host,application \
--value-columns=usage_percent,status \
--count=5 cpuCache
--database servers \
--table cpu \
--key-columns host,application \
--value-columns usage_percent,status \
--count 5 cpuCache
```
_You can create a last values cache per time series, but be mindful of high cardinality tables that could take excessive memory._
@ -632,7 +641,7 @@ To use the LVC, call it using the `last_cache()` function in your query--for exa
```bash
influxdb3 query \
--database=servers \
--database servers \
"SELECT * FROM last_cache('cpu', 'cpuCache') WHERE host = 'Bravo';"
```
@ -647,8 +656,8 @@ Use the `influxdb3` CLI to [delete a last values cache](/influxdb3/version/refer
```bash
influxdb3 delete last_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--cache-name <CACHE_NAME>
```
@ -660,8 +669,8 @@ You can use the `influxdb3` CLI to [create a distinct values cache](/influxdb3/v
```bash
influxdb3 create distinct_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--columns <COLUMNS> \
[CACHE_NAME]
```
@ -680,9 +689,9 @@ The following command creates a distinct values cache named `cpuDistinctCache`:
```bash
influxdb3 create distinct_cache \
--database=servers \
--table=cpu \
--columns=host,application \
--database servers \
--table cpu \
--columns host,application \
cpuDistinctCache
```
@ -692,16 +701,14 @@ To use the distinct values cache, call it using the `distinct_cache()` function
```bash
influxdb3 query \
--database=servers \
--database servers \
"SELECT * FROM distinct_cache('cpu', 'cpuDistinctCache')"
```
> [!Note]
> #### Only works with SQL
>
> The Distinct cache only works with SQL, not InfluxQL; SQL is the default language.
> The distinct cache only works with SQL, not InfluxQL; SQL is the default language.
#### Delete a distinct values cache
@ -709,8 +716,8 @@ Use the `influxdb3` CLI to [delete a distinct values cache](/influxdb3/version/r
```bash
influxdb3 delete distinct_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--cache-name <CACHE_NAME>
```
@ -736,12 +743,12 @@ InfluxDB 3 provides the following types of triggers, each with specific trigger-
- **On WAL flush**: Sends a batch of written data (for a specific table or all tables) to a plugin (by default, every second).
- **On Schedule**: Executes a plugin on a user-configured schedule (using a crontab or a duration); useful for data collection and deadman monitoring.
- **On Request**: Binds a plugin to a custom HTTP API endpoint at `/api/v3/engine/<ENDPOINT>`.
- **On Request**: Binds a plugin to a custom HTTP API endpoint at `/api/v3/engine/<ENDPOINT_PATH>`.
The plugin receives the HTTP request headers and content, and can then parse, process, and send the data into the database or to third-party services.
### Test, create, and trigger plugin code
##### Example: Python plugin for WAL flush
##### Example: Python plugin for WAL rows
```python
# This is the basic structure for Python plugin code that runs in the
@ -827,7 +834,7 @@ To test a plugin, do the following:
1. Create a _plugin directory_--for example, `/path/to/.influxdb/plugins`
2. [Start the InfluxDB server](#start-influxdb) and include the `--plugin-dir <PATH>` option.
3. Save the [preceding example code](#example-python-plugin) to a plugin file inside of the plugin directory. If you haven't yet written data to the table in the example, comment out the lines where it queries.
3. Save the [example plugin code](#example-python-plugin-for-wal-flush) to a plugin file inside of the plugin directory. If you haven't yet written data to the table in the example, comment out the lines where it queries.
4. To run the test, enter the following command with the following options:
- `--lp` or `--file`: The line protocol to test
@ -863,9 +870,9 @@ trigger:
# - A Python plugin file named `test.py`
# Test a plugin
influxdb3 test wal_plugin \
--lp="my_measure,tag1=asdf f1=1.0 123" \
-d mydb \
--input-arguments="arg1=hello,arg2=world" \
--lp "my_measure,tag1=asdf f1=1.0 123" \
--database mydb \
--input-arguments "arg1=hello,arg2=world" \
test.py
```
@ -873,9 +880,9 @@ influxdb3 test wal_plugin \
# Create a trigger that runs the plugin
influxdb3 create trigger \
-d mydb \
--plugin=test_plugin \
--trigger-spec="table:foo" \
--trigger-arguments="arg1=hello,arg2=world" \
--plugin test_plugin \
--trigger-spec "table:foo" \
--trigger-arguments "arg1=hello,arg2=world" \
trigger1
```
@ -885,3 +892,5 @@ enable the trigger and have it run the plugin as you write data:
```bash
influxdb3 enable trigger --database mydb trigger1
```
For more information, see [Python plugins and the Processing engine](/influxdb3/version/plugins/).

View File

@ -1,81 +1,356 @@
Use the InfluxDB 3 Processing engine to run Python code directly in your
{{% product-name %}} database to automatically process data and respond to database events.
Use the {{% product-name %}} Processing engine to run code and perform tasks
for different database events.
The Processing engine is an embedded Python VM that runs inside your InfluxDB 3 database and lets you:
{{% product-name %}} provides the InfluxDB 3 Processing engine, an embedded Python VM that can dynamically load and trigger Python plugins
in response to events in your database.
- Process data as it's written to the database
- Run code on a schedule
- Create API endpoints that execute Python code
- Maintain state between executions with an in-memory cache
## Key Concepts
Learn how to create, configure, run, and extend Python plugins that execute when specific events occur.
### Plugins
1. [Set up the Processing engine](#set-up-the-processing-engine)
2. [Add a Processing engine plugin](#add-a-processing-engine-plugin)
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
3. [Create a trigger to run a plugin](#create-a-trigger-to-run-a-plugin)
- [Create a trigger for data writes](#create-a-trigger-for-data-writes)
- [Create a trigger for scheduled events](#create-a-trigger-for-scheduled-events)
- [Create a trigger for HTTP requests](#create-a-trigger-for-http-requests)
- [Use community plugins from GitHub](#use-community-plugins-from-github)
- [Pass arguments to plugins](#pass-arguments-to-plugins)
- [Control trigger execution](#control-trigger-execution)
- [Configure error handling for a trigger](#configure-error-handling-for-a-trigger)
- [Extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management)
- [Install Python dependencies](#install-python-dependencies)
A Processing engine _plugin_ is Python code you provide to run tasks, such as
downsampling data, monitoring, creating alerts, or calling external services.
## Set up the Processing engine
> [!Note]
> #### Contribute and use community plugins
>
> [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) is a public repository on GitHub where you can find
> and contribute example plugins.
> You can reference plugins from the repository directly within a trigger configuration.
### Triggers
A _trigger_ is an InfluxDB 3 resource you create to associate a database
event (for example, a WAL flush) with the plugin that should run.
When an event occurs, the trigger passes configuration details, optional arguments, and event data to the plugin.
The Processing engine provides four types of triggers--each type corresponds to
an event type with event-specific configuration to let you handle events with targeted logic.
- **WAL Flush**: Triggered when the write-ahead log (WAL) is flushed to the object store (default is every second).
- **Scheduled Tasks**: Triggered on a schedule you specify using cron syntax.
- **On-request**: Triggered on a GET or POST request to the bound HTTP API endpoint at `/api/v3/engine/<CUSTOM_PATH>`.
<!--
- **Parquet Persistence (coming soon)**: Triggered when InfluxDB 3 persists data to object storage Parquet files.
-->
### Activate the Processing engine
To enable the Processing engine, start the {{% product-name %}} server with the
`--plugin-dir` option and a path to your plugins directory.
If the directory doesnt exist, the server creates it.
To enable the Processing engine, start your InfluxDB server with the `--plugin-dir` option:
```bash
influxdb3 serve \
--node-id node0 \
--object-store [OBJECT STORE TYPE]\
--plugin-dir /path/to/plugins
--node-id node0 \
--object-store [OBJECT_STORE_TYPE] \
--plugin-dir /path/to/plugins
```
## Shared API
## Add a Processing engine plugin
All plugin types provide the InfluxDB 3 _shared API_ for interacting with the database.
The shared API provides access to the following:
A plugin is a Python file that contains a specific function signature that corresponds to a trigger type.
Plugins:
- `LineBuilder` to create Line Protocol lines for writing to the database
- `query` to query data from any database
- `info`, `warn`, and `error` to log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL
- Receive plugin-specific arguments (such as written data, call time, or an HTTP request)
- Can receive keyword arguments (as `args`) from _trigger arguments_
- Can access the `influxdb3_local` shared API for writing, querying, and managing state
### LineBuilder
Get started using example plugins or create your own:
The `LineBuilder` is a simple API for building lines of Line Protocol to write into the database. Writes are buffered while the plugin runs and are flushed when the plugin completes. The `LineBuilder` API is available in all plugin types.
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
The following example shows how to use the `LineBuilder` API:
### Get example plugins
InfluxData maintains a repository of contributed plugins that you can use as-is or as a starting point for your own plugin.
#### From local files
You can copy example plugins from the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to your local plugin directory:
```bash
# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git
# Copy example plugins to your plugin directory
cp -r influxdb3_plugins/examples/write/* /path/to/plugins/
```
#### Directly from GitHub
You can use plugins directly from GitHub without downloading them first by using the `gh:` prefix in the plugin filename:
```bash
# Use a plugin directly from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
> [!Note]
> #### Find and contribute plugins
>
> The plugins repository includes examples for various use cases:
>
> - **Data transformation**: Process and transform incoming data
> - **Alerting**: Send notifications based on data thresholds
> - **Aggregation**: Calculate statistics on time series data
> - **Integration**: Connect to external services and APIs
> - **System monitoring**: Track resource usage and health metrics
>
> Visit [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins)
> to browse available plugins or contribute your own.
### Create a plugin
1. Create a `.py` file in your plugins directory
2. Define a function with one of the following signatures:
#### For data write events
```python
# Build line protocol incrementally
def process_writes(influxdb3_local, table_batches, args=None):
# Process data as it's written to the database
for table_batch in table_batches:
table_name = table_batch["table_name"]
rows = table_batch["rows"]
# Log information about the write
influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
# Write derived data back to the database
line = LineBuilder("processed_data")
line.tag("source_table", table_name)
line.int64_field("row_count", len(rows))
influxdb3_local.write(line)
```
#### For scheduled events
```python
def process_scheduled_call(influxdb3_local, call_time, args=None):
# Run code on a schedule
# Query recent data
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Process the results
if results:
influxdb3_local.info(f"Found {len(results)} recent metrics")
else:
influxdb3_local.warn("No recent metrics found")
```
#### For HTTP requests
```python
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Handle HTTP requests to a custom endpoint
# Log the request parameters
influxdb3_local.info(f"Received request with parameters: {query_parameters}")
# Process the request body
if request_body:
import json
data = json.loads(request_body)
influxdb3_local.info(f"Request data: {data}")
# Return a response (automatically converted to JSON)
return {"status": "success", "message": "Request processed"}
```
After adding your plugin, you can [install Python dependencies](#install-python-dependencies) or learn how to [extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management).
## Create a trigger to run a plugin
A trigger connects your plugin to a specific database event.
The plugin function signature in your plugin file determines which _trigger specification_
you can choose for configuring and activating your plugin.
Create a trigger with the `influxdb3 create trigger` command.
> [!Note]
> When specifying a local plugin file, the `--plugin-filename` parameter
> _is relative to_ the `--plugin-dir` configured for the server.
> You don't need to provide an absolute path.
### Create a trigger for data writes
Use the `table:<TABLE_NAME>` or the `all_tables` trigger specification to configure
and run a [plugin for data write events](#for-data-write-events)--for example:
```bash
# Trigger on writes to a specific table
# The plugin file must be in your configured plugin directory
influxdb3 create trigger \
--trigger-spec "table:sensor_data" \
--plugin-filename "process_sensors.py" \
--database my_database \
sensor_processor
# Trigger on writes to all tables
influxdb3 create trigger \
--trigger-spec "all_tables" \
--plugin-filename "process_all_data.py" \
--database my_database \
all_data_processor
```
The trigger runs when the database flushes ingested data for the specified tables
to the Write-Ahead Log (WAL) in the Object store (default is every second).
The plugin receives the written data and table information.
### Create a trigger for scheduled events
Use the `every:<DURATION>` or the `cron:<CRONTAB_EXPRESSION>` trigger specification
to configure and run a [plugin for scheduled events](#for-scheduled-events)--for example:
```bash
# Run every 5 minutes
influxdb3 create trigger \
--trigger-spec "every:5m" \
--plugin-filename "hourly_check.py" \
--database my_database \
regular_check
# Run on a cron schedule (8am daily)
influxdb3 create trigger \
--trigger-spec "cron:0 8 * * *" \
--plugin-filename "daily_report.py" \
--database my_database \
daily_report
```
The plugin receives the scheduled call time.
### Create a trigger for HTTP requests
[For an HTTP request plugin], use the `path:<ENDPOINT_PATH>` trigger specification to configure and enable a [plugin for HTTP requests](#for-http-requests)--for example:
```bash
# Create an endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
--trigger-spec "path:webhook" \
--plugin-filename "webhook_handler.py" \
--database my_database \
webhook_processor
```
The trigger makes your endpoint available at `/api/v3/engine/<ENDPOINT_PATH>`.
To run the plugin, send a `GET` or `POST` request to the endpoint--for example:
```bash
curl http://{{% influxdb/host %}}/api/v3/engine/webhook
```
The plugin receives the HTTP request object with methods, headers, and body.
### Use community plugins from GitHub
You can reference plugins directly from the GitHub repository by using the `gh:` prefix:
```bash
# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
### Pass arguments to plugins
Use trigger arguments to pass configuration from a trigger to the plugin it runs. You can use this for:
- Threshold values for monitoring
- Connection properties for external services
- Configuration settings for plugin behavior
```bash
influxdb3 create trigger \
--trigger-spec "every:1h" \
--plugin-filename "threshold_check.py" \
--trigger-arguments threshold=90,notify_email=admin@example.com \
--database my_database \
threshold_monitor
```
The arguments are passed to the plugin as a `Dict[str, str]` where the key is the argument name and the value is the argument value:
```python
def process_scheduled_call(influxdb3_local, call_time, args=None):
if args and "threshold" in args:
threshold = float(args["threshold"])
email = args.get("notify_email", "default@example.com")
# Use the arguments in your logic
influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")
```
### Control trigger execution
By default, triggers run synchronously—each instance waits for previous instances to complete before executing.
To allow multiple instances of the same trigger to run simultaneously, configure triggers to run asynchronously:
```bash
# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
--trigger-spec "table:metrics" \
--plugin-filename "heavy_process.py" \
--run-asynchronous \
--database my_database \
async_processor
```
### Configure error handling for a trigger
To configure error handling behavior for a trigger, use the `--error-behavior <ERROR_BEHAVIOR>` CLI option with one of the following values:
- `log` (default): Log all plugin errors to stdout and the `system.processing_engine_logs` system table.
- `retry`: Attempt to run the plugin again immediately after an error.
- `disable`: Automatically disable the plugin when an error occurs (can be re-enabled later via CLI).
```bash
# Automatically retry on error
influxdb3 create trigger \
--trigger-spec "table:important_data" \
--plugin-filename "critical_process.py" \
--error-behavior retry \
--database my_database \
critical_processor
# Disable the trigger on error
influxdb3 create trigger \
--trigger-spec "path:webhook" \
--plugin-filename "webhook_handler.py" \
--error-behavior disable \
--database my_database \
auto_disable_processor
```
## Extend plugins with API features and state management
The Processing engine includes API capabilities that allow your plugins to
interact with InfluxDB data and maintain state between executions.
These features let you build more sophisticated plugins that can transform, analyze, and respond to data.
### Use the shared API
All plugins have access to the shared API to interact with the database.
#### Write data
Use the `LineBuilder` API to create line protocol data:
```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
influxdb3_local.write(line)
# Output line protocol as a string ("weather,location=us-midwest temperature=82.5 1627680000000000000")
line_str = line.build()
# Write the data to the database
influxdb3_local.write(line)
```
Here is the Python implementation of the `LineBuilder` API:
Writes are buffered while the plugin runs and are flushed when the plugin completes.
{{% expand-wrapper %}}
{{% expand "View the `LineBuilder` Python implementation" %}}
```python
from typing import Optional
@ -190,412 +465,127 @@ class LineBuilder:
return line
```
{{% /expand %}}
{{% /expand-wrapper %}}
### Query
#### Query data
The shared API `query` function executes an SQL query with optional parameters (a [parameterized query](/influxdb3/version/query-data/sql/parameterized-queries/)) and returns results as a `List` of `Dict[String, Any]` where the key is the column name and the value is the column value. The `query` function is available in all plugin types.
The following examples show how to use the `query` function:
Execute SQL queries and get results:
```python
influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - INTERVAL '1 hour'")
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Or using parameterized queries
args = {"bar": "baz"}
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - INTERVAL '1 hour'", args)
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```
### Logging
The shared API `query` function returns results as a `List` of `Dict[String, Any]`, where the key is the column name and the value is the column value.
The shared API `info`, `warn`, and `error` functions log messages to the database log, which is output in the server logs and captured in system tables queryable by SQL.
The `info`, `warn`, and `error` functions are available in all plugin types. Each function accepts multiple arguments, converts them to strings, and logs them as a single, space-separated message.
#### Log information
The following examples show how to use the `info`, `warn`, and `error` logging functions:
The shared API `info`, `warn`, and `error` functions accept multiple arguments,
convert them to strings, and log them as a space-separated message to the database log,
which is output in the server logs and captured in system tables that you can
query using SQL.
Add logging to track plugin execution:
```python
influxdb3_local.info("This is an info message")
influxdb3_local.warn("This is a warning message")
influxdb3_local.error("This is an error message")
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log a message that contains a data object
obj_to_log = {"hello": "world"}
influxdb3_local.info("This is an info message with an object", obj_to_log)
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
```
### Trigger Settings
#### Use the in-memory cache
#### Control trigger execution
The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions.
By default, triggers run synchronously—each instance waits for previous instances to complete before executing.
To allow multiple instances of the same trigger to run simultaneously, configure triggers to run asynchronously:
```bash
# Create an asynchronous trigger
influx create trigger --run-asynchronously
#### Configure error handling
#### Configure error behavior for plugins
The Processing engine logs all plugin errors to stdout and the `system.processing_engine_logs` system table.
To configure additional error handling for a trigger, use the `--error-behavior` flag:
- `--error-behavior retry`: Attempt to run the plugin again immediately after an error
- `--error-behavior disable`: Automatically disable the plugin when an error occurs (can be re-enabled later via CLI)
```bash
# Create a trigger that retries on error
influx create trigger --error-behavior retry
# Create a trigger that disables the plugin on error
influx create trigger --error-behavior disable
This behavior can be changed by specifying the "Error behavior", via the `--error-behavior` flag. Apart from the default `log`, you may set
* `--error-behavior retry` will immediately retry the plugin trigger in the event of error.
* `--error-behavior disable` will turn off the plugin as soon as an error occurs. You can enable it again using the CLI.
### Trigger arguments
A plugin can receive arguments from the trigger that runs it.
You can use this to provide runtime configuration and drive behavior of a plugin—for example:
- threshold values for monitoring
- connection properties for connecting to third-party services
To pass arguments to a plugin, specify trigger arguments in a comma-separated list
of key-value pairs--for example, using the CLI:
```bash
influxdb3 create trigger
--trigger-arguments key1=val1,key2=val2
```
The arguments are passed to the plugin as a `Dict[str, str]` where the key is
the argument name and the value is the argument value--for example:
```python
args = {
"key1": "value1",
"key2": "value2",
}
```
The following example shows how to access and use an argument in a WAL plugin:
```python
def process_writes(influxdb3_local, table_batches, args=None):
if args and "threshold" in args:
threshold = int(args["threshold"])
influxdb3_local.info(f"Threshold is {threshold}")
else:
influxdb3_local.warn("No threshold provided")
```
The `args` parameter is optional. If a plugin doesnt require arguments, you can omit it from the trigger definition.
## Import plugin dependencies
Use the `influxdb3 install` command to download and install Python packages that your plugin depends on.
```bash
influxdb3 install package <PACKAGE_NAME>
```
### Use `influxdb3 install` with Docker
1. Start the server
```bash
docker run \
--name CONTAINER_NAME \
-v /path/to/.influxdb3/data:/data \
-v /path/to/.influxdb3/plugins:/plugins \
quay.io/influxdb/influxdb3-{{< product-key >}}:latest \
serve --node-id=node0 \
--object-store=file \
--data-dir=/data \
--http-bind=localhost:8183 \
--plugin-dir=/plugins
```
2. Use `docker exec` to run the `influxdb3 install` command:
```bash
docker exec -it CONTAINER_NAME influxdb3 install package pandas
```
The result is an active Python virtual environment with the package installed in `<PLUGINS_DIR>/.venv`.
You can specify additional options to install dependencies from a `requirements.txt` file or a custom virtual environment path.
For more information, see the `influxdb3` CLI help:
```bash
influxdb3 install package --help
```
## Configure plugin triggers
Triggers define when and how plugins execute in response to database events. Each trigger type corresponds to a specific event, allowing precise control over automation within {{% product-name %}}.
### WAL flush trigger
When a WAL flush plugin is triggered, the plugin receives a list of `table_batches` filtered by the trigger configuration (either _all tables_ in the database or a specific table).
The following example shows a simple WAL flush plugin:
```python
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
# Skip the batch if table_name is write_reports
if table_batch["table_name"] == "write_reports":
continue
row_count = len(table_batch["rows"])
# Double the row count if table name matches args table_name
if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
row_count *= 2
# Use the LineBuilder API to write data
line = LineBuilder("write_reports")\
.tag("table_name", table_batch["table_name"])\
.int64_field("row_count", row_count)
influxdb3_local.write(line)
influxdb3_local.info("wal_plugin.py done")
```
#### WAL flush trigger configuration
When you create a trigger, you associate it with a database and provide configuration specific to the trigger type.
For a WAL flush trigger you specify a `trigger-spec`, which determines when the plugin is triggered (and what table data it receives):
- `all-tables`: triggers the plugin on every write to the associated database
- `table:<table_name>` triggers the plugin function only for writes to the specified table.
The following example creates a WAL flush trigger for the `gh:examples/wal_plugin/wal_plugin.py` plugin.
```bash
influxdb3 create trigger \
--trigger-spec "table:TABLE_NAME" \
--plugin-filename "gh:examples/wal_plugin/wal_plugin.py" \
--database DATABASE_NAME TRIGGER_NAME
```
The `gh:` prefix lets you fetch a plugin file directly from the [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins) repository in GitHub.
Without the prefix, the server looks for the file inside of the plugins directory.
To provide additional configuration to your plugin, pass a list of key-value pairs in the `--trigger-arguments` option and, in your plugin, use the `args` parameter to receive the arguments.
For more information about trigger arguments, see the CLI help:
```bash
influxdb3 create trigger help
```
### Schedule trigger
Schedule plugins run on a schedule specified in cron syntax. The plugin receives the local API, the time of the trigger, and any arguments passed in the trigger definition. Here's an example of a simple schedule plugin:
```python
# see if a table has been written to in the last 5 minutes
def process_scheduled_call(influxdb3_local, time, args=None):
if args and "table_name" in args:
table_name = args["table_name"]
result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
# write an error log if the result is empty
if not result:
influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
else:
influxdb3_local.error("No table_name provided for schedule plugin")
```
#### Schedule trigger configuration
Schedule plugins are set with a `trigger-spec` of `schedule:<cron_expression>` or `every:<duration>`. The `args` parameter can be used to pass configuration to the plugin. For example, if we wanted to use the system-metrics example from the Github repo and have it collect every 10 seconds we could use the following trigger definition:
```bash
influxdb3 create trigger \
--trigger-spec "every:10s" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database mydb system-metrics
```
### On Request trigger
On Request plugins are triggered by a request to a custom HTTP API endpoint.
The plugin receives the shared API, query parameters `Dict[str, str]`, request headers `Dict[str, str]`, the request body (as bytes), and any arguments passed in the trigger definition.
On Request plugin responses follow conventions for [Flask responses](https://flask.palletsprojects.com/en/stable/quickstart/#about-responses).
#### Example: On Request plugin
```python
import json
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
for k, v in query_parameters.items():
influxdb3_local.info(f"query_parameters: {k}={v}")
for k, v in request_headers.items():
influxdb3_local.info(f"request_headers: {k}={v}")
request_data = json.loads(request_body)
influxdb3_local.info("parsed JSON request body:", request_data)
# write the data to the database
line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
# get a string of the line to return as the body
line_str = line.build()
influxdb3_local.write(line)
return {"status": "ok", "line": line_str}
```
#### On Request trigger configuration
To create a trigger for an On Request plugin, specify the `request:<ENDPOINT>` trigger-spec.
For example, the following command creates an HTTP API `/api/v3/engine/my-plugin` endpoint for the plugin file:
```bash
influxdb3 create trigger \
--trigger-spec "request:my-plugin" \
--plugin-filename "examples/my-on-request.py" \
--database mydb my-plugin
```
To run the plugin, you send an HTTP request to `<HOST>/api/v3/engine/my-plugin`.
Because all On Request plugins for a server share the same `<host>/api/v3/engine/` base URL,
the trigger-spec you define must be unique across all plugins configured for a server,
regardless of which database they are associated with.
## In-memory cache
The Processing engine provides a powerful in-memory cache system that enables plugins to persist and retrieve data between executions. This cache system is essential for maintaining state, tracking metrics over time, and optimizing performance when working with external data sources.
### Key Benefits
- **State persistence**: Maintain counters, timestamps, and other state variables across plugin executions.
- **Performance and cost optimization**: Store frequently used data to avoid expensive recalculations. Minimize external API calls by caching responses and avoiding rate limits.
- **Data enrichment**: Cache lookup tables, API responses, or reference data to enrich data efficiently.
### Cache API
The cache API is accessible via the `cache` property on the `influxdb3_local` object provided to all plugin types:
Use the shared API `cache` property to access the cache API.
```python
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
```
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `put` | `key` (str): The key to store the value under<br>`value` (Any): Any Python object to cache<br>`ttl` (Optional[float], default=None): Time in seconds before expiration<br>`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| `get` | `key` (str): The key to retrieve<br>`default` (Any, default=None): Value to return if key not found<br>`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| `delete` | `key` (str): The key to delete<br>`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |
### Cache Namespaces
##### Cache namespaces
The cache system offers two distinct namespaces, providing flexibility for different use cases:
The cache system offers two distinct namespaces:
| Namespace | Scope | Best For |
| --- | --- | --- |
| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
### Using the In-memory cache
The following examples show how to use the cache API in plugins:
##### Store and retrieve cached data
```python
# Store values in the trigger-specific namespace
influxdb3_local.cache.put("last_processed_time", time.time())
influxdb3_local.cache.put("error_count", 0)
influxdb3_local.cache.put("processed_records", {"total": 0, "errors": 0})
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Store values with expiration
influxdb3_local.cache.put("temp_data", {"value": 42}, ttl=300) # Expires in 5 minutes
influxdb3_local.cache.put("auth_token", "t0k3n", ttl=3600) # Expires in 1 hour
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Store values in the global namespace
influxdb3_local.cache.put("app_config", {"version": "1.0.2"}, use_global=True)
influxdb3_local.cache.put("global_counter", 0, use_global=True)
# Retrieve values
last_time = influxdb3_local.cache.get("last_processed_time")
auth = influxdb3_local.cache.get("auth_token")
config = influxdb3_local.cache.get("app_config", use_global=True)
# Provide defaults for missing keys
missing = influxdb3_local.cache.get("missing_key", default="Not found")
count = influxdb3_local.cache.get("visit_count", default=0)
# Delete cached values
influxdb3_local.cache.delete("temp_data")
influxdb3_local.cache.delete("app_config", use_global=True)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```
#### Example: maintaining state between executions
The following example shows a WAL plugin that uses the cache to maintain a counter across executions:
##### Store cached data with expiration
```python
def process_writes(influxdb3_local, table_batches, args=None):
# Get the current counter value or default to 0
counter = influxdb3_local.cache.get("execution_counter", default=0)
# Increment the counter
counter += 1
# Store the updated counter back in the cache
influxdb3_local.cache.put("execution_counter", counter)
influxdb3_local.info(f"This plugin has been executed {counter} times")
# Process writes normally...
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```
#### Example: sharing configuration across triggers
One benefit of using a global namespace is being more responsive to changing conditions. This example demonstrates using the global namespace to share configuration, so a scheduled call can check thresholds placed by prior trigger calls, without making a query to the DB itself:
##### Share data across plugins
```python
def process_scheduled_call(influxdb3_local, time, args=None):
# Check if we have cached configuration
config = influxdb3_local.cache.get("alert_config", use_global=True)
if not config:
# Load configuration from database
results = influxdb3_local.query("SELECT * FROM system.alert_config")
# Transform query results into config object
config = {row["name"]: row["value"] for row in results}
# Cache the configuration with a 5-minute TTL
influxdb3_local.cache.put("alert_config", config, ttl=300, use_global=True)
influxdb3_local.info("Loaded fresh configuration from database")
else:
influxdb3_local.info("Using cached configuration")
# Use the configuration
threshold = float(config.get("cpu_threshold", "90.0"))
# ...
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```
The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.
##### Track state between executions
### Best practices
```python
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
```
#### Best practices for in-memory caching
- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace)
- [Use TTL appropriately](#use-ttl-appropriately)
- [Cache computation results](#cache-computation-results)
- [Warm the cache](#warm-the-cache)
- [Consider cache limitations](#consider-cache-limitations)
#### Use TTL appropriately
##### Use the trigger-specific namespace
The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.
##### Use TTL appropriately
Set realistic expiration times based on how frequently data changes.
```python
@ -603,14 +593,14 @@ Set realistic expiration times based on how frequently data changes.
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
```
#### Cache computation results
##### Cache computation results
Store the results of expensive calculations that need to be utilized frequently.
```python
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
```
#### Warm the cache
##### Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data.
```python
@ -619,8 +609,24 @@ if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
```
## Consider cache limitations
##### Consider cache limitations
- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above).
- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
## Install Python dependencies
If your plugin needs additional Python packages, use the `influxdb3 install` command:
```bash
# Install a package directly
influxdb3 install package pandas
```
```bash
# With Docker
docker exec -it CONTAINER_NAME influxdb3 install package pandas
```
This creates a Python virtual environment in your plugins directory with the specified packages installed.

View File

@ -148,18 +148,18 @@ The following examples show how to start InfluxDB 3 with different object store
# Memory object store
# Stores data in RAM; doesn't persist data
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--object-store=memory
--node-id host01 \
--cluster-id cluster01 \
--object-store memory
```
```bash
# Filesystem object store
# Provide the filesystem directory
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--object-store=file \
--node-id host01 \
--cluster-id cluster01 \
--object-store file \
--data-dir ~/.influxdb3
```
@ -192,12 +192,12 @@ docker run -it \
# Specify the Object store type and associated options
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--object-store=s3 \
--bucket=BUCKET \
--aws-access-key-id=AWS_ACCESS_KEY_ID \
--aws-secret-access-key=AWS_SECRET_ACCESS_KEY
--node-id host01 \
--cluster-id cluster01 \
--object-store s3 \
--bucket BUCKET \
--aws-access-key-id AWS_ACCESS_KEY_ID \
--aws-secret-access-key AWS_SECRET_ACCESS_KEY
```
```bash
@ -206,13 +206,13 @@ influxdb3 serve \
# Specify the object store type and associated options
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--object-store=s3 \
--bucket=BUCKET \
--aws-access-key-id=AWS_ACCESS_KEY_ID \
--aws-secret-access-key=AWS_SECRET_ACCESS_KEY \
--aws-endpoint=ENDPOINT \
host01 \
--cluster-id cluster01 \
--object-store s3 \
--bucket BUCKET \
--aws-access-key-id AWS_ACCESS_KEY_ID \
--aws-secret-access-key AWS_SECRET_ACCESS_KEY \
--aws-endpoint ENDPOINT \
--aws-allow-http
```
@ -313,7 +313,7 @@ cpu,host=Alpha,region=us-west,application=webserver val=6i,usage_percent=25.3,st
If you save the preceding line protocol to a file (for example, `server_data`), then you can use the `influxdb3` CLI to write the data--for example:
```bash
influxdb3 write --database=mydb --file=server_data
influxdb3 write --database mydb --file server_data
```
##### Example: write data using the /api/v3 HTTP API
@ -419,7 +419,11 @@ curl "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&no_sync=tr
The `no_sync` CLI option controls when writes are acknowledged--for example:
```bash
influxdb3 write --bucket=mydb --org=my_org --token=my-token --no-sync
influxdb3 write \
--bucket mydb \
--org my_org \
--token my-token \
--no-sync
```
### Create a database or table
@ -459,7 +463,7 @@ The `query` subcommand includes options to help ensure that the right database i
#### Example: query `“SHOW TABLES”` on the `servers` database:
```console
$ influxdb3 query --database=servers "SHOW TABLES"
$ influxdb3 query --database servers "SHOW TABLES"
+---------------+--------------------+--------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+--------------+------------+
@ -475,7 +479,7 @@ $ influxdb3 query --database=servers "SHOW TABLES"
#### Example: query the `cpu` table, limiting to 10 rows:
```console
$ influxdb3 query --database=servers "SELECT DISTINCT usage_percent, time FROM cpu LIMIT 10"
$ influxdb3 query --database servers "SELECT DISTINCT usage_percent, time FROM cpu LIMIT 10"
+---------------+---------------------+
| usage_percent | time |
+---------------+---------------------+
@ -499,7 +503,10 @@ $ influxdb3 query --database=servers "SELECT DISTINCT usage_percent, time FROM c
To query using InfluxQL, enter the `influxdb3 query` subcommand and specify `influxql` in the language option--for example:
```bash
influxdb3 query --database=servers --language=influxql "SELECT DISTINCT usage_percent FROM cpu WHERE time >= now() - 1d"
influxdb3 query \
--database servers \
--language influxql \
"SELECT DISTINCT usage_percent FROM cpu WHERE time >= now() - 1d"
```
### Query using the API
@ -610,11 +617,11 @@ The following command creates a last value cache named `cpuCache`:
```bash
influxdb3 create last_cache \
--database=servers \
--table=cpu \
--key-columns=host,application \
--value-columns=usage_percent,status \
--count=5 cpuCache
--database servers \
--table cpu \
--key-columns host,application \
--value-columns usage_percent,status \
--count 5 cpuCache
```
_You can create a last values cache per time series, but be mindful of high cardinality tables that could take excessive memory._
@ -625,7 +632,7 @@ To use the LVC, call it using the `last_cache()` function in your query--for exa
```bash
influxdb3 query \
--database=servers \
--database servers \
"SELECT * FROM last_cache('cpu', 'cpuCache') WHERE host = 'Bravo';"
```
@ -640,8 +647,8 @@ Use the `influxdb3` CLI to [delete a last values cache](/influxdb3/version/refer
```bash
influxdb3 delete last_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--cache-name <CACHE_NAME>
```
@ -653,8 +660,8 @@ You can use the `influxdb3` CLI to [create a distinct values cache](/influxdb3/v
```bash
influxdb3 create distinct_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--columns <COLUMNS> \
[CACHE_NAME]
```
@ -673,9 +680,9 @@ The following command creates a distinct values cache named `cpuDistinctCache`:
```bash
influxdb3 create distinct_cache \
--database=servers \
--table=cpu \
--columns=host,application \
--database servers \
--table cpu \
--columns host,application \
cpuDistinctCache
```
@ -685,16 +692,14 @@ To use the distinct values cache, call it using the `distinct_cache()` function
```bash
influxdb3 query \
--database=servers \
--database servers \
"SELECT * FROM distinct_cache('cpu', 'cpuDistinctCache')"
```
> [!Note]
> #### Only works with SQL
>
> The Distinct values cache only works with SQL, not InfluxQL; SQL is the default language.
> The distinct cache only works with SQL, not InfluxQL; SQL is the default language.
#### Delete a distinct values cache
@ -702,12 +707,11 @@ Use the `influxdb3` CLI to [delete a distinct values cache](/influxdb3/version/r
```bash
influxdb3 delete distinct_cache \
-d <DATABASE_NAME> \
-t <TABLE> \
--database <DATABASE_NAME> \
--table <TABLE> \
--cache-name <CACHE_NAME>
```
### Python plugins and the Processing engine
The InfluxDB 3 Processing engine is an embedded Python VM for running code inside the database to process and transform data.
@ -735,7 +739,7 @@ InfluxDB 3 provides the following types of triggers, each with specific trigger-
### Test, create, and trigger plugin code
##### Example: Python plugin for WAL flush
##### Example: Python plugin for WAL rows
```python
# This is the basic structure for Python plugin code that runs in the
@ -821,7 +825,7 @@ To test a plugin, do the following:
1. Create a _plugin directory_--for example, `/path/to/.influxdb/plugins`
2. [Start the InfluxDB server](#start-influxdb) and include the `--plugin-dir <PATH>` option.
3. Save the [preceding example code](#example-python-plugin) to a plugin file inside of the plugin directory. If you haven't yet written data to the table in the example, comment out the lines where it queries.
3. Save the [example plugin code](#example-python-plugin-for-wal-flush) to a plugin file inside of the plugin directory. If you haven't yet written data to the table in the example, comment out the lines where it queries.
4. To run the test, enter the following command with the following options:
- `--lp` or `--file`: The line protocol to test
@ -857,9 +861,9 @@ trigger:
# - A Python plugin file named `test.py`
# Test a plugin
influxdb3 test wal_plugin \
--lp="my_measure,tag1=asdf f1=1.0 123" \
-d mydb \
--input-arguments="arg1=hello,arg2=world" \
--lp "my_measure,tag1=asdf f1=1.0 123" \
--database mydb \
--input-arguments "arg1=hello,arg2=world" \
test.py
```
@ -867,9 +871,9 @@ influxdb3 test wal_plugin \
# Create a trigger that runs the plugin
influxdb3 create trigger \
-d mydb \
--plugin=test_plugin \
--trigger-spec="table:foo" \
--trigger-arguments="arg1=hello,arg2=world" \
--plugin test_plugin \
--trigger-spec "table:foo" \
--trigger-arguments "arg1=hello,arg2=world" \
trigger1
```
@ -919,16 +923,15 @@ for a basic HA setup.
# cluster-id: 'cluster01'
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--mode=ingest,query,compact \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://{{< influxdb/host >}} \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host01 \
--cluster-id cluster01 \
--mode ingest,query,compact \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--http-bind {{< influxdb/host >}} \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```bash
## NODE 2
@ -938,37 +941,19 @@ influxdb3 serve \
# cluster-id: 'cluster01'
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host02 \
--cluster-id=cluster01 \
--mode=ingest,query \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://localhost:8282 \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host02 \
--cluster-id cluster01 \
--mode ingest,query \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--http-bind localhost:8282 \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
After the nodes have started, querying either node returns data for both nodes, and _NODE 1_ runs compaction.
To add nodes to this setup, start more read replicas with the same cluster ID:
> [!Note]
> To run this setup for testing, you can start nodes in separate terminals and pass a different `--http-bind` value for each--for example:
>
> ```bash
> # In terminal 1
> influxdb3 serve --node-id=host01 \
> --cluster-id=cluster01 \
> --http-bind=http://{{< influxdb/host >}} [...OPTIONS]
> ```
>
> ```bash
> # In terminal 2
> influxdb3 serve \
> --node-id=host01 \
> --cluster-id=cluster01 \
> --http-bind=http://{{< influxdb/host >}} [...OPTIONS]
To add nodes to this setup, start more read replicas with the same cluster ID.
### High availability with a dedicated Compactor
@ -977,7 +962,7 @@ To ensure that your read-write nodes don't slow down due to compaction work, set
{{< img-hd src="/img/influxdb/influxdb-3-enterprise-dedicated-compactor.png" alt="Dedicated Compactor setup" />}}
The following examples show how to set up HA with a dedicated Compactor node:
The following examples show how to set up high availability with a dedicated Compactor node:
1. Start two read-write nodes as read replicas, similar to the previous example.
@ -990,14 +975,14 @@ The following examples show how to set up HA with a dedicated Compactor node:
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--mode=ingest,query \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://{{< influxdb/host >}} \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host01 \
--cluster-id cluster01 \
--mode ingest,query \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--http-bind {{< influxdb/host >}} \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
```bash
@ -1009,14 +994,14 @@ The following examples show how to set up HA with a dedicated Compactor node:
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host02 \
--cluster-id=cluster01 \
--mode=ingest,query \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://localhost:8282 \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host02 \
--cluster-id cluster01 \
--mode ingest,query \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--http-bind localhost:8282 \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
2. Start the dedicated compactor node with the `--mode=compact` option to ensure the node **only** runs compaction.
@ -1030,13 +1015,13 @@ The following examples show how to set up HA with a dedicated Compactor node:
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host03 \
--cluster-id=cluster01 \
--mode=compact \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host03 \
--cluster-id cluster01 \
--mode compact \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
### High availability with read replicas and a dedicated Compactor
@ -1057,14 +1042,14 @@ For a robust and effective setup for managing time-series data, you can run inge
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host01 \
--cluster-id=cluster01 \
--mode=ingest \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://{{< influxdb/host >}} \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host01 \
--cluster-id cluster01 \
--mode ingest \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
-- http-bind {{< influxdb/host >}} \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
<!-- The following examples use different ports for different nodes. Don't use the influxdb/host shortcode below. -->
@ -1078,17 +1063,17 @@ For a robust and effective setup for managing time-series data, you can run inge
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host02 \
--cluster-id=cluster01 \
--mode=ingest \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://localhost:8282 \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host02 \
--cluster-id cluster01 \
--mode ingest \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--http-bind localhost:8282 \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
2. Start the dedicated Compactor node with `--mode=compact`.
2. Start the dedicated Compactor node with ` compact`.
```bash
## NODE 3 — Compactor Node
@ -1099,16 +1084,16 @@ For a robust and effective setup for managing time-series data, you can run inge
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host03 \
--cluster-id=cluster01 \
--mode=compact \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host03 \
--cluster-id cluster01 \
--mode compact \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
<AWS_SECRET_ACCESS_KEY>
```
3. Finally, start the query nodes as _read-only_ with `--mode=query`.
3. Finally, start the query nodes as _read-only_ with `--mode query`.
```bash
## NODE 4 — Read Node #1
@ -1119,14 +1104,14 @@ For a robust and effective setup for managing time-series data, you can run inge
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host04 \
--cluster-id=cluster01 \
--mode=query \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://localhost:8383 \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host04 \
--cluster-id cluster01 \
--mode query \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
-- http-bind localhost:8383 \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
--aws-secret-access-key <AWS_SECRET_ACCESS_KEY>
```
```bash
@ -1138,14 +1123,14 @@ For a robust and effective setup for managing time-series data, you can run inge
# bucket: 'influxdb-3-enterprise-storage'
influxdb3 serve \
--node-id=host05 \
--cluster-id=cluster01 \
--mode=query \
--object-store=s3 \
--bucket=influxdb-3-enterprise-storage \
--http-bind=http://localhost:8484 \
--aws-access-key-id=<AWS_ACCESS_KEY_ID> \
--aws-secret-access-key=<AWS_SECRET_ACCESS_KEY>
--node-id host05 \
--cluster-id cluster01 \
--mode query \
--object-store s3 \
--bucket influxdb-3-enterprise-storage \
-- http-bind localhost:8484 \
--aws-access-key-id <AWS_ACCESS_KEY_ID> \
<AWS_SECRET_ACCESS_KEY>
```
Congratulations, you have a robust setup for workload isolation using {{% product-name %}}.
@ -1167,7 +1152,7 @@ You can use the default port `8181` for any write or query, without changing any
```bash
# Example variables on a query
# HTTP-bound Port: 8585
influxdb3 query --host=http://localhost:8585 -d <DATABASE> "<QUERY>"
influxdb3 query http://localhost:8585 --database <DATABASE> "<QUERY>"
```
### File index settings
@ -1182,9 +1167,9 @@ This feature is only available in Enterprise and is not available in Core.
# HTTP-bound Port: 8585
influxdb3 create file_index \
--host=http://localhost:8585 \
-d <DATABASE> \
-t <TABLE> \
--host http://localhost:8585 \
--database <DATABASE> \
--table <TABLE> \
<COLUMNS>
```
@ -1192,7 +1177,7 @@ influxdb3 create file_index \
```bash
influxdb3 delete file_index \
--host=http://localhost:8585 \
-d <DATABASE> \
-t <TABLE>
--host http://localhost:8585 \
--database <DATABASE> \
--table <TABLE> \
```