feat(sample-plugin-guide): GitHub process_request plugin and separate scheduled_call plugin for running it. process_writes plugin for calling Kapa API after writing PR merge data.
parent
0147889ff5
commit
cae372c204
|
@ -61,21 +61,42 @@ home,room=Kitchen temp=22.8,hum=36.5,co=1i 1742216400"
|
|||
## Use the processing engine to retrieve and write pull request data from GitHub
|
||||
|
||||
1. Install [InfluxDB 3 Core](/influxdb3/core/) or [InfluxDB 3 Enterprise](/influxdb3/enterprise/) for your system.
|
||||
2. _Optional_: install additional tools that might help test and explore API requests:
|
||||
2. _Optional_: install additional tools that might help you test and explore HTTP API requests:
|
||||
- cURL
|
||||
- jq
|
||||
|
||||
3. Start the server with the following command:
|
||||
```bash
|
||||
# Start the server with the specified node ID, mode, cluster ID, host, database, and plugin directory
|
||||
# The --mode flag specifies the server mode (ingest, query, compact, or all)
|
||||
# The --cluster-id flag specifies the cluster ID
|
||||
# The --host flag specifies the host URL
|
||||
# The --plugin-dir flag specifies the directory where the plugins are located and activates the processing engine
|
||||
# The --node-id flag specifies the node ID
|
||||
influxdb3 serve --node-id rw01 --mode ingest,query --cluster-id docsaichat --http-bind localhost:9191 --plugin-dir ~/influxdb3-plugins/aichat --object-store file --data-dir ~/.influxdb3-data --log-filter debug
|
||||
```
|
||||
3. Start the server with the processing engine activated.
|
||||
The `--plugin-dir <PYTHON_PLUGINS_DIRECTORY>` specifies the location
|
||||
of your plugin files and stores the `.venv` Python virtual environment for the processing engine.
|
||||
|
||||
```bash
|
||||
# If using InfluxDB 3 Enterprise:
|
||||
# Start the server with the specified node ID, mode, cluster ID, host, database, and plugin directory
|
||||
# The --mode flag specifies the server mode (ingest, query, compact, or all)
|
||||
# The --cluster-id flag specifies the cluster ID
|
||||
# The --http-bind flag specifies the host URL where your server listens for HTTP requests
|
||||
# The --plugin-dir flag specifies the directory where the plugins are located and activates the processing engine
|
||||
# The --node-id flag specifies the node ID
|
||||
# The --object-store flag specifies the object store type
|
||||
# The --data-dir flag specifies the data directory
|
||||
# Optional: The --log-filter flag specifies the log level
|
||||
influxdb3 serve --node-id rw01 --mode ingest,query --cluster-id docsaichat --http-bind localhost:9191 --plugin-dir ~/influxdb3-plugins/aichat --object-store file --data-dir ~/.influxdb3-data --log-filter debug
|
||||
```
|
||||
|
||||
```bash
|
||||
# If using InfluxDB 3 Core:
|
||||
# The --node-id flag specifies the node ID
|
||||
# The --http-bind flag specifies the host URL where your server listens for HTTP requests
|
||||
# The --plugin-dir flag specifies the directory where the plugins are located and activates the processing engine
|
||||
# The --object-store flag specifies the object store type
|
||||
# The --data-dir flag specifies the data directory
|
||||
# Optional: The --log-filter flag specifies the log level
|
||||
influxdb3 serve --node-id rw01 --http-bind localhost:9191 --plugin-dir ~/Documents/github/influxdb3-plugins --object-store file --data-dir ~/.influxdb3-data --log-filter debug
|
||||
```
|
||||
|
||||
When the server starts, it creates the `.venv` directory with `python` and `pip` installed.
|
||||
|
||||
```bash
|
||||
4. If you haven't already, create a database to store the data:
|
||||
|
||||
```bash
|
||||
|
@ -89,9 +110,9 @@ influxdb3 serve --node-id rw01 --mode ingest,query --cluster-id docsaichat --htt
|
|||
|
||||
### Create a plugin
|
||||
|
||||
With the processing engine enabled, get started with a sample plugin.
|
||||
With the processing engine enabled, create plugins and triggers.
|
||||
|
||||
#### Example plugin: GitHub pull request scheduler
|
||||
#### Example: create a request plugin
|
||||
|
||||
The following sample Python plugin for the InfluxDB 3 Processing engine retrieves GitHub pull request merge data and writes it to your database.
|
||||
|
||||
|
@ -277,48 +298,13 @@ Save this as `github_pr_req.py` in your plugin directory--for example: `~/influx
|
|||
While you could use a _schedule_ plugin for the code, using a _request_ plugin
|
||||
gives additional flexibility for running and testing the plugin on-demand.
|
||||
|
||||
In later steps, you'll create the `path` trigger to run this endpoint on-demand,
|
||||
In later steps, you'll create the `request` trigger to run this endpoint on-demand,
|
||||
and create a separate plugin and trigger to schedule running the endpoint.
|
||||
|
||||
When you the plugin runs, it writes data in the following format:
|
||||
|
||||
```text
|
||||
gh,repo="influxdata/docs-v2",ref="master" activity_type="pr_merge",lines=25,author="jstirnaman" 1742198400
|
||||
```
|
||||
|
||||
And it returns a JSON response similar to the following:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "success",
|
||||
"message": "Wrote 2 PR merge events to database",
|
||||
"repository": "influxdata/docs-v2",
|
||||
"time_period": "week",
|
||||
"since": "2023-10-01T00:00:00Z",
|
||||
"pr_count": 2,
|
||||
"pull_requests": [
|
||||
{
|
||||
"number": 1234,
|
||||
"title": "Fix typo in README",
|
||||
"author": "jstirnaman",
|
||||
"merged_at": "2023-10-01T12:34:56Z",
|
||||
"lines": 25
|
||||
},
|
||||
{
|
||||
"number": 5678,
|
||||
"title": "Add new feature",
|
||||
"author": "jdoe",
|
||||
"merged_at": "2023-10-02T14:56:78Z",
|
||||
"lines": 50
|
||||
}
|
||||
]
|
||||
}
|
||||
````
|
||||
|
||||
2. Install Python dependencies for your plugin:
|
||||
|
||||
```bash
|
||||
influxdb3 install package
|
||||
influxdb3 install package requests
|
||||
```
|
||||
|
||||
> [!Note]
|
||||
|
@ -329,9 +315,9 @@ And it returns a JSON response similar to the following:
|
|||
> - `process_request` plugin: Create a `request` trigger for your API endpoint and test it using your favorite HTTP client.
|
||||
> - `process_writes` or `process_schedule_call` plugin: Use the [`influxdb3 test`](/influxdb3/version/reference/cli/influxdb3/test/) command.
|
||||
|
||||
#### Use the plugin
|
||||
#### Create a request trigger for a request plugin
|
||||
|
||||
1. To use the request plugin, create a trigger to publish an API endpoint for the plugin:
|
||||
1. Create a trigger to publish an API endpoint for the request plugin:
|
||||
|
||||
```bash
|
||||
influxdb3 create trigger \
|
||||
|
@ -347,12 +333,107 @@ And it returns a JSON response similar to the following:
|
|||
|
||||
- `your_github_token`: an authorization token. If the repository activity is _public_, you don't need to provide a token.
|
||||
|
||||
2. To run the plugin, send an HTTP request to the endpoint--for example:
|
||||
|
||||
## Schedule the pull request operation
|
||||
```bash
|
||||
curl -X POST "http://localhost:9191/api/v3/engine/github_prs" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"repo": "influxdata/docs-v2", "time_period": "week"}'
|
||||
```
|
||||
|
||||
The plugin writes the following data in line protocol format to the `gh` database:
|
||||
- GitHub pull request data
|
||||
- Measurement: "aichat"
|
||||
|
||||
When you the plugin runs, it writes data in the following format:
|
||||
|
||||
Use the processing engine to schedule a trigger to get GitHub pull request data every 1 hour and write it to the database.
|
||||
```text
|
||||
gh,repo="influxdata/docs-v2",ref="master" activity_type="pr_merge",lines=25,author="jstirnaman" 1742198400
|
||||
```
|
||||
|
||||
And it returns a JSON response similar to the following:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "success",
|
||||
"message": "Wrote 2 PR merge events to database",
|
||||
"repository": "influxdata/docs-v2",
|
||||
"time_period": "week",
|
||||
"since": "2023-10-01T00:00:00Z",
|
||||
"pr_count": 2,
|
||||
"pull_requests": [
|
||||
{
|
||||
"number": 1234,
|
||||
"title": "Fix typo in README",
|
||||
"author": "jstirnaman",
|
||||
"merged_at": "2023-10-01T12:34:56Z",
|
||||
"lines": 25
|
||||
},
|
||||
{
|
||||
"number": 5678,
|
||||
"title": "Add new feature",
|
||||
"author": "jdoe",
|
||||
"merged_at": "2023-10-02T14:56:78Z",
|
||||
"lines": 50
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
3. Query the pull request data from the database:
|
||||
|
||||
```bash
|
||||
influxdb3 query --host http://localhost:9191 --database aichat "select * from gh where time > 1742198400 and time < 1742216400" --format jsonl
|
||||
```
|
||||
|
||||
## Example: create a scheduled call plugin
|
||||
|
||||
Create a plugin and schedule a trigger to call the `github_prs` endpoint
|
||||
every 1 hour and write it to the database.
|
||||
|
||||
1. Save the following code as `github_pr_sched.py` in your plugin directory--for example: `~/influxdb3-plugins/github_pr_sched.py`:
|
||||
|
||||
```python
|
||||
def process_schedule_call()
|
||||
"""
|
||||
Schedule plugin that runs the GitHub PR request plugin every hour.
|
||||
|
||||
This plugin schedules a call to the GitHub PR request plugin every hour
|
||||
to collect PR merge data for the last week.
|
||||
|
||||
Args:
|
||||
influxdb3_local: Local InfluxDB API client
|
||||
args: Optional arguments passed from the trigger configuration
|
||||
|
||||
Returns:
|
||||
Dictionary containing response data (automatically converted to JSON)
|
||||
"""
|
||||
# Get trigger arguments
|
||||
github_token = args.get("github_token")
|
||||
time_period = args.get("time_period") or "week"
|
||||
|
||||
# Log start of scheduled call
|
||||
influxdb3_local.info(f"Scheduling PR merge data collection for the last {time_period}")
|
||||
|
||||
# Call the request plugin with the specified parameters
|
||||
response = influxdb3_local.call_plugin("github_pr_req.py", github_token=github_token, time_period=time_period)
|
||||
|
||||
# Log response
|
||||
if response.get("status") == "success":
|
||||
success_msg = f"Successfully scheduled PR merge data collection: {response['message']}"
|
||||
influxdb3_local.info(success_msg)
|
||||
else:
|
||||
error_msg = f"Error scheduling PR merge data collection: {response['message']}"
|
||||
influxdb3_local.error(error_msg)
|
||||
|
||||
return response
|
||||
```
|
||||
|
||||
### Create a trigger for the GitHub PR scheduled call plugin
|
||||
|
||||
To schedule the plugin, create a trigger to run the plugin every hour--for example,
|
||||
to collect pull request data from the last week:
|
||||
|
||||
1. To use the plugin, create a trigger to run the plugin every hour--for example, to collect pull request data from the last week:
|
||||
```bash
|
||||
influxdb3 create trigger \
|
||||
--host http://localhost:9191 \
|
||||
|
@ -368,14 +449,605 @@ In your trigger arguments, replace the following:
|
|||
`your_github_token`: your actual GitHub token
|
||||
`time_period`: set to "day" or "week" to specify the time range for data collection.
|
||||
|
||||
The processing engine writes the following data in line protocol format to the `gh` database:
|
||||
- GitHub pull request data
|
||||
- Measurement: "aichat"
|
||||
```text
|
||||
"gh,repo="influxdata/docs-v2",ref="master" activity_type="pr_merge",lines=25,author="jstirnaman" 1742198400"
|
||||
## Example: create a request plugin to fetch Kapa.ai data
|
||||
|
||||
Create a request plugin that retrieves the last `ingested_at` timestamps for [sources from the Kapa.ai API](https://docs.kapa.ai/api#tag/Sources) and writes them to the database.
|
||||
|
||||
1. Save the following code as `kapa_ai_req.py` in your plugin directory--for example: `~/influxdb3-plugins/kapa_ai_req.py`:
|
||||
|
||||
```python
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
|
||||
"""
|
||||
Request plugin that retrieves Kapa.ai source data and writes it to the database using line protocol.
|
||||
|
||||
This plugin creates an HTTP endpoint that accepts parameters via query string or JSON body
|
||||
and returns the processed Kapa.ai data while also writing it to the database.
|
||||
|
||||
Args:
|
||||
influxdb3_local: Local InfluxDB API client
|
||||
query_parameters: Dictionary of query parameters from the URL
|
||||
request_headers: Dictionary of HTTP request headers
|
||||
request_body: Request body as a string
|
||||
args: Optional arguments passed from the trigger configuration
|
||||
|
||||
Returns:
|
||||
Dictionary containing response data (automatically converted to JSON)
|
||||
"""
|
||||
# Parse request body if provided
|
||||
request_data = {}
|
||||
if request_body:
|
||||
try:
|
||||
request_data = json.loads(request_body)
|
||||
except json.JSONDecodeError:
|
||||
influxdb3_local.warn("Invalid JSON in request body")
|
||||
|
||||
# Get project_id from request parameters or return error if not provided
|
||||
project_id = request_data.get("project_id") or query_parameters.get("project_id")
|
||||
if not project_id:
|
||||
error_msg = "Missing required parameter: project_id"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
# Get integration_id from request parameters or return error if not provided
|
||||
integration_id = request_data.get("integration_id") or query_parameters.get("integration_id")
|
||||
if not integration_id:
|
||||
error_msg = "Missing required parameter: integration_id"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
# Get Kapa API token from request or trigger args
|
||||
kapa_token = (
|
||||
request_data.get("kapa_token") or
|
||||
query_parameters.get("kapa_token") or
|
||||
(args and args.get("kapa_token"))
|
||||
)
|
||||
|
||||
if not kapa_token:
|
||||
error_msg = "Missing required parameter: kapa_token"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
# Set up Kapa.ai API request headers with authentication
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
"X-API-KEY": kapa_token
|
||||
}
|
||||
|
||||
# Set up Kapa.ai request data
|
||||
payload = {
|
||||
"integration_id": integration_id
|
||||
}
|
||||
|
||||
try:
|
||||
# Make API request to get sources for the project
|
||||
url = f"https://api.kapa.ai/ingestion/v1/projects/{project_id}/sources/"
|
||||
influxdb3_local.info(f"Requesting data from: {url}")
|
||||
|
||||
# For GET requests, parameters should be in query string not body
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
# Check for API errors
|
||||
if response.status_code != 200:
|
||||
error_msg = f"Kapa API error: {response.status_code} - {response.text}"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
# Parse response data
|
||||
data = response.json()
|
||||
sources = data.get("results", [])
|
||||
|
||||
influxdb3_local.info(f"Retrieved {len(sources)} sources from Kapa.ai")
|
||||
|
||||
# Process and write each source to the database
|
||||
sources_data = []
|
||||
for source in sources:
|
||||
source_id = source.get("id")
|
||||
source_name = source.get("name", "")
|
||||
source_type = source.get("type", "")
|
||||
|
||||
# Process ingested_at timestamp
|
||||
ingested_at = source.get("ingested_at")
|
||||
if ingested_at:
|
||||
try:
|
||||
# Parse ISO format timestamp with timezone
|
||||
dt = datetime.fromisoformat(ingested_at.replace("Z", "+00:00"))
|
||||
timestamp_ns = int(dt.timestamp() * 1_000_000_000)
|
||||
|
||||
# Write ingested_at data point using the actual timestamp
|
||||
line = LineBuilder("kapa_source")
|
||||
line.tag("project_id", project_id)
|
||||
line.tag("source_id", source_id)
|
||||
line.tag("source_name", source_name)
|
||||
line.tag("source_type", source_type)
|
||||
line.tag("event_type", "ingestion")
|
||||
|
||||
# Add fields with the timestamp components
|
||||
line.int64_field("timestamp_unix", int(dt.timestamp()))
|
||||
line.string_field("timezone", dt.tzinfo.tzname(dt))
|
||||
line.int64_field("occurred", 1) # Required field
|
||||
|
||||
# Set the timestamp for the data point
|
||||
line.time_ns(timestamp_ns)
|
||||
|
||||
influxdb3_local.write(line)
|
||||
|
||||
except Exception as e:
|
||||
influxdb3_local.warn(f"Error processing ingested_at for source {source_id}: {str(e)}")
|
||||
|
||||
# Process next_run_at timestamp
|
||||
next_run_at = source.get("next_run_at")
|
||||
if next_run_at:
|
||||
try:
|
||||
# Parse ISO format timestamp with timezone
|
||||
dt = datetime.fromisoformat(next_run_at.replace("Z", "+00:00"))
|
||||
timestamp_ns = int(dt.timestamp() * 1_000_000_000)
|
||||
|
||||
# Write next_run_at data point using the actual timestamp
|
||||
line = LineBuilder("kapa_source")
|
||||
line.tag("project_id", project_id)
|
||||
line.tag("source_id", source_id)
|
||||
line.tag("source_name", source_name)
|
||||
line.tag("source_type", source_type)
|
||||
line.tag("event_type", "scheduled_run")
|
||||
|
||||
# Add fields with the timestamp components
|
||||
line.int64_field("timestamp_unix", int(dt.timestamp()))
|
||||
line.string_field("timezone", dt.tzinfo.tzname(dt))
|
||||
line.int64_field("scheduled", 1) # Required field
|
||||
|
||||
# Set the timestamp for the data point
|
||||
line.time_ns(timestamp_ns)
|
||||
|
||||
influxdb3_local.write(line)
|
||||
|
||||
except Exception as e:
|
||||
influxdb3_local.warn(f"Error processing next_run_at for source {source_id}: {str(e)}")
|
||||
|
||||
# Add source info to the returned data
|
||||
sources_data.append({
|
||||
"id": source_id,
|
||||
"name": source_name,
|
||||
"type": source_type,
|
||||
"ingested_at": ingested_at,
|
||||
"next_run_at": next_run_at
|
||||
})
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"project_id": project_id,
|
||||
"integration_id": integration_id,
|
||||
"source_count": len(sources),
|
||||
"sources": sources_data
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error collecting Kapa data: {str(e)}"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
```
|
||||
|
||||
##### Troubleshoot triggers
|
||||
### Create a trigger for the Kapa.ai sources request plugin:
|
||||
|
||||
```bash
|
||||
influxdb3 create trigger \
|
||||
--host http://localhost:9191 \
|
||||
--database aichat \
|
||||
--trigger-spec "request:kapa_sources" \
|
||||
--plugin-filename "kapa_sources_req.py" \
|
||||
kapa_sources_endpoint
|
||||
```
|
||||
|
||||
### Run the Kapa.ai sources request plugin:
|
||||
|
||||
1. Securely store your Kapa.ai API token in a secret store or environment variable.
|
||||
2. Send an HTTP request to the endpoint--for example:
|
||||
|
||||
```bash
|
||||
API_TOKEN=$(grep API_TOKEN ~/.env.js_influxdb3_aichat_token | cut -d'=' -f2)
|
||||
PROJECT_ID=$(grep PROJECT_ID ~/.env.js_influxdb3_aichat_projectid | cut -d'=' -f2)
|
||||
INTEGRATION_ID=$(grep INTEGRATION_ID ~/.env.js_influxdb3_aichat_projectid | cut -d'=' -f2)
|
||||
|
||||
curl -X POST "http://localhost:9191/api/v3/engine/kapa_sources" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d @- << EOF
|
||||
{
|
||||
"project_id": "$PROJECT_ID",
|
||||
"integration_id": "$INTEGRATION_ID",
|
||||
"kapa_token": "$API_TOKEN"
|
||||
}
|
||||
EOF
|
||||
```
|
||||
|
||||
The plugin writes the following data in line protocol format to the `kapa_source` database:
|
||||
- Kapa.ai source data
|
||||
- Measurement: "kapa_source"
|
||||
|
||||
For example:
|
||||
|
||||
```text
|
||||
kapa_source,project_id=project123,source_id=source456,source_name=MySource,source_type=s3,event_type=ingested_at timestamp_unix=1612345678i,timezone="UTC",occurred=1i 1612345678000000000
|
||||
|
||||
kapa_source,project_id=project123,source_id=source456,source_name=MySource,source_type=s3,event_type=next_run_at timestamp_unix=1712345678i,timezone="UTC",scheduled=1i 1712345678000000000
|
||||
```
|
||||
|
||||
And it returns a JSON response similar to the following:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "success",
|
||||
"project_id": "PROJECT_ID",
|
||||
"integration_id": "INTEGRATION_ID",
|
||||
"source_count": 58,
|
||||
"sources": [
|
||||
{
|
||||
"id": "e58aefe2-c16a-42d9-9ddc-b15d38142105",
|
||||
"name": "Source 1",
|
||||
"type": "scrape",
|
||||
"ingested_at": "2025-03-19T12:00:35.788963Z",
|
||||
"next_run_at": "2025-03-26T12:00:00.000138Z"
|
||||
},
|
||||
{
|
||||
"id": "0b8bcba6-c698-4e88-b9fa-7fa8ec38b05b",
|
||||
"name": "Source 2",
|
||||
"type": "scrape",
|
||||
"ingested_at": "2025-03-21T02:00:21.279909Z",
|
||||
"next_run_at": "2025-03-28T02:00:00.000112Z"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
To view your data, query the `kapa_source` table:
|
||||
|
||||
```bash
|
||||
influxdb3 query --host http://localhost:9191 --database aichat \
|
||||
"select * from kapa_source where event_type='ingested_at' and time > now() - interval '7 days'" \
|
||||
--format jsonl
|
||||
```
|
||||
|
||||
## Orchestrate data collection with triggers
|
||||
|
||||
### Example: schedule PR data collection from GitHub
|
||||
|
||||
```python
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
def process_scheduled_call(influxdb3_local, call_time, args=None):
|
||||
"""
|
||||
Scheduled plugin that retrieves GitHub PR data on a regular schedule.
|
||||
|
||||
This plugin calls the github_prs endpoint to fetch PR merge data for a specified repository.
|
||||
The data is automatically written to the database by the github_prs endpoint.
|
||||
|
||||
Args:
|
||||
influxdb3_local: Local InfluxDB API client
|
||||
call_time: Time when the trigger was called
|
||||
args: Optional arguments passed from the trigger configuration
|
||||
- github_token: GitHub API token
|
||||
- repo: GitHub repository (default: "influxdata/docs-v2")
|
||||
- time_period: Time period to retrieve PRs for (default: "week")
|
||||
- api_host: InfluxDB API host (default: "http://localhost:9191")
|
||||
|
||||
Returns:
|
||||
Dictionary containing response data
|
||||
"""
|
||||
# Extract arguments or use defaults
|
||||
if not args:
|
||||
args = {}
|
||||
|
||||
github_token = args.get("github_token", "")
|
||||
repo = args.get("repo", "influxdata/docs-v2")
|
||||
time_period = args.get("time_period", "week")
|
||||
api_host = args.get("api_host", "http://localhost:9191")
|
||||
|
||||
# Ensure api_host doesn't have trailing slash
|
||||
if api_host.endswith("/"):
|
||||
api_host = api_host[:-1]
|
||||
|
||||
# Log the start of data collection
|
||||
current_time = datetime.now().isoformat()
|
||||
influxdb3_local.info(f"Starting GitHub PR data collection at {current_time}")
|
||||
|
||||
# Make HTTP request to the github_prs endpoint
|
||||
github_url = f"{api_host}/api/v3/engine/github_prs"
|
||||
github_params = {
|
||||
"repo": repo,
|
||||
"time_period": time_period
|
||||
}
|
||||
|
||||
if github_token:
|
||||
github_params["github_token"] = github_token
|
||||
|
||||
try:
|
||||
influxdb3_local.info(f"Calling github_prs endpoint for {repo} over the last {time_period}...")
|
||||
|
||||
response = requests.post(
|
||||
github_url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=json.dumps(github_params),
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
error_msg = f"GitHub API error: {response.status_code} - {response.text}"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
|
||||
# Parse response
|
||||
response_data = response.json()
|
||||
pr_count = response_data.get("pr_count", 0)
|
||||
|
||||
success_msg = f"Successfully fetched {pr_count} PR merges for {repo}"
|
||||
influxdb3_local.info(success_msg)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": success_msg,
|
||||
"pr_count": pr_count,
|
||||
"repository": repo,
|
||||
"time_period": time_period
|
||||
}
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
error_msg = f"Failed to call github_prs endpoint: {str(e)}"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
except Exception as e:
|
||||
error_msg = f"Error during GitHub PR data collection: {str(e)}"
|
||||
influxdb3_local.error(error_msg)
|
||||
return {"status": "error", "message": error_msg}
|
||||
```
|
||||
|
||||
```bash
|
||||
GITHUB_TOKEN=$(grep GITHUB_TOKEN ~/.env.js_influxdb3_repo_token | cut -d'=' -f2)
|
||||
|
||||
influxdb3 create trigger \
|
||||
--host http://localhost:9191 \
|
||||
--database aichat \
|
||||
--trigger-spec "every:1h" \
|
||||
--plugin-filename "github_pr_scheduled.py" \
|
||||
--trigger-arguments "github_token=$GITHUB_TOKEN,repo=influxdata/docs-v2,time_period=week,api_host=http://localhost:9191" \
|
||||
github_pr_monitor
|
||||
```
|
||||
|
||||
### Example: alert on outdated documentation in Kapa sources
|
||||
|
||||
```python
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def process_writes(influxdb3_local, table_batches, args=None):
|
||||
"""
|
||||
Writes plugin that monitors GitHub PR activity and checks Kapa.ai source ingestion.
|
||||
|
||||
This plugin triggers when new data is written to the gh table. It checks if accumulated
|
||||
PR lines since the last Kapa.ai source ingestion exceed a threshold, and if so,
|
||||
generates an alert.
|
||||
|
||||
Args:
|
||||
influxdb3_local: Local InfluxDB API client
|
||||
table_batches: Batches of data written to tables
|
||||
args: Optional arguments passed from the trigger configuration
|
||||
- source_name: Kapa.ai source name to monitor (default: "Documentation")
|
||||
- repo: GitHub repository (default: "influxdata/docs-v2")
|
||||
- severity_threshold: Line count threshold for severity warning (default: 100)
|
||||
- kapa_token: Kapa.ai API token
|
||||
- project_id: Kapa.ai project ID
|
||||
|
||||
Returns:
|
||||
Dictionary containing response data
|
||||
"""
|
||||
# Extract arguments or use defaults
|
||||
if not args:
|
||||
args = {}
|
||||
|
||||
source_name = args.get("source_name", "Documentation")
|
||||
repo = args.get("repo", "influxdata/docs-v2")
|
||||
severity_threshold = int(args.get("severity_threshold", 100))
|
||||
kapa_token = args.get("kapa_token", "")
|
||||
project_id = args.get("project_id", "")
|
||||
|
||||
# Check if any gh table writes match our criteria (activity_type = "pr_merge")
|
||||
has_pr_activity = False
|
||||
for table_batch in table_batches:
|
||||
if table_batch["table_name"] == "gh":
|
||||
for row in table_batch["rows"]:
|
||||
if row.get("activity_type") == "pr_merge" and row.get("repo") == repo:
|
||||
has_pr_activity = True
|
||||
break
|
||||
if has_pr_activity:
|
||||
break
|
||||
|
||||
# Skip processing if no relevant PR activity
|
||||
if not has_pr_activity:
|
||||
return {"status": "skipped", "reason": "No relevant PR merge activity"}
|
||||
|
||||
# Initialize results
|
||||
results = {
|
||||
"status": "success",
|
||||
"data": {},
|
||||
"errors": []
|
||||
}
|
||||
|
||||
# Validate required parameters for Kapa.ai API
|
||||
if not all([kapa_token, project_id]):
|
||||
error_msg = "Missing required Kapa.ai parameters (kapa_token, project_id)"
|
||||
influxdb3_local.error(error_msg)
|
||||
results["status"] = "error"
|
||||
results["errors"].append(error_msg)
|
||||
return results
|
||||
|
||||
try:
|
||||
# Call Kapa.ai API directly to get source information
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
"X-API-KEY": kapa_token
|
||||
}
|
||||
|
||||
# Make API request to get sources
|
||||
url = f"https://api.kapa.ai/ingestion/v1/projects/{project_id}/sources/"
|
||||
influxdb3_local.info(f"Requesting source data from Kapa.ai API")
|
||||
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
# Check for API errors
|
||||
if response.status_code != 200:
|
||||
error_msg = f"Kapa API error: {response.status_code} - {response.text}"
|
||||
influxdb3_local.error(error_msg)
|
||||
results["errors"].append(error_msg)
|
||||
return results
|
||||
|
||||
# Parse response to find the specified source
|
||||
kapa_data = response.json()
|
||||
sources = kapa_data.get("results", [])
|
||||
|
||||
# Find the exact matching source
|
||||
target_source = None
|
||||
for source in sources:
|
||||
if source.get("name") == source_name:
|
||||
target_source = source
|
||||
break
|
||||
|
||||
if not target_source:
|
||||
error_msg = f"Source '{source_name}' not found in Kapa.ai"
|
||||
influxdb3_local.warn(error_msg)
|
||||
results["errors"].append(error_msg)
|
||||
return results
|
||||
|
||||
# Extract ingestion time
|
||||
ingestion_time = None
|
||||
if target_source.get("ingested_at"):
|
||||
ingestion_time = datetime.fromisoformat(target_source["ingested_at"].replace("Z", "+00:00"))
|
||||
else:
|
||||
error_msg = f"Source '{source_name}' has no ingestion timestamp"
|
||||
influxdb3_local.warn(error_msg)
|
||||
results["errors"].append(error_msg)
|
||||
return results
|
||||
|
||||
# Format for database query
|
||||
ingestion_time_str = ingestion_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
|
||||
# Query the database for PR data since last ingestion
|
||||
pr_query = f"""
|
||||
SELECT SUM(lines) as total_lines, COUNT(*) as pr_count
|
||||
FROM gh
|
||||
WHERE repo = '{repo}' AND ref = 'master' AND activity_type = 'pr_merge' AND time > '{ingestion_time_str}'
|
||||
"""
|
||||
|
||||
pr_records = influxdb3_local.query(pr_query)
|
||||
|
||||
# Get PR stats
|
||||
if not pr_records or len(pr_records) == 0:
|
||||
total_lines = 0
|
||||
pr_count = 0
|
||||
else:
|
||||
pr_data = pr_records[0]
|
||||
total_lines = int(pr_data.get("total_lines") or 0)
|
||||
pr_count = int(pr_data.get("pr_count") or 0)
|
||||
|
||||
# Calculate days since ingestion
|
||||
current_time = datetime.now(timezone.utc)
|
||||
days_difference = (current_time - ingestion_time).total_seconds() / 86400
|
||||
|
||||
# Create status data
|
||||
status = {
|
||||
"source_name": source_name,
|
||||
"source_id": target_source.get("id"),
|
||||
"last_ingestion": ingestion_time.isoformat(),
|
||||
"days_since_ingestion": round(days_difference, 1),
|
||||
"pr_count": pr_count,
|
||||
"total_lines": total_lines,
|
||||
"needs_update": total_lines > 0,
|
||||
"threshold": severity_threshold
|
||||
}
|
||||
|
||||
results["data"] = status
|
||||
|
||||
# Only generate status and alerts if PR lines were found
|
||||
if total_lines > 0:
|
||||
influxdb3_local.info(f"Found {pr_count} PRs with {total_lines} lines changed since last ingestion of '{source_name}'")
|
||||
|
||||
# Write status data to the database
|
||||
line = LineBuilder("integration_status")
|
||||
line.tag("repo", repo)
|
||||
line.tag("source_name", source_name)
|
||||
line.int64_field("days_since_ingestion", int(days_difference))
|
||||
line.int64_field("prs_since_ingestion", pr_count)
|
||||
line.int64_field("lines_since_ingestion", total_lines)
|
||||
line.int64_field("needs_update", 1)
|
||||
line.int64_field("severity_threshold", severity_threshold)
|
||||
|
||||
influxdb3_local.write(line)
|
||||
|
||||
# Generate alert based on severity threshold
|
||||
severity = "info" if total_lines < severity_threshold else "warning"
|
||||
|
||||
alert_line = LineBuilder("documentation_alert")
|
||||
alert_line.tag("repo", repo)
|
||||
alert_line.tag("source_name", source_name)
|
||||
alert_line.tag("severity", severity)
|
||||
alert_line.string_field("message",
|
||||
f"{source_name} needs update: {pr_count} PRs with {total_lines} lines changed in the last {days_difference:.1f} days")
|
||||
alert_line.int64_field("lines_changed", total_lines)
|
||||
alert_line.int64_field("days_since_ingestion", int(days_difference))
|
||||
|
||||
influxdb3_local.write(alert_line)
|
||||
|
||||
influxdb3_local.info(f"Generated {severity} alert for {source_name}")
|
||||
else:
|
||||
influxdb3_local.info(f"No PR changes since last ingestion of '{source_name}'")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
error_msg = f"Failed to call Kapa.ai API: {str(e)}"
|
||||
influxdb3_local.error(error_msg)
|
||||
results["errors"].append(error_msg)
|
||||
except Exception as e:
|
||||
error_msg = f"Error during integration monitoring: {str(e)}"
|
||||
influxdb3_local.error(error_msg)
|
||||
results["errors"].append(error_msg)
|
||||
|
||||
return results
|
||||
```
|
||||
|
||||
```bash
|
||||
KAPA_TOKEN=$(grep API_TOKEN ~/.env.js_influxdb3_aichat_token | cut -d'=' -f2)
|
||||
PROJECT_ID=$(grep PROJECT_ID ~/.env.js_influxdb3_aichat_projectid | cut -d'=' -f2)
|
||||
|
||||
influxdb3 create trigger \
|
||||
--host http://localhost:9191 \
|
||||
--database aichat \
|
||||
--trigger-spec "table:gh" \
|
||||
--plugin-filename "gh_kapa_integration.py" \
|
||||
--trigger-arguments "kapa_token=$KAPA_TOKEN,project_id=$PROJECT_ID,source_name=Documentation,repo=influxdata/docs-v2,severity_threshold=200" \
|
||||
gh_kapa_monitor
|
||||
```
|
||||
|
||||
#### Data model
|
||||
|
||||
This plugin writes two types of measurements to your database:
|
||||
|
||||
1. **integration_status**: Summary of the source update status
|
||||
|
||||
```text
|
||||
integration_status,repo=influxdata/docs-v2,source_name=Documentation days_since_ingestion=5i,prs_since_ingestion=3i,lines_since_ingestion=250i,needs_update=1i,severity_threshold=200i
|
||||
```
|
||||
|
||||
2. **documentation_alert**: Alerts when documentation needs updating
|
||||
|
||||
```text
|
||||
documentation_alert,repo=influxdata/docs-v2,source_name=Documentation,severity=warning message="Documentation needs update: 3 PRs with 250 lines changed in the last 5.2 days",lines_changed=250i,days_since_ingestion=5i
|
||||
```
|
||||
|
||||
##### Troubleshoot triggers and plugins
|
||||
|
||||
###### Error: Failed to create trigger
|
||||
|
||||
|
@ -395,7 +1067,3 @@ Possible causes:
|
|||
|
||||
Create the database or check the database name in the `create trigger` command.
|
||||
|
||||
4. Query the pull request data from the database:
|
||||
```bash
|
||||
influxdb3 query --host http://localhost:9191 --database aichat "select * from home where time > 1742198400 and time < 1742216400" --format jsonl
|
||||
```
|
Loading…
Reference in New Issue