362 lines
15 KiB
Markdown
362 lines
15 KiB
Markdown
---
|
|
title: Use Quix Streams to downsample data
|
|
description: >
|
|
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
|
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
|
downsample it, and then write the downsampled data back to InfluxDB.
|
|
menu:
|
|
influxdb3_cloud_serverless:
|
|
name: Use Quix
|
|
parent: Downsample data
|
|
identifier: downsample-quix
|
|
weight: 202
|
|
related:
|
|
- /influxdb3/cloud-serverless/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
|
---
|
|
|
|
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
|
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
|
downsample it, and then write the downsampled data back to InfluxDB.
|
|
Quix Streams is an open source Python library for building containerized stream
|
|
processing applications with Apache Kafka. It is designed to run as a service
|
|
that continuously processes a stream of data while streaming the results to a
|
|
Kafka topic. You can try it locally, with a local Kafka installation, or run it
|
|
in [Quix Cloud](https://quix.io/) with a free trial.
|
|
|
|
This guide uses [Python](https://www.python.org/) and the
|
|
[InfluxDB 3 Python client library](https://github.com/InfluxCommunity/influxdb3-python),
|
|
but you can use your runtime of choice and any of the available
|
|
[InfluxDB 3 client libraries](/influxdb3/cloud-serverless/reference/client-libraries/v3/).
|
|
This guide also assumes you have already
|
|
[setup your Python project and virtual environment](/influxdb3/cloud-serverless/query-data/execute-queries/client-libraries/python/#create-a-python-virtual-environment).
|
|
|
|
## Pipeline architecture
|
|
|
|
The following diagram illustrates how data is passed between processes as it is downsampled:
|
|
|
|
{{< html-diagram/quix-downsample-pipeline >}}
|
|
|
|
> [!Note]
|
|
> It is usually more efficient to write raw data directly to Kafka rather than
|
|
> writing raw data to InfluxDB first (essentially starting the Quix Streams
|
|
> pipeline with the "raw-data" topic). However, this guide assumes that you
|
|
> already have raw data in InfluxDB that you want to downsample.
|
|
|
|
---
|
|
|
|
1. [Set up prerequisites](#set-up-prerequisites)
|
|
2. [Install dependencies](#install-dependencies)
|
|
3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets)
|
|
4. [Create the downsampling logic](#create-the-downsampling-logic)
|
|
5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients)
|
|
1. [Create the producer](#create-the-producer)
|
|
2. [Create the consumer](#create-the-consumer)
|
|
6. [Get the full downsampling code files](#get-the-full-downsampling-code-files)
|
|
|
|
## Set up prerequisites
|
|
|
|
The process described in this guide requires the following:
|
|
|
|
- An InfluxDB Cloud Serverless account with data ready for downsampling.
|
|
- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a
|
|
local Apache Kafka or Red Panda installation.
|
|
- Familiarity with basic Python and Docker concepts.
|
|
|
|
## Install dependencies
|
|
|
|
Use `pip` to install the following dependencies:
|
|
|
|
- `influxdb_client_3`
|
|
- `quixstreams<2.5`
|
|
- `pandas`
|
|
|
|
```sh
|
|
pip install influxdb3-python pandas quixstreams<2.5
|
|
```
|
|
|
|
## Prepare InfluxDB buckets
|
|
|
|
The downsampling process involves two InfluxDB buckets.
|
|
Each bucket has a [retention period](/influxdb3/cloud-serverless/reference/glossary/#retention-period)
|
|
that specifies how long data persists before it expires and is deleted.
|
|
By using two buckets, you can store unmodified, high-resolution data in a bucket
|
|
with a shorter retention period and then downsampled, low-resolution data in a
|
|
bucket with a longer retention period.
|
|
|
|
Ensure you have a bucket for each of the following:
|
|
|
|
- One to query unmodified data from
|
|
- The other to write downsampled data to
|
|
|
|
For information about creating buckets, see
|
|
[Create a bucket](/influxdb3/cloud-serverless/admin/buckets/create-bucket/).
|
|
|
|
## Create the downsampling logic
|
|
|
|
This process reads the raw data from the input Kafka topic that stores data streamed from InfluxDB,
|
|
downsamples it, and then sends it to an output topic that is used to write back to InfluxDB.
|
|
|
|
1. Use the Quix Streams library's `Application` class to initialize a connection to Apache Kafka.
|
|
|
|
```py
|
|
from quixstreams import Application
|
|
|
|
app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
|
|
input_topic = app.topic('raw-data')
|
|
output_topic = app.topic('downsampled-data')
|
|
|
|
# ...
|
|
```
|
|
|
|
2. Configure the Quix Streams built-in windowing function to create a tumbling
|
|
window that continously downsamples the data into 1-minute buckets.
|
|
|
|
```py
|
|
# ...
|
|
target_field = 'temperature' # The field that you want to downsample.
|
|
|
|
def custom_ts_extractor(value):
|
|
# ...
|
|
# truncated for brevity - custom code that defines the 'time_recorded'
|
|
# field as the timestamp to use for windowing...
|
|
|
|
topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
|
|
|
|
sdf = (
|
|
sdf.apply(lambda value: value[target_field]) # Extract temperature values
|
|
.tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows
|
|
.mean() # Calculate average temperature
|
|
.final() # Emit results at window completion
|
|
)
|
|
|
|
sdf = sdf.apply(
|
|
lambda value: {
|
|
'time': value['end'], # End of the window
|
|
'temperature_avg': value['value'], # Average temperature
|
|
}
|
|
)
|
|
|
|
sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic
|
|
# ...
|
|
```
|
|
|
|
The results are streamed to the Kafka topic, `downsampled-data`.
|
|
|
|
> [!Note]
|
|
> "sdf" stands for "Streaming Dataframe".
|
|
|
|
You can find the full code for this process in the
|
|
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py).
|
|
|
|
## Create the producer and consumer clients
|
|
|
|
Use the `influxdb_client_3` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Apache Kafka:
|
|
|
|
- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka.
|
|
- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket.
|
|
|
|
### Create the producer client
|
|
|
|
Provide the following credentials for the producer:
|
|
|
|
- **host**: [{{< product-name >}} region URL](/influxdb3/cloud-serverless/reference/regions)
|
|
_(without the protocol)_
|
|
- **org**: InfluxDB organization name
|
|
- **token**: InfluxDB API token with read and write permissions on the buckets you
|
|
want to query and write to.
|
|
- **database**: InfluxDB bucket name
|
|
|
|
The producer queries for fresh data from InfluxDB at specific intervals. It's configured to look for a specific measurement defined in a variable. It writes the raw data to a Kafka topic called 'raw-data'
|
|
|
|
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
|
```py
|
|
from influxdb_client_3 import InfluxDBClient3
|
|
from quixstreams import Application
|
|
import pandas
|
|
|
|
# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
|
|
influxdb_raw = InfluxDBClient3(
|
|
host='{{< influxdb/host >}}',
|
|
token='API_TOKEN',
|
|
database='RAW_BUCKET_NAME'
|
|
)
|
|
|
|
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
|
|
|
# Create a Quix Streams producer application that connects to a local Kafka installation
|
|
app = Application(
|
|
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
|
consumer_group=consumer_group_name,
|
|
auto_create_topics=True
|
|
)
|
|
|
|
# Override the app variable if the local development env var is set to false or is not present.
|
|
# This causes Quix Streams to use an application configured for Quix Cloud
|
|
localdev = os.environ.get('localdev', 'false')
|
|
|
|
if localdev == 'false':
|
|
# Create a Quix platform-specific application instead (broker address is in-built)
|
|
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
|
|
|
topic = app.topic(name='raw-data')
|
|
|
|
## ... remaining code trunctated for brevity ...
|
|
|
|
# Query InfluxDB for the raw data and store it in a Dataframe
|
|
def get_data():
|
|
# Run in a loop until the main thread is terminated
|
|
while run:
|
|
try:
|
|
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
|
|
print(f'sending query {myquery}')
|
|
# Query InfluxDB 3 using influxql or sql
|
|
table = influxdb_raw.query(
|
|
query=myquery,
|
|
mode='pandas',
|
|
language='influxql')
|
|
|
|
#... remaining code trunctated for brevity ...
|
|
|
|
# Send the data to a Kafka topic for the downsampling process to consumer
|
|
def main():
|
|
"""
|
|
Read data from the Query and publish it to Kafka
|
|
"""
|
|
#... remaining code trunctated for brevity ...
|
|
|
|
for index, obj in enumerate(records):
|
|
print(obj) # Obj contains each row in the table includimng temperature
|
|
# Generate a unique message_key for each row
|
|
message_key = obj['machineId']
|
|
logger.info(f'Produced message with key:{message_key}, value:{obj}')
|
|
|
|
serialized = topic.serialize(
|
|
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
|
|
)
|
|
|
|
# publish each row returned in the query to the topic 'raw-data'
|
|
producer.produce(
|
|
topic=topic.name,
|
|
headers=serialized.headers,
|
|
key=serialized.key,
|
|
value=serialized.value,
|
|
)
|
|
|
|
```
|
|
{{% /code-placeholders %}}
|
|
|
|
You can find the full code for this process in the
|
|
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py).
|
|
|
|
### Create the consumer
|
|
|
|
As before, provide the following credentials for the consumer:
|
|
|
|
- **host**: [{{< product-name >}} region URL](/influxdb3/cloud-serverless/reference/regions)
|
|
_(without the protocol)_
|
|
- **org**: InfluxDB organization name
|
|
- **token**: InfluxDB API token with read and write permissions on the buckets you
|
|
want to query and write to.
|
|
- **database**: InfluxDB bucket name
|
|
|
|
This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB.
|
|
|
|
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
|
```py
|
|
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
|
|
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
|
|
influxdb_downsampled = InfluxDBClient3(
|
|
host='{{< influxdb/host >}}',
|
|
token='API_TOKEN',
|
|
database='DOWNSAMPLED_BUCKET_NAME',
|
|
org=''
|
|
)
|
|
|
|
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
|
|
|
# Create a Quix Streams consumer application that connects to a local Kafka installation
|
|
app = Application(
|
|
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
|
consumer_group=consumer_group_name,
|
|
auto_create_topics=True
|
|
)
|
|
|
|
# Override the app variable if the local development env var is set to false or is not present.
|
|
# This causes Quix Streams to use an application configured for Quix Cloud
|
|
localdev = os.environ.get('localdev', 'false')
|
|
|
|
if localdev == 'false':
|
|
# Create a Quix platform-specific application instead (broker address is in-built)
|
|
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
|
|
|
input_topic = app.topic('downsampled-data')
|
|
|
|
## ... remaining code trunctated for brevity ...
|
|
|
|
def send_data_to_influx(message):
|
|
logger.info(f'Processing message: {message}')
|
|
try:
|
|
|
|
## ... remaining code trunctated for brevity ...
|
|
|
|
# Construct the points dictionary
|
|
points = {
|
|
'measurement': measurement_name,
|
|
'tags': tags,
|
|
'fields': fields,
|
|
'time': message['time']
|
|
}
|
|
|
|
influxdb_downsampled.write(record=points, write_precision='ms')
|
|
|
|
sdf = app.dataframe(input_topic)
|
|
sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream
|
|
|
|
## ... remaining code trunctated for brevity ...
|
|
```
|
|
{{% /code-placeholders %}}
|
|
|
|
You can find the full code for this process in the
|
|
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Sink/main.py).
|
|
|
|
## Get the full downsampling code files
|
|
|
|
To get the complete set of files referenced in this tutorial, clone the Quix "downsampling template" repository or use an interactive version of this tutorial saved as a Jupyter Notebook.
|
|
|
|
### Clone the downsampling template repository
|
|
|
|
To clone the downsampling template, enter the following command in the command line:
|
|
|
|
```sh
|
|
git clone https://github.com/quixio/template-influxdbv3-downsampling.git
|
|
```
|
|
|
|
This repository contains the following folders which store different parts of the whole pipeline:
|
|
|
|
- **Machine Data to InfluxDB**: A script that generates synthetic machine data
|
|
and writes it to InfluxDB. This is useful if you dont have your own data yet,
|
|
or just want to work with test data first.
|
|
|
|
- It produces a reading every 250 milliseconds.
|
|
- This script originally comes from the
|
|
[InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py)
|
|
but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
|
|
|
|
- **InfluxDB V3 Data Source**: A service that queries for fresh data from
|
|
InfluxDB at specific intervals. It's configured to look for the measurement
|
|
produced by the previously-mentioned synthetic machine data generator.
|
|
It writes the raw data to a Kafka topic called "raw-data".
|
|
- **Downsampler**: A service that performs a 1-minute tumbling window operation
|
|
on the data from InfluxDB and emits the mean of the "temperature" reading
|
|
every minute. It writes the output to a "downsampled-data" Kafka topic.
|
|
- **InfluxDB V3 Data Sink**: A service that reads from the "downsampled-data"
|
|
topic and writes the downsample records as points back into InfluxDB.
|
|
|
|
### Use the downsampling Jupyter Notebook
|
|
|
|
You can use the interactive notebook ["Continuously downsample data using InfluxDB and Quix Streams"](https://github.com/quixio/tutorial-code/edit/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb) to try downsampling code yourself. It is configured to install Apache Kafka within the runtime environment (such as Google Colab).
|
|
|
|
Each process is also set up to run in the background so that a running cell does not block the rest of the tutorial.
|
|
|
|
<a target="_blank" href="https://colab.research.google.com/github/quixio/tutorial-code/blob/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
|