feat(influxdb3): WIP: Use multi-file code and modules in plugins

jts-multifile-plugins-guide
Jason Stirnaman 2025-11-01 16:38:19 -05:00
parent be87ffc5b7
commit b75bc95494
1 changed files with 696 additions and 0 deletions

View File

@ -0,0 +1,696 @@
---
title: Use multi-file Python code and modules in plugins
description: |
Organize complex plugin logic across multiple Python files and modules for better code reuse, testing, and maintainability in InfluxDB 3 Processing Engine plugins.
menu:
influxdb3_core:
name: Use multi-file plugins
parent: Processing engine and Python plugins
weight: 101
influxdb3/core/tags: [processing engine, plugins, python, modules]
related:
- /influxdb3/core/plugins/
- /influxdb3/core/plugins/extend-plugin/
- /influxdb3/core/reference/cli/influxdb3/create/trigger/
---
As your plugin logic grows in complexity, organizing code across multiple Python files improves maintainability, enables code reuse, and makes testing easier.
The InfluxDB 3 Processing Engine supports multi-file plugin architectures using standard Python module patterns.
## Before you begin
Ensure you have:
- A working InfluxDB 3 Core instance with the Processing Engine enabled
- Basic understanding of [Python modules and packages](https://docs.python.org/3/tutorial/modules.html)
- Familiarity with [creating InfluxDB 3 plugins](/influxdb3/core/plugins/)
## Multi-file plugin structure
A multi-file plugin is a directory containing Python files organized as a package.
The directory must include an `__init__.py` file that serves as the entry point and contains your trigger function.
### Basic structure
```
my_plugin/
├── __init__.py # Required - entry point with trigger function
├── processors.py # Data processing functions
├── utils.py # Helper utilities
└── config.py # Configuration management
```
### Required: **init**.py entry point
The `__init__.py` file must contain the trigger function that InfluxDB calls when the trigger fires.
This file imports and orchestrates code from other modules in your plugin.
```python
# my_plugin/__init__.py
from .processors import process_data
from .config import load_settings
from .utils import format_output
def process_writes(influxdb3_local, table_batches, args=None):
"""Entry point for WAL trigger."""
settings = load_settings(args)
for table_batch in table_batches:
processed_data = process_data(table_batch, settings)
output = format_output(processed_data)
influxdb3_local.write(output)
```
## Organizing plugin code
### Separate concerns into modules
Organize your plugin code by functional responsibility to improve maintainability and testing.
#### processors.py - Data transformation logic
```python
# my_plugin/processors.py
"""Data processing and transformation functions."""
def process_data(table_batch, settings):
"""Transform data according to configuration settings."""
table_name = table_batch["table_name"]
rows = table_batch["rows"]
transformed_rows = []
for row in rows:
transformed = transform_row(row, settings)
if transformed:
transformed_rows.append(transformed)
return {
"table": table_name,
"rows": transformed_rows,
"count": len(transformed_rows)
}
def transform_row(row, settings):
"""Apply transformations to a single row."""
# Apply threshold filtering
if "value" in row and row["value"] < settings.get("min_value", 0):
return None
# Apply unit conversion if configured
if settings.get("convert_units"):
row["value"] = row["value"] * settings.get("conversion_factor", 1.0)
return row
```
#### config.py - Configuration management
```python
# my_plugin/config.py
"""Plugin configuration parsing and validation."""
DEFAULT_SETTINGS = {
"min_value": 0.0,
"convert_units": False,
"conversion_factor": 1.0,
"output_measurement": "processed_data",
}
def load_settings(args):
"""Load and validate plugin settings from trigger arguments."""
settings = DEFAULT_SETTINGS.copy()
if not args:
return settings
# Parse numeric arguments
if "min_value" in args:
settings["min_value"] = float(args["min_value"])
if "conversion_factor" in args:
settings["conversion_factor"] = float(args["conversion_factor"])
# Parse boolean arguments
if "convert_units" in args:
settings["convert_units"] = args["convert_units"].lower() in ("true", "1", "yes")
# Parse string arguments
if "output_measurement" in args:
settings["output_measurement"] = args["output_measurement"]
return settings
def validate_settings(settings):
"""Validate settings and raise exceptions for invalid configurations."""
if settings["min_value"] < 0:
raise ValueError("min_value must be non-negative")
if settings["conversion_factor"] <= 0:
raise ValueError("conversion_factor must be positive")
return True
```
#### utils.py - Helper functions
```python
# my_plugin/utils.py
"""Utility functions for data formatting and logging."""
from datetime import datetime
def format_output(processed_data):
"""Format processed data for writing to InfluxDB."""
from influxdb3_local import LineBuilder
lines = []
measurement = processed_data.get("measurement", "processed_data")
for row in processed_data["rows"]:
line = LineBuilder(measurement)
# Add tags from row
for key, value in row.items():
if key.startswith("tag_"):
line.tag(key.replace("tag_", ""), str(value))
# Add fields from row
for key, value in row.items():
if key.startswith("field_"):
field_name = key.replace("field_", "")
if isinstance(value, float):
line.float64_field(field_name, value)
elif isinstance(value, int):
line.int64_field(field_name, value)
elif isinstance(value, str):
line.string_field(field_name, value)
lines.append(line)
return lines
def log_metrics(influxdb3_local, operation, duration_ms, record_count):
"""Log plugin performance metrics."""
influxdb3_local.info(
f"Operation: {operation}, "
f"Duration: {duration_ms}ms, "
f"Records: {record_count}"
)
```
## Importing external libraries
Multi-file plugins can use both relative imports (for your own modules) and absolute imports (for external libraries).
### Relative imports for plugin modules
Use relative imports to reference other modules within your plugin:
```python
# my_plugin/__init__.py
from .processors import process_data # Same package
from .config import load_settings # Same package
from .utils import format_output # Same package
# Relative imports from subdirectories
from .transforms.aggregators import calculate_mean
from .integrations.webhook import send_notification
```
### Absolute imports for external libraries
Use absolute imports for standard library and third-party packages:
```python
# my_plugin/processors.py
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
# Third-party libraries (must be installed with influxdb3 install package)
import pandas as pd
import numpy as np
```
### Installing third-party dependencies
Before using external libraries, install them into the Processing Engine's Python environment:
```bash
# Install packages for your plugin
influxdb3 install package pandas numpy requests
```
For Docker deployments:
```bash
docker exec -it CONTAINER_NAME influxdb3 install package pandas numpy requests
```
## Advanced plugin patterns
### Nested module structure
For complex plugins, organize code into subdirectories:
```
my_advanced_plugin/
├── __init__.py
├── config.py
├── transforms/
│ ├── __init__.py
│ ├── aggregators.py
│ └── filters.py
├── integrations/
│ ├── __init__.py
│ ├── webhook.py
│ └── email.py
└── utils/
├── __init__.py
├── logging.py
└── validators.py
```
Import from nested modules:
```python
# my_advanced_plugin/__init__.py
from .transforms.aggregators import calculate_statistics
from .transforms.filters import apply_threshold_filter
from .integrations.webhook import send_alert
from .utils.logging import setup_logger
def process_writes(influxdb3_local, table_batches, args=None):
logger = setup_logger(influxdb3_local)
for table_batch in table_batches:
# Filter data
filtered = apply_threshold_filter(table_batch, threshold=100)
# Calculate statistics
stats = calculate_statistics(filtered)
# Send alerts if needed
if stats["max"] > 1000:
send_alert(stats, logger)
```
### Shared code across plugins
Share common code across multiple plugins using a shared module directory:
```
plugins/
├── shared/
│ ├── __init__.py
│ ├── formatters.py
│ └── validators.py
├── plugin_a/
│ └── __init__.py
└── plugin_b/
└── __init__.py
```
Add the shared directory to Python's module search path in your plugin:
```python
# plugin_a/__init__.py
import sys
from pathlib import Path
# Add shared directory to path
plugin_dir = Path(__file__).parent.parent
sys.path.insert(0, str(plugin_dir))
# Now import from shared
from shared.formatters import format_line_protocol
from shared.validators import validate_data
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
if validate_data(table_batch):
formatted = format_line_protocol(table_batch)
influxdb3_local.write(formatted)
```
## Testing multi-file plugins
### Unit testing individual modules
Test modules independently before integration:
```python
# tests/test_processors.py
import unittest
from my_plugin.processors import transform_row
from my_plugin.config import load_settings
class TestProcessors(unittest.TestCase):
def test_transform_row_filtering(self):
"""Test that rows below threshold are filtered."""
settings = {"min_value": 10.0}
row = {"value": 5.0}
result = transform_row(row, settings)
self.assertIsNone(result)
def test_transform_row_conversion(self):
"""Test unit conversion."""
settings = {
"convert_units": True,
"conversion_factor": 2.0,
"min_value": 0.0
}
row = {"value": 10.0}
result = transform_row(row, settings)
self.assertEqual(result["value"], 20.0)
if __name__ == "__main__":
unittest.main()
```
### Testing with the influxdb3 CLI
Test your complete multi-file plugin before deployment:
```bash
# Test scheduled plugin
influxdb3 test schedule_plugin \
--database testdb \
--schedule "0 0 * * * *" \
--plugin-dir /path/to/plugins \
my_plugin
# Test WAL plugin with sample data
influxdb3 test wal_plugin \
--database testdb \
--plugin-dir /path/to/plugins \
my_plugin
```
For more testing options, see the [influxdb3 test reference](/influxdb3/core/reference/cli/influxdb3/test/).
## Deploying multi-file plugins
### Upload plugin directory
Upload your complete plugin directory when creating a trigger:
```bash
# Upload the entire plugin directory
influxdb3 create trigger \
--trigger-spec "table:sensor_data" \
--path "/local/path/to/my_plugin" \
--upload \
--database mydb \
sensor_processor
```
The `--upload` flag transfers all files in the directory to the server's plugin directory.
### Update plugin code
Update all files in a running plugin:
```bash
# Update the plugin with new code
influxdb3 update trigger \
--database mydb \
--trigger-name sensor_processor \
--path "/local/path/to/my_plugin"
```
The update replaces all plugin files while preserving trigger configuration.
## Best practices
### Code organization
- **Single responsibility**: Each module should have one clear purpose
- **Shallow hierarchies**: Avoid deeply nested directory structures (2-3 levels maximum)
- **Descriptive names**: Use clear, descriptive module and function names
- **Module size**: Keep modules under 300-400 lines for maintainability
### Import management
- **Explicit imports**: Use explicit imports rather than `from module import *`
- **Standard library first**: Import standard library, then third-party, then local modules
- **Avoid circular imports**: Design modules to prevent circular dependencies
Example import organization:
```python
# Standard library
import json
import time
from datetime import datetime
# Third-party packages
import pandas as pd
import numpy as np
# Local modules
from .config import load_settings
from .processors import process_data
from .utils import format_output
```
### Error handling
Centralize error handling in your entry point:
```python
# my_plugin/__init__.py
from .processors import process_data
from .config import load_settings, validate_settings
def process_writes(influxdb3_local, table_batches, args=None):
try:
# Load and validate configuration
settings = load_settings(args)
validate_settings(settings)
# Process data
for table_batch in table_batches:
process_data(influxdb3_local, table_batch, settings)
except ValueError as e:
influxdb3_local.error(f"Configuration error: {e}")
except Exception as e:
influxdb3_local.error(f"Unexpected error: {e}")
```
### Documentation
Document your modules with docstrings:
```python
"""
my_plugin - Data processing plugin for sensor data.
This plugin processes incoming sensor data by:
1. Filtering values below configured threshold
2. Converting units if requested
3. Writing processed data to output measurement
Modules:
- processors: Core data transformation logic
- config: Configuration parsing and validation
- utils: Helper functions for formatting and logging
"""
def process_writes(influxdb3_local, table_batches, args=None):
"""Process incoming sensor data writes.
Args:
influxdb3_local: InfluxDB API interface
table_batches: List of table batches with written data
args: Optional trigger arguments for configuration
Trigger arguments:
min_value (float): Minimum value threshold
convert_units (bool): Enable unit conversion
conversion_factor (float): Conversion multiplier
output_measurement (str): Target measurement name
"""
pass
```
## Example: Complete multi-file plugin
Here's a complete example of a temperature monitoring plugin with multi-file organization:
### Plugin structure
```
temperature_monitor/
├── __init__.py
├── config.py
├── processors.py
└── alerts.py
```
### **init**.py
```python
# temperature_monitor/__init__.py
"""Temperature monitoring plugin with alerting."""
from .config import load_config
from .processors import calculate_statistics
from .alerts import check_thresholds
def process_scheduled_call(influxdb3_local, call_time, args=None):
"""Monitor temperature data and send alerts."""
try:
config = load_config(args)
# Query recent temperature data
query = f"""
SELECT temp_value, location
FROM {config['measurement']}
WHERE time > now() - INTERVAL '{config['window']}'
"""
results = influxdb3_local.query(query)
# Calculate statistics
stats = calculate_statistics(results)
# Check thresholds and alert
check_thresholds(influxdb3_local, stats, config)
influxdb3_local.info(
f"Processed {len(results)} readings "
f"from {len(stats)} locations"
)
except Exception as e:
influxdb3_local.error(f"Plugin error: {e}")
```
### config.py
```python
# temperature_monitor/config.py
"""Configuration management for temperature monitor."""
DEFAULTS = {
"measurement": "temperature",
"window": "1 hour",
"high_threshold": 30.0,
"low_threshold": 10.0,
"alert_measurement": "temperature_alerts"
}
def load_config(args):
"""Load configuration from trigger arguments."""
config = DEFAULTS.copy()
if args:
for key in DEFAULTS:
if key in args:
if key.endswith("_threshold"):
config[key] = float(args[key])
else:
config[key] = args[key]
return config
```
### processors.py
```python
# temperature_monitor/processors.py
"""Data processing functions."""
from collections import defaultdict
def calculate_statistics(data):
"""Calculate statistics by location."""
stats = defaultdict(lambda: {
"count": 0,
"sum": 0.0,
"min": float('inf'),
"max": float('-inf')
})
for row in data:
location = row.get("location", "unknown")
value = float(row.get("temp_value", 0))
s = stats[location]
s["count"] += 1
s["sum"] += value
s["min"] = min(s["min"], value)
s["max"] = max(s["max"], value)
# Calculate averages
for location, s in stats.items():
if s["count"] > 0:
s["avg"] = s["sum"] / s["count"]
return dict(stats)
```
### alerts.py
```python
# temperature_monitor/alerts.py
"""Alert checking and notification."""
def check_thresholds(influxdb3_local, stats, config):
"""Check temperature thresholds and write alerts."""
from influxdb3_local import LineBuilder
high_threshold = config["high_threshold"]
low_threshold = config["low_threshold"]
alert_measurement = config["alert_measurement"]
for location, s in stats.items():
if s["max"] > high_threshold:
line = LineBuilder(alert_measurement)
line.tag("location", location)
line.tag("severity", "high")
line.float64_field("temperature", s["max"])
line.string_field("message",
f"High temperature: {s['max']}°C exceeds {high_threshold}°C")
influxdb3_local.write(line)
influxdb3_local.warn(f"High temperature alert for {location}")
elif s["min"] < low_threshold:
line = LineBuilder(alert_measurement)
line.tag("location", location)
line.tag("severity", "low")
line.float64_field("temperature", s["min"])
line.string_field("message",
f"Low temperature: {s['min']}°C below {low_threshold}°C")
influxdb3_local.write(line)
influxdb3_local.warn(f"Low temperature alert for {location}")
```
### Deploy the plugin
```bash
# Create trigger with configuration
influxdb3 create trigger \
--trigger-spec "every:5m" \
--path "/local/path/to/temperature_monitor" \
--upload \
--trigger-arguments high_threshold=35,low_threshold=5,window="15 minutes" \
--database sensors \
temp_monitor
```
## Related resources
- [Processing engine and Python plugins](/influxdb3/core/plugins/)
- [Extend plugins with API features](/influxdb3/core/plugins/extend-plugin/)
- [Plugin library](/influxdb3/core/plugins/library/)
- [influxdb3 create trigger](/influxdb3/core/reference/cli/influxdb3/create/trigger/)
- [influxdb3 test](/influxdb3/core/reference/cli/influxdb3/test/)