diff --git a/content/influxdb3/core/plugins/use-multifile-plugins.md b/content/influxdb3/core/plugins/use-multifile-plugins.md new file mode 100644 index 000000000..551663711 --- /dev/null +++ b/content/influxdb3/core/plugins/use-multifile-plugins.md @@ -0,0 +1,696 @@ +--- +title: Use multi-file Python code and modules in plugins +description: | + Organize complex plugin logic across multiple Python files and modules for better code reuse, testing, and maintainability in InfluxDB 3 Processing Engine plugins. +menu: + influxdb3_core: + name: Use multi-file plugins + parent: Processing engine and Python plugins +weight: 101 +influxdb3/core/tags: [processing engine, plugins, python, modules] +related: +- /influxdb3/core/plugins/ +- /influxdb3/core/plugins/extend-plugin/ +- /influxdb3/core/reference/cli/influxdb3/create/trigger/ +--- + +As your plugin logic grows in complexity, organizing code across multiple Python files improves maintainability, enables code reuse, and makes testing easier. +The InfluxDB 3 Processing Engine supports multi-file plugin architectures using standard Python module patterns. + +## Before you begin + +Ensure you have: + +- A working InfluxDB 3 Core instance with the Processing Engine enabled +- Basic understanding of [Python modules and packages](https://docs.python.org/3/tutorial/modules.html) +- Familiarity with [creating InfluxDB 3 plugins](/influxdb3/core/plugins/) + +## Multi-file plugin structure + +A multi-file plugin is a directory containing Python files organized as a package. +The directory must include an `__init__.py` file that serves as the entry point and contains your trigger function. + +### Basic structure + +``` +my_plugin/ +├── __init__.py # Required - entry point with trigger function +├── processors.py # Data processing functions +├── utils.py # Helper utilities +└── config.py # Configuration management +``` + +### Required: **init**.py entry point + +The `__init__.py` file must contain the trigger function that InfluxDB calls when the trigger fires. +This file imports and orchestrates code from other modules in your plugin. + +```python +# my_plugin/__init__.py +from .processors import process_data +from .config import load_settings +from .utils import format_output + +def process_writes(influxdb3_local, table_batches, args=None): + """Entry point for WAL trigger.""" + settings = load_settings(args) + + for table_batch in table_batches: + processed_data = process_data(table_batch, settings) + output = format_output(processed_data) + influxdb3_local.write(output) +``` + +## Organizing plugin code + +### Separate concerns into modules + +Organize your plugin code by functional responsibility to improve maintainability and testing. + +#### processors.py - Data transformation logic + +```python +# my_plugin/processors.py +"""Data processing and transformation functions.""" + +def process_data(table_batch, settings): + """Transform data according to configuration settings.""" + table_name = table_batch["table_name"] + rows = table_batch["rows"] + + transformed_rows = [] + for row in rows: + transformed = transform_row(row, settings) + if transformed: + transformed_rows.append(transformed) + + return { + "table": table_name, + "rows": transformed_rows, + "count": len(transformed_rows) + } + +def transform_row(row, settings): + """Apply transformations to a single row.""" + # Apply threshold filtering + if "value" in row and row["value"] < settings.get("min_value", 0): + return None + + # Apply unit conversion if configured + if settings.get("convert_units"): + row["value"] = row["value"] * settings.get("conversion_factor", 1.0) + + return row +``` + +#### config.py - Configuration management + +```python +# my_plugin/config.py +"""Plugin configuration parsing and validation.""" + +DEFAULT_SETTINGS = { + "min_value": 0.0, + "convert_units": False, + "conversion_factor": 1.0, + "output_measurement": "processed_data", +} + +def load_settings(args): + """Load and validate plugin settings from trigger arguments.""" + settings = DEFAULT_SETTINGS.copy() + + if not args: + return settings + + # Parse numeric arguments + if "min_value" in args: + settings["min_value"] = float(args["min_value"]) + + if "conversion_factor" in args: + settings["conversion_factor"] = float(args["conversion_factor"]) + + # Parse boolean arguments + if "convert_units" in args: + settings["convert_units"] = args["convert_units"].lower() in ("true", "1", "yes") + + # Parse string arguments + if "output_measurement" in args: + settings["output_measurement"] = args["output_measurement"] + + return settings + +def validate_settings(settings): + """Validate settings and raise exceptions for invalid configurations.""" + if settings["min_value"] < 0: + raise ValueError("min_value must be non-negative") + + if settings["conversion_factor"] <= 0: + raise ValueError("conversion_factor must be positive") + + return True +``` + +#### utils.py - Helper functions + +```python +# my_plugin/utils.py +"""Utility functions for data formatting and logging.""" + +from datetime import datetime + +def format_output(processed_data): + """Format processed data for writing to InfluxDB.""" + from influxdb3_local import LineBuilder + + lines = [] + measurement = processed_data.get("measurement", "processed_data") + + for row in processed_data["rows"]: + line = LineBuilder(measurement) + + # Add tags from row + for key, value in row.items(): + if key.startswith("tag_"): + line.tag(key.replace("tag_", ""), str(value)) + + # Add fields from row + for key, value in row.items(): + if key.startswith("field_"): + field_name = key.replace("field_", "") + if isinstance(value, float): + line.float64_field(field_name, value) + elif isinstance(value, int): + line.int64_field(field_name, value) + elif isinstance(value, str): + line.string_field(field_name, value) + + lines.append(line) + + return lines + +def log_metrics(influxdb3_local, operation, duration_ms, record_count): + """Log plugin performance metrics.""" + influxdb3_local.info( + f"Operation: {operation}, " + f"Duration: {duration_ms}ms, " + f"Records: {record_count}" + ) +``` + +## Importing external libraries + +Multi-file plugins can use both relative imports (for your own modules) and absolute imports (for external libraries). + +### Relative imports for plugin modules + +Use relative imports to reference other modules within your plugin: + +```python +# my_plugin/__init__.py +from .processors import process_data # Same package +from .config import load_settings # Same package +from .utils import format_output # Same package + +# Relative imports from subdirectories +from .transforms.aggregators import calculate_mean +from .integrations.webhook import send_notification +``` + +### Absolute imports for external libraries + +Use absolute imports for standard library and third-party packages: + +```python +# my_plugin/processors.py +import json +import time +from datetime import datetime, timedelta +from collections import defaultdict + +# Third-party libraries (must be installed with influxdb3 install package) +import pandas as pd +import numpy as np +``` + +### Installing third-party dependencies + +Before using external libraries, install them into the Processing Engine's Python environment: + +```bash +# Install packages for your plugin +influxdb3 install package pandas numpy requests +``` + +For Docker deployments: + +```bash +docker exec -it CONTAINER_NAME influxdb3 install package pandas numpy requests +``` + +## Advanced plugin patterns + +### Nested module structure + +For complex plugins, organize code into subdirectories: + +``` +my_advanced_plugin/ +├── __init__.py +├── config.py +├── transforms/ +│ ├── __init__.py +│ ├── aggregators.py +│ └── filters.py +├── integrations/ +│ ├── __init__.py +│ ├── webhook.py +│ └── email.py +└── utils/ + ├── __init__.py + ├── logging.py + └── validators.py +``` + +Import from nested modules: + +```python +# my_advanced_plugin/__init__.py +from .transforms.aggregators import calculate_statistics +from .transforms.filters import apply_threshold_filter +from .integrations.webhook import send_alert +from .utils.logging import setup_logger + +def process_writes(influxdb3_local, table_batches, args=None): + logger = setup_logger(influxdb3_local) + + for table_batch in table_batches: + # Filter data + filtered = apply_threshold_filter(table_batch, threshold=100) + + # Calculate statistics + stats = calculate_statistics(filtered) + + # Send alerts if needed + if stats["max"] > 1000: + send_alert(stats, logger) +``` + +### Shared code across plugins + +Share common code across multiple plugins using a shared module directory: + +``` +plugins/ +├── shared/ +│ ├── __init__.py +│ ├── formatters.py +│ └── validators.py +├── plugin_a/ +│ └── __init__.py +└── plugin_b/ + └── __init__.py +``` + +Add the shared directory to Python's module search path in your plugin: + +```python +# plugin_a/__init__.py +import sys +from pathlib import Path + +# Add shared directory to path +plugin_dir = Path(__file__).parent.parent +sys.path.insert(0, str(plugin_dir)) + +# Now import from shared +from shared.formatters import format_line_protocol +from shared.validators import validate_data + +def process_writes(influxdb3_local, table_batches, args=None): + for table_batch in table_batches: + if validate_data(table_batch): + formatted = format_line_protocol(table_batch) + influxdb3_local.write(formatted) +``` + +## Testing multi-file plugins + +### Unit testing individual modules + +Test modules independently before integration: + +```python +# tests/test_processors.py +import unittest +from my_plugin.processors import transform_row +from my_plugin.config import load_settings + +class TestProcessors(unittest.TestCase): + def test_transform_row_filtering(self): + """Test that rows below threshold are filtered.""" + settings = {"min_value": 10.0} + row = {"value": 5.0} + + result = transform_row(row, settings) + + self.assertIsNone(result) + + def test_transform_row_conversion(self): + """Test unit conversion.""" + settings = { + "convert_units": True, + "conversion_factor": 2.0, + "min_value": 0.0 + } + row = {"value": 10.0} + + result = transform_row(row, settings) + + self.assertEqual(result["value"], 20.0) + +if __name__ == "__main__": + unittest.main() +``` + +### Testing with the influxdb3 CLI + +Test your complete multi-file plugin before deployment: + +```bash +# Test scheduled plugin +influxdb3 test schedule_plugin \ + --database testdb \ + --schedule "0 0 * * * *" \ + --plugin-dir /path/to/plugins \ + my_plugin + +# Test WAL plugin with sample data +influxdb3 test wal_plugin \ + --database testdb \ + --plugin-dir /path/to/plugins \ + my_plugin +``` + +For more testing options, see the [influxdb3 test reference](/influxdb3/core/reference/cli/influxdb3/test/). + +## Deploying multi-file plugins + +### Upload plugin directory + +Upload your complete plugin directory when creating a trigger: + +```bash +# Upload the entire plugin directory +influxdb3 create trigger \ + --trigger-spec "table:sensor_data" \ + --path "/local/path/to/my_plugin" \ + --upload \ + --database mydb \ + sensor_processor +``` + +The `--upload` flag transfers all files in the directory to the server's plugin directory. + +### Update plugin code + +Update all files in a running plugin: + +```bash +# Update the plugin with new code +influxdb3 update trigger \ + --database mydb \ + --trigger-name sensor_processor \ + --path "/local/path/to/my_plugin" +``` + +The update replaces all plugin files while preserving trigger configuration. + +## Best practices + +### Code organization + +- **Single responsibility**: Each module should have one clear purpose +- **Shallow hierarchies**: Avoid deeply nested directory structures (2-3 levels maximum) +- **Descriptive names**: Use clear, descriptive module and function names +- **Module size**: Keep modules under 300-400 lines for maintainability + +### Import management + +- **Explicit imports**: Use explicit imports rather than `from module import *` +- **Standard library first**: Import standard library, then third-party, then local modules +- **Avoid circular imports**: Design modules to prevent circular dependencies + +Example import organization: + +```python +# Standard library +import json +import time +from datetime import datetime + +# Third-party packages +import pandas as pd +import numpy as np + +# Local modules +from .config import load_settings +from .processors import process_data +from .utils import format_output +``` + +### Error handling + +Centralize error handling in your entry point: + +```python +# my_plugin/__init__.py +from .processors import process_data +from .config import load_settings, validate_settings + +def process_writes(influxdb3_local, table_batches, args=None): + try: + # Load and validate configuration + settings = load_settings(args) + validate_settings(settings) + + # Process data + for table_batch in table_batches: + process_data(influxdb3_local, table_batch, settings) + + except ValueError as e: + influxdb3_local.error(f"Configuration error: {e}") + except Exception as e: + influxdb3_local.error(f"Unexpected error: {e}") +``` + +### Documentation + +Document your modules with docstrings: + +```python +""" +my_plugin - Data processing plugin for sensor data. + +This plugin processes incoming sensor data by: +1. Filtering values below configured threshold +2. Converting units if requested +3. Writing processed data to output measurement + +Modules: +- processors: Core data transformation logic +- config: Configuration parsing and validation +- utils: Helper functions for formatting and logging +""" + +def process_writes(influxdb3_local, table_batches, args=None): + """Process incoming sensor data writes. + + Args: + influxdb3_local: InfluxDB API interface + table_batches: List of table batches with written data + args: Optional trigger arguments for configuration + + Trigger arguments: + min_value (float): Minimum value threshold + convert_units (bool): Enable unit conversion + conversion_factor (float): Conversion multiplier + output_measurement (str): Target measurement name + """ + pass +``` + +## Example: Complete multi-file plugin + +Here's a complete example of a temperature monitoring plugin with multi-file organization: + +### Plugin structure + +``` +temperature_monitor/ +├── __init__.py +├── config.py +├── processors.py +└── alerts.py +``` + +### **init**.py + +```python +# temperature_monitor/__init__.py +"""Temperature monitoring plugin with alerting.""" + +from .config import load_config +from .processors import calculate_statistics +from .alerts import check_thresholds + +def process_scheduled_call(influxdb3_local, call_time, args=None): + """Monitor temperature data and send alerts.""" + try: + config = load_config(args) + + # Query recent temperature data + query = f""" + SELECT temp_value, location + FROM {config['measurement']} + WHERE time > now() - INTERVAL '{config['window']}' + """ + results = influxdb3_local.query(query) + + # Calculate statistics + stats = calculate_statistics(results) + + # Check thresholds and alert + check_thresholds(influxdb3_local, stats, config) + + influxdb3_local.info( + f"Processed {len(results)} readings " + f"from {len(stats)} locations" + ) + + except Exception as e: + influxdb3_local.error(f"Plugin error: {e}") +``` + +### config.py + +```python +# temperature_monitor/config.py +"""Configuration management for temperature monitor.""" + +DEFAULTS = { + "measurement": "temperature", + "window": "1 hour", + "high_threshold": 30.0, + "low_threshold": 10.0, + "alert_measurement": "temperature_alerts" +} + +def load_config(args): + """Load configuration from trigger arguments.""" + config = DEFAULTS.copy() + + if args: + for key in DEFAULTS: + if key in args: + if key.endswith("_threshold"): + config[key] = float(args[key]) + else: + config[key] = args[key] + + return config +``` + +### processors.py + +```python +# temperature_monitor/processors.py +"""Data processing functions.""" + +from collections import defaultdict + +def calculate_statistics(data): + """Calculate statistics by location.""" + stats = defaultdict(lambda: { + "count": 0, + "sum": 0.0, + "min": float('inf'), + "max": float('-inf') + }) + + for row in data: + location = row.get("location", "unknown") + value = float(row.get("temp_value", 0)) + + s = stats[location] + s["count"] += 1 + s["sum"] += value + s["min"] = min(s["min"], value) + s["max"] = max(s["max"], value) + + # Calculate averages + for location, s in stats.items(): + if s["count"] > 0: + s["avg"] = s["sum"] / s["count"] + + return dict(stats) +``` + +### alerts.py + +```python +# temperature_monitor/alerts.py +"""Alert checking and notification.""" + +def check_thresholds(influxdb3_local, stats, config): + """Check temperature thresholds and write alerts.""" + from influxdb3_local import LineBuilder + + high_threshold = config["high_threshold"] + low_threshold = config["low_threshold"] + alert_measurement = config["alert_measurement"] + + for location, s in stats.items(): + if s["max"] > high_threshold: + line = LineBuilder(alert_measurement) + line.tag("location", location) + line.tag("severity", "high") + line.float64_field("temperature", s["max"]) + line.string_field("message", + f"High temperature: {s['max']}°C exceeds {high_threshold}°C") + + influxdb3_local.write(line) + influxdb3_local.warn(f"High temperature alert for {location}") + + elif s["min"] < low_threshold: + line = LineBuilder(alert_measurement) + line.tag("location", location) + line.tag("severity", "low") + line.float64_field("temperature", s["min"]) + line.string_field("message", + f"Low temperature: {s['min']}°C below {low_threshold}°C") + + influxdb3_local.write(line) + influxdb3_local.warn(f"Low temperature alert for {location}") +``` + +### Deploy the plugin + +```bash +# Create trigger with configuration +influxdb3 create trigger \ + --trigger-spec "every:5m" \ + --path "/local/path/to/temperature_monitor" \ + --upload \ + --trigger-arguments high_threshold=35,low_threshold=5,window="15 minutes" \ + --database sensors \ + temp_monitor +``` + +## Related resources + +- [Processing engine and Python plugins](/influxdb3/core/plugins/) +- [Extend plugins with API features](/influxdb3/core/plugins/extend-plugin/) +- [Plugin library](/influxdb3/core/plugins/library/) +- [influxdb3 create trigger](/influxdb3/core/reference/cli/influxdb3/create/trigger/) +- [influxdb3 test](/influxdb3/core/reference/cli/influxdb3/test/)