Merge branch 'master' into jstirnaman/issue5463
commit
9a8ae35c6e
|
@ -0,0 +1,16 @@
|
|||
---
|
||||
title: Downsample data stored in InfluxDB
|
||||
description: >
|
||||
Learn about different methods for querying and downsampling time series data
|
||||
stored in InfluxDB.
|
||||
weight: 101
|
||||
menu:
|
||||
influxdb_cloud_dedicated:
|
||||
name: Downsample data
|
||||
parent: Process & visualize data
|
||||
---
|
||||
|
||||
Learn about different methods for querying and downsampling time series data
|
||||
stored in InfluxDB.
|
||||
|
||||
{{< children >}}
|
|
@ -1,13 +1,13 @@
|
|||
---
|
||||
title: Downsample data stored in InfluxDB
|
||||
title: Use client libraries to downsample data
|
||||
description: >
|
||||
Query and downsample time series data stored in InfluxDB and write the
|
||||
downsampled data back to InfluxDB.
|
||||
Use InfluxDB client libraries to query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.
|
||||
menu:
|
||||
influxdb_cloud_dedicated:
|
||||
name: Downsample data
|
||||
parent: Process & visualize data
|
||||
weight: 101
|
||||
name: Use client libraries
|
||||
parent: Downsample data
|
||||
identifier: influxdb-dedicated-downsample-client-libraries
|
||||
weight: 201
|
||||
related:
|
||||
- /influxdb/cloud-dedicated/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
---
|
|
@ -5,12 +5,11 @@ description: >
|
|||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
menu:
|
||||
influxdb_cloud_serverless:
|
||||
influxdb_cloud_dedicated:
|
||||
name: Use Quix
|
||||
parent: Downsample data
|
||||
weight: 202
|
||||
related:
|
||||
- /influxdb/cloud-serverless/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
identifier: influxdb-dedicated-downsample-quix
|
||||
weight: 102
|
||||
---
|
||||
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
|
@ -100,11 +99,10 @@ downsamples it, and then sends it to an output topic that is used to write back
|
|||
|
||||
```py
|
||||
from quixstreams import Application
|
||||
from quixstreams.models.serializers.quix import JSONDeserializer, JSONSerializer
|
||||
|
||||
app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
|
||||
input_topic = app.topic('raw-data', value_deserializer=JSONDeserializer())
|
||||
output_topic = app.topic('downsampled-data', value_serializer=JSONSerializer())
|
||||
input_topic = app.topic('raw-data')
|
||||
output_topic = app.topic('downsampled-data')
|
||||
|
||||
# ...
|
||||
```
|
||||
|
@ -174,7 +172,6 @@ The producer queries for fresh data from InfluxDB at specific intervals. It's co
|
|||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from quixstreams import Application
|
||||
from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext
|
||||
import pandas
|
||||
|
||||
# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
|
||||
|
@ -201,8 +198,7 @@ if localdev == 'false':
|
|||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
serializer = JSONSerializer()
|
||||
topic = app.topic(name='raw-data', value_serializer='json')
|
||||
topic = app.topic(name='raw-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
|
@ -293,7 +289,7 @@ if localdev == 'false':
|
|||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
input_topic = app.topic('downsampled-data', value_deserializer=JSONDeserializer())
|
||||
input_topic = app.topic('downsampled-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
|
@ -0,0 +1,363 @@
|
|||
---
|
||||
title: Use Quix Streams to downsample data
|
||||
description: >
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
menu:
|
||||
influxdb_cloud_serverless:
|
||||
name: Use Quix
|
||||
parent: Downsample data
|
||||
identifier: downsample-quix
|
||||
weight: 202
|
||||
related:
|
||||
- /influxdb/cloud-serverless/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
---
|
||||
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
Quix Streams is an open source Python library for building containerized stream
|
||||
processing applications with Apache Kafka. It is designed to run as a service
|
||||
that continuously processes a stream of data while streaming the results to a
|
||||
Kafka topic. You can try it locally, with a local Kafka installation, or run it
|
||||
in [Quix Cloud](https://quix.io/) with a free trial.
|
||||
|
||||
This guide uses [Python](https://www.python.org/) and the
|
||||
[InfluxDB v3 Python client library](https://github.com/InfluxCommunity/influxdb3-python),
|
||||
but you can use your runtime of choice and any of the available
|
||||
[InfluxDB v3 client libraries](/influxdb/cloud-serverless/reference/client-libraries/v3/).
|
||||
This guide also assumes you have already
|
||||
[setup your Python project and virtual environment](/influxdb/cloud-serverless/query-data/execute-queries/client-libraries/python/#create-a-python-virtual-environment).
|
||||
|
||||
## Pipeline architecture
|
||||
|
||||
The following diagram illustrates how data is passed between processes as it is downsampled:
|
||||
|
||||
{{< html-diagram/quix-downsample-pipeline >}}
|
||||
|
||||
{{% note %}}
|
||||
It is usually more efficient to write raw data directly to Kafka rather than
|
||||
writing raw data to InfluxDB first (essentially starting the Quix Streams
|
||||
pipeline with the "raw-data" topic). However, this guide assumes that you
|
||||
already have raw data in InfluxDB that you want to downsample.
|
||||
{{% /note %}}
|
||||
|
||||
---
|
||||
|
||||
1. [Set up prerequisites](#set-up-prerequisites)
|
||||
2. [Install dependencies](#install-dependencies)
|
||||
3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets)
|
||||
4. [Create the downsampling logic](#create-the-downsampling-logic)
|
||||
5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients)
|
||||
1. [Create the producer](#create-the-producer)
|
||||
2. [Create the consumer](#create-the-consumer)
|
||||
6. [Get the full downsampling code files](#get-the-full-downsampling-code-files)
|
||||
|
||||
## Set up prerequisites
|
||||
|
||||
The process described in this guide requires the following:
|
||||
|
||||
- An InfluxDB Cloud Serverless account with data ready for downsampling.
|
||||
- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a
|
||||
local Apache Kafka or Red Panda installation.
|
||||
- Familiarity with basic Python and Docker concepts.
|
||||
|
||||
## Install dependencies
|
||||
|
||||
Use `pip` to install the following dependencies:
|
||||
|
||||
- `influxdb_client_3`
|
||||
- `quixstreams<2.5`
|
||||
- `pandas`
|
||||
|
||||
```sh
|
||||
pip install influxdb3-python pandas quixstreams<2.5
|
||||
```
|
||||
|
||||
## Prepare InfluxDB buckets
|
||||
|
||||
The downsampling process involves two InfluxDB buckets.
|
||||
Each bucket has a [retention period](/influxdb/cloud-serverless/reference/glossary/#retention-period)
|
||||
that specifies how long data persists before it expires and is deleted.
|
||||
By using two buckets, you can store unmodified, high-resolution data in a bucket
|
||||
with a shorter retention period and then downsampled, low-resolution data in a
|
||||
bucket with a longer retention period.
|
||||
|
||||
Ensure you have a bucket for each of the following:
|
||||
|
||||
- One to query unmodified data from
|
||||
- The other to write downsampled data to
|
||||
|
||||
For information about creating buckets, see
|
||||
[Create a bucket](/influxdb/cloud-serverless/admin/buckets/create-bucket/).
|
||||
|
||||
## Create the downsampling logic
|
||||
|
||||
This process reads the raw data from the input Kafka topic that stores data streamed from InfluxDB,
|
||||
downsamples it, and then sends it to an output topic that is used to write back to InfluxDB.
|
||||
|
||||
1. Use the Quix Streams library's `Application` class to initialize a connection to Apache Kafka.
|
||||
|
||||
```py
|
||||
from quixstreams import Application
|
||||
|
||||
app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
|
||||
input_topic = app.topic('raw-data')
|
||||
output_topic = app.topic('downsampled-data')
|
||||
|
||||
# ...
|
||||
```
|
||||
|
||||
2. Configure the Quix Streams built-in windowing function to create a tumbling
|
||||
window that continously downsamples the data into 1-minute buckets.
|
||||
|
||||
```py
|
||||
# ...
|
||||
target_field = 'temperature' # The field that you want to downsample.
|
||||
|
||||
def custom_ts_extractor(value):
|
||||
# ...
|
||||
# truncated for brevity - custom code that defines the 'time_recorded'
|
||||
# field as the timestamp to use for windowing...
|
||||
|
||||
topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
|
||||
|
||||
sdf = (
|
||||
sdf.apply(lambda value: value[target_field]) # Extract temperature values
|
||||
.tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows
|
||||
.mean() # Calculate average temperature
|
||||
.final() # Emit results at window completion
|
||||
)
|
||||
|
||||
sdf = sdf.apply(
|
||||
lambda value: {
|
||||
'time': value['end'], # End of the window
|
||||
'temperature_avg': value['value'], # Average temperature
|
||||
}
|
||||
)
|
||||
|
||||
sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic
|
||||
# ...
|
||||
```
|
||||
|
||||
The results are streamed to the Kafka topic, `downsampled-data`.
|
||||
|
||||
{{% note %}}
|
||||
Note: "sdf" stands for "Streaming Dataframe".
|
||||
{{% /note %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py).
|
||||
|
||||
## Create the producer and consumer clients
|
||||
|
||||
Use the `influxdb_client_3` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Apache Kafka:
|
||||
|
||||
- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka.
|
||||
- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket.
|
||||
|
||||
### Create the producer client
|
||||
|
||||
Provide the following credentials for the producer:
|
||||
|
||||
- **host**: [{{< product-name >}} region URL](/influxdb/cloud-serverless/reference/regions)
|
||||
_(without the protocol)_
|
||||
- **org**: InfluxDB organization name
|
||||
- **token**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **database**: InfluxDB bucket name
|
||||
|
||||
The producer queries for fresh data from InfluxDB at specific intervals. It's configured to look for a specific measurement defined in a variable. It writes the raw data to a Kafka topic called 'raw-data'
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from quixstreams import Application
|
||||
import pandas
|
||||
|
||||
# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
|
||||
influxdb_raw = InfluxDBClient3(
|
||||
host='{{< influxdb/host >}}',
|
||||
token='API_TOKEN',
|
||||
database='RAW_BUCKET_NAME'
|
||||
)
|
||||
|
||||
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
||||
|
||||
# Create a Quix Streams producer application that connects to a local Kafka installation
|
||||
app = Application(
|
||||
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
||||
consumer_group=consumer_group_name,
|
||||
auto_create_topics=True
|
||||
)
|
||||
|
||||
# Override the app variable if the local development env var is set to false or is not present.
|
||||
# This causes Quix Streams to use an application configured for Quix Cloud
|
||||
localdev = os.environ.get('localdev', 'false')
|
||||
|
||||
if localdev == 'false':
|
||||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
topic = app.topic(name='raw-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Query InfluxDB for the raw data and store it in a Dataframe
|
||||
def get_data():
|
||||
# Run in a loop until the main thread is terminated
|
||||
while run:
|
||||
try:
|
||||
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
|
||||
print(f'sending query {myquery}')
|
||||
# Query InfluxDB 3.0 using influxql or sql
|
||||
table = influxdb_raw.query(
|
||||
query=myquery,
|
||||
mode='pandas',
|
||||
language='influxql')
|
||||
|
||||
#... remaining code trunctated for brevity ...
|
||||
|
||||
# Send the data to a Kafka topic for the downsampling process to consumer
|
||||
def main():
|
||||
"""
|
||||
Read data from the Query and publish it to Kafka
|
||||
"""
|
||||
#... remaining code trunctated for brevity ...
|
||||
|
||||
for index, obj in enumerate(records):
|
||||
print(obj) # Obj contains each row in the table includimng temperature
|
||||
# Generate a unique message_key for each row
|
||||
message_key = obj['machineId']
|
||||
logger.info(f'Produced message with key:{message_key}, value:{obj}')
|
||||
|
||||
serialized = topic.serialize(
|
||||
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
|
||||
)
|
||||
|
||||
# publish each row returned in the query to the topic 'raw-data'
|
||||
producer.produce(
|
||||
topic=topic.name,
|
||||
headers=serialized.headers,
|
||||
key=serialized.key,
|
||||
value=serialized.value,
|
||||
)
|
||||
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py).
|
||||
|
||||
### Create the consumer
|
||||
|
||||
As before, provide the following credentials for the consumer:
|
||||
|
||||
- **host**: [{{< product-name >}} region URL](/influxdb/cloud-serverless/reference/regions)
|
||||
_(without the protocol)_
|
||||
- **org**: InfluxDB organization name
|
||||
- **token**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **database**: InfluxDB bucket name
|
||||
|
||||
This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB.
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
|
||||
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
|
||||
influxdb_downsampled = InfluxDBClient3(
|
||||
host='{{< influxdb/host >}}',
|
||||
token='API_TOKEN',
|
||||
database='DOWNSAMPLED_BUCKET_NAME',
|
||||
org=''
|
||||
)
|
||||
|
||||
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
||||
|
||||
# Create a Quix Streams consumer application that connects to a local Kafka installation
|
||||
app = Application(
|
||||
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
||||
consumer_group=consumer_group_name,
|
||||
auto_create_topics=True
|
||||
)
|
||||
|
||||
# Override the app variable if the local development env var is set to false or is not present.
|
||||
# This causes Quix Streams to use an application configured for Quix Cloud
|
||||
localdev = os.environ.get('localdev', 'false')
|
||||
|
||||
if localdev == 'false':
|
||||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
input_topic = app.topic('downsampled-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
def send_data_to_influx(message):
|
||||
logger.info(f'Processing message: {message}')
|
||||
try:
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Construct the points dictionary
|
||||
points = {
|
||||
'measurement': measurement_name,
|
||||
'tags': tags,
|
||||
'fields': fields,
|
||||
'time': message['time']
|
||||
}
|
||||
|
||||
influxdb_downsampled.write(record=points, write_precision='ms')
|
||||
|
||||
sdf = app.dataframe(input_topic)
|
||||
sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Sink/main.py).
|
||||
|
||||
## Get the full downsampling code files
|
||||
|
||||
To get the complete set of files referenced in this tutorial, clone the Quix "downsampling template" repository or use an interactive version of this tutorial saved as a Jupyter Notebook.
|
||||
|
||||
### Clone the downsampling template repository
|
||||
|
||||
To clone the downsampling template, enter the following command in the command line:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/quixio/template-influxdbv3-downsampling.git
|
||||
```
|
||||
|
||||
This repository contains the following folders which store different parts of the whole pipeline:
|
||||
|
||||
- **Machine Data to InfluxDB**: A script that generates synthetic machine data
|
||||
and writes it to InfluxDB. This is useful if you dont have your own data yet,
|
||||
or just want to work with test data first.
|
||||
|
||||
- It produces a reading every 250 milliseconds.
|
||||
- This script originally comes from the
|
||||
[InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py)
|
||||
but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
|
||||
|
||||
- **InfluxDB V3 Data Source**: A service that queries for fresh data from
|
||||
InfluxDB at specific intervals. It's configured to look for the measurement
|
||||
produced by the previously-mentioned synthetic machine data generator.
|
||||
It writes the raw data to a Kafka topic called "raw-data".
|
||||
- **Downsampler**: A service that performs a 1-minute tumbling window operation
|
||||
on the data from InfluxDB and emits the mean of the "temperature" reading
|
||||
every minute. It writes the output to a "downsampled-data" Kafka topic.
|
||||
- **InfluxDB V3 Data Sink**: A service that reads from the "downsampled-data"
|
||||
topic and writes the downsample records as points back into InfluxDB.
|
||||
|
||||
### Use the downsampling Jupyter Notebook
|
||||
|
||||
You can use the interactive notebook ["Continuously downsample data using InfluxDB and Quix Streams"](https://github.com/quixio/tutorial-code/edit/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb) to try downsampling code yourself. It is configured to install Apache Kafka within the runtime environment (such as Google Colab).
|
||||
|
||||
Each process is also set up to run in the background so that a running cell does not block the rest of the tutorial.
|
||||
|
||||
<a target="_blank" href="https://colab.research.google.com/github/quixio/tutorial-code/blob/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
|
|
@ -0,0 +1,332 @@
|
|||
---
|
||||
title: Downsample data with Python and Quix Streams
|
||||
description: >
|
||||
Use Quix Streams to create a Python service that downsamples data stored in InfluxDB.
|
||||
menu:
|
||||
influxdb_cloud:
|
||||
name: Downsample with Quix
|
||||
parent: Common tasks
|
||||
identifier: influxdb_cloud-downsample-quix
|
||||
weight: 202
|
||||
---
|
||||
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
Quix Streams is an open source Python library for building containerized stream
|
||||
processing applications with Apache Kafka. It is designed to run as a service
|
||||
that continuously processes a stream of data while streaming the results to a
|
||||
Kafka topic. You can try it locally, with a local Kafka installation, or run it
|
||||
in [Quix Cloud](https://quix.io/) with a free trial.
|
||||
A common practice when processing high volume data is to downsample it before comitting
|
||||
it to InfluxDB to reduce the overall disk usage as data collects over time.
|
||||
|
||||
This guide walks through the process of creating a series of Python services that ingest
|
||||
from an InfluxDB v2 bucket and then downsample and publish the data to another InfluxDB v2 bucket.
|
||||
By aggregating data within windows of time and then storing the aggregate values back to InfluxDB, you can reduce
|
||||
disk usage and costs over time.
|
||||
|
||||
The guide uses the InfluxDB v2 and Quix Streams Python client libraries and can be run locally or deployed within Quix Cloud with a free trial. It assumes you have setup a Python project and virtual environment.
|
||||
|
||||
## Pipeline architecture
|
||||
The following diagram illustrates how data is passed between processes as it is downsampled:
|
||||
|
||||
{{< html-diagram/quix-downsample-pipeline "v2" >}}
|
||||
|
||||
{{% note %}}
|
||||
It is usually more efficient to write raw data directly to Kafka rather than
|
||||
writing raw data to InfluxDB first (essentially starting the Quix Streams
|
||||
pipeline with the "influxv2-data" topic). However, this guide assumes that you
|
||||
already have raw data in InfluxDB that you want to downsample.
|
||||
{{% /note %}}
|
||||
|
||||
---
|
||||
|
||||
1. [Set up prerequisites](#set-up-prerequisites)
|
||||
2. [Install dependencies](#install-dependencies)
|
||||
3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets)
|
||||
4. [Create the downsampling logic](#create-the-downsampling-logic)
|
||||
5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients)
|
||||
1. [Create the producer](#create-the-producer)
|
||||
2. [Create the consumer](#create-the-consumer)
|
||||
6. [Run the machine data generator](#run-the-machine-data-generator)
|
||||
7. [Get the full downsampling code files](#get-the-full-downsampling-code-files)
|
||||
|
||||
|
||||
## Set up prerequisites
|
||||
|
||||
The process described in this guide requires the following:
|
||||
|
||||
- InfluxDB v2 with data ready for downsampling. [Use the machine data generator code](#run-the-machine-data-generator) below.
|
||||
- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a
|
||||
local Apache Kafka or Red Panda installation.
|
||||
- Familiarity with basic Python and Docker concepts.
|
||||
|
||||
## Install dependencies
|
||||
|
||||
Use `pip` to install the following dependencies:
|
||||
|
||||
- `influxdb-client` (InfluxDB v2 client library)
|
||||
- `quixstreams<2.5` (Quixstreams client library)
|
||||
- `pandas` (data analysis and manipulation tool)
|
||||
|
||||
|
||||
```sh
|
||||
pip install influxdb-client pandas quixstreams<2.5
|
||||
```
|
||||
|
||||
## Prepare InfluxDB buckets
|
||||
|
||||
The downsampling process involves two InfluxDB buckets.
|
||||
Each bucket has a [retention period](/influxdb/cloud/reference/glossary/#retention-period)
|
||||
that specifies how long data persists before it expires and is deleted.
|
||||
By using two buckets, you can store unmodified, high-resolution data in a bucket
|
||||
with a shorter retention period and then downsampled, low-resolution data in a
|
||||
bucket with a longer retention period.
|
||||
|
||||
Ensure you have a bucket for each of the following:
|
||||
|
||||
- One to query unmodified data from your InfluxDB v2 cluster
|
||||
- The other to write downsampled data into
|
||||
|
||||
## Create the downsampling logic
|
||||
|
||||
This process reads the raw data from the input Kafka topic that stores data streamed from the InfluxDB v2 bucket,
|
||||
downsamples it, and then sends it to an output topic which is later written back to another bucket.
|
||||
|
||||
1. Use the Quix Streams library's `Application` class to initialize a connection to the Kafka topics.
|
||||
|
||||
```py
|
||||
from quixstreams import Application
|
||||
|
||||
app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest")
|
||||
input_topic = app.topic("input")
|
||||
output_topic = app.topic("output")
|
||||
# ...
|
||||
```
|
||||
|
||||
2. Configure the Quix Streams built-in windowing function to create a tumbling
|
||||
window that continously downsamples the data into 1-minute buckets.
|
||||
|
||||
```py
|
||||
# ...
|
||||
target_field = "temperature" # The field that you want to downsample.
|
||||
|
||||
def custom_ts_extractor(value):
|
||||
# ...
|
||||
# truncated for brevity - custom code that defines the "time_recorded"
|
||||
# field as the timestamp to use for windowing...
|
||||
|
||||
topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
|
||||
|
||||
sdf = (
|
||||
sdf.apply(lambda value: value[target_field]) # Extract temperature values
|
||||
.tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows
|
||||
.mean() # Calculate average temperature
|
||||
.final() # Emit results at window completion
|
||||
)
|
||||
|
||||
sdf = sdf.apply(
|
||||
lambda value: {
|
||||
"time": value["end"], # End of the window
|
||||
"temperature_avg": value["value"], # Average temperature
|
||||
}
|
||||
)
|
||||
|
||||
sdf.to_topic(output_topic) # Output results to the "downsampled" topic
|
||||
# ...
|
||||
```
|
||||
|
||||
The results are streamed to the Kafka topic, `downsampled`.
|
||||
|
||||
{{% note %}}
|
||||
Note: "sdf" stands for "Streaming Dataframe".
|
||||
{{% /note %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/Downsampler/main.py).
|
||||
|
||||
## Create the producer and consumer clients
|
||||
|
||||
Use the `influxdb_client` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Kafka:
|
||||
|
||||
- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka.
|
||||
- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket.
|
||||
|
||||
### Create the producer
|
||||
|
||||
Provide the following credentials for the producer:
|
||||
|
||||
- **INFLUXDB_HOST**: [{{< product-name >}} region URL](/influxdb/cloud/reference/regions)
|
||||
_(without the protocol)_
|
||||
- **INFLUXDB_ORG**: InfluxDB organization name
|
||||
- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **INFLUXDB_BUCKET**: InfluxDB bucket name
|
||||
|
||||
The producer queries for fresh data from InfluxDB at specific intervals. It writes the raw data to a Kafka topic called `influxv2-data`.
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from quixstreams import Application
|
||||
import influxdb_client
|
||||
# Create a Quix Application
|
||||
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
|
||||
# Define the topic using the "output" environment variable
|
||||
topic = app.topic(os.getenv("output", "influxv2-data"))
|
||||
# Create an InfluxDB v2 client
|
||||
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
|
||||
org=os.environ["INFLUXDB_ORG"],
|
||||
url=os.environ["INFLUXDB_HOST"])
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Function to fetch data from InfluxDB
|
||||
# It runs in a continuous loop, periodically fetching data based on the interval.
|
||||
def get_data():
|
||||
# Run in a loop until the main thread is terminated
|
||||
while run:
|
||||
try:
|
||||
# Query InfluxDB 2.0 using flux
|
||||
flux_query = f'''
|
||||
from(bucket: "{bucket}")
|
||||
|> range(start: -{interval})
|
||||
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
'''
|
||||
logger.info(f"Sending query: {flux_query}")
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Create a pre-configured Producer object.
|
||||
with app.get_producer() as producer:
|
||||
for res in get_data():
|
||||
# Get the data from InfluxDB
|
||||
records = json.loads(res)
|
||||
for index, obj in enumerate(records):
|
||||
logger.info(f"Produced message with key:{message_key}, value:{obj}")
|
||||
# Publish the data to the Kafka topic
|
||||
producer.produce(
|
||||
topic=topic.name,
|
||||
key=message_key,
|
||||
value=obj,
|
||||
)
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/InfluxDB%20V2%20Data%20Source/main.py).
|
||||
|
||||
### Create the consumer
|
||||
|
||||
As before, provide the following credentials for the consumer:
|
||||
|
||||
- **INFLUXDB_HOST**: [{{< product-name >}} region URL](/influxdb/cloud/reference/regions)
|
||||
_(without the protocol)_
|
||||
- **INFLUXDB_ORG**: InfluxDB organization name
|
||||
- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **INFLUXDB_BUCKET**: InfluxDB bucket name
|
||||
|
||||
{{% note %}}
|
||||
Note: These will be your InfluxDB v2 credentials.
|
||||
{{% /note %}}
|
||||
|
||||
This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB.
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from quixstreams import Application, State
|
||||
from influxdb_client import InfluxDBClient, Point
|
||||
|
||||
# Create a Quix platform-specific application instead
|
||||
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)
|
||||
|
||||
input_topic = app.topic(os.getenv("input", "input-data"))
|
||||
|
||||
# Initialize InfluxDB v2 client
|
||||
influx2_client = InfluxDBClient(url=influx_host,
|
||||
token=influx_token,
|
||||
org=influx_org)
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
def send_data_to_influx(message: dict, state: State):
|
||||
global last_write_time_ns, points_buffer, service_start_state
|
||||
|
||||
try:
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
# Check if it's time to write the batch
|
||||
# 10k records have accumulated or 15 seconds have passed
|
||||
if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9:
|
||||
with influx2_client.write_api() as write_api:
|
||||
logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.")
|
||||
write_api.write(influx_bucket, influx_org, points_buffer)
|
||||
|
||||
# Clear the buffer and update the last write time
|
||||
points_buffer = []
|
||||
last_write_time_ns = int(time() * 1e9)
|
||||
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"{str(datetime.utcnow())}: Write failed")
|
||||
logger.info(e)
|
||||
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
# We use Quix Streams StreamingDataframe (SDF) to handle every message
|
||||
# in the Kafka topic by writing it to InfluxDB
|
||||
sdf = app.dataframe(input_topic)
|
||||
sdf = sdf.update(send_data_to_influx, stateful=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting application")
|
||||
app.run(sdf)
|
||||
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/quix-samples/tree/develop/python/destinations/influxdb_2).
|
||||
|
||||
## Run the Machine data generator
|
||||
|
||||
Now it's time to run the machine data generator code which will populate your source
|
||||
bucket with data which will be read by the [producer](#create-the-consumer).
|
||||
|
||||
Run `main.py` from the `Machine data to InfluxDB` folder in the GitHub repository.
|
||||
|
||||
## Get the full downsampling code files
|
||||
|
||||
To get the complete set of files referenced in this tutorial, clone the Quix "downsampling" repository.
|
||||
|
||||
### Clone the downsampling template repository
|
||||
|
||||
To clone the downsampling template, enter the following command in the command line:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git
|
||||
```
|
||||
|
||||
This repository contains the following folders which store different parts of the whole pipeline:
|
||||
|
||||
- **Machine Data to InfluxDB**: A script that generates synthetic machine data
|
||||
and writes it to InfluxDB. This is useful if you dont have your own data yet,
|
||||
or just want to work with test data first.
|
||||
|
||||
- It produces a reading every 250 milliseconds.
|
||||
- This script originally comes from the
|
||||
[InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py)
|
||||
but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
|
||||
|
||||
- **InfluxDB v2 Data Source**: A service that queries for fresh data from
|
||||
InfluxDB at specific intervals. It's configured to look for the measurement
|
||||
produced by the previously-mentioned synthetic machine data generator.
|
||||
It writes the raw data to a Kafka topic called "influxv2-data".
|
||||
- **Downsampler**: A service that performs a 1-minute tumbling window operation
|
||||
on the data from InfluxDB and emits the mean of the "temperature" reading
|
||||
every minute. It writes the output to a "downsampled" Kafka topic.
|
||||
- **InfluxDB v2 Data Sink**: A service that reads from the "downsampled"
|
||||
topic and writes the downsampled records as points back into InfluxDB.
|
||||
|
|
@ -6,7 +6,7 @@ description: >
|
|||
in previous versions of InfluxDB.
|
||||
menu:
|
||||
influxdb_cloud:
|
||||
name: Downsample data
|
||||
name: Downsample with InfluxDB
|
||||
parent: Common tasks
|
||||
weight: 201
|
||||
influxdb/cloud/tags: [tasks]
|
||||
|
@ -32,7 +32,7 @@ A separate bucket where aggregated, downsampled data is stored.
|
|||
To downsample data, it must be aggregated in some way.
|
||||
What specific method of aggregation you use depends on your specific use case,
|
||||
but examples include mean, median, top, bottom, etc.
|
||||
View [Flux's aggregate functions](/flux/v0/stdlib/universe/)
|
||||
View [Flux's aggregate functions](/flux/v0/function-types/#aggregates)
|
||||
for more information and ideas.
|
||||
|
||||
## Example downsampling task script
|
||||
|
@ -48,7 +48,6 @@ The example task script below is a very basic form of data downsampling that doe
|
|||
|
||||
```js
|
||||
// Task Options
|
||||
// Task Options
|
||||
option task = {name: "cq-mem-data-1w", every: 1w}
|
||||
|
||||
// Defines a data source
|
||||
|
@ -72,7 +71,6 @@ Once your task is ready, see [Create a task](/influxdb/cloud/process-data/manage
|
|||
## Things to consider
|
||||
- If there is a chance that data may arrive late, specify an `offset` in your
|
||||
task options long enough to account for late-data.
|
||||
- If running a task against a bucket with a finite retention period, do not schedule
|
||||
tasks to run too closely to the end of the retention period.
|
||||
Always provide a "cushion" for downsampling tasks to complete before the data
|
||||
is dropped by the retention period.
|
||||
- If running a task against a bucket with a finite retention period,
|
||||
schedule tasks to run prior to the end of the retention period to let
|
||||
downsampling tasks complete before data outside of the retention period is dropped.
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
title: Downsample data stored in InfluxDB
|
||||
description: >
|
||||
Learn about different methods for querying and downsampling time series data
|
||||
stored in InfluxDB.
|
||||
menu:
|
||||
influxdb_clustered:
|
||||
name: Downsample data
|
||||
parent: Process & visualize data
|
||||
weight: 101
|
||||
related:
|
||||
- /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
---
|
||||
|
||||
Learn about different methods for querying and downsampling time series data
|
||||
stored in InfluxDB.
|
||||
|
||||
{{< children >}}
|
|
@ -1,13 +1,13 @@
|
|||
---
|
||||
title: Downsample data stored in InfluxDB
|
||||
title: Use client libraries to downsample data
|
||||
description: >
|
||||
Query and downsample time series data stored in InfluxDB and write the
|
||||
downsampled data back to InfluxDB.
|
||||
Use InfluxDB client libraries to query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.
|
||||
menu:
|
||||
influxdb_clustered:
|
||||
name: Downsample data
|
||||
parent: Process & visualize data
|
||||
weight: 101
|
||||
name: Use client libraries
|
||||
parent: Downsample data
|
||||
identifier: downsample-influx-client-libraries
|
||||
weight: 201
|
||||
related:
|
||||
- /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
---
|
|
@ -0,0 +1,363 @@
|
|||
---
|
||||
title: Use Quix Streams to downsample data
|
||||
description: >
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
menu:
|
||||
influxdb_clustered:
|
||||
name: Use Quix Streams
|
||||
parent: Downsample data
|
||||
identifier: downsample-quix
|
||||
weight: 202
|
||||
related:
|
||||
- /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL)
|
||||
---
|
||||
|
||||
Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series
|
||||
data stored in InfluxDB and written to Kafka at regular intervals, continuously
|
||||
downsample it, and then write the downsampled data back to InfluxDB.
|
||||
Quix Streams is an open source Python library for building containerized stream
|
||||
processing applications with Apache Kafka. It is designed to run as a service
|
||||
that continuously processes a stream of data while streaming the results to a
|
||||
Kafka topic. You can try it locally, with a local Kafka installation, or run it
|
||||
in [Quix Cloud](https://quix.io/) with a free trial.
|
||||
|
||||
This guide uses [Python](https://www.python.org/) and the
|
||||
[InfluxDB v3 Python client library](https://github.com/InfluxCommunity/influxdb3-python),
|
||||
but you can use your runtime of choice and any of the available
|
||||
[InfluxDB v3 client libraries](/influxdb/cloud-serverless/reference/client-libraries/v3/).
|
||||
This guide also assumes you have already
|
||||
[setup your Python project and virtual environment](/influxdb/cloud-serverless/query-data/execute-queries/client-libraries/python/#create-a-python-virtual-environment).
|
||||
|
||||
## Pipeline architecture
|
||||
|
||||
The following diagram illustrates how data is passed between processes as it is downsampled:
|
||||
|
||||
{{< html-diagram/quix-downsample-pipeline >}}
|
||||
|
||||
{{% note %}}
|
||||
It is usually more efficient to write raw data directly to Kafka rather than
|
||||
writing raw data to InfluxDB first (essentially starting the Quix Streams
|
||||
pipeline with the "raw-data" topic). However, this guide assumes that you
|
||||
already have raw data in InfluxDB that you want to downsample.
|
||||
{{% /note %}}
|
||||
|
||||
---
|
||||
|
||||
1. [Set up prerequisites](#set-up-prerequisites)
|
||||
2. [Install dependencies](#install-dependencies)
|
||||
3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets)
|
||||
4. [Create the downsampling logic](#create-the-downsampling-logic)
|
||||
5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients)
|
||||
1. [Create the producer](#create-the-producer)
|
||||
2. [Create the consumer](#create-the-consumer)
|
||||
6. [Get the full downsampling code files](#get-the-full-downsampling-code-files)
|
||||
|
||||
## Set up prerequisites
|
||||
|
||||
The process described in this guide requires the following:
|
||||
|
||||
- An InfluxDB cluster with data ready for downsampling.
|
||||
- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a
|
||||
local Apache Kafka or Red Panda installation.
|
||||
- Familiarity with basic Python and Docker concepts.
|
||||
|
||||
## Install dependencies
|
||||
|
||||
Use `pip` to install the following dependencies:
|
||||
|
||||
- `influxdb_client_3`
|
||||
- `quixstreams<2.5`
|
||||
- `pandas`
|
||||
|
||||
```sh
|
||||
pip install influxdb3-python pandas quixstreams<2.5
|
||||
```
|
||||
|
||||
## Prepare InfluxDB buckets
|
||||
|
||||
The downsampling process involves two InfluxDB databases.
|
||||
Each database has a [retention period](/influxdb/clustered/reference/glossary/#retention-period)
|
||||
that specifies how long data persists before it expires and is deleted.
|
||||
By using two databases, you can store unmodified, high-resolution data in a database
|
||||
with a shorter retention period and then downsampled, low-resolution data in a
|
||||
database with a longer retention period.
|
||||
|
||||
Ensure you have a database for each of the following:
|
||||
|
||||
- One to query unmodified data from
|
||||
- The other to write downsampled data to
|
||||
|
||||
For information about creating databases, see
|
||||
[Create a bucket](/influxdb/clustered/admin/databases/create/).
|
||||
|
||||
## Create the downsampling logic
|
||||
|
||||
This process reads the raw data from the input Kafka topic that stores data streamed from InfluxDB,
|
||||
downsamples it, and then sends it to an output topic that is used to write back to InfluxDB.
|
||||
|
||||
1. Use the Quix Streams library's `Application` class to initialize a connection to Apache Kafka.
|
||||
|
||||
```py
|
||||
from quixstreams import Application
|
||||
|
||||
app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest')
|
||||
input_topic = app.topic('raw-data')
|
||||
output_topic = app.topic('downsampled-data')
|
||||
|
||||
# ...
|
||||
```
|
||||
|
||||
2. Configure the Quix Streams built-in windowing function to create a tumbling
|
||||
window that continously downsamples the data into 1-minute buckets.
|
||||
|
||||
```py
|
||||
# ...
|
||||
target_field = 'temperature' # The field that you want to downsample.
|
||||
|
||||
def custom_ts_extractor(value):
|
||||
# ...
|
||||
# truncated for brevity - custom code that defines the 'time_recorded'
|
||||
# field as the timestamp to use for windowing...
|
||||
|
||||
topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
|
||||
|
||||
sdf = (
|
||||
sdf.apply(lambda value: value[target_field]) # Extract temperature values
|
||||
.tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows
|
||||
.mean() # Calculate average temperature
|
||||
.final() # Emit results at window completion
|
||||
)
|
||||
|
||||
sdf = sdf.apply(
|
||||
lambda value: {
|
||||
'time': value['end'], # End of the window
|
||||
'temperature_avg': value['value'], # Average temperature
|
||||
}
|
||||
)
|
||||
|
||||
sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic
|
||||
# ...
|
||||
```
|
||||
|
||||
The results are streamed to the Kafka topic, `downsampled-data`.
|
||||
|
||||
{{% note %}}
|
||||
Note: "sdf" stands for "Streaming Dataframe".
|
||||
{{% /note %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py).
|
||||
|
||||
## Create the producer and consumer clients
|
||||
|
||||
Use the `influxdb_client_3` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Apache Kafka:
|
||||
|
||||
- A **producer** client configured to read from your InfluxDB database with _unmodified_ data and _produce_ that data to Kafka.
|
||||
- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB database.
|
||||
|
||||
### Create the producer client
|
||||
|
||||
Provide the following credentials for the producer:
|
||||
|
||||
- **host**: {{< product-name omit=" Clustered">}} cluster URL
|
||||
_(without the protocol)_
|
||||
- **org**: An arbitrary string. {{< product-name >}} ignores the organization.
|
||||
- **token**: InfluxDB database token with read and write permissions on the databases you
|
||||
want to query and write to.
|
||||
- **database**: InfluxDB database name
|
||||
|
||||
The producer queries for fresh data from InfluxDB at specific intervals. It's configured to look for a specific measurement defined in a variable. It writes the raw data to a Kafka topic called 'raw-data'
|
||||
|
||||
{{% code-placeholders "(RAW|DOWNSAMPLED)_DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from influxdb_client_3 import InfluxDBClient3
|
||||
from quixstreams import Application
|
||||
import pandas
|
||||
|
||||
# Instantiate an InfluxDBClient3 client configured for your unmodified database
|
||||
influxdb_raw = InfluxDBClient3(
|
||||
host='{{< influxdb/host >}}',
|
||||
token='DATABASE_TOKEN',
|
||||
database='RAW_DATABASE_NAME'
|
||||
)
|
||||
|
||||
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
||||
|
||||
# Create a Quix Streams producer application that connects to a local Kafka installation
|
||||
app = Application(
|
||||
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
||||
consumer_group=consumer_group_name,
|
||||
auto_create_topics=True
|
||||
)
|
||||
|
||||
# Override the app variable if the local development env var is set to false or is not present.
|
||||
# This causes Quix Streams to use an application configured for Quix Cloud
|
||||
localdev = os.environ.get('localdev', 'false')
|
||||
|
||||
if localdev == 'false':
|
||||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
topic = app.topic(name='raw-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Query InfluxDB for the raw data and store it in a Dataframe
|
||||
def get_data():
|
||||
# Run in a loop until the main thread is terminated
|
||||
while run:
|
||||
try:
|
||||
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
|
||||
print(f'sending query {myquery}')
|
||||
# Query InfluxDB 3.0 using influxql or sql
|
||||
table = influxdb_raw.query(
|
||||
query=myquery,
|
||||
mode='pandas',
|
||||
language='influxql')
|
||||
|
||||
#... remaining code trunctated for brevity ...
|
||||
|
||||
# Send the data to a Kafka topic for the downsampling process to consumer
|
||||
def main():
|
||||
"""
|
||||
Read data from the Query and publish it to Kafka
|
||||
"""
|
||||
#... remaining code trunctated for brevity ...
|
||||
|
||||
for index, obj in enumerate(records):
|
||||
print(obj) # Obj contains each row in the table includimng temperature
|
||||
# Generate a unique message_key for each row
|
||||
message_key = obj['machineId']
|
||||
logger.info(f'Produced message with key:{message_key}, value:{obj}')
|
||||
|
||||
serialized = topic.serialize(
|
||||
key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())}
|
||||
)
|
||||
|
||||
# publish each row returned in the query to the topic 'raw-data'
|
||||
producer.produce(
|
||||
topic=topic.name,
|
||||
headers=serialized.headers,
|
||||
key=serialized.key,
|
||||
value=serialized.value,
|
||||
)
|
||||
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py).
|
||||
|
||||
### Create the consumer
|
||||
|
||||
As before, provide the following credentials for the consumer:
|
||||
|
||||
- **host**: {{< product-name omit=" Clustered">}} cluster URL
|
||||
_(without the protocol)_
|
||||
- **org**: An arbitrary string. {{< product-name >}} ignores the organization.
|
||||
- **token**: InfluxDB database token with read and write permissions on the databases you
|
||||
want to query and write to.
|
||||
- **database**: InfluxDB database name
|
||||
|
||||
This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB.
|
||||
|
||||
{{% code-placeholders "(RAW|DOWNSAMPLED)_DATABASE_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
|
||||
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
|
||||
influxdb_downsampled = InfluxDBClient3(
|
||||
host='{{< influxdb/host >}}',
|
||||
token='DATABASE_TOKEN',
|
||||
database='DOWNSAMPLED_DATABASE_NAME',
|
||||
org=''
|
||||
)
|
||||
|
||||
# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud
|
||||
|
||||
# Create a Quix Streams consumer application that connects to a local Kafka installation
|
||||
app = Application(
|
||||
broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
|
||||
consumer_group=consumer_group_name,
|
||||
auto_create_topics=True
|
||||
)
|
||||
|
||||
# Override the app variable if the local development env var is set to false or is not present.
|
||||
# This causes Quix Streams to use an application configured for Quix Cloud
|
||||
localdev = os.environ.get('localdev', 'false')
|
||||
|
||||
if localdev == 'false':
|
||||
# Create a Quix platform-specific application instead (broker address is in-built)
|
||||
app = Application(consumer_group=consumer_group_name, auto_create_topics=True)
|
||||
|
||||
input_topic = app.topic('downsampled-data')
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
def send_data_to_influx(message):
|
||||
logger.info(f'Processing message: {message}')
|
||||
try:
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Construct the points dictionary
|
||||
points = {
|
||||
'measurement': measurement_name,
|
||||
'tags': tags,
|
||||
'fields': fields,
|
||||
'time': message['time']
|
||||
}
|
||||
|
||||
influxdb_downsampled.write(record=points, write_precision='ms')
|
||||
|
||||
sdf = app.dataframe(input_topic)
|
||||
sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Sink/main.py).
|
||||
|
||||
## Get the full downsampling code files
|
||||
|
||||
To get the complete set of files referenced in this tutorial, clone the Quix "downsampling template" repository or use an interactive version of this tutorial saved as a Jupyter Notebook.
|
||||
|
||||
### Clone the downsampling template repository
|
||||
|
||||
To clone the downsampling template, enter the following command in the command line:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/quixio/template-influxdbv3-downsampling.git
|
||||
```
|
||||
|
||||
This repository contains the following folders which store different parts of the whole pipeline:
|
||||
|
||||
- **Machine Data to InfluxDB**: A script that generates synthetic machine data
|
||||
and writes it to InfluxDB. This is useful if you dont have your own data yet,
|
||||
or just want to work with test data first.
|
||||
|
||||
- It produces a reading every 250 milliseconds.
|
||||
- This script originally comes from the
|
||||
[InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py)
|
||||
but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
|
||||
|
||||
- **InfluxDB V3 Data Source**: A service that queries for fresh data from
|
||||
InfluxDB at specific intervals. It's configured to look for the measurement
|
||||
produced by the previously-mentioned synthetic machine data generator.
|
||||
It writes the raw data to a Kafka topic called "raw-data".
|
||||
- **Downsampler**: A service that performs a 1-minute tumbling window operation
|
||||
on the data from InfluxDB and emits the mean of the "temperature" reading
|
||||
every minute. It writes the output to a "downsampled-data" Kafka topic.
|
||||
- **InfluxDB V3 Data Sink**: A service that reads from the "downsampled-data"
|
||||
topic and writes the downsample records as points back into InfluxDB.
|
||||
|
||||
### Use the downsampling Jupyter Notebook
|
||||
|
||||
You can use the interactive notebook ["Continuously downsample data using InfluxDB and Quix Streams"](https://github.com/quixio/tutorial-code/edit/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb) to try downsampling code yourself. It is configured to install Apache Kafka within the runtime environment (such as Google Colab).
|
||||
|
||||
Each process is also set up to run in the background so that a running cell does not block the rest of the tutorial.
|
||||
|
||||
<a target="_blank" href="https://colab.research.google.com/github/quixio/tutorial-code/blob/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
|
|
@ -14,4 +14,4 @@ weight: 104
|
|||
|
||||
The following articles walk through common task use cases.
|
||||
|
||||
{{< children >}}
|
||||
{{< children >}}
|
|
@ -0,0 +1,324 @@
|
|||
---
|
||||
title: Downsample data with Quix Streams
|
||||
seotitle: Downsample data with Python and Quix Streams
|
||||
description: >
|
||||
Use Quix Streams to create Python service that downsamples data stored in InfluxDB.
|
||||
menu:
|
||||
influxdb_v2:
|
||||
name: Quix
|
||||
parent: Tools & integrations
|
||||
identifier: influxdb_v2-downsample-quix
|
||||
weight: 122
|
||||
---
|
||||
|
||||
A common practice when processing high volume data is to downsample it before comitting
|
||||
it to InfluxDB to reduce the overall disk usage as data collects over time.
|
||||
|
||||
This guide walks through the process of creating a series of Python services that ingest from an InfluxDB v2 bucket, downsample and publish the data to another InfluxDB v2 bucket.
|
||||
By aggregating data within windows of time, then storing the aggregate values back to InfluxDB, you can reduce
|
||||
disk usage and costs over time.
|
||||
|
||||
The guide uses the InfluxDB v2 and Quix Streams Python client libraries and can be run locally or deployed within Quix Cloud with a free trial. It assumes you have setup a Python project and virtual environment.
|
||||
|
||||
## Pipeline architecture
|
||||
The following diagram illustrates how data is passed between processes as it is downsampled:
|
||||
|
||||
{{< html-diagram/quix-downsample-pipeline "v2" >}}
|
||||
|
||||
{{% note %}}
|
||||
It is usually more efficient to write raw data directly to Kafka rather than
|
||||
writing raw data to InfluxDB first (essentially starting the Quix Streams
|
||||
pipeline with the "influxv2-data" topic). However, this guide assumes that you
|
||||
already have raw data in InfluxDB that you want to downsample.
|
||||
{{% /note %}}
|
||||
|
||||
---
|
||||
|
||||
1. [Set up prerequisites](#set-up-prerequisites)
|
||||
2. [Install dependencies](#install-dependencies)
|
||||
3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets)
|
||||
4. [Create the downsampling logic](#create-the-downsampling-logic)
|
||||
5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients)
|
||||
1. [Create the producer](#create-the-producer)
|
||||
2. [Create the consumer](#create-the-consumer)
|
||||
6. [Run the machine data generator](#run-the-machine-data-generator)
|
||||
7. [Get the full downsampling code files](#get-the-full-downsampling-code-files)
|
||||
|
||||
|
||||
## Set up prerequisites
|
||||
|
||||
The process described in this guide requires the following:
|
||||
|
||||
- InfluxDB v2 with data ready for downsampling. [Use the machine data generator code](#run-the-machine-data-generator) below.
|
||||
- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a
|
||||
local Apache Kafka or Red Panda installation.
|
||||
- Familiarity with basic Python and Docker concepts.
|
||||
|
||||
## Install dependencies
|
||||
|
||||
Use `pip` to install the following dependencies:
|
||||
|
||||
- `influxdb-client` (InfluxDB v2 client library)
|
||||
- `quixstreams<2.5` (Quix Streams client library)
|
||||
- `pandas` (data analysis and manipulation tool)
|
||||
|
||||
|
||||
```sh
|
||||
pip install influxdb-client pandas quixstreams<2.5
|
||||
```
|
||||
|
||||
## Prepare InfluxDB buckets
|
||||
|
||||
The downsampling process involves two InfluxDB buckets.
|
||||
Each bucket has a [retention period](/influxdb/v2/reference/glossary/#retention-period)
|
||||
that specifies how long data persists before it expires and is deleted.
|
||||
By using two buckets, you can store unmodified, high-resolution data in a bucket
|
||||
with a shorter retention period and then downsampled, low-resolution data in a
|
||||
bucket with a longer retention period.
|
||||
|
||||
Ensure you have a bucket for each of the following:
|
||||
|
||||
- One to query unmodified data from your InfluxDB v2 cluster
|
||||
- The other to write downsampled data into
|
||||
|
||||
## Create the downsampling logic
|
||||
|
||||
This process reads the raw data from the input Kafka topic that stores data streamed from the InfluxDB v2 bucket,
|
||||
downsamples it, and then sends it to an output topic which is later written back to another bucket.
|
||||
|
||||
1. Use the Quix Streams library's `Application` class to initialize a connection to the Kafka topics.
|
||||
|
||||
```py
|
||||
from quixstreams import Application
|
||||
|
||||
app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest")
|
||||
input_topic = app.topic("input")
|
||||
output_topic = app.topic("output")
|
||||
# ...
|
||||
```
|
||||
|
||||
2. Configure the Quix Streams built-in windowing function to create a tumbling
|
||||
window that continously downsamples the data into 1-minute buckets.
|
||||
|
||||
```py
|
||||
# ...
|
||||
target_field = "temperature" # The field that you want to downsample.
|
||||
|
||||
def custom_ts_extractor(value):
|
||||
# ...
|
||||
# truncated for brevity - custom code that defines the "time_recorded"
|
||||
# field as the timestamp to use for windowing...
|
||||
|
||||
topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)
|
||||
|
||||
sdf = (
|
||||
sdf.apply(lambda value: value[target_field]) # Extract temperature values
|
||||
.tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows
|
||||
.mean() # Calculate average temperature
|
||||
.final() # Emit results at window completion
|
||||
)
|
||||
|
||||
sdf = sdf.apply(
|
||||
lambda value: {
|
||||
"time": value["end"], # End of the window
|
||||
"temperature_avg": value["value"], # Average temperature
|
||||
}
|
||||
)
|
||||
|
||||
sdf.to_topic(output_topic) # Output results to the "downsampled" topic
|
||||
# ...
|
||||
```
|
||||
|
||||
The results are streamed to the Kafka topic, `downsampled`.
|
||||
|
||||
{{% note %}}
|
||||
Note: "sdf" stands for "Streaming Dataframe".
|
||||
{{% /note %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/Downsampler/main.py).
|
||||
|
||||
## Create the producer and consumer clients
|
||||
|
||||
Use the `influxdb_client` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Kafka:
|
||||
|
||||
- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka.
|
||||
- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket.
|
||||
|
||||
### Create the producer
|
||||
|
||||
Provide the following credentials for the producer:
|
||||
|
||||
- **INFLUXDB_HOST**: [InfluxDB URL](/influxdb/v2/reference/urls/)
|
||||
_(without the protocol)_
|
||||
- **INFLUXDB_ORG**: InfluxDB organization name
|
||||
- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **INFLUXDB_BUCKET**: InfluxDB bucket name
|
||||
|
||||
The producer queries for fresh data from InfluxDB at specific intervals. It writes the raw data to a Kafka topic called `influxv2-data`.
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from quixstreams import Application
|
||||
import influxdb_client
|
||||
# Create a Quix Application
|
||||
app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True)
|
||||
# Define the topic using the "output" environment variable
|
||||
topic = app.topic(os.getenv("output", "influxv2-data"))
|
||||
# Create an InfluxDB v2 client
|
||||
influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
|
||||
org=os.environ["INFLUXDB_ORG"],
|
||||
url=os.environ["INFLUXDB_HOST"])
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Function to fetch data from InfluxDB
|
||||
# It runs in a continuous loop, periodically fetching data based on the interval.
|
||||
def get_data():
|
||||
# Run in a loop until the main thread is terminated
|
||||
while run:
|
||||
try:
|
||||
# Query InfluxDB 2.0 using flux
|
||||
flux_query = f'''
|
||||
from(bucket: "{bucket}")
|
||||
|> range(start: -{interval})
|
||||
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
'''
|
||||
logger.info(f"Sending query: {flux_query}")
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
# Create a pre-configured Producer object.
|
||||
with app.get_producer() as producer:
|
||||
for res in get_data():
|
||||
# Get the data from InfluxDB
|
||||
records = json.loads(res)
|
||||
for index, obj in enumerate(records):
|
||||
logger.info(f"Produced message with key:{message_key}, value:{obj}")
|
||||
# Publish the data to the Kafka topic
|
||||
producer.produce(
|
||||
topic=topic.name,
|
||||
key=message_key,
|
||||
value=obj,
|
||||
)
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/InfluxDB%20V2%20Data%20Source/main.py).
|
||||
|
||||
### Create the consumer
|
||||
|
||||
As before, provide the following credentials for the consumer:
|
||||
|
||||
- **INFLUXDB_HOST**: [InfluxDB URL](/influxdb/v2/reference/urls/)
|
||||
_(without the protocol)_
|
||||
- **INFLUXDB_ORG**: InfluxDB organization name
|
||||
- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you
|
||||
want to query and write to.
|
||||
- **INFLUXDB_BUCKET**: InfluxDB bucket name
|
||||
|
||||
{{% note %}}
|
||||
Note: These will be your InfluxDB v2 credentials.
|
||||
{{% /note %}}
|
||||
|
||||
This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB.
|
||||
|
||||
{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}}
|
||||
```py
|
||||
from quixstreams import Application, State
|
||||
from influxdb_client import InfluxDBClient, Point
|
||||
|
||||
# Create a Quix platform-specific application instead
|
||||
app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False)
|
||||
|
||||
input_topic = app.topic(os.getenv("input", "input-data"))
|
||||
|
||||
# Initialize InfluxDB v2 client
|
||||
influx2_client = InfluxDBClient(url={{< influxdb/host >}},
|
||||
token=API_TOKEN,
|
||||
org=ORG_NAME)
|
||||
|
||||
## ... remaining code trunctated for brevity ...
|
||||
|
||||
def send_data_to_influx(message: dict, state: State):
|
||||
global last_write_time_ns, points_buffer, service_start_state
|
||||
|
||||
try:
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
# Check if it's time to write the batch
|
||||
# 10k records have accumulated or 15 seconds have passed
|
||||
if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9:
|
||||
with influx2_client.write_api() as write_api:
|
||||
logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.")
|
||||
write_api.write(influx_bucket, influx_org, points_buffer)
|
||||
|
||||
# Clear the buffer and update the last write time
|
||||
points_buffer = []
|
||||
last_write_time_ns = int(time() * 1e9)
|
||||
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"{str(datetime.utcnow())}: Write failed")
|
||||
logger.info(e)
|
||||
|
||||
## ... code trunctated for brevity ...
|
||||
|
||||
# We use Quix Streams StreamingDataframe (SDF) to handle every message
|
||||
# in the Kafka topic by writing it to InfluxDB
|
||||
sdf = app.dataframe(input_topic)
|
||||
sdf = sdf.update(send_data_to_influx, stateful=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting application")
|
||||
app.run(sdf)
|
||||
|
||||
```
|
||||
{{% /code-placeholders %}}
|
||||
|
||||
You can find the full code for this process in the
|
||||
[Quix GitHub repository](https://github.com/quixio/quix-samples/tree/develop/python/destinations/influxdb_2).
|
||||
|
||||
## Run the Machine data generator
|
||||
|
||||
Now it's time to run the machine data generator code which will populate your source
|
||||
bucket with data which will be read by the [producer](#create-the-consumer).
|
||||
|
||||
Run `main.py` from the `Machine data to InfluxDB` folder in the GitHub repository.
|
||||
|
||||
## Get the full downsampling code files
|
||||
|
||||
To get the complete set of files referenced in this tutorial, clone the Quix "downsampling" repository.
|
||||
|
||||
### Clone the downsampling template repository
|
||||
|
||||
To clone the downsampling template, enter the following command in the command line:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git
|
||||
```
|
||||
|
||||
This repository contains the following folders which store different parts of the whole pipeline:
|
||||
|
||||
- **Machine Data to InfluxDB**: A script that generates synthetic machine data
|
||||
and writes it to InfluxDB. This is useful if you dont have your own data yet,
|
||||
or just want to work with test data first.
|
||||
|
||||
- It produces a reading every 250 milliseconds.
|
||||
- This script originally comes from the
|
||||
[InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py)
|
||||
but has been adapted to write directly to InfluxDB rather than using an MQTT broker.
|
||||
|
||||
- **InfluxDB v2 Data Source**: A service that queries for fresh data from
|
||||
InfluxDB at specific intervals. It's configured to look for the measurement
|
||||
produced by the previously-mentioned synthetic machine data generator.
|
||||
It writes the raw data to a Kafka topic called "influxv2-data".
|
||||
- **Downsampler**: A service that performs a 1-minute tumbling window operation
|
||||
on the data from InfluxDB and emits the mean of the "temperature" reading
|
||||
every minute. It writes the output to a "downsampled" Kafka topic.
|
||||
- **InfluxDB v2 Data Sink**: A service that reads from the "downsampled"
|
||||
topic and writes the downsampled records as points back into InfluxDB.
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
{{- $version := .Get 0 | default "v3" -}}
|
||||
<div id="quix-downsample-pipeline">
|
||||
<div class="logo-row top">
|
||||
<div class="influxdb-connector">
|
||||
|
@ -7,7 +8,7 @@
|
|||
</div>
|
||||
<div class="quix-stream-container">
|
||||
<div id="source-producer" class="quix-stream-component">
|
||||
<p>InfluxDB v2 Source Producer</p>
|
||||
<p>InfluxDB {{ $version }} Source Producer</p>
|
||||
</div>
|
||||
<div class="arrow"></div>
|
||||
<div class="kafka-toggle" topic="raw-data">
|
||||
|
@ -23,7 +24,7 @@
|
|||
</div>
|
||||
<div class="arrow"></div>
|
||||
<div id="sink-consumer" class="quix-stream-component">
|
||||
<p>InfluxDB v3 Sink Consumer</p>
|
||||
<p>InfluxDB {{ $version }} Sink Consumer</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="logo-row bottom">
|
||||
|
|
Loading…
Reference in New Issue