chore(mono): /api/v3 processing engine request parameters:

- Update trigger-spec syntax for schedule triggers
- Specify that cron only supports extended cron format
- Provide examples and pattern validation for trigger-specs
- Update response status
- Cleanup  use
- Misc. updates and cleanup
- Add test plugins
pull/6128/head
Jason Stirnaman 2025-06-06 17:16:20 -05:00
parent 6de5de2edb
commit 687b8343b7
6 changed files with 477 additions and 80 deletions

View File

@ -146,15 +146,15 @@ tags:
description: | description: |
Manage Processing engine triggers, test plugins, and send requests to trigger On Request plugins. Manage Processing engine triggers, test plugins, and send requests to trigger On Request plugins.
InfluxDB 3 Core provides the InfluxDB 3 Processing engine, an embedded Python VM that can dynamically load and trigger Python plugins in response to events in your database. InfluxDB 3 Core provides the InfluxDB 3 processing engine, an embedded Python VM that can dynamically load and trigger Python plugins in response to events in your database.
Use Processing engine plugins and triggers to run code and perform tasks for different database events. Use Processing engine plugins and triggers to run code and perform tasks for different database events.
To get started with the Processing engine, see the [Processing engine and Python plugins](/influxdb3/core/processing-engine/) guide. To get started with the processing engine, see the [Processing engine and Python plugins](/influxdb3/core/processing-engine/) guide.
- name: Query data - name: Query data
description: Query data using SQL or InfluxQL description: Query data using SQL or InfluxQL
- name: Quick start - name: Quick start
description: | description: |
1. [Create an admin token](#section/Authentication) for the InfluxDB 3 Core API. 1. [Create an admin token](#section/Authentication) to authorize API requests.
```bash ```bash
curl -X POST "http://localhost:8181/api/v3/configure/token/admin" curl -X POST "http://localhost:8181/api/v3/configure/token/admin"
@ -385,12 +385,7 @@ paths:
parameters: parameters:
- $ref: '#/components/parameters/dbWriteParam' - $ref: '#/components/parameters/dbWriteParam'
- $ref: '#/components/parameters/accept_partial' - $ref: '#/components/parameters/accept_partial'
- name: precision - $ref: '#/components/parameters/precisionParam'
in: query
required: true
schema:
$ref: '#/components/schemas/PrecisionWrite'
description: Precision of timestamps.
- name: no_sync - name: no_sync
in: query in: query
schema: schema:
@ -440,16 +435,8 @@ paths:
description: Executes an SQL query to retrieve data from the specified database. description: Executes an SQL query to retrieve data from the specified database.
parameters: parameters:
- $ref: '#/components/parameters/db' - $ref: '#/components/parameters/db'
- name: q - $ref: '#/components/parameters/querySqlParam'
in: query - $ref: '#/components/parameters/format'
required: true
schema:
type: string
- name: format
in: query
required: false
schema:
type: string
- $ref: '#/components/parameters/AcceptQueryHeader' - $ref: '#/components/parameters/AcceptQueryHeader'
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
responses: responses:
@ -1072,15 +1059,104 @@ paths:
post: post:
operationId: PostConfigureProcessingEngineTrigger operationId: PostConfigureProcessingEngineTrigger
summary: Create processing engine trigger summary: Create processing engine trigger
description: Creates a new processing engine trigger. description: |
Creates a processing engine trigger with the specified plugin file and trigger specification.
### Related guides
- [Processing engine and Python plugins](/influxdb3/core/plugins/)
requestBody: requestBody:
required: true required: true
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/ProcessingEngineTriggerRequest' $ref: '#/components/schemas/ProcessingEngineTriggerRequest'
examples:
schedule_cron:
summary: Schedule trigger using cron
description: |
In `"cron:CRON_EXPRESSION"`, `CRON_EXPRESSION` uses extended 6-field cron format.
The cron expression `0 0 6 * * 1-5` means the trigger will run at 6:00 AM every weekday (Monday to Friday).
value:
db: DATABASE_NAME
plugin_filename: schedule.py
trigger_name: schedule_cron_trigger
trigger_specification: cron:0 0 6 * * 1-5
schedule_every:
summary: Schedule trigger using interval
description: |
In `"every:DURATION"`, `DURATION` specifies the interval between trigger executions.
The duration `1h` means the trigger will run every hour.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_trigger
trigger_specification: every:1h
schedule_every_seconds:
summary: Schedule trigger using seconds interval
description: |
Example of scheduling a trigger to run every 30 seconds.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_30s_trigger
trigger_specification: every:30s
schedule_every_minutes:
summary: Schedule trigger using minutes interval
description: |
Example of scheduling a trigger to run every 5 minutes.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_5m_trigger
trigger_specification: every:5m
all_tables:
summary: All tables trigger example
description: |
Trigger that fires on write events to any table in the database.
value:
db: mydb
plugin_filename: all_tables.py
trigger_name: all_tables_trigger
trigger_specification: all_tables
table_specific:
summary: Table-specific trigger example
description: |
Trigger that fires on write events to a specific table.
value:
db: mydb
plugin_filename: table.py
trigger_name: table_trigger
trigger_specification: table:sensors
api_request:
summary: On-demand request trigger example
description: |
Creates an HTTP endpoint `/api/v3/engine/hello-world` for manual invocation.
value:
db: mydb
plugin_filename: request.py
trigger_name: hello_world_trigger
trigger_specification: path:hello-world
cron_friday_afternoon:
summary: Cron trigger for Friday afternoons
description: |
Example of a cron trigger that runs every Friday at 2:30 PM.
value:
db: reports
plugin_filename: weekly_report.py
trigger_name: friday_report_trigger
trigger_specification: cron:0 30 14 * * 5
cron_monthly:
summary: Cron trigger for monthly execution
description: |
Example of a cron trigger that runs on the first day of every month at midnight.
value:
db: monthly_data
plugin_filename: monthly_cleanup.py
trigger_name: monthly_cleanup_trigger
trigger_specification: cron:0 0 0 1 * *
responses: responses:
'201': '200':
description: Success. Processing engine trigger created. description: Success. Processing engine trigger created.
'400': '400':
description: Bad request. description: Bad request.
@ -1157,7 +1233,7 @@ paths:
$ref: '#/components/schemas/ProcessingEngineTriggerRequest' $ref: '#/components/schemas/ProcessingEngineTriggerRequest'
responses: responses:
'200': '200':
description: Success. The processing engine trigger has been enabled. description: Success. The processing engine trigger is enabled.
'400': '400':
description: Bad request. description: Bad request.
'401': '401':
@ -1170,7 +1246,14 @@ paths:
post: post:
operationId: PostInstallPluginPackages operationId: PostInstallPluginPackages
summary: Install plugin packages summary: Install plugin packages
description: Installs packages for the plugin environment. description: |
Installs the specified Python packages into the processing engine plugin environment.
This endpoint is synchronous and blocks until the packages are installed.
### Related guides
- [Processing engine and Python plugins](/influxdb3/core/plugins/)
parameters: parameters:
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
requestBody: requestBody:
@ -1179,10 +1262,30 @@ paths:
application/json: application/json:
schema: schema:
type: object type: object
additionalProperties: true properties:
packages:
type: array
items:
type: string
description: |
A list of Python package names to install.
Can include version specifiers (e.g., "scipy==1.9.0").
example:
- influxdb3-python
- scipy
- pandas==1.5.0
- requests
required:
- packages
example:
packages:
- influxdb3-python
- scipy
- pandas==1.5.0
- requests
responses: responses:
'200': '200':
description: Success. The packages have been installed. description: Success. The packages are installed.
'400': '400':
description: Bad request. description: Bad request.
'401': '401':
@ -1193,7 +1296,15 @@ paths:
post: post:
operationId: PostInstallPluginRequirements operationId: PostInstallPluginRequirements
summary: Install plugin requirements summary: Install plugin requirements
description: Installs requirements for the plugin environment. description: |
Installs requirements from a requirements file (also known as a "pip requirements file") into the processing engine plugin environment.
This endpoint is synchronous and blocks until the requirements are installed.
### Related
- [Processing engine and Python plugins](/influxdb3/core/plugins/)
- [Python requirements file format](https://pip.pypa.io/en/stable/reference/requirements-file-format/)
parameters: parameters:
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
requestBody: requestBody:
@ -1202,7 +1313,17 @@ paths:
application/json: application/json:
schema: schema:
type: object type: object
additionalProperties: true properties:
requirements_location:
type: string
description: |
The path to the requirements file containing Python packages to install.
Can be a relative path (relative to the plugin directory) or an absolute path.
example: requirements.txt
required:
- requirements_location
example:
requirements_location: requirements.txt
responses: responses:
'200': '200':
description: Success. The requirements have been installed. description: Success. The requirements have been installed.
@ -1248,18 +1369,18 @@ paths:
parameters: parameters:
- name: plugin_path - name: plugin_path
description: | description: |
The path configured in the `trigger-spec` for the plugin. The path configured in the request trigger specification "path:<plugin_path>"` for the plugin.
For example, if you define a trigger with the following: For example, if you define a trigger with the following:
``` ```json
trigger-spec: "request:hello-world" trigger-spec: "path:hello-world"
``` ```
then, the HTTP API exposes the following plugin endpoint: then, the HTTP API exposes the following plugin endpoint:
``` ```
<INFLUXDB_HOST>/api/v3/engine/hello-world <INFLUXDB3_HOST>/api/v3/engine/hello-world
``` ```
in: path in: path
required: true required: true
@ -1269,7 +1390,7 @@ paths:
operationId: GetProcessingEnginePluginRequest operationId: GetProcessingEnginePluginRequest
summary: On Request processing engine plugin request summary: On Request processing engine plugin request
description: | description: |
Sends a request to invoke an _On Request_ processing engine plugin. Executes the On Request processing engine plugin specified in `<plugin_path>`.
The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin. The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin.
An On Request plugin implements the following signature: An On Request plugin implements the following signature:
@ -1296,7 +1417,7 @@ paths:
operationId: PostProcessingEnginePluginRequest operationId: PostProcessingEnginePluginRequest
summary: On Request processing engine plugin request summary: On Request processing engine plugin request
description: | description: |
Sends a request to invoke an _On Request_ processing engine plugin. Executes the On Request processing engine plugin specified in `<plugin_path>`.
The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin. The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin.
An On Request plugin implements the following signature: An On Request plugin implements the following signature:
@ -1335,8 +1456,6 @@ paths:
description: | description: |
Creates an admin token. Creates an admin token.
An admin token is a special type of token that has full access to all resources in the system. An admin token is a special type of token that has full access to all resources in the system.
This endpoint is only available in InfluxDB 3 Enterprise.
responses: responses:
'201': '201':
description: | description: |
@ -1357,8 +1476,6 @@ paths:
summary: Regenerate admin token summary: Regenerate admin token
description: | description: |
Regenerates an admin token and revokes the previous token with the same name. Regenerates an admin token and revokes the previous token with the same name.
This endpoint is only available in InfluxDB 3 Enterprise.
parameters: [] parameters: []
responses: responses:
'201': '201':
@ -1429,7 +1546,6 @@ components:
schema: schema:
type: string type: string
description: | description: |
The name of the database.
The name of the database. The name of the database.
InfluxDB creates the database if it doesn't already exist, and then InfluxDB creates the database if it doesn't already exist, and then
writes all points in the batch to the database. writes all points in the batch to the database.
@ -1747,15 +1863,69 @@ components:
type: string type: string
plugin_filename: plugin_filename:
type: string type: string
description: |
The path and filename of the plugin to execute--for example,
`schedule.py` or `endpoints/report.py`.
The path can be absolute or relative to the `--plugins-dir` directory configured when starting InfluxDB 3.
The plugin file must implement the trigger interface associated with the trigger's specification (`trigger_spec`).
trigger_name: trigger_name:
type: string type: string
trigger_specification: trigger_specification:
type: string type: string
description: |
Specifies when and how the processing engine trigger should be invoked.
## Supported trigger specifications:
### Cron-based scheduling
Format: `cron:CRON_EXPRESSION`
Uses extended (6-field) cron format (second minute hour day_of_month month day_of_week):
```
┌───────────── second (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │ │
* * * * * *
```
Examples:
- `cron:0 0 6 * * 1-5` - Every weekday at 6:00 AM
- `cron:0 30 14 * * 5` - Every Friday at 2:30 PM
- `cron:0 0 0 1 * *` - First day of every month at midnight
### Interval-based scheduling
Format: `every:DURATION`
Supported durations: `s` (seconds), `m` (minutes), `h` (hours), `d` (days):
- `every:30s` - Every 30 seconds
- `every:5m` - Every 5 minutes
- `every:1h` - Every hour
- `every:1d` - Every day
### Table-based triggers
- `all_tables` - Triggers on write events to any table in the database
- `table:TABLE_NAME` - Triggers on write events to a specific table
### On-demand triggers
Format: `path:ENDPOINT_NAME`
Creates an HTTP endpoint `/api/v3/engine/ENDPOINT_NAME` for manual invocation:
- `path:hello-world` - Creates endpoint `/api/v3/engine/hello-world`
- `path:data-export` - Creates endpoint `/api/v3/engine/data-export`
pattern: ^(cron:[0-9 *,/-]+|every:[0-9]+[smhd]|all_tables|table:[a-zA-Z_][a-zA-Z0-9_]*|path:[a-zA-Z0-9_-]+)$
example: cron:0 0 6 * * 1-5
trigger_arguments: trigger_arguments:
type: object type: object
additionalProperties: true additionalProperties: true
description: Optional arguments passed to the plugin.
disabled: disabled:
type: boolean type: boolean
default: false
description: Whether the trigger is disabled.
required: required:
- db - db
- plugin_filename - plugin_filename
@ -1879,8 +2049,6 @@ components:
scheme: bearer scheme: bearer
bearerFormat: JWT bearerFormat: JWT
description: | description: |
_During Alpha release, an API token is not required._
A Bearer token for authentication. A Bearer token for authentication.
Provide the scheme and the API token in the `Authorization` header--for example: Provide the scheme and the API token in the `Authorization` header--for example:

View File

@ -146,15 +146,15 @@ tags:
description: | description: |
Manage Processing engine triggers, test plugins, and send requests to trigger On Request plugins. Manage Processing engine triggers, test plugins, and send requests to trigger On Request plugins.
InfluxDB 3 Enterprise provides the InfluxDB 3 Processing engine, an embedded Python VM that can dynamically load and trigger Python plugins in response to events in your database. InfluxDB 3 Enterprise provides the InfluxDB 3 processing engine, an embedded Python VM that can dynamically load and trigger Python plugins in response to events in your database.
Use Processing engine plugins and triggers to run code and perform tasks for different database events. Use Processing engine plugins and triggers to run code and perform tasks for different database events.
To get started with the Processing engine, see the [Processing engine and Python plugins](/influxdb3/enterprise/processing-engine/) guide. To get started with the processing engine, see the [Processing engine and Python plugins](/influxdb3/enterprise/processing-engine/) guide.
- name: Query data - name: Query data
description: Query data using SQL or InfluxQL description: Query data using SQL or InfluxQL
- name: Quick start - name: Quick start
description: | description: |
1. [Create an admin token](#section/Authentication) for the InfluxDB 3 Enterprise API. 1. [Create an admin token](#section/Authentication) to authorize API requests.
```bash ```bash
curl -X POST "http://localhost:8181/api/v3/configure/token/admin" curl -X POST "http://localhost:8181/api/v3/configure/token/admin"
@ -385,12 +385,7 @@ paths:
parameters: parameters:
- $ref: '#/components/parameters/dbWriteParam' - $ref: '#/components/parameters/dbWriteParam'
- $ref: '#/components/parameters/accept_partial' - $ref: '#/components/parameters/accept_partial'
- name: precision - $ref: '#/components/parameters/precisionParam'
in: query
required: true
schema:
$ref: '#/components/schemas/PrecisionWrite'
description: Precision of timestamps.
- name: no_sync - name: no_sync
in: query in: query
schema: schema:
@ -440,16 +435,8 @@ paths:
description: Executes an SQL query to retrieve data from the specified database. description: Executes an SQL query to retrieve data from the specified database.
parameters: parameters:
- $ref: '#/components/parameters/db' - $ref: '#/components/parameters/db'
- name: q - $ref: '#/components/parameters/querySqlParam'
in: query - $ref: '#/components/parameters/format'
required: true
schema:
type: string
- name: format
in: query
required: false
schema:
type: string
- $ref: '#/components/parameters/AcceptQueryHeader' - $ref: '#/components/parameters/AcceptQueryHeader'
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
responses: responses:
@ -1072,15 +1059,104 @@ paths:
post: post:
operationId: PostConfigureProcessingEngineTrigger operationId: PostConfigureProcessingEngineTrigger
summary: Create processing engine trigger summary: Create processing engine trigger
description: Creates a new processing engine trigger. description: |
Creates a processing engine trigger with the specified plugin file and trigger specification.
### Related guides
- [Processing engine and Python plugins](/influxdb3/enterprise/plugins/)
requestBody: requestBody:
required: true required: true
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/ProcessingEngineTriggerRequest' $ref: '#/components/schemas/ProcessingEngineTriggerRequest'
examples:
schedule_cron:
summary: Schedule trigger using cron
description: |
In `"cron:CRON_EXPRESSION"`, `CRON_EXPRESSION` uses extended 6-field cron format.
The cron expression `0 0 6 * * 1-5` means the trigger will run at 6:00 AM every weekday (Monday to Friday).
value:
db: DATABASE_NAME
plugin_filename: schedule.py
trigger_name: schedule_cron_trigger
trigger_specification: cron:0 0 6 * * 1-5
schedule_every:
summary: Schedule trigger using interval
description: |
In `"every:DURATION"`, `DURATION` specifies the interval between trigger executions.
The duration `1h` means the trigger will run every hour.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_trigger
trigger_specification: every:1h
schedule_every_seconds:
summary: Schedule trigger using seconds interval
description: |
Example of scheduling a trigger to run every 30 seconds.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_30s_trigger
trigger_specification: every:30s
schedule_every_minutes:
summary: Schedule trigger using minutes interval
description: |
Example of scheduling a trigger to run every 5 minutes.
value:
db: mydb
plugin_filename: schedule.py
trigger_name: schedule_every_5m_trigger
trigger_specification: every:5m
all_tables:
summary: All tables trigger example
description: |
Trigger that fires on write events to any table in the database.
value:
db: mydb
plugin_filename: all_tables.py
trigger_name: all_tables_trigger
trigger_specification: all_tables
table_specific:
summary: Table-specific trigger example
description: |
Trigger that fires on write events to a specific table.
value:
db: mydb
plugin_filename: table.py
trigger_name: table_trigger
trigger_specification: table:sensors
api_request:
summary: On-demand request trigger example
description: |
Creates an HTTP endpoint `/api/v3/engine/hello-world` for manual invocation.
value:
db: mydb
plugin_filename: request.py
trigger_name: hello_world_trigger
trigger_specification: path:hello-world
cron_friday_afternoon:
summary: Cron trigger for Friday afternoons
description: |
Example of a cron trigger that runs every Friday at 2:30 PM.
value:
db: reports
plugin_filename: weekly_report.py
trigger_name: friday_report_trigger
trigger_specification: cron:0 30 14 * * 5
cron_monthly:
summary: Cron trigger for monthly execution
description: |
Example of a cron trigger that runs on the first day of every month at midnight.
value:
db: monthly_data
plugin_filename: monthly_cleanup.py
trigger_name: monthly_cleanup_trigger
trigger_specification: cron:0 0 0 1 * *
responses: responses:
'201': '200':
description: Success. Processing engine trigger created. description: Success. Processing engine trigger created.
'400': '400':
description: Bad request. description: Bad request.
@ -1157,7 +1233,7 @@ paths:
$ref: '#/components/schemas/ProcessingEngineTriggerRequest' $ref: '#/components/schemas/ProcessingEngineTriggerRequest'
responses: responses:
'200': '200':
description: Success. The processing engine trigger has been enabled. description: Success. The processing engine trigger is enabled.
'400': '400':
description: Bad request. description: Bad request.
'401': '401':
@ -1170,7 +1246,14 @@ paths:
post: post:
operationId: PostInstallPluginPackages operationId: PostInstallPluginPackages
summary: Install plugin packages summary: Install plugin packages
description: Installs packages for the plugin environment. description: |
Installs the specified Python packages into the processing engine plugin environment.
This endpoint is synchronous and blocks until the packages are installed.
### Related guides
- [Processing engine and Python plugins](/influxdb3/enterprise/plugins/)
parameters: parameters:
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
requestBody: requestBody:
@ -1179,10 +1262,30 @@ paths:
application/json: application/json:
schema: schema:
type: object type: object
additionalProperties: true properties:
packages:
type: array
items:
type: string
description: |
A list of Python package names to install.
Can include version specifiers (e.g., "scipy==1.9.0").
example:
- influxdb3-python
- scipy
- pandas==1.5.0
- requests
required:
- packages
example:
packages:
- influxdb3-python
- scipy
- pandas==1.5.0
- requests
responses: responses:
'200': '200':
description: Success. The packages have been installed. description: Success. The packages are installed.
'400': '400':
description: Bad request. description: Bad request.
'401': '401':
@ -1193,7 +1296,15 @@ paths:
post: post:
operationId: PostInstallPluginRequirements operationId: PostInstallPluginRequirements
summary: Install plugin requirements summary: Install plugin requirements
description: Installs requirements for the plugin environment. description: |
Installs requirements from a requirements file (also known as a "pip requirements file") into the processing engine plugin environment.
This endpoint is synchronous and blocks until the requirements are installed.
### Related
- [Processing engine and Python plugins](/influxdb3/enterprise/plugins/)
- [Python requirements file format](https://pip.pypa.io/en/stable/reference/requirements-file-format/)
parameters: parameters:
- $ref: '#/components/parameters/ContentType' - $ref: '#/components/parameters/ContentType'
requestBody: requestBody:
@ -1202,7 +1313,17 @@ paths:
application/json: application/json:
schema: schema:
type: object type: object
additionalProperties: true properties:
requirements_location:
type: string
description: |
The path to the requirements file containing Python packages to install.
Can be a relative path (relative to the plugin directory) or an absolute path.
example: requirements.txt
required:
- requirements_location
example:
requirements_location: requirements.txt
responses: responses:
'200': '200':
description: Success. The requirements have been installed. description: Success. The requirements have been installed.
@ -1248,18 +1369,18 @@ paths:
parameters: parameters:
- name: plugin_path - name: plugin_path
description: | description: |
The path configured in the `trigger-spec` for the plugin. The path configured in the request trigger specification "path:<plugin_path>"` for the plugin.
For example, if you define a trigger with the following: For example, if you define a trigger with the following:
``` ```json
trigger-spec: "request:hello-world" trigger-spec: "path:hello-world"
``` ```
then, the HTTP API exposes the following plugin endpoint: then, the HTTP API exposes the following plugin endpoint:
``` ```
<INFLUXDB_HOST>/api/v3/engine/hello-world <INFLUXDB3_HOST>/api/v3/engine/hello-world
``` ```
in: path in: path
required: true required: true
@ -1269,7 +1390,7 @@ paths:
operationId: GetProcessingEnginePluginRequest operationId: GetProcessingEnginePluginRequest
summary: On Request processing engine plugin request summary: On Request processing engine plugin request
description: | description: |
Sends a request to invoke an _On Request_ processing engine plugin. Executes the On Request processing engine plugin specified in `<plugin_path>`.
The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin. The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin.
An On Request plugin implements the following signature: An On Request plugin implements the following signature:
@ -1296,7 +1417,7 @@ paths:
operationId: PostProcessingEnginePluginRequest operationId: PostProcessingEnginePluginRequest
summary: On Request processing engine plugin request summary: On Request processing engine plugin request
description: | description: |
Sends a request to invoke an _On Request_ processing engine plugin. Executes the On Request processing engine plugin specified in `<plugin_path>`.
The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin. The request can include request headers, query string parameters, and a request body, which InfluxDB passes to the plugin.
An On Request plugin implements the following signature: An On Request plugin implements the following signature:
@ -1448,7 +1569,6 @@ components:
schema: schema:
type: string type: string
description: | description: |
The name of the database.
The name of the database. The name of the database.
InfluxDB creates the database if it doesn't already exist, and then InfluxDB creates the database if it doesn't already exist, and then
writes all points in the batch to the database. writes all points in the batch to the database.
@ -1804,15 +1924,69 @@ components:
type: string type: string
plugin_filename: plugin_filename:
type: string type: string
description: |
The path and filename of the plugin to execute--for example,
`schedule.py` or `endpoints/report.py`.
The path can be absolute or relative to the `--plugins-dir` directory configured when starting InfluxDB 3.
The plugin file must implement the trigger interface associated with the trigger's specification (`trigger_spec`).
trigger_name: trigger_name:
type: string type: string
trigger_specification: trigger_specification:
type: string type: string
description: |
Specifies when and how the processing engine trigger should be invoked.
## Supported trigger specifications:
### Cron-based scheduling
Format: `cron:CRON_EXPRESSION`
Uses extended (6-field) cron format (second minute hour day_of_month month day_of_week):
```
┌───────────── second (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │ │
* * * * * *
```
Examples:
- `cron:0 0 6 * * 1-5` - Every weekday at 6:00 AM
- `cron:0 30 14 * * 5` - Every Friday at 2:30 PM
- `cron:0 0 0 1 * *` - First day of every month at midnight
### Interval-based scheduling
Format: `every:DURATION`
Supported durations: `s` (seconds), `m` (minutes), `h` (hours), `d` (days):
- `every:30s` - Every 30 seconds
- `every:5m` - Every 5 minutes
- `every:1h` - Every hour
- `every:1d` - Every day
### Table-based triggers
- `all_tables` - Triggers on write events to any table in the database
- `table:TABLE_NAME` - Triggers on write events to a specific table
### On-demand triggers
Format: `path:ENDPOINT_NAME`
Creates an HTTP endpoint `/api/v3/engine/ENDPOINT_NAME` for manual invocation:
- `path:hello-world` - Creates endpoint `/api/v3/engine/hello-world`
- `path:data-export` - Creates endpoint `/api/v3/engine/data-export`
pattern: ^(cron:[0-9 *,/-]+|every:[0-9]+[smhd]|all_tables|table:[a-zA-Z_][a-zA-Z0-9_]*|path:[a-zA-Z0-9_-]+)$
example: cron:0 0 6 * * 1-5
trigger_arguments: trigger_arguments:
type: object type: object
additionalProperties: true additionalProperties: true
description: Optional arguments passed to the plugin.
disabled: disabled:
type: boolean type: boolean
default: false
description: Whether the trigger is disabled.
required: required:
- db - db
- plugin_filename - plugin_filename

View File

@ -169,10 +169,10 @@ Before you begin, make sure:
Choose a plugin type based on your automation goals: Choose a plugin type based on your automation goals:
| Plugin Type | Best For | Trigger Type | | Plugin Type | Best For | Trigger Type |
|-------------|----------|-------------| | ---------------- | ------------------------------------------- | ------------------------ |
| **Data write** | Processing data as it arrives | `table:` or `all_tables` | | **Data write** | Processing data as it arrives | `table:` or `all_tables` |
| **Scheduled** | Running code at specific times | `every:` or `cron:` | | **Scheduled** | Running code at specific intervals or times | `every:` or `cron:` |
| **HTTP request** | Creating API endpoints | `path:` | | **HTTP request** | Running code on demand via API endpoints | `path:` |
#### Create your plugin file #### Create your plugin file
@ -336,8 +336,9 @@ influxdb3 create trigger \
regular_check regular_check
# Run on a cron schedule (8am daily) # Run on a cron schedule (8am daily)
# Supports extended cron format with seconds
influxdb3 create trigger \ influxdb3 create trigger \
--trigger-spec "cron:0 8 * * *" \ --trigger-spec "cron:0 0 8 * * *" \
--plugin-filename "daily_report.py" \ --plugin-filename "daily_report.py" \
--database my_database \ --database my_database \
daily_report daily_report

View File

@ -0,0 +1,24 @@
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
"""
Process an HTTP request to a custom endpoint in the InfluxDB 3 processing engine.
Args:
influxdb3_local: Local InfluxDB API client
query_parameters: Query parameters from the HTTP request
request_headers: Headers from the HTTP request
request_body: Body of the HTTP request
args: Optional arguments passed from the trigger configuration
"""
influxdb3_local.info("Processing HTTP request to custom endpoint")
# Handle HTTP requests to a custom endpoint
# Log the request parameters
influxdb3_local.info(f"Received request with parameters: {query_parameters}")
# Process the request body
if request_body:
import json
data = json.loads(request_body)
influxdb3_local.info(f"Request data: {data}")
# Return a response (automatically converted to JSON)
return {"status": "success", "message": "Request processed"}

View File

@ -0,0 +1,12 @@
def process_scheduled_call(influxdb3_local, call_time, args=None):
"""
Process a scheduled call from the InfluxDB 3 processing engine.
Args:
influxdb3_local: Local InfluxDB API client
call_time: Time when the trigger was called
args: Optional arguments passed from the trigger configuration
"""
influxdb3_local.info(f"Processing scheduled call at {call_time}")
if args:
influxdb3_local.info(f"With arguments: {args}")

View File

@ -0,0 +1,18 @@
def process_writes(influxdb3_local, table_batches, args=None):
"""
Process writes to the InfluxDB 3 processing engine, handling
data persisted to the object store.
"""
# Process data as it's written to the database
for table_batch in table_batches:
table_name = table_batch["table_name"]
rows = table_batch["rows"]
# Log information about the write
influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
# Write derived data back to the database
line = LineBuilder("processed_data")
line.tag("source_table", table_name)
line.int64_field("row_count", len(rows))
influxdb3_local.write(line)