docs-v2/content/shared/extended-plugin-api.md

12 KiB
Raw Blame History

The Processing Engine includes a shared API that your plugins can use to interact with data, write new records in line protocol format, and maintain state between executions. These capabilities let you build plugins that transform, analyze, and respond to time series data as it flows through your database.

The plugin API lets you:

Get started with the shared API

Each plugin automatically has access to the shared API through the influxdb3_local object. You dont need to import any libraries. The API becomes available as soon as your plugin runs.

Write data

To write data into your database, use the LineBuilder API to create line protocol data:

# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)

InfluxDB 3 buffers your writes while the plugin runs and flushes them when the plugin completes.

{{% expand-wrapper %}} {{% expand "View the LineBuilder Python implementation" %}}

from typing import Optional
from collections import OrderedDict

class InfluxDBError(Exception):
    """Base exception for InfluxDB-related errors"""
    pass

class InvalidMeasurementError(InfluxDBError):
    """Raised when measurement name is invalid"""
    pass

class InvalidKeyError(InfluxDBError):
    """Raised when a tag or field key is invalid"""
    pass

class InvalidLineError(InfluxDBError):
    """Raised when a line protocol string is invalid"""
    pass

class LineBuilder:
    def __init__(self, measurement: str):
        if ' ' in measurement:
            raise InvalidMeasurementError("Measurement name cannot contain spaces")
        self.measurement = measurement
        self.tags: OrderedDict[str, str] = OrderedDict()
        self.fields: OrderedDict[str, str] = OrderedDict()
        self._timestamp_ns: Optional[int] = None

    def _validate_key(self, key: str, key_type: str) -> None:
        """Validate that a key does not contain spaces, commas, or equals signs."""
        if not key:
            raise InvalidKeyError(f"{key_type} key cannot be empty")
        if ' ' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
        if ',' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
        if '=' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")

    def tag(self, key: str, value: str) -> 'LineBuilder':
        """Add a tag to the line protocol."""
        self._validate_key(key, "tag")
        self.tags[key] = str(value)
        return self

    def uint64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an unsigned integer field to the line protocol."""
        self._validate_key(key, "field")
        if value < 0:
            raise ValueError(f"uint64 field '{key}' cannot be negative")
        self.fields[key] = f"{value}u"
        return self

    def int64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an integer field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = f"{value}i"
        return self

    def float64_field(self, key: str, value: float) -> 'LineBuilder':
        """Add a float field to the line protocol."""
        self._validate_key(key, "field")
        # Check if value has no decimal component
        self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
        return self

    def string_field(self, key: str, value: str) -> 'LineBuilder':
        """Add a string field to the line protocol."""
        self._validate_key(key, "field")
        # Escape quotes and backslashes in string values
        escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
        self.fields[key] = f'"{escaped_value}"'
        return self

    def bool_field(self, key: str, value: bool) -> 'LineBuilder':
        """Add a boolean field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = 't' if value else 'f'
        return self

    def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
        """Set the timestamp in nanoseconds."""
        self._timestamp_ns = timestamp_ns
        return self

    def build(self) -> str:
        """Build the line protocol string."""
        # Start with measurement name (escape commas only)
        line = self.measurement.replace(',', '\\,')

        # Add tags if present
        if self.tags:
            tags_str = ','.join(
                f"{k}={v}" for k, v in self.tags.items()
            )
            line += f",{tags_str}"

        # Add fields (required)
        if not self.fields:
            raise InvalidLineError(f"At least one field is required: {line}")

        fields_str = ','.join(
            f"{k}={v}" for k, v in self.fields.items()
        )
        line += f" {fields_str}"

        # Add timestamp if present
        if self._timestamp_ns is not None:
            line += f" {self._timestamp_ns}"

        return line

{{% /expand %}} {{% /expand-wrapper %}}

Query data

Your plugins can execute SQL queries and process results directly:

# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)

Query results are a List of Dict[String, Any], where each dictionary represents a row. Column names are keys, and column values are the corresponding values.

Log messages for monitoring and debugging

Use the shared API's info, warn, and error functions to log messages from your plugin. Each function accepts one or more arguments, converts them to strings, and logs them as a space-separated message.

Add logging to monitor plugin execution and assist with debugging:

influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)

The system writes all log messages to the server logs and stores them in system tables, where you can query them using SQL.

Maintain state with the in-memory cache

The Processing Engine provides an in-memory cache that enables your plugins to persist and retrieve data between executions.

Access the cache using the cache property of the shared API:

# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)

cache provides the following methods to retrieve and manage cached values:

Method Parameters Returns Description
put key (str): The key to store the value under
value (Any): Any Python object to cache
ttl (Optional[float], default=None): Time in seconds before expiration
use_global (bool, default=False): If True, uses global namespace
None Stores a value in the cache with an optional time-to-live
get key (str): The key to retrieve
default (Any, default=None): Value to return if key not found
use_global (bool, default=False): If True, uses global namespace
Any Retrieves a value from the cache or returns default if not found
delete key (str): The key to delete
use_global (bool, default=False): If True, uses global namespace
bool Deletes a value from the cache. Returns True if deleted, False if not found

Understanding cache namespaces

The cache system offers two distinct namespaces:

Namespace Scope Best For
Trigger-specific (default) Isolated to a single trigger Plugin state, counters, timestamps specific to one plugin
Global Shared across all triggers Configuration, lookup tables, service states that should be available to all plugins

Common cache operations

Store and retrieve cached data

# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")

Store cached data with expiration

# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)

Share data across plugins

# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)

Building a counter

You can track how many times a plugin has run:

# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")

Guidelines for in-memory caching

To get the most out of the in-memory cache, follow these guidelines:

Use the trigger-specific namespace

The Processing Engine provides a cache that supports stateful operations while maintaining isolation between different triggers. For most use cases, use the trigger-specific namespace to keep plugin state isolated. Use the global namespace only when you need to share data across triggers.

Use TTL appropriately

Set appropriate expiration times based on how frequently your data changes:

# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)

Cache computation results

Store the results of expensive calculations that you frequently utilize:

# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)

Warm the cache

For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:

# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())

Consider cache limitations

  • Memory Usage: Since the system stores cache contents in memory, monitor your memory usage when caching large datasets.
  • Server Restarts: Because the server clears the cache on restart, design your plugins to handle cache initialization (as noted above).
  • Concurrency: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.

Next Steps

With an understanding of the InfluxDB 3 Shared Plugin API, you can start building data workflows that transform, analyze, and respond to your time series data.

To find example plugins you can extend, visit the influxdb3_plugins repository on GitHub.