19 KiB
| title | description | menu | weight | influxdb3/core/tags | related | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Use multi-file Python code and modules in plugins | Organize complex plugin logic across multiple Python files and modules for better code reuse, testing, and maintainability in InfluxDB 3 Processing Engine plugins. |
|
101 |
|
|
As your plugin logic grows in complexity, organizing code across multiple Python files improves maintainability, enables code reuse, and makes testing easier. The InfluxDB 3 Processing Engine supports multi-file plugin architectures using standard Python module patterns.
Before you begin
Ensure you have:
- A working InfluxDB 3 Core instance with the Processing Engine enabled
- Basic understanding of Python modules and packages
- Familiarity with creating InfluxDB 3 plugins
Multi-file plugin structure
A multi-file plugin is a directory containing Python files organized as a package.
The directory must include an __init__.py file that serves as the entry point and contains your trigger function.
Basic structure
my_plugin/
├── __init__.py # Required - entry point with trigger function
├── processors.py # Data processing functions
├── utils.py # Helper utilities
└── config.py # Configuration management
Required: init.py entry point
The __init__.py file must contain the trigger function that InfluxDB calls when the trigger fires.
This file imports and orchestrates code from other modules in your plugin.
# my_plugin/__init__.py
from .processors import process_data
from .config import load_settings
from .utils import format_output
def process_writes(influxdb3_local, table_batches, args=None):
"""Entry point for WAL trigger."""
settings = load_settings(args)
for table_batch in table_batches:
processed_data = process_data(table_batch, settings)
output = format_output(processed_data)
influxdb3_local.write(output)
Organizing plugin code
Separate concerns into modules
Organize your plugin code by functional responsibility to improve maintainability and testing.
processors.py - Data transformation logic
# my_plugin/processors.py
"""Data processing and transformation functions."""
def process_data(table_batch, settings):
"""Transform data according to configuration settings."""
table_name = table_batch["table_name"]
rows = table_batch["rows"]
transformed_rows = []
for row in rows:
transformed = transform_row(row, settings)
if transformed:
transformed_rows.append(transformed)
return {
"table": table_name,
"rows": transformed_rows,
"count": len(transformed_rows)
}
def transform_row(row, settings):
"""Apply transformations to a single row."""
# Apply threshold filtering
if "value" in row and row["value"] < settings.get("min_value", 0):
return None
# Apply unit conversion if configured
if settings.get("convert_units"):
row["value"] = row["value"] * settings.get("conversion_factor", 1.0)
return row
config.py - Configuration management
# my_plugin/config.py
"""Plugin configuration parsing and validation."""
DEFAULT_SETTINGS = {
"min_value": 0.0,
"convert_units": False,
"conversion_factor": 1.0,
"output_measurement": "processed_data",
}
def load_settings(args):
"""Load and validate plugin settings from trigger arguments."""
settings = DEFAULT_SETTINGS.copy()
if not args:
return settings
# Parse numeric arguments
if "min_value" in args:
settings["min_value"] = float(args["min_value"])
if "conversion_factor" in args:
settings["conversion_factor"] = float(args["conversion_factor"])
# Parse boolean arguments
if "convert_units" in args:
settings["convert_units"] = args["convert_units"].lower() in ("true", "1", "yes")
# Parse string arguments
if "output_measurement" in args:
settings["output_measurement"] = args["output_measurement"]
return settings
def validate_settings(settings):
"""Validate settings and raise exceptions for invalid configurations."""
if settings["min_value"] < 0:
raise ValueError("min_value must be non-negative")
if settings["conversion_factor"] <= 0:
raise ValueError("conversion_factor must be positive")
return True
utils.py - Helper functions
# my_plugin/utils.py
"""Utility functions for data formatting and logging."""
from datetime import datetime
def format_output(processed_data):
"""Format processed data for writing to InfluxDB."""
from influxdb3_local import LineBuilder
lines = []
measurement = processed_data.get("measurement", "processed_data")
for row in processed_data["rows"]:
line = LineBuilder(measurement)
# Add tags from row
for key, value in row.items():
if key.startswith("tag_"):
line.tag(key.replace("tag_", ""), str(value))
# Add fields from row
for key, value in row.items():
if key.startswith("field_"):
field_name = key.replace("field_", "")
if isinstance(value, float):
line.float64_field(field_name, value)
elif isinstance(value, int):
line.int64_field(field_name, value)
elif isinstance(value, str):
line.string_field(field_name, value)
lines.append(line)
return lines
def log_metrics(influxdb3_local, operation, duration_ms, record_count):
"""Log plugin performance metrics."""
influxdb3_local.info(
f"Operation: {operation}, "
f"Duration: {duration_ms}ms, "
f"Records: {record_count}"
)
Importing external libraries
Multi-file plugins can use both relative imports (for your own modules) and absolute imports (for external libraries).
Relative imports for plugin modules
Use relative imports to reference other modules within your plugin:
# my_plugin/__init__.py
from .processors import process_data # Same package
from .config import load_settings # Same package
from .utils import format_output # Same package
# Relative imports from subdirectories
from .transforms.aggregators import calculate_mean
from .integrations.webhook import send_notification
Absolute imports for external libraries
Use absolute imports for standard library and third-party packages:
# my_plugin/processors.py
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
# Third-party libraries (must be installed with influxdb3 install package)
import pandas as pd
import numpy as np
Installing third-party dependencies
Before using external libraries, install them into the Processing Engine's Python environment:
# Install packages for your plugin
influxdb3 install package pandas numpy requests
For Docker deployments:
docker exec -it CONTAINER_NAME influxdb3 install package pandas numpy requests
Advanced plugin patterns
Nested module structure
For complex plugins, organize code into subdirectories:
my_advanced_plugin/
├── __init__.py
├── config.py
├── transforms/
│ ├── __init__.py
│ ├── aggregators.py
│ └── filters.py
├── integrations/
│ ├── __init__.py
│ ├── webhook.py
│ └── email.py
└── utils/
├── __init__.py
├── logging.py
└── validators.py
Import from nested modules:
# my_advanced_plugin/__init__.py
from .transforms.aggregators import calculate_statistics
from .transforms.filters import apply_threshold_filter
from .integrations.webhook import send_alert
from .utils.logging import setup_logger
def process_writes(influxdb3_local, table_batches, args=None):
logger = setup_logger(influxdb3_local)
for table_batch in table_batches:
# Filter data
filtered = apply_threshold_filter(table_batch, threshold=100)
# Calculate statistics
stats = calculate_statistics(filtered)
# Send alerts if needed
if stats["max"] > 1000:
send_alert(stats, logger)
Shared code across plugins
Share common code across multiple plugins using a shared module directory:
plugins/
├── shared/
│ ├── __init__.py
│ ├── formatters.py
│ └── validators.py
├── plugin_a/
│ └── __init__.py
└── plugin_b/
└── __init__.py
Add the shared directory to Python's module search path in your plugin:
# plugin_a/__init__.py
import sys
from pathlib import Path
# Add shared directory to path
plugin_dir = Path(__file__).parent.parent
sys.path.insert(0, str(plugin_dir))
# Now import from shared
from shared.formatters import format_line_protocol
from shared.validators import validate_data
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
if validate_data(table_batch):
formatted = format_line_protocol(table_batch)
influxdb3_local.write(formatted)
Testing multi-file plugins
Unit testing individual modules
Test modules independently before integration:
# tests/test_processors.py
import unittest
from my_plugin.processors import transform_row
from my_plugin.config import load_settings
class TestProcessors(unittest.TestCase):
def test_transform_row_filtering(self):
"""Test that rows below threshold are filtered."""
settings = {"min_value": 10.0}
row = {"value": 5.0}
result = transform_row(row, settings)
self.assertIsNone(result)
def test_transform_row_conversion(self):
"""Test unit conversion."""
settings = {
"convert_units": True,
"conversion_factor": 2.0,
"min_value": 0.0
}
row = {"value": 10.0}
result = transform_row(row, settings)
self.assertEqual(result["value"], 20.0)
if __name__ == "__main__":
unittest.main()
Testing with the influxdb3 CLI
Test your complete multi-file plugin before deployment:
# Test scheduled plugin
influxdb3 test schedule_plugin \
--database testdb \
--schedule "0 0 * * * *" \
--plugin-dir /path/to/plugins \
my_plugin
# Test WAL plugin with sample data
influxdb3 test wal_plugin \
--database testdb \
--plugin-dir /path/to/plugins \
my_plugin
For more testing options, see the influxdb3 test reference.
Deploying multi-file plugins
Upload plugin directory
Upload your complete plugin directory when creating a trigger:
# Upload the entire plugin directory
influxdb3 create trigger \
--trigger-spec "table:sensor_data" \
--path "/local/path/to/my_plugin" \
--upload \
--database mydb \
sensor_processor
The --upload flag transfers all files in the directory to the server's plugin directory.
Update plugin code
Update all files in a running plugin:
# Update the plugin with new code
influxdb3 update trigger \
--database mydb \
--trigger-name sensor_processor \
--path "/local/path/to/my_plugin"
The update replaces all plugin files while preserving trigger configuration.
Best practices
Code organization
- Single responsibility: Each module should have one clear purpose
- Shallow hierarchies: Avoid deeply nested directory structures (2-3 levels maximum)
- Descriptive names: Use clear, descriptive module and function names
- Module size: Keep modules under 300-400 lines for maintainability
Import management
- Explicit imports: Use explicit imports rather than
from module import * - Standard library first: Import standard library, then third-party, then local modules
- Avoid circular imports: Design modules to prevent circular dependencies
Example import organization:
# Standard library
import json
import time
from datetime import datetime
# Third-party packages
import pandas as pd
import numpy as np
# Local modules
from .config import load_settings
from .processors import process_data
from .utils import format_output
Error handling
Centralize error handling in your entry point:
# my_plugin/__init__.py
from .processors import process_data
from .config import load_settings, validate_settings
def process_writes(influxdb3_local, table_batches, args=None):
try:
# Load and validate configuration
settings = load_settings(args)
validate_settings(settings)
# Process data
for table_batch in table_batches:
process_data(influxdb3_local, table_batch, settings)
except ValueError as e:
influxdb3_local.error(f"Configuration error: {e}")
except Exception as e:
influxdb3_local.error(f"Unexpected error: {e}")
Documentation
Document your modules with docstrings:
"""
my_plugin - Data processing plugin for sensor data.
This plugin processes incoming sensor data by:
1. Filtering values below configured threshold
2. Converting units if requested
3. Writing processed data to output measurement
Modules:
- processors: Core data transformation logic
- config: Configuration parsing and validation
- utils: Helper functions for formatting and logging
"""
def process_writes(influxdb3_local, table_batches, args=None):
"""Process incoming sensor data writes.
Args:
influxdb3_local: InfluxDB API interface
table_batches: List of table batches with written data
args: Optional trigger arguments for configuration
Trigger arguments:
min_value (float): Minimum value threshold
convert_units (bool): Enable unit conversion
conversion_factor (float): Conversion multiplier
output_measurement (str): Target measurement name
"""
pass
Example: Complete multi-file plugin
Here's a complete example of a temperature monitoring plugin with multi-file organization:
Plugin structure
temperature_monitor/
├── __init__.py
├── config.py
├── processors.py
└── alerts.py
init.py
# temperature_monitor/__init__.py
"""Temperature monitoring plugin with alerting."""
from .config import load_config
from .processors import calculate_statistics
from .alerts import check_thresholds
def process_scheduled_call(influxdb3_local, call_time, args=None):
"""Monitor temperature data and send alerts."""
try:
config = load_config(args)
# Query recent temperature data
query = f"""
SELECT temp_value, location
FROM {config['measurement']}
WHERE time > now() - INTERVAL '{config['window']}'
"""
results = influxdb3_local.query(query)
# Calculate statistics
stats = calculate_statistics(results)
# Check thresholds and alert
check_thresholds(influxdb3_local, stats, config)
influxdb3_local.info(
f"Processed {len(results)} readings "
f"from {len(stats)} locations"
)
except Exception as e:
influxdb3_local.error(f"Plugin error: {e}")
config.py
# temperature_monitor/config.py
"""Configuration management for temperature monitor."""
DEFAULTS = {
"measurement": "temperature",
"window": "1 hour",
"high_threshold": 30.0,
"low_threshold": 10.0,
"alert_measurement": "temperature_alerts"
}
def load_config(args):
"""Load configuration from trigger arguments."""
config = DEFAULTS.copy()
if args:
for key in DEFAULTS:
if key in args:
if key.endswith("_threshold"):
config[key] = float(args[key])
else:
config[key] = args[key]
return config
processors.py
# temperature_monitor/processors.py
"""Data processing functions."""
from collections import defaultdict
def calculate_statistics(data):
"""Calculate statistics by location."""
stats = defaultdict(lambda: {
"count": 0,
"sum": 0.0,
"min": float('inf'),
"max": float('-inf')
})
for row in data:
location = row.get("location", "unknown")
value = float(row.get("temp_value", 0))
s = stats[location]
s["count"] += 1
s["sum"] += value
s["min"] = min(s["min"], value)
s["max"] = max(s["max"], value)
# Calculate averages
for location, s in stats.items():
if s["count"] > 0:
s["avg"] = s["sum"] / s["count"]
return dict(stats)
alerts.py
# temperature_monitor/alerts.py
"""Alert checking and notification."""
def check_thresholds(influxdb3_local, stats, config):
"""Check temperature thresholds and write alerts."""
from influxdb3_local import LineBuilder
high_threshold = config["high_threshold"]
low_threshold = config["low_threshold"]
alert_measurement = config["alert_measurement"]
for location, s in stats.items():
if s["max"] > high_threshold:
line = LineBuilder(alert_measurement)
line.tag("location", location)
line.tag("severity", "high")
line.float64_field("temperature", s["max"])
line.string_field("message",
f"High temperature: {s['max']}°C exceeds {high_threshold}°C")
influxdb3_local.write(line)
influxdb3_local.warn(f"High temperature alert for {location}")
elif s["min"] < low_threshold:
line = LineBuilder(alert_measurement)
line.tag("location", location)
line.tag("severity", "low")
line.float64_field("temperature", s["min"])
line.string_field("message",
f"Low temperature: {s['min']}°C below {low_threshold}°C")
influxdb3_local.write(line)
influxdb3_local.warn(f"Low temperature alert for {location}")
Deploy the plugin
# Create trigger with configuration
influxdb3 create trigger \
--trigger-spec "every:5m" \
--path "/local/path/to/temperature_monitor" \
--upload \
--trigger-arguments high_threshold=35,low_threshold=5,window="15 minutes" \
--database sensors \
temp_monitor