Merge branch 'master' into restructure-processing-engine-docs

restructure-processing-engine-docs
Jameelah Mercer 2025-05-15 09:13:56 -07:00 committed by GitHub
commit 270cad5bba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 785 additions and 529 deletions

View File

@ -157,7 +157,7 @@ tags:
1. [Create an admin token](#section/Authentication) for the InfluxDB 3 Enterprise API.
```bash
curl -X POST "http://localhost:8181/api/v3/enterprise/configure/token/admin"
curl -X POST "http://localhost:8181/api/v3/configure/token/admin"
```
2. [Check the status](#section/Server-information) of the InfluxDB server.
@ -1351,15 +1351,13 @@ paths:
tags:
- Authentication
- Token
/api/v3/configure/enterprise/token/admin:
/api/v3/configure/token/admin:
post:
operationId: PostCreateAdminToken
summary: Create admin token
description: |
Creates an admin token.
An admin token is a special type of token that has full access to all resources in the system.
This endpoint is only available in InfluxDB 3 Enterprise.
responses:
'201':
description: |
@ -1374,14 +1372,12 @@ paths:
tags:
- Authentication
- Token
/api/v3/configure/enterprise/token/admin/regenerate:
/api/v3/configure/token/admin/regenerate:
post:
operationId: PostRegenerateAdminToken
summary: Regenerate admin token
description: |
Regenerates an admin token and revokes the previous token with the same name.
This endpoint is only available in InfluxDB 3 Enterprise.
parameters: []
responses:
'201':

View File

@ -18,7 +18,7 @@ The InfluxDB time series platform is designed to handle high write and query loa
Learn how to use and leverage InfluxDB Cloud Dedicated for your specific
time series use case.
<a class="btn" href="{{< cta-link >}}">Run an {{% product-name %}} proof of concept (PoC)</a>
<a class="btn" href="{{< cta-link >}}">Run an {{% product-name %}} proof of concept (PoC)</a>
<a class="btn" href="/influxdb3/cloud-dedicated/get-started/">Get started with InfluxDB Cloud Dedicated</a>
## InfluxDB 3

View File

@ -71,13 +71,18 @@ and managing tables.
can sort on column headers or use the **Search** field to find a specific cluster.
4. In the database list, find and click the database you want to create a table in. You
can sort on column headers or use the **Search** field to find a specific database.
4. Click the **New Table** button above the table list.
5. Click the **New Table** button above the table list.
The **Create table** dialog displays.
5. In the **Create table** dialog, provide a **Table name**.
6. Toggle **Use default partitioning** to **On**
7. Click the **Create Table** button.
{{% /tab-content %}}
{{< img-hd src="/img/influxdb3/cloud-dedicated-admin-ui-create-table-default.png" alt="Create table dialog" />}}
6. In the **Create table** dialog, provide a **Table name**.
7. Leave **Use custom partitioning** set to **Off**.
By default, the table inherits the database's partition template.
If no custom partition template is applied to the database, the table inherits the default partitioning of `%Y-%m-%d` (daily).
8. Click the **Create Table** button.
{{% /tab-content %}}
{{% tab-content %}}
<!------------------------------- BEGIN INFLUXCTL ----------------------------->
1. If you haven't already, [download and install the `influxctl` CLI](/influxdb3/cloud-dedicated/reference/cli/influxctl/#download-and-install-influxctl).
@ -95,8 +100,8 @@ influxctl table create \
Replace:
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the database to create the table in
- {{% code-placeholder-key %}}`TABLE_NAME`{{% /code-placeholder-key %}}: the name for your new table
{{% /tab-content %}}
{{% /tab-content %}}
{{% tab-content %}}
<!------------------------------- BEGIN MANAGEMENT API ------------------------------>
_This example uses [cURL](https://curl.se/) to send a Management HTTP API request, but you can use any HTTP client._
@ -123,11 +128,12 @@ curl \
Replace the following:
- {{% code-placeholder-key %}}`ACCOUNT_ID`{{% /code-placeholder-key %}}: the account ID for the cluster
- {{% code-placeholder-key %}}`CLUSTER_ID`{{% /code-placeholder-key %}}: the cluster ID
- {{% code-placeholder-key %}}`MANAGEMENT_TOKEN`{{% /code-placeholder-key %}}: a valid management token
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the database to create the table in
- {{% code-placeholder-key %}}`ACCOUNT_ID`{{% /code-placeholder-key %}}: the [account](/influxdb3/cloud-dedicated/admin/account/) ID for the cluster _(list details via the [Admin UI](/influxdb3/cloud-dedicated/admin/clusters/list/) or [CLI](/influxdb3/cloud-dedicated/admin/clusters/list/#detailed-output-in-json))_
- {{% code-placeholder-key %}}`CLUSTER_ID`{{% /code-placeholder-key %}}: the [cluster](/influxdb3/cloud-dedicated/admin/clusters/) ID _(list details via the [Admin UI](/influxdb3/cloud-dedicated/admin/clusters/list/) or [CLI](/influxdb3/cloud-dedicated/admin/clusters/list/#detailed-output-in-json))_.
- {{% code-placeholder-key %}}`MANAGEMENT_TOKEN`{{% /code-placeholder-key %}}: a valid [management token](/influxdb3/cloud-dedicated/admin/tokens/management/) for your {{% product-name %}} cluster
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of the [database](/influxdb3/cloud-dedicated/admin/databases/) to create the table in
- {{% code-placeholder-key %}}`TABLE_NAME`{{% /code-placeholder-key %}}: the name for your new table
{{% /tab-content %}}
{{< /tabs-wrapper >}}
@ -161,21 +167,26 @@ If a table doesn't have a custom partition template, it inherits the database's
can sort on column headers or use the **Search** field to find a specific cluster.
4. In the database list, find and click the database you want to create a table in. You
can sort on column headers or use the **Search** field to find a specific database.
4. Click the **New Table** button above the table list.
5. Click the **New Table** button above the table list.
The **Create table** dialog displays.
<img src="/img/influxdb3/cloud-dedicated-admin-ui-create-table.png" alt="Create table dialog" />
5. In the **Create table** dialog, provide a **Table name**.
6. Make sure the **Use default partitioning** toggle is set to **Off**
7. Provide the following:
- **Custom partition template time format**: The time part for partitioning data.
- _Optional_: **Custom partition template tag parts**: The tag parts for partitioning data.
- _Optional_: **Custom partition template tag bucket parts**: The tag bucket parts for partitioning data.
8. _Optional_: To add more parts to the partition template, click the **Add Tag** button.
9. Click the **Create Table** button to create the table.
{{< img-hd src="/img/influxdb3/cloud-dedicated-admin-ui-create-table-default.png" alt="Create table dialog" />}}
6. In the **Create table** dialog, provide a **Table name**.
7. Toggle **Use custom partitioning** to **On**.
The **Custom partition template** section displays.
{{< img-hd src="/img/influxdb3/cloud-dedicated-admin-ui-create-table-custom-partitioning.png" alt="Create table dialog with custom partitioning" />}}
8. Provide the following:
- **Custom partition template time format**: The time part for partitioning data (yearly, monthly, or daily).
- _Optional_: **Custom partition template tag parts**: The [tag parts](/influxdb3/cloud-dedicated/admin/custom-partitions/partition-templates/#tag-part-templates) for partitioning data.
- _Optional_: **Custom partition template tag bucket parts**: The [tag bucket parts](/influxdb3/cloud-dedicated/admin/custom-partitions/partition-templates/#tag-bucket-part-templates) for partitioning data.
9. _Optional_: To add more parts to the partition template, click the **Add Tag** button. For more information, see [Partition template requirements and guidelines](#partition-template-requirements-and-guidelines).
10. Click the **Create Table** button to create the table.
The new table displays in the list of tables for the cluster.
{{% /tab-content %}}
{{% tab-content %}}
<!------------------------------- BEGIN INFLUXCTL CUSTOM ----------------------------->
1. If you haven't already, [download and install the `influxctl` CLI](/influxdb3/cloud-dedicated/get-started/setup/#download-install-and-configure-the-influxctl-cli).
@ -220,7 +231,6 @@ Replace the following:
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of the [database](/influxdb3/cloud-dedicated/admin/databases/) to create the table in
- {{% code-placeholder-key %}}`TABLE_NAME`{{% /code-placeholder-key %}}: the name you want for the new table
{{% /tab-content %}}
{{% tab-content %}}
<!------------------------------- BEGIN MANAGEMENT API CUSTOM ------------------------------>
_This example uses [cURL](https://curl.se/) to send a Management HTTP API request, but you can use any HTTP client._

View File

@ -6,7 +6,7 @@ description: >
menu:
influxdb3_cloud_dedicated:
parent: Reference
weight: 6
weight: 105
---
The Administrative (Admin) UI for {{% product-name %}} is a browser-based, no-code way to manage your {{% product-name %}} environment and perform administrative tasks, such as creating and managing clusters, databases, and tokens.

View File

@ -9,7 +9,7 @@ menu:
influxdb3_cloud_dedicated:
parent: Reference
name: InfluxDB HTTP API
weight: 104
weight: 105
influxdb3/cloud-dedicated/tags: [api]
---

View File

@ -10,11 +10,9 @@ menu:
parent: Reference
name: CLIs
weight: 104
draft: true
# draft: true
---
The following command line interfaces (CLIs) are available:
{{< children >}}

View File

@ -4,7 +4,7 @@ description: >
InfluxDB client libraries are language-specific tools that integrate with InfluxDB APIs.
View the list of available client libraries.
list_title: API client libraries
weight: 105
weight: 106
aliases:
- /influxdb3/cloud-dedicated/reference/api/client-libraries/
- /influxdb3/cloud-dedicated/tools/client-libraries/

View File

@ -5,7 +5,7 @@ description: >
InfluxDB uses an InfluxQL-like predicate syntax to determine what data points to delete.
menu:
influxdb3_cloud_serverless:
parent: Syntax
parent: Other syntaxes
name: Delete predicate
weight: 104
influxdb3/cloud-serverless/tags: [syntax, delete]

View File

@ -18,7 +18,7 @@ The InfluxDB time series platform is designed to handle high write and query loa
Learn how to use and leverage InfluxDB Clustered for your specific
time series use case.
<a class="btn" href="{{< cta-link >}}">Run an {{% product-name %}} proof of concept (PoC)</a>
<a class="btn" href="{{< cta-link >}}">Run an {{% product-name %}} proof of concept (PoC)</a>
<a class="btn" href="/influxdb3/clustered/get-started/">Get started with InfluxDB Clustered</a>
## InfluxDB 3

View File

@ -55,7 +55,12 @@ snapshot. When a snapshot is restored to the Catalog store, the Compactor
A _soft delete_ refers to when, on compaction, the Compactor sets a `deleted_at`
timestamp on the Parquet file entry in the Catalog.
The Parquet file is no
longer queryable, but remains intact in the object store.
longer queryable, but remains intact in the object store.
> [!Note]
> Soft deletes are a mechanism of the {{% product-name %}} Catalog, not of the
> underlying object storage provider. Soft deletes do not modify objects in the
> object store; only Catalog entries that reference objects in the object store.
## Hard delete
@ -219,6 +224,15 @@ written on or around the beginning of the next hour.
Use the following process to restore your InfluxDB cluster to a recovery point
using Catalog store snapshots:
> [!Warning]
>
> #### Use the same InfluxDB Clustered version used to generate the snapshot
>
> When restoring an InfluxDB cluster to a recovery point, use the same version
> of InfluxDB Clustered used to generate the Catalog store snapshot.
> You may need to [downgrade to a previous version](/influxdb3/clustered/admin/upgrade/)
> before restoring.
1. **Install prerequisites:**
- `kubectl` CLI for managing your Kubernetes deployment.
@ -273,7 +287,8 @@ using Catalog store snapshots:
metadata:
name: influxdb
namespace: influxdb
pause: true
spec:
pause: true
# ...
```
@ -331,7 +346,8 @@ using Catalog store snapshots:
metadata:
name: influxdb
namespace: influxdb
pause: false
spec:
pause: false
# ...
```
@ -349,8 +365,6 @@ Your InfluxDB cluster is now restored to the recovery point.
When the Garbage Collector runs, it identifies what Parquet files are not
associated with the recovery point and [soft deletes](#soft-delete) them.
## Resources
### prep\_pg\_dump.awk

View File

@ -22,9 +22,9 @@ to delete a database from your InfluxDB cluster.
1. If you haven't already, [download and install the `influxctl` CLI](/influxdb3/clustered/reference/cli/influxctl/#download-and-install-influxctl).
2. Run the `influxctl database delete` command and provide the following:
- Name of the database to delete
- The name of the database to delete
3. Confirm that you want to delete the database.
3. Confirm that you want to delete the database.
{{% code-placeholders "DATABASE_NAME" %}}
```sh
@ -37,9 +37,12 @@ influxctl database delete DATABASE_NAME
>
> Once a database is deleted, data stored in that database cannot be recovered.
>
> #### Cannot reuse database names
>
> After a database is deleted, you cannot reuse the same name for a new database.
> #### Wait before writing to a new database with the same name
>
> After deleting a database from your {{% product-name omit=" Clustered" %}}
> cluster, you can reuse the name to create a new database, but **wait two to
> three minutes** after deleting the previous database before writing to the new
> database to allow write caches to clear.
>
> #### Never directly modify the Catalog
>

View File

@ -1,55 +0,0 @@
---
title: Delete a database token
description: >
Use the [`influxctl token delete` command](/influxdb3/clustered/reference/cli/influxctl/token/delete/)
to delete a token from your InfluxDB cluster and revoke all
permissions associated with the token.
Provide the ID of the token you want to delete.
menu:
influxdb3_clustered:
parent: Database tokens
weight: 203
list_code_example: |
```sh
influxctl token delete <TOKEN_ID>
```
aliases:
- /influxdb3/clustered/admin/tokens/delete/
---
Use the [`influxctl token delete` command](/influxdb3/clustered/reference/cli/influxctl/token/delete/)
to delete a database token from your InfluxDB cluster and revoke
all permissions associated with the token.
1. If you haven't already, [download and install the `influxctl` CLI](/influxdb3/clustered/reference/cli/influxctl/#download-and-install-influxctl).
2. Run the [`influxctl token list` command](/influxdb3/clustered/reference/cli/influxctl/token/list)
to output tokens with their IDs.
Copy the **token ID** of the token you want to delete.
```sh
influxctl token list
```
3. Run the `influxctl token delete` command and provide the following:
- Token ID to delete
4. Confirm that you want to delete the token.
{{% code-placeholders "TOKEN_ID" %}}
```sh
influxctl token delete TOKEN_ID
```
{{% /code-placeholders %}}
> [!Warning]
> #### Deleting a token is immediate and cannot be undone
>
> Deleting a database token is a destructive action that takes place immediately
> and cannot be undone.
>
> #### Rotate deleted tokens
>
> After deleting a database token, any clients using the deleted token need to be
> updated with a new database token to continue to interact with your InfluxDB
> cluster.

View File

@ -0,0 +1,56 @@
---
title: Revoke a database token
description: >
Use the [`influxctl token revoke` command](/influxdb3/clustered/reference/cli/influxctl/token/revoke/)
to revoke a token from your InfluxDB cluster and disable all
permissions associated with the token.
Provide the ID of the token you want to revoke.
menu:
influxdb3_clustered:
parent: Database tokens
weight: 203
list_code_example: |
```sh
influxctl token revoke <TOKEN_ID>
```
aliases:
- /influxdb3/clustered/admin/tokens/delete/
- /influxdb3/clustered/admin/tokens/database/delete/
---
Use the [`influxctl token revoke` command](/influxdb3/clustered/reference/cli/influxctl/token/revoke/)
to revoke a database token from your InfluxDB cluster and disable
all permissions associated with the token.
1. If you haven't already, [download and install the `influxctl` CLI](/influxdb3/clustered/reference/cli/influxctl/#download-and-install-influxctl).
2. Run the [`influxctl token list` command](/influxdb3/clustered/reference/cli/influxctl/token/list)
to output tokens with their IDs.
Copy the **token ID** of the token you want to delete.
```sh
influxctl token list
```
3. Run the `influxctl token revoke` command and provide the following:
- Token ID to revoke
4. Confirm that you want to revoke the token.
{{% code-placeholders "TOKEN_ID" %}}
```sh
influxctl token revoke TOKEN_ID
```
{{% /code-placeholders %}}
> [!Warning]
> #### Revoking a token is immediate and cannot be undone
>
> Revoking a database token is a destructive action that takes place immediately
> and cannot be undone.
>
> #### Rotate revoked tokens
>
> After revoking a database token, any clients using the revoked token need to
> be updated with a new database token to continue to interact with your
> {{% product-name omit=" Clustered" %}} cluster.

View File

@ -68,17 +68,12 @@ Be sure to follow [partitioning best practices](/influxdb3/clustered/admin/custo
> Otherwise, InfluxDB omits time from the partition template and won't compact partitions.
> [!Warning]
> #### Cannot reuse deleted database names
>
> You cannot reuse the name of a deleted database when creating a new database.
> If you try to reuse the name, the API response status code
> is `400` and the `message` field contains the following:
>
> ```text
> 'iox_proxy.app.CreateDatabase failed to create database: \
> rpc error: code = AlreadyExists desc = A namespace with the
> name `<DATABASE_NAME>` already exists'
> ```
> #### Wait before writing to a new database with the same name as a deleted database
>
> After deleting a database from your {{% product-name omit=" Clustered" %}}
> cluster, you can reuse the name to create a new database, but **wait two to
> three minutes** after deleting the previous database before writing to the new
> database to allow write caches to clear.
## Usage

View File

@ -1,14 +1,16 @@
---
title: influxctl database delete
description: >
The `influxctl database delete` command deletes a database from an InfluxDB cluster.
The `influxctl database delete` command deletes a database from an
{{% product-name omit=" Clustered" %}} cluster.
menu:
influxdb3_clustered:
parent: influxctl database
weight: 301
---
The `influxctl database delete` command deletes a database from an InfluxDB cluster.
The `influxctl database delete` command deletes a database from an
{{< product-name omit=" Clustered" >}} cluster.
## Usage
@ -24,10 +26,12 @@ influxctl database delete [command options] [--force] <DATABASE_NAME> [<DATABASE
>
> Deleting a database is a destructive action that cannot be undone.
>
> #### Cannot reuse deleted database names
>
> After deleting a database, you cannot reuse the name of the deleted database
> when creating a new database.
> #### Wait before writing to a new database with the same name
>
> After deleting a database from your {{% product-name omit=" Clustered" %}}
> cluster, you can reuse the name to create a new database, but **wait two to
> three minutes** after deleting the previous database before writing to the new
> database to allow write caches to clear.
## Arguments

View File

@ -7,7 +7,7 @@ menu:
influxdb3_clustered:
name: Release notes
parent: Reference
weight: 190
weight: 101
---
View release notes and updates for products and tools related to

View File

@ -25,6 +25,30 @@ weight: 201
---
## 20250508-1719206 {date="2025-05-08"}
### Quickstart
```yaml
spec:
package:
image: us-docker.pkg.dev/influxdb2-artifacts/clustered/influxdb:20250508-1719206
```
### Changes
#### Deployment
- Expose the v0 REST API for the management and authorization service (Granite).
#### Database Engine
- Reuse database names after deletion.
- Create database tokens with expiration dates.
- Revoke database tokens rather than deleting them.
---
## 20250212-1570743 {date="2025-02-12"}
### Quickstart

View File

@ -0,0 +1,17 @@
---
title: Extend plugins with API features and state management
description: |
The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions.
menu:
influxdb3_core:
name: Extend plugins
parent: Processing engine and Python plugins
weight: 4
influxdb3/core/tags: [processing engine, plugins, API, python]
source: /shared/extended-plugin-api.md
---
<!--
// SOURCE content/shared/extended-plugin-api.md
-->

View File

@ -0,0 +1,16 @@
---
title: Extend plugins with API features and state management
description: |
The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions.
menu:
influxdb3_enterprise:
name: Extend plugins
parent: Processing engine and Python plugins
weight: 4
influxdb3/enterprise/tags: [processing engine, plugins, API, python]
source: /shared/extended-plugin-api.md
---
<!--
// SOURCE content/shared/extended-plugin-api.md
-->

View File

@ -5,7 +5,7 @@ description: |
code on different events in an {{< product-name >}} instance.
menu:
influxdb3_enterprise:
name: Processing Engine and Python plugins
name: Processing engine and Python plugins
weight: 4
influxdb3/enterprise/tags: [processing engine, python]
related:

View File

@ -0,0 +1,323 @@
The Processing Engine includes a shared API that your plugins can use to interact with data, write new records in line protocol format, and maintain state between executions. These capabilities let you build plugins that transform, analyze, and respond to time series data as it flows through your database.
The plugin API lets you:
- [Write data](#write-data)
- [Query data](#query-data)
- [Log messages for monitoring and debugging](#log-messages-for-monitoring-and-debugging)
- [Maintain state with the in-memory cache](#maintain-state-with-in-memory-cache)
- [Store and retrieve cached data](#store-and-retrieve-cached-data)
- [Use TTL appropriately](#use-ttl-appropriately)
- [Share data across plugins](#share-data-across-plugins)
- [Build a counter](#building-a-counter)
- [Guidelines for in-memory caching](#guidelines-for-in-memory-caching)
- [Consider cache limitations](#consider-cache-limitations)
## Get started with the shared API
Each plugin automatically has access to the shared API through the `influxdb3_local` object. You dont need to import any libraries. The API becomes available as soon as your plugin runs.
## Write data
To write data into your database, use the `LineBuilder` API to create line protocol data:
```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
# Write the data to the database
influxdb3_local.write(line)
```
InfluxDB 3 buffers your writes while the plugin runs and flushes them when the plugin completes.
{{% expand-wrapper %}}
{{% expand "View the `LineBuilder` Python implementation" %}}
```python
from typing import Optional
from collections import OrderedDict
class InfluxDBError(Exception):
"""Base exception for InfluxDB-related errors"""
pass
class InvalidMeasurementError(InfluxDBError):
"""Raised when measurement name is invalid"""
pass
class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass
class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass
class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
raise InvalidMeasurementError("Measurement name cannot contain spaces")
self.measurement = measurement
self.tags: OrderedDict[str, str] = OrderedDict()
self.fields: OrderedDict[str, str] = OrderedDict()
self._timestamp_ns: Optional[int] = None
def _validate_key(self, key: str, key_type: str) -> None:
"""Validate that a key does not contain spaces, commas, or equals signs."""
if not key:
raise InvalidKeyError(f"{key_type} key cannot be empty")
if ' ' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
if ',' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
if '=' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")
def tag(self, key: str, value: str) -> 'LineBuilder':
"""Add a tag to the line protocol."""
self._validate_key(key, "tag")
self.tags[key] = str(value)
return self
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an unsigned integer field to the line protocol."""
self._validate_key(key, "field")
if value < 0:
raise ValueError(f"uint64 field '{key}' cannot be negative")
self.fields[key] = f"{value}u"
return self
def int64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an integer field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = f"{value}i"
return self
def float64_field(self, key: str, value: float) -> 'LineBuilder':
"""Add a float field to the line protocol."""
self._validate_key(key, "field")
# Check if value has no decimal component
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
return self
def string_field(self, key: str, value: str) -> 'LineBuilder':
"""Add a string field to the line protocol."""
self._validate_key(key, "field")
# Escape quotes and backslashes in string values
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
self.fields[key] = f'"{escaped_value}"'
return self
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
"""Add a boolean field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = 't' if value else 'f'
return self
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
"""Set the timestamp in nanoseconds."""
self._timestamp_ns = timestamp_ns
return self
def build(self) -> str:
"""Build the line protocol string."""
# Start with measurement name (escape commas only)
line = self.measurement.replace(',', '\\,')
# Add tags if present
if self.tags:
tags_str = ','.join(
f"{k}={v}" for k, v in self.tags.items()
)
line += f",{tags_str}"
# Add fields (required)
if not self.fields:
raise InvalidLineError(f"At least one field is required: {line}")
fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()
)
line += f" {fields_str}"
# Add timestamp if present
if self._timestamp_ns is not None:
line += f" {self._timestamp_ns}"
return line
```
{{% /expand %}}
{{% /expand-wrapper %}}
## Query data
Your plugins can execute SQL queries and process results directly:
```python
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```
Query results are a `List` of `Dict[String, Any]`, where each dictionary represents a row. Column names are keys, and column values are the corresponding values.
## Log messages for monitoring and debugging
Use the shared API's `info`, `warn`, and `error` functions to log messages from your plugin. Each function accepts one or more arguments, converts them to strings, and logs them as a space-separated message.
Add logging to monitor plugin execution and assist with debugging:
```python
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
```
The system writes all log messages to the server logs and stores them in [system tables](/influxdb3/version/reference/cli/influxdb3/show/system/summary/), where you can query them using SQL.
## Maintain state with the in-memory cache
The Processing Engine provides an in-memory cache that enables your plugins to persist and retrieve data between executions.
Access the cache using the `cache` property of the shared API:
```python
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
```
`cache` provides the following methods to retrieve and manage cached values:
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `put` | `key` (str): The key to store the value under<br>`value` (Any): Any Python object to cache<br>`ttl` (Optional[float], default=None): Time in seconds before expiration<br>`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| `get` | `key` (str): The key to retrieve<br>`default` (Any, default=None): Value to return if key not found<br>`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| `delete` | `key` (str): The key to delete<br>`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |
### Understanding cache namespaces
The cache system offers two distinct namespaces:
| Namespace | Scope | Best For |
| --- | --- | --- |
| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
### Common cache operations
- [Store and retrieve cached data](#store-and-retrieve-cached-data)
- [Store cached data with expiration](#store-cached-data-with-expiration)
- [Share data across plugins](#share-data-across-plugins)
- [Build a counter](#build-a-counter)
### Store and retrieve cached data
```python
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```
### Store cached data with expiration
```python
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```
### Share data across plugins
```python
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```
### Building a counter
You can track how many times a plugin has run:
```python
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
```
## Guidelines for in-memory caching
To get the most out of the in-memory cache, follow these guidelines:
- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace)
- [Use TTL appropriately](#use-ttl-appropriately)
- [Cache computation results](#cache-computation-results)
- [Warm the cache](#warm-the-cache)
- [Consider cache limitations](#consider-cache-limitations)
### Use the trigger-specific namespace
The Processing Engine provides a cache that supports stateful operations while maintaining isolation between different triggers. For most use cases, use the trigger-specific namespace to keep plugin state isolated. Use the global namespace only when you need to share data across triggers.
### Use TTL appropriately
Set appropriate expiration times based on how frequently your data changes:
```python
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
```
### Cache computation results
Store the results of expensive calculations that you frequently utilize:
```python
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
```
### Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:
```python
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
```
### Consider cache limitations
- **Memory Usage**: Since the system stores cache contents in memory, monitor your memory usage when caching large datasets.
- **Server Restarts**: Because the server clears the cache on restart, design your plugins to handle cache initialization (as noted above).
- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
## Next Steps
With an understanding of the InfluxDB 3 Shared Plugin API, you can start building data workflows that transform, analyze, and respond to your time series data.
To find example plugins you can extend, visit the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) on GitHub.

View File

@ -15,9 +15,6 @@ have multiple DVCs.
{{% show-in "core" %}}
- [Distinct Value Caches are flushed when the server stops](#distinct-value-caches-are-flushed-when-the-server-stops)
{{% /show-in %}}
{{% show-in "enterprise" %}}
- [Distinct Value Caches are rebuilt on restart](#distinct-value-caches-are-rebuilt-on-restart)
{{% /show-in %}}
Consider a dataset with the following schema:
@ -75,9 +72,6 @@ node requires to maintain it. Consider the following:
{{% show-in "core" %}}
- [Distinct Value Caches are flushed when the server stops](#distinct-value-caches-are-flushed-when-the-server-stops)
{{% /show-in %}}
{{% show-in "enterprise" %}}
- [Distinct Value Caches are rebuilt on restart](#distinct-value-caches-are-rebuilt-on-restart)
{{% /show-in %}}
### High cardinality limits
@ -96,11 +90,3 @@ stops. After a server restart, {{% product-name %}} only writes new values to
the DVC when you write data, so there may be a period of time when some values are
unavailable in the DVC.
{{% /show-in %}}
{{% show-in "enterprise" %}}
### Distinct Value Caches are rebuilt on restart
Because the DVC is an in-memory cache, the cache is flushed any time the server
stops. After a server restarts, {{< product-name >}} uses persisted data to
rebuild the DVC.
{{% /show-in %}}

View File

@ -17,9 +17,6 @@ An LVC is associated with a table, which can have multiple LVCs.
{{% show-in "core" %}}
- [Last Value Caches are flushed when the server stops](#last-value-caches-are-flushed-when-the-server-stops)
{{% /show-in %}}
{{% show-in "enterprise" %}}
- [Last Value Caches are rebuilt on restart](#last-value-caches-are-rebuilt-on-restart)
{{% /show-in %}}
- [Defining value columns](#defining-value-columns)
Consider a dataset with the following schema (similar to the
@ -88,11 +85,7 @@ maintain it. Consider the following:
- [Value count](#value-count)
{{% show-in "core" %}}
- [Last Value Caches are flushed when the server stops](#last-value-caches-are-flushed-when-the-server-stops)
{{% /show-in %}}
{{% show-in "enterprise" %}}
- [Last Value Caches are rebuilt on restart](#last-value-caches-are-rebuilt-on-restart)
{{% /show-in %}}
- [Defining value columns](#defining-value-columns)
{{% /show-in %}}lue-columns)
### High cardinality key columns
@ -141,14 +134,6 @@ you write data, so there may be a period of time when some values are
unavailable in the LVC.
{{% /show-in %}}
{{% show-in "enterprise" %}}
### Last Value Caches are rebuilt on restart
Because the LVC is an in-memory cache, the cache is flushed any time the server
stops. After a server restarts, {{< product-name >}} uses persisted data to
rebuild the LVC.
{{% /show-in %}}
### Defining value columns
When creating an LVC, if you include the `--value-columns` options to specify

View File

@ -1,54 +1,72 @@
Use the InfluxDB 3 Processing engine to run Python code directly in your
{{% product-name %}} database to automatically process data and respond to database events.
Use the Processing Engine in {{% product-name %}} to extend your database with custom Python code. Trigger your code on write, on a schedule, or on demand to automate workflows, transform data, and create API endpoints.
The Processing engine is an embedded Python VM that runs inside your InfluxDB 3 database and lets you:
## What is the Processing Engine?
- Process data as it's written to the database
- Run code on a schedule
- Create API endpoints that execute Python code
- Maintain state between executions with an in-memory cache
The Processing Engine is an embedded Python virtual machine that runs inside your {{% product-name %}} database. You configure _triggers_ to run your Python _plugin_ code in response to:
Learn how to create, configure, run, and extend Python plugins that execute when specific events occur.
- **Data writes** - Process and transform data as it enters the database
- **Scheduled events** - Run code at defined intervals or specific times
- **HTTP requests** - Expose custom API endpoints that execute your code
1. [Set up the Processing engine](#set-up-the-processing-engine)
2. [Add a Processing engine plugin](#add-a-processing-engine-plugin)
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
3. [Create a trigger to run a plugin](#create-a-trigger-to-run-a-plugin)
- [Create a trigger for data writes](#create-a-trigger-for-data-writes)
- [Create a trigger for scheduled events](#create-a-trigger-for-scheduled-events)
- [Create a trigger for HTTP requests](#create-a-trigger-for-http-requests)
- [Use community plugins from GitHub](#use-community-plugins-from-github)
You can use the Processing Engine's in-memory cache to manage state between executions and build stateful applications directly in your database.
This guide walks you through setting up the Processing Engine, creating your first plugin, and configuring triggers that execute your code on specific events.
## Before you begin
Ensure you have:
- A working {{% product-name %}} instance
- Access to command line
- Python installed if you're writing your own plugin
- Basic knowledge of the InfluxDB CLI
Once you have all the prerequisites in place, follow these steps to implement the Processing Engine for your data automation needs.
1. [Set up the Processing Engine](#set-up-the-processing-engine)
2. [Add a Processing Engine plugin](#add-a-processing-engine-plugin)
- [Use example plugins](#use-example-plugins)
- [Create a custom plugin](#create-a-custom-plugin)
3. [Set up a trigger](#set-up-a-trigger)
- [Understand trigger types](#understand-trigger-types)
- [Use the create trigger command](#use-the-create-trigger-command)
- [Trigger specification examples](#trigger-specification-examples)
4. [Advanced trigger configuration](#advanced-trigger-configuration)
- [Access community plugins from GitHub](#access-community-plugins-from-github)
- [Pass arguments to plugins](#pass-arguments-to-plugins)
- [Control trigger execution](#control-trigger-execution)
- [Configure error handling for a trigger](#configure-error-handling-for-a-trigger)
- [Extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management)
- [Install Python dependencies](#install-python-dependencies)
- [Install Python dependencies](#install-python-dependencies)
## Set up the Processing engine
## Set up the Processing Engine
To enable the Processing engine, start your InfluxDB server with the `--plugin-dir` option:
To activate the Processing Engine, start your {{% product-name %}} server with the `--plugin-dir` flag. This flag tells InfluxDB where to load your plugin files.
{{% code-placeholders "NODE_ID|OBJECT_STORE_TYPE|PLUGIN_DIR" %}}
```bash
influxdb3 serve \
--node-id node0 \
--object-store [OBJECT_STORE_TYPE] \
--plugin-dir /path/to/plugins
--NODE_ID \
--object-store OBJECT_STORE_TYPE \
--plugin-dir PLUGIN_DIR
```
Replace `/path/to/plugins` with the directory where you want to store your Python plugin files. All plugin files must be located in this directory or its subdirectories.
{{% /code-placeholders %}}
In the example above, replace the following:
- {{% code-placeholder-key %}}`NODE_ID`{{% /code-placeholder-key %}}: Unique identifier for your instance
- {{% code-placeholder-key %}}`OBJECT_STORE_TYPE`{{% /code-placeholder-key %}}: Type of object store (for example, file or s3)
- {{% code-placeholder-key %}}`PLUGIN_DIR`{{% /code-placeholder-key %}}: Absolute path to the directory where plugin files are stored. Store all plugin files in this directory or its subdirectories.
### Configure distributed environments
If you're running multiple {{% product-name %}} instances (distributed deployment):
When running {{% product-name %}} in a distributed setup, follow these steps to configure the Processing Engine:
1. Decide where plugins should run
1. Decide where each plugin should run
- Data processing plugins, such as WAL plugins, run on ingester nodes
- HTTP-triggered plugins run on nodes handling API requests
- Scheduled plugins can run on any configured node
2. Enable plugins on selected instances
2. Enable plugins on the correct instance
3. Maintain identical plugin files across all instances where plugins run
- Use shared storage or file synchronization tools to keep plugins consistent
@ -57,43 +75,58 @@ If you're running multiple {{% product-name %}} instances (distributed deploymen
>
> Configure your plugin directory on the same system as the nodes that run the triggers and plugins.
## Add a Processing Engine plugin
## Add a Processing engine plugin
A plugin is a Python script that defines a specific function signature for a trigger (_trigger spec_). When the specified event occurs, InfluxDB runs the plugin.
A plugin is a Python file that contains a specific function signature that corresponds to a trigger type.
Plugins:
### Choose a plugin strategy
- Receive plugin-specific arguments (such as written data, call time, or an HTTP request)
- Can receive keyword arguments (as `args`) from _trigger arguments_
- Can access the `influxdb3_local` shared API for writing, querying, and managing state
You have two main options for adding plugins to your InfluxDB instance:
Get started using example plugins or create your own:
- [Use example plugins](#use-example-plugins) - Quickly get started with prebuilt plugins
- [Create a custom plugin](#create-a-custom-plugin) - Build your own for specialized use cases
- [Get example plugins](#get-example-plugins)
- [Create a plugin](#create-a-plugin)
### Use example plugins
### Get example plugins
InfluxData provides a public repository of example plugins that you can use immediately.
InfluxData maintains a repository of contributed plugins that you can use as-is or as a starting point for your own plugin.
#### Browse plugin examples
#### From local files
Visit the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to find examples for:
You can copy example plugins from the [influxdb3_plugins repository](https://github.com/influxdata/influxdb3_plugins) to your local plugin directory:
- **Data transformation**: Process and transform incoming data
- **Alerting**: Send notifications based on data thresholds
- **Aggregation**: Calculate statistics on time series data
- **Integration**: Connect to external services and APIs
- **System monitoring**: Track resource usage and health metrics
#### Add example plugins
You can either copy a plugin or retrieve it directly from the repository:
{{< code-tabs-wrapper >}}
{{% code-tabs %}}
[Copy locally](#)
[Fetch via gh:](#)
{{% /code-tabs %}}
{{% code-tab-content %}}
```bash
# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git
# Copy example plugins to your plugin directory
cp -r influxdb3_plugins/examples/wal_plugin/* /path/to/plugins/
# Copy a plugin to your configured plugin directory
cp influxdb3_plugins/examples/schedule/system_metrics/system_metrics.py /path/to/plugins/
```
{{% /code-tab-content %}}
#### Directly from GitHub
You can use plugins directly from GitHub without downloading them first by using the `gh:` prefix in the plugin filename:
{{% code-tab-content %}}
```bash
# Use a plugin directly from GitHub
# To retrieve and use a plugin directly from GitHub,
# use the `gh:` prefix in the plugin filename:
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
@ -101,26 +134,61 @@ influxdb3 create trigger \
system_metrics
```
> [!Note]
> #### Find and contribute plugins
>
> The plugins repository includes examples for various use cases:
>
> - **Data transformation**: Process and transform incoming data
> - **Alerting**: Send notifications based on data thresholds
> - **Aggregation**: Calculate statistics on time series data
> - **Integration**: Connect to external services and APIs
> - **System monitoring**: Track resource usage and health metrics
>
> Visit [influxdata/influxdb3_plugins](https://github.com/influxdata/influxdb3_plugins)
> to browse available plugins or contribute your own.
{{% /code-tab-content %}}
### Create a plugin
{{< /code-tabs-wrapper >}}
1. Create a `.py` file in your plugins directory
2. Define a function with one of the following signatures:
Plugins have various functions such as:
#### For data write events
- Receive plugin-specific arguments (such as written data, call time, or an HTTP request)
- Access keyword arguments (as `args`) passed from _trigger arguments_ configurations
- Access the `influxdb3_local` shared API to write data, query data, and managing state between executions
For more information about available functions, arguments, and how plugins interact with InfluxDB, see how to [Extend plugins](/influxdb3/version/extend-plugin/).
### Create a custom plugin
To build custom functionality, you can create your own Processing Engine plugin.
#### Prerequisites
Before you begin, make sure:
- The Processing Engine is enabled on your {{% product-name %}} instance.
- Youve configured the `--plugin-dir` where plugin files are stored.
- You have access to that plugin directory.
#### Steps to create a plugin:
- [Choose your plugin type](#choose-your-plugin-type)
- [Create your plugin file](#create-your-plugin-file)
- [Next Steps](#next-steps)
#### Choose your plugin type
Choose a plugin type based on your automation goals:
| Plugin Type | Best For | Trigger Type |
|-------------|----------|-------------|
| **Data write** | Processing data as it arrives | `table:` or `all_tables` |
| **Scheduled** | Running code at specific times | `every:` or `cron:` |
| **HTTP request** | Creating API endpoints | `path:` |
#### Create your plugin file
- Create a `.py` file in your plugins directory
- Add the appropriate function signature based on your chosen plugin type
- Write your processing logic inside the function
After writing your plugin, [create a trigger](#use-the-create-trigger-command) to connect it to a database event and define when it runs.
#### Create a data write plugin
Use a data write plugin to process data as it's written to the database. Ideal use cases include:
- Data transformation and enrichment
- Alerting on incoming values
- Creating derived metrics
```python
def process_writes(influxdb3_local, table_batches, args=None):
@ -139,7 +207,13 @@ def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.write(line)
```
#### For scheduled events
#### Create a scheduled plugin
Scheduled plugins run at defined intervals. Use them for:
- Periodic data aggregation
- Report generation
- System health checks
```python
def process_scheduled_call(influxdb3_local, call_time, args=None):
@ -155,7 +229,13 @@ def process_scheduled_call(influxdb3_local, call_time, args=None):
influxdb3_local.warn("No recent metrics found")
```
#### For HTTP requests
#### Create an HTTP request plugin
HTTP request plugins respond to API calls. Use them for:
- Creating custom API endpoints
- Webhooks for external integrations
- User interfaces for data interaction
```python
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
@ -174,25 +254,55 @@ def process_request(influxdb3_local, query_parameters, request_headers, request_
return {"status": "success", "message": "Request processed"}
```
After adding your plugin, you can [install Python dependencies](#install-python-dependencies) or learn how to [extend plugins with API features and state management](#extend-plugins-with-api-features-and-state-management).
#### Next steps
## Create a trigger to run a plugin
After writing your plugin:
A trigger connects your plugin to a specific database event.
The plugin function signature in your plugin file determines which _trigger specification_
you can choose for configuring and activating your plugin.
- [Create a trigger](#use-the-create-trigger-command) to connect your plugin to database events
- [Install any Python dependencies](#install-python-dependencies) your plugin requires
- Learn how to [extend plugins with the API](/influxdb3/version/extend-plugin/)
Create a trigger with the `influxdb3 create trigger` command.
## Set up a trigger
### Understand trigger types
| Plugin Type | Trigger Specification | When Plugin Runs |
|------------|----------------------|-----------------|
| Data write | `table:<TABLE_NAME>` or `all_tables` | When data is written to tables |
| Scheduled | `every:<DURATION>` or `cron:<EXPRESSION>` | At specified time intervals |
| HTTP request | `path:<ENDPOINT_PATH>` | When HTTP requests are received |
### Use the create trigger command
Use the `influxdb3 create trigger` command with the appropriate trigger specification:
{{% code-placeholders "SPECIFICATION|PLUGIN_FILE|DATABASE_NAME|TRIGGER_NAME" %}}
```bash
influxdb3 create trigger \
--trigger-spec SPECIFICATION \
--plugin-filename PLUGIN_FILE \
--database DATABASE_NAME \
TRIGGER_NAME
```
{{% /code-placeholders %}}
In the example above, replace the following:
- {{% code-placeholder-key %}}`SPECIFICATION`{{% /code-placeholder-key %}}: Trigger specification
- {{% code-placeholder-key %}}`PLUGIN_FILE`{{% /code-placeholder-key %}}: Plugin filename relative to your configured plugin directory
- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: Name of the database
- {{% code-placeholder-key %}}`TRIGGER_NAME`{{% /code-placeholder-key %}}: Name of the new trigger
> [!Note]
> When specifying a local plugin file, the `--plugin-filename` parameter
> _is relative to_ the `--plugin-dir` configured for the server.
> You don't need to provide an absolute path.
### Create a trigger for data writes
### Trigger specification examples
Use the `table:<TABLE_NAME>` or the `all_tables` trigger specification to configure
and run a [plugin for data write events](#for-data-write-events)--for example:
#### Data write example
```bash
# Trigger on writes to a specific table
@ -211,15 +321,11 @@ influxdb3 create trigger \
all_data_processor
```
The trigger runs when the database flushes ingested data for the specified tables
to the Write-Ahead Log (WAL) in the Object store (default is every second).
The trigger runs when the database flushes ingested data for the specified tables to the Write-Ahead Log (WAL) in the Object store (default is every second).
The plugin receives the written data and table information.
### Create a trigger for scheduled events
Use the `every:<DURATION>` or the `cron:<CRONTAB_EXPRESSION>` trigger specification
to configure and run a [plugin for scheduled events](#for-scheduled-events)--for example:
#### Scheduled events example
```bash
# Run every 5 minutes
@ -239,9 +345,7 @@ influxdb3 create trigger \
The plugin receives the scheduled call time.
### Create a trigger for HTTP requests
For an [HTTP request plugin](#for-http-requests), use the `request:<ENDPOINT_PATH>` trigger specification to configure and enable a [plugin for HTTP requests](#for-http-requests)--for example:
#### HTTP requests example
```bash
# Create an endpoint at /api/v3/engine/webhook
@ -252,7 +356,7 @@ influxdb3 create trigger \
webhook_processor
```
The trigger makes your endpoint available at `/api/v3/engine/<ENDPOINT_PATH>`.
Access your endpoint available at `/api/v3/engine/<ENDPOINT_PATH>`.
To run the plugin, send a `GET` or `POST` request to the endpoint--for example:
```bash
@ -261,22 +365,10 @@ curl http://{{% influxdb/host %}}/api/v3/engine/webhook
The plugin receives the HTTP request object with methods, headers, and body.
### Use community plugins from GitHub
You can reference plugins directly from the GitHub repository by using the `gh:` prefix:
```bash
# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
### Pass arguments to plugins
Use trigger arguments to pass configuration from a trigger to the plugin it runs. You can use this for:
- Threshold values for monitoring
- Connection properties for external services
- Configuration settings for plugin behavior
@ -344,300 +436,91 @@ influxdb3 create trigger \
auto_disable_processor
```
## Extend plugins with API features and state management
## Advanced trigger configuration
The Processing engine includes API capabilities that allow your plugins to
interact with InfluxDB data and maintain state between executions.
These features let you build more sophisticated plugins that can transform, analyze, and respond to data.
After creating basic triggers, you can enhance your plugins with these advanced features:
### Use the shared API
### Access community plugins from GitHub
All plugins have access to the shared API to interact with the database.
Skip downloading plugins by referencing them directly from GitHub:
#### Write data
```bash
# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database my_database \
system_metrics
```
Use the `LineBuilder` API to create line protocol data:
This approach:
- Ensures you're using the latest version
- Simplifies updates and maintenance
- Reduces local storage requirements
### Configure your triggers
#### Pass configuration arguments
Provide runtine configuration to your plugins:
```bash
# Pass threshold and email settings to a plugin
Provide runtime configuration to your plugins:
--trigger-spec "every:1h" \
--plugin-filename "threshold_check.py" \
--trigger-arguments threshold=90,notify_email=admin@example.com \
--database my_database \
threshold_monitor
```
Your plugin accesses these values through the `args` parameter:
```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
# Write the data to the database
influxdb3_local.write(line)
def process_scheduled_call(influxdb3_local, call_time, args=None):
if args and "threshold" in args:
threshold = float(args["threshold"])
email = args.get("notify_email", "default@example.com")
# Use the arguments in your logic
influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")
```
Writes are buffered while the plugin runs and are flushed when the plugin completes.
#### Set execution mode
{{% expand-wrapper %}}
{{% expand "View the `LineBuilder` Python implementation" %}}
Choose between synchronous (default) or asynchronous execution:
```python
from typing import Optional
from collections import OrderedDict
class InfluxDBError(Exception):
"""Base exception for InfluxDB-related errors"""
pass
class InvalidMeasurementError(InfluxDBError):
"""Raised when measurement name is invalid"""
pass
class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass
class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass
class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
raise InvalidMeasurementError("Measurement name cannot contain spaces")
self.measurement = measurement
self.tags: OrderedDict[str, str] = OrderedDict()
self.fields: OrderedDict[str, str] = OrderedDict()
self._timestamp_ns: Optional[int] = None
def _validate_key(self, key: str, key_type: str) -> None:
"""Validate that a key does not contain spaces, commas, or equals signs."""
if not key:
raise InvalidKeyError(f"{key_type} key cannot be empty")
if ' ' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
if ',' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
if '=' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")
def tag(self, key: str, value: str) -> 'LineBuilder':
"""Add a tag to the line protocol."""
self._validate_key(key, "tag")
self.tags[key] = str(value)
return self
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an unsigned integer field to the line protocol."""
self._validate_key(key, "field")
if value < 0:
raise ValueError(f"uint64 field '{key}' cannot be negative")
self.fields[key] = f"{value}u"
return self
def int64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an integer field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = f"{value}i"
return self
def float64_field(self, key: str, value: float) -> 'LineBuilder':
"""Add a float field to the line protocol."""
self._validate_key(key, "field")
# Check if value has no decimal component
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
return self
def string_field(self, key: str, value: str) -> 'LineBuilder':
"""Add a string field to the line protocol."""
self._validate_key(key, "field")
# Escape quotes and backslashes in string values
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
self.fields[key] = f'"{escaped_value}"'
return self
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
"""Add a boolean field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = 't' if value else 'f'
return self
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
"""Set the timestamp in nanoseconds."""
self._timestamp_ns = timestamp_ns
return self
def build(self) -> str:
"""Build the line protocol string."""
# Start with measurement name (escape commas only)
line = self.measurement.replace(',', '\\,')
# Add tags if present
if self.tags:
tags_str = ','.join(
f"{k}={v}" for k, v in self.tags.items()
)
line += f",{tags_str}"
# Add fields (required)
if not self.fields:
raise InvalidLineError(f"At least one field is required: {line}")
fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()
)
line += f" {fields_str}"
# Add timestamp if present
if self._timestamp_ns is not None:
line += f" {self._timestamp_ns}"
return line
```
{{% /expand %}}
{{% /expand-wrapper %}}
#### Query data
Execute SQL queries and get results:
```python
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```bash
# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
--trigger-spec "table:metrics" \
--plugin-filename "heavy_process.py" \
--run-asynchronous \
--database my_database \
async_processor
```
The shared API `query` function returns results as a `List` of `Dict[String, Any]`, where the key is the column name and the value is the column value.
Use asynchronous execution when:
#### Log information
- Processing might take longer than the trigger interval
- Multiple events need to be handled simultaneously
- Performance is more important than sequential execution
The shared API `info`, `warn`, and `error` functions accept multiple arguments,
convert them to strings, and log them as a space-separated message to the database log,
which is output in the server logs and captured in system tables that you can
query using SQL.
#### Configure error handling
Add logging to track plugin execution:
```python
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
Control how your trigger responds to errors:
```bash
# Automatically retry on error
influxdb3 create trigger \
--trigger-spec "table:important_data" \
--plugin-filename "critical_process.py" \
--error-behavior retry \
--database my_database \
critical_processor
```
#### Use the in-memory cache
The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions.
Use the shared API `cache` property to access the cache API.
```python
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
```
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| `put` | `key` (str): The key to store the value under<br>`value` (Any): Any Python object to cache<br>`ttl` (Optional[float], default=None): Time in seconds before expiration<br>`use_global` (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| `get` | `key` (str): The key to retrieve<br>`default` (Any, default=None): Value to return if key not found<br>`use_global` (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| `delete` | `key` (str): The key to delete<br>`use_global` (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |
##### Cache namespaces
The cache system offers two distinct namespaces:
| Namespace | Scope | Best For |
| --- | --- | --- |
| **Trigger-specific** (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| **Global** | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
##### Store and retrieve cached data
```python
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```
##### Store cached data with expiration
```python
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```
##### Share data across plugins
```python
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```
##### Track state between executions
```python
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
```
#### Best practices for in-memory caching
- [Use the trigger-specific namespace](#use-the-trigger-specific-namespace)
- [Use TTL appropriately](#use-ttl-appropriately)
- [Cache computation results](#cache-computation-results)
- [Warm the cache](#warm-the-cache)
- [Consider cache limitations](#consider-cache-limitations)
##### Use the trigger-specific namespace
The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.
##### Use TTL appropriately
Set realistic expiration times based on how frequently data changes.
```python
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
```
##### Cache computation results
Store the results of expensive calculations that need to be utilized frequently.
```python
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
```
##### Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data.
```python
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
```
##### Consider cache limitations
- **Memory Usage**: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
- **Server Restarts**: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above).
- **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
## Install Python dependencies
### Install Python dependencies
If your plugin needs additional Python packages, use the `influxdb3 install` command:
@ -654,6 +537,7 @@ docker exec -it CONTAINER_NAME influxdb3 install package pandas
This creates a Python virtual environment in your plugins directory with the specified packages installed.
{{% show-in "enterprise" %}}
### Connect Grafana to your InfluxDB instance
When configuring Grafana to connect to an InfluxDB 3 Enterprise instance:

View File

@ -2,6 +2,6 @@
{{- $product := index $productPathData 0 -}}
{{- $version := index $productPathData 1 -}}
{{- $isInfluxDBOSS := and (eq $product "influxdb") (gt (len (findRE `^v[0-9]` $version)) 0)}}
{{- $productKey := cond (and (eq $product "influxdb") (not $isInfluxDBOSS)) (print "influxdb_" (replaceRE "-" "_" $version)) $product -}}
{{- $productKey := cond (and (in $product "influxdb") (not $isInfluxDBOSS)) (print $product "_" (replaceRE "-" "_" $version)) $product -}}
{{- $productData := index $.Site.Data.products $productKey -}}
{{ $productData.link }}

Binary file not shown.

After

Width:  |  Height:  |  Size: 433 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 244 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 412 KiB