12 KiB
The Processing engine includes an API that allows your plugins to interact with your data, build and write data in line protocol format, and maintain state between executions. These features let you build plugins that can transform, analyze, and respond to data.
The plugin API lets you:
- Write data
- Query data
- Log messages for monitoring and debugging
- Maintain state with in-memory cache
- Guidelines for in-memory caching
Get started with the shared API
Every plugin has access to the shared API through the influxdb3_local
object. You don't need to import any libraries to use the API. It's available as soon as your plugin runs.
Write and query data
Write data
To write data into your database use the LineBuilder
API to create line protocol data:
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
# Write the data to the database
influxdb3_local.write(line)
Your writes are buffered while the plugin runs and are flushed when the plugin completes.
{{% expand-wrapper %}}
{{% expand "View the LineBuilder
Python implementation" %}}
from typing import Optional
from collections import OrderedDict
class InfluxDBError(Exception):
"""Base exception for InfluxDB-related errors"""
pass
class InvalidMeasurementError(InfluxDBError):
"""Raised when measurement name is invalid"""
pass
class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass
class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass
class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
raise InvalidMeasurementError("Measurement name cannot contain spaces")
self.measurement = measurement
self.tags: OrderedDict[str, str] = OrderedDict()
self.fields: OrderedDict[str, str] = OrderedDict()
self._timestamp_ns: Optional[int] = None
def _validate_key(self, key: str, key_type: str) -> None:
"""Validate that a key does not contain spaces, commas, or equals signs."""
if not key:
raise InvalidKeyError(f"{key_type} key cannot be empty")
if ' ' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
if ',' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
if '=' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")
def tag(self, key: str, value: str) -> 'LineBuilder':
"""Add a tag to the line protocol."""
self._validate_key(key, "tag")
self.tags[key] = str(value)
return self
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an unsigned integer field to the line protocol."""
self._validate_key(key, "field")
if value < 0:
raise ValueError(f"uint64 field '{key}' cannot be negative")
self.fields[key] = f"{value}u"
return self
def int64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an integer field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = f"{value}i"
return self
def float64_field(self, key: str, value: float) -> 'LineBuilder':
"""Add a float field to the line protocol."""
self._validate_key(key, "field")
# Check if value has no decimal component
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
return self
def string_field(self, key: str, value: str) -> 'LineBuilder':
"""Add a string field to the line protocol."""
self._validate_key(key, "field")
# Escape quotes and backslashes in string values
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
self.fields[key] = f'"{escaped_value}"'
return self
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
"""Add a boolean field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = 't' if value else 'f'
return self
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
"""Set the timestamp in nanoseconds."""
self._timestamp_ns = timestamp_ns
return self
def build(self) -> str:
"""Build the line protocol string."""
# Start with measurement name (escape commas only)
line = self.measurement.replace(',', '\\,')
# Add tags if present
if self.tags:
tags_str = ','.join(
f"{k}={v}" for k, v in self.tags.items()
)
line += f",{tags_str}"
# Add fields (required)
if not self.fields:
raise InvalidLineError(f"At least one field is required: {line}")
fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()
)
line += f" {fields_str}"
# Add timestamp if present
if self._timestamp_ns is not None:
line += f" {self._timestamp_ns}"
return line
{{% /expand %}} {{% /expand-wrapper %}}
Query data
Your plugins can execute SQL queries and process results directly:
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
Query results are a List
of Dict[String, Any]
, where each dictionary represents a row with column names as keys and column values as values.
Log messages for monitoring and debugging
The shared API info
, warn
, and error
functions accept multiple arguments,
convert them to strings, and log them as a space-separated message to the database log.
Add logging to track plugin execution:
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
All log messages are written to the server logs and stored in system tables, where you can query them using SQL.
Maintain state with in-memory cache
The Processing engine provides an in-memory cache system that enables plugins to persist and retrieve data between executions.
You can access the cache through the cache
property of the shared API:
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
cache
provides the following methods to retrieve and manage cached values:
Method | Parameters | Returns | Description |
---|---|---|---|
put |
key (str): The key to store the value undervalue (Any): Any Python object to cachettl (Optional[float], default=None): Time in seconds before expirationuse_global (bool, default=False): If True, uses global namespace |
None | Stores a value in the cache with an optional time-to-live |
get |
key (str): The key to retrievedefault (Any, default=None): Value to return if key not founduse_global (bool, default=False): If True, uses global namespace |
Any | Retrieves a value from the cache or returns default if not found |
delete |
key (str): The key to deleteuse_global (bool, default=False): If True, uses global namespace |
bool | Deletes a value from the cache. Returns True if deleted, False if not found |
Understanding cache namespaces
The cache system offers two distinct namespaces:
Namespace | Scope | Best For |
---|---|---|
Trigger-specific (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
Global | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
Common cache operations
Here are some examples of how to use the cache in your plugins
Store and retrieve cached data
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
Store cached data with expiration
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
Share data across plugins
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
Building a counter
You can track how many times a plugin has run:
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
Guidelines for in-memory caching
To get the most out of the in-memory cache, follow these guidelines:
- Use the trigger-specific namespace
- Use TTL appropriately
- Cache computation results
- Warm the cache
- Consider cache limitations
Use the trigger-specific namespace
The cache is designed to support stateful operations while maintaining isolation between different triggers. Use the trigger-specific namespace for most operations and the global namespace only when data sharing across triggers is necessary.
Use TTL appropriately
Set realistic expiration times based on how frequently data changes:
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
Cache computation results
Store the results of expensive calculations that need to be utilized frequently:
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
Consider cache limitations
- Memory Usage: Since cache contents are stored in memory, monitor your memory usage when caching large datasets.
- Server Restarts: Because the cache is cleared when the server restarts, design your plugins to handle cache initialization (as noted above).
- Concurrency: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
Next Steps
With an understanding of the InfluxDB 3 Shared Plugin API, you're ready to build data processing workflows that can transform, ana lyze, and respond to your time series data. To find example plugins you can extend, visit the plugin repo on GitHub.