diff --git a/content/influxdb/cloud-dedicated/process-data/downsample/_index.md b/content/influxdb/cloud-dedicated/process-data/downsample/_index.md new file mode 100644 index 000000000..6656a34c4 --- /dev/null +++ b/content/influxdb/cloud-dedicated/process-data/downsample/_index.md @@ -0,0 +1,16 @@ +--- +title: Downsample data stored in InfluxDB +description: > + Learn about different methods for querying and downsampling time series data + stored in InfluxDB. +weight: 101 +menu: + influxdb_cloud_dedicated: + name: Downsample data + parent: Process & visualize data +--- + +Learn about different methods for querying and downsampling time series data +stored in InfluxDB. + +{{< children >}} \ No newline at end of file diff --git a/content/influxdb/cloud-dedicated/process-data/downsample.md b/content/influxdb/cloud-dedicated/process-data/downsample/downsample-client-libraries.md similarity index 97% rename from content/influxdb/cloud-dedicated/process-data/downsample.md rename to content/influxdb/cloud-dedicated/process-data/downsample/downsample-client-libraries.md index 213438cfa..f710ae06f 100644 --- a/content/influxdb/cloud-dedicated/process-data/downsample.md +++ b/content/influxdb/cloud-dedicated/process-data/downsample/downsample-client-libraries.md @@ -1,13 +1,13 @@ --- -title: Downsample data stored in InfluxDB +title: Use client libraries to downsample data description: > - Query and downsample time series data stored in InfluxDB and write the - downsampled data back to InfluxDB. + Use InfluxDB client libraries to query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB. menu: influxdb_cloud_dedicated: - name: Downsample data - parent: Process & visualize data -weight: 101 + name: Use client libraries + parent: Downsample data + identifier: influxdb-dedicated-downsample-client-libraries +weight: 201 related: - /influxdb/cloud-dedicated/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) --- diff --git a/content/influxdb/cloud-serverless/process-data/downsample/downsample-quix.md b/content/influxdb/cloud-dedicated/process-data/downsample/downsample-quix.md similarity index 95% rename from content/influxdb/cloud-serverless/process-data/downsample/downsample-quix.md rename to content/influxdb/cloud-dedicated/process-data/downsample/downsample-quix.md index a84d53d63..f4ec25261 100644 --- a/content/influxdb/cloud-serverless/process-data/downsample/downsample-quix.md +++ b/content/influxdb/cloud-dedicated/process-data/downsample/downsample-quix.md @@ -5,12 +5,11 @@ description: > data stored in InfluxDB and written to Kafka at regular intervals, continuously downsample it, and then write the downsampled data back to InfluxDB. menu: - influxdb_cloud_serverless: + influxdb_cloud_dedicated: name: Use Quix parent: Downsample data -weight: 202 -related: - - /influxdb/cloud-serverless/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) + identifier: influxdb-dedicated-downsample-quix +weight: 102 --- Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series @@ -100,11 +99,10 @@ downsamples it, and then sends it to an output topic that is used to write back ```py from quixstreams import Application - from quixstreams.models.serializers.quix import JSONDeserializer, JSONSerializer app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest') - input_topic = app.topic('raw-data', value_deserializer=JSONDeserializer()) - output_topic = app.topic('downsampled-data', value_serializer=JSONSerializer()) + input_topic = app.topic('raw-data') + output_topic = app.topic('downsampled-data') # ... ``` @@ -174,7 +172,6 @@ The producer queries for fresh data from InfluxDB at specific intervals. It's co ```py from influxdb_client_3 import InfluxDBClient3 from quixstreams import Application -from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext import pandas # Instantiate an InfluxDBClient3 client configured for your unmodified bucket @@ -201,8 +198,7 @@ if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) -serializer = JSONSerializer() -topic = app.topic(name='raw-data', value_serializer='json') +topic = app.topic(name='raw-data') ## ... remaining code trunctated for brevity ... @@ -293,7 +289,7 @@ if localdev == 'false': # Create a Quix platform-specific application instead (broker address is in-built) app = Application(consumer_group=consumer_group_name, auto_create_topics=True) -input_topic = app.topic('downsampled-data', value_deserializer=JSONDeserializer()) +input_topic = app.topic('downsampled-data') ## ... remaining code trunctated for brevity ... diff --git a/content/influxdb/cloud-serverless/process-data/downsample/quix.md b/content/influxdb/cloud-serverless/process-data/downsample/quix.md new file mode 100644 index 000000000..00e3e28fe --- /dev/null +++ b/content/influxdb/cloud-serverless/process-data/downsample/quix.md @@ -0,0 +1,363 @@ +--- +title: Use Quix Streams to downsample data +description: > + Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series + data stored in InfluxDB and written to Kafka at regular intervals, continuously + downsample it, and then write the downsampled data back to InfluxDB. +menu: + influxdb_cloud_serverless: + name: Use Quix + parent: Downsample data + identifier: downsample-quix +weight: 202 +related: + - /influxdb/cloud-serverless/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) +--- + +Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series +data stored in InfluxDB and written to Kafka at regular intervals, continuously +downsample it, and then write the downsampled data back to InfluxDB. +Quix Streams is an open source Python library for building containerized stream +processing applications with Apache Kafka. It is designed to run as a service +that continuously processes a stream of data while streaming the results to a +Kafka topic. You can try it locally, with a local Kafka installation, or run it +in [Quix Cloud](https://quix.io/) with a free trial. + +This guide uses [Python](https://www.python.org/) and the +[InfluxDB v3 Python client library](https://github.com/InfluxCommunity/influxdb3-python), +but you can use your runtime of choice and any of the available +[InfluxDB v3 client libraries](/influxdb/cloud-serverless/reference/client-libraries/v3/). +This guide also assumes you have already +[setup your Python project and virtual environment](/influxdb/cloud-serverless/query-data/execute-queries/client-libraries/python/#create-a-python-virtual-environment). + +## Pipeline architecture + +The following diagram illustrates how data is passed between processes as it is downsampled: + +{{< html-diagram/quix-downsample-pipeline >}} + +{{% note %}} +It is usually more efficient to write raw data directly to Kafka rather than +writing raw data to InfluxDB first (essentially starting the Quix Streams +pipeline with the "raw-data" topic). However, this guide assumes that you +already have raw data in InfluxDB that you want to downsample. +{{% /note %}} + +--- + +1. [Set up prerequisites](#set-up-prerequisites) +2. [Install dependencies](#install-dependencies) +3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets) +4. [Create the downsampling logic](#create-the-downsampling-logic) +5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients) + 1. [Create the producer](#create-the-producer) + 2. [Create the consumer](#create-the-consumer) +6. [Get the full downsampling code files](#get-the-full-downsampling-code-files) + +## Set up prerequisites + +The process described in this guide requires the following: + +- An InfluxDB Cloud Serverless account with data ready for downsampling. +- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a + local Apache Kafka or Red Panda installation. +- Familiarity with basic Python and Docker concepts. + +## Install dependencies + +Use `pip` to install the following dependencies: + +- `influxdb_client_3` +- `quixstreams<2.5` +- `pandas` + +```sh +pip install influxdb3-python pandas quixstreams<2.5 +``` + +## Prepare InfluxDB buckets + +The downsampling process involves two InfluxDB buckets. +Each bucket has a [retention period](/influxdb/cloud-serverless/reference/glossary/#retention-period) +that specifies how long data persists before it expires and is deleted. +By using two buckets, you can store unmodified, high-resolution data in a bucket +with a shorter retention period and then downsampled, low-resolution data in a +bucket with a longer retention period. + +Ensure you have a bucket for each of the following: + +- One to query unmodified data from +- The other to write downsampled data to + +For information about creating buckets, see +[Create a bucket](/influxdb/cloud-serverless/admin/buckets/create-bucket/). + +## Create the downsampling logic + +This process reads the raw data from the input Kafka topic that stores data streamed from InfluxDB, +downsamples it, and then sends it to an output topic that is used to write back to InfluxDB. + +1. Use the Quix Streams library's `Application` class to initialize a connection to Apache Kafka. + + ```py + from quixstreams import Application + + app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest') + input_topic = app.topic('raw-data') + output_topic = app.topic('downsampled-data') + + # ... + ``` + +2. Configure the Quix Streams built-in windowing function to create a tumbling + window that continously downsamples the data into 1-minute buckets. + + ```py + # ... + target_field = 'temperature' # The field that you want to downsample. + + def custom_ts_extractor(value): + # ... + # truncated for brevity - custom code that defines the 'time_recorded' + # field as the timestamp to use for windowing... + + topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) + + sdf = ( + sdf.apply(lambda value: value[target_field]) # Extract temperature values + .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows + .mean() # Calculate average temperature + .final() # Emit results at window completion + ) + + sdf = sdf.apply( + lambda value: { + 'time': value['end'], # End of the window + 'temperature_avg': value['value'], # Average temperature + } + ) + + sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic + # ... + ``` + +The results are streamed to the Kafka topic, `downsampled-data`. + +{{% note %}} +Note: "sdf" stands for "Streaming Dataframe". +{{% /note %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py). + +## Create the producer and consumer clients + +Use the `influxdb_client_3` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Apache Kafka: + +- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka. +- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket. + +### Create the producer client + +Provide the following credentials for the producer: + +- **host**: [{{< product-name >}} region URL](/influxdb/cloud-serverless/reference/regions) + _(without the protocol)_ +- **org**: InfluxDB organization name +- **token**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **database**: InfluxDB bucket name + +The producer queries for fresh data from InfluxDB at specific intervals. It's configured to look for a specific measurement defined in a variable. It writes the raw data to a Kafka topic called 'raw-data' + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +from influxdb_client_3 import InfluxDBClient3 +from quixstreams import Application +import pandas + +# Instantiate an InfluxDBClient3 client configured for your unmodified bucket +influxdb_raw = InfluxDBClient3( + host='{{< influxdb/host >}}', + token='API_TOKEN', + database='RAW_BUCKET_NAME' +) + +# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud + +# Create a Quix Streams producer application that connects to a local Kafka installation +app = Application( + broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), + consumer_group=consumer_group_name, + auto_create_topics=True +) + +# Override the app variable if the local development env var is set to false or is not present. +# This causes Quix Streams to use an application configured for Quix Cloud +localdev = os.environ.get('localdev', 'false') + +if localdev == 'false': + # Create a Quix platform-specific application instead (broker address is in-built) + app = Application(consumer_group=consumer_group_name, auto_create_topics=True) + +topic = app.topic(name='raw-data') + +## ... remaining code trunctated for brevity ... + +# Query InfluxDB for the raw data and store it in a Dataframe +def get_data(): + # Run in a loop until the main thread is terminated + while run: + try: + myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}' + print(f'sending query {myquery}') + # Query InfluxDB 3.0 using influxql or sql + table = influxdb_raw.query( + query=myquery, + mode='pandas', + language='influxql') + +#... remaining code trunctated for brevity ... + +# Send the data to a Kafka topic for the downsampling process to consumer +def main(): + """ + Read data from the Query and publish it to Kafka + """ + #... remaining code trunctated for brevity ... + + for index, obj in enumerate(records): + print(obj) # Obj contains each row in the table includimng temperature + # Generate a unique message_key for each row + message_key = obj['machineId'] + logger.info(f'Produced message with key:{message_key}, value:{obj}') + + serialized = topic.serialize( + key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())} + ) + + # publish each row returned in the query to the topic 'raw-data' + producer.produce( + topic=topic.name, + headers=serialized.headers, + key=serialized.key, + value=serialized.value, + ) + +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py). + +### Create the consumer + +As before, provide the following credentials for the consumer: + +- **host**: [{{< product-name >}} region URL](/influxdb/cloud-serverless/reference/regions) + _(without the protocol)_ +- **org**: InfluxDB organization name +- **token**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **database**: InfluxDB bucket name + +This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB. + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +# Instantiate an InfluxDBClient3 client configured for your downsampled database. +# When writing, the org= argument is required by the client (but ignored by InfluxDB). +influxdb_downsampled = InfluxDBClient3( + host='{{< influxdb/host >}}', + token='API_TOKEN', + database='DOWNSAMPLED_BUCKET_NAME', + org='' +) + +# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud + +# Create a Quix Streams consumer application that connects to a local Kafka installation +app = Application( + broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), + consumer_group=consumer_group_name, + auto_create_topics=True +) + +# Override the app variable if the local development env var is set to false or is not present. +# This causes Quix Streams to use an application configured for Quix Cloud +localdev = os.environ.get('localdev', 'false') + +if localdev == 'false': + # Create a Quix platform-specific application instead (broker address is in-built) + app = Application(consumer_group=consumer_group_name, auto_create_topics=True) + +input_topic = app.topic('downsampled-data') + +## ... remaining code trunctated for brevity ... + +def send_data_to_influx(message): + logger.info(f'Processing message: {message}') + try: + + ## ... remaining code trunctated for brevity ... + + # Construct the points dictionary + points = { + 'measurement': measurement_name, + 'tags': tags, + 'fields': fields, + 'time': message['time'] + } + + influxdb_downsampled.write(record=points, write_precision='ms') + +sdf = app.dataframe(input_topic) +sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream + +## ... remaining code trunctated for brevity ... +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Sink/main.py). + +## Get the full downsampling code files + +To get the complete set of files referenced in this tutorial, clone the Quix "downsampling template" repository or use an interactive version of this tutorial saved as a Jupyter Notebook. + +### Clone the downsampling template repository + +To clone the downsampling template, enter the following command in the command line: + +```sh +git clone https://github.com/quixio/template-influxdbv3-downsampling.git +``` + +This repository contains the following folders which store different parts of the whole pipeline: + +- **Machine Data to InfluxDB**: A script that generates synthetic machine data + and writes it to InfluxDB. This is useful if you dont have your own data yet, + or just want to work with test data first. + + - It produces a reading every 250 milliseconds. + - This script originally comes from the + [InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py) + but has been adapted to write directly to InfluxDB rather than using an MQTT broker. + +- **InfluxDB V3 Data Source**: A service that queries for fresh data from + InfluxDB at specific intervals. It's configured to look for the measurement + produced by the previously-mentioned synthetic machine data generator. + It writes the raw data to a Kafka topic called "raw-data". +- **Downsampler**: A service that performs a 1-minute tumbling window operation + on the data from InfluxDB and emits the mean of the "temperature" reading + every minute. It writes the output to a "downsampled-data" Kafka topic. +- **InfluxDB V3 Data Sink**: A service that reads from the "downsampled-data" + topic and writes the downsample records as points back into InfluxDB. + +### Use the downsampling Jupyter Notebook + +You can use the interactive notebook ["Continuously downsample data using InfluxDB and Quix Streams"](https://github.com/quixio/tutorial-code/edit/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb) to try downsampling code yourself. It is configured to install Apache Kafka within the runtime environment (such as Google Colab). + +Each process is also set up to run in the background so that a running cell does not block the rest of the tutorial. + +Open In Colab diff --git a/content/influxdb/cloud/process-data/common-tasks/downsample-data-quix.md b/content/influxdb/cloud/process-data/common-tasks/downsample-data-quix.md new file mode 100644 index 000000000..5ed3f47d1 --- /dev/null +++ b/content/influxdb/cloud/process-data/common-tasks/downsample-data-quix.md @@ -0,0 +1,332 @@ +--- +title: Downsample data with Python and Quix Streams +description: > + Use Quix Streams to create a Python service that downsamples data stored in InfluxDB. +menu: + influxdb_cloud: + name: Downsample with Quix + parent: Common tasks + identifier: influxdb_cloud-downsample-quix +weight: 202 +--- + +Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series +data stored in InfluxDB and written to Kafka at regular intervals, continuously +downsample it, and then write the downsampled data back to InfluxDB. +Quix Streams is an open source Python library for building containerized stream +processing applications with Apache Kafka. It is designed to run as a service +that continuously processes a stream of data while streaming the results to a +Kafka topic. You can try it locally, with a local Kafka installation, or run it +in [Quix Cloud](https://quix.io/) with a free trial. +A common practice when processing high volume data is to downsample it before comitting +it to InfluxDB to reduce the overall disk usage as data collects over time. + +This guide walks through the process of creating a series of Python services that ingest +from an InfluxDB v2 bucket and then downsample and publish the data to another InfluxDB v2 bucket. +By aggregating data within windows of time and then storing the aggregate values back to InfluxDB, you can reduce +disk usage and costs over time. + +The guide uses the InfluxDB v2 and Quix Streams Python client libraries and can be run locally or deployed within Quix Cloud with a free trial. It assumes you have setup a Python project and virtual environment. + +## Pipeline architecture +The following diagram illustrates how data is passed between processes as it is downsampled: + +{{< html-diagram/quix-downsample-pipeline "v2" >}} + +{{% note %}} +It is usually more efficient to write raw data directly to Kafka rather than +writing raw data to InfluxDB first (essentially starting the Quix Streams +pipeline with the "influxv2-data" topic). However, this guide assumes that you +already have raw data in InfluxDB that you want to downsample. +{{% /note %}} + +--- + +1. [Set up prerequisites](#set-up-prerequisites) +2. [Install dependencies](#install-dependencies) +3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets) +4. [Create the downsampling logic](#create-the-downsampling-logic) +5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients) + 1. [Create the producer](#create-the-producer) + 2. [Create the consumer](#create-the-consumer) +6. [Run the machine data generator](#run-the-machine-data-generator) +7. [Get the full downsampling code files](#get-the-full-downsampling-code-files) + + +## Set up prerequisites + +The process described in this guide requires the following: + +- InfluxDB v2 with data ready for downsampling. [Use the machine data generator code](#run-the-machine-data-generator) below. +- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a + local Apache Kafka or Red Panda installation. +- Familiarity with basic Python and Docker concepts. + +## Install dependencies + +Use `pip` to install the following dependencies: + +- `influxdb-client` (InfluxDB v2 client library) +- `quixstreams<2.5` (Quixstreams client library) +- `pandas` (data analysis and manipulation tool) + + +```sh +pip install influxdb-client pandas quixstreams<2.5 +``` + +## Prepare InfluxDB buckets + +The downsampling process involves two InfluxDB buckets. +Each bucket has a [retention period](/influxdb/cloud/reference/glossary/#retention-period) +that specifies how long data persists before it expires and is deleted. +By using two buckets, you can store unmodified, high-resolution data in a bucket +with a shorter retention period and then downsampled, low-resolution data in a +bucket with a longer retention period. + +Ensure you have a bucket for each of the following: + +- One to query unmodified data from your InfluxDB v2 cluster +- The other to write downsampled data into + +## Create the downsampling logic + +This process reads the raw data from the input Kafka topic that stores data streamed from the InfluxDB v2 bucket, +downsamples it, and then sends it to an output topic which is later written back to another bucket. + +1. Use the Quix Streams library's `Application` class to initialize a connection to the Kafka topics. + + ```py + from quixstreams import Application + + app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest") + input_topic = app.topic("input") + output_topic = app.topic("output") + # ... + ``` + +2. Configure the Quix Streams built-in windowing function to create a tumbling + window that continously downsamples the data into 1-minute buckets. + + ```py + # ... + target_field = "temperature" # The field that you want to downsample. + + def custom_ts_extractor(value): + # ... + # truncated for brevity - custom code that defines the "time_recorded" + # field as the timestamp to use for windowing... + + topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) + + sdf = ( + sdf.apply(lambda value: value[target_field]) # Extract temperature values + .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows + .mean() # Calculate average temperature + .final() # Emit results at window completion + ) + + sdf = sdf.apply( + lambda value: { + "time": value["end"], # End of the window + "temperature_avg": value["value"], # Average temperature + } + ) + + sdf.to_topic(output_topic) # Output results to the "downsampled" topic + # ... + ``` + +The results are streamed to the Kafka topic, `downsampled`. + +{{% note %}} +Note: "sdf" stands for "Streaming Dataframe". +{{% /note %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/Downsampler/main.py). + +## Create the producer and consumer clients + +Use the `influxdb_client` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Kafka: + +- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka. +- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket. + +### Create the producer + +Provide the following credentials for the producer: + +- **INFLUXDB_HOST**: [{{< product-name >}} region URL](/influxdb/cloud/reference/regions) + _(without the protocol)_ +- **INFLUXDB_ORG**: InfluxDB organization name +- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **INFLUXDB_BUCKET**: InfluxDB bucket name + +The producer queries for fresh data from InfluxDB at specific intervals. It writes the raw data to a Kafka topic called `influxv2-data`. + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +from quixstreams import Application +import influxdb_client +# Create a Quix Application +app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True) +# Define the topic using the "output" environment variable +topic = app.topic(os.getenv("output", "influxv2-data")) +# Create an InfluxDB v2 client +influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"], + org=os.environ["INFLUXDB_ORG"], + url=os.environ["INFLUXDB_HOST"]) + +## ... remaining code trunctated for brevity ... + +# Function to fetch data from InfluxDB +# It runs in a continuous loop, periodically fetching data based on the interval. +def get_data(): + # Run in a loop until the main thread is terminated + while run: + try: + # Query InfluxDB 2.0 using flux + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -{interval}) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + logger.info(f"Sending query: {flux_query}") + +## ... remaining code trunctated for brevity ... + +# Create a pre-configured Producer object. +with app.get_producer() as producer: + for res in get_data(): + # Get the data from InfluxDB + records = json.loads(res) + for index, obj in enumerate(records): + logger.info(f"Produced message with key:{message_key}, value:{obj}") + # Publish the data to the Kafka topic + producer.produce( + topic=topic.name, + key=message_key, + value=obj, + ) +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/InfluxDB%20V2%20Data%20Source/main.py). + +### Create the consumer + +As before, provide the following credentials for the consumer: + +- **INFLUXDB_HOST**: [{{< product-name >}} region URL](/influxdb/cloud/reference/regions) + _(without the protocol)_ +- **INFLUXDB_ORG**: InfluxDB organization name +- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **INFLUXDB_BUCKET**: InfluxDB bucket name + +{{% note %}} +Note: These will be your InfluxDB v2 credentials. +{{% /note %}} + +This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB. + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +from quixstreams import Application, State +from influxdb_client import InfluxDBClient, Point + +# Create a Quix platform-specific application instead +app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False) + +input_topic = app.topic(os.getenv("input", "input-data")) + +# Initialize InfluxDB v2 client +influx2_client = InfluxDBClient(url=influx_host, + token=influx_token, + org=influx_org) + +## ... remaining code trunctated for brevity ... + +def send_data_to_influx(message: dict, state: State): + global last_write_time_ns, points_buffer, service_start_state + + try: + ## ... code trunctated for brevity ... + + # Check if it's time to write the batch + # 10k records have accumulated or 15 seconds have passed + if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9: + with influx2_client.write_api() as write_api: + logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.") + write_api.write(influx_bucket, influx_org, points_buffer) + + # Clear the buffer and update the last write time + points_buffer = [] + last_write_time_ns = int(time() * 1e9) + + ## ... code trunctated for brevity ... + + except Exception as e: + logger.info(f"{str(datetime.utcnow())}: Write failed") + logger.info(e) + +## ... code trunctated for brevity ... + +# We use Quix Streams StreamingDataframe (SDF) to handle every message +# in the Kafka topic by writing it to InfluxDB +sdf = app.dataframe(input_topic) +sdf = sdf.update(send_data_to_influx, stateful=True) + +if __name__ == "__main__": + logger.info("Starting application") + app.run(sdf) + +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/quix-samples/tree/develop/python/destinations/influxdb_2). + +## Run the Machine data generator + +Now it's time to run the machine data generator code which will populate your source +bucket with data which will be read by the [producer](#create-the-consumer). + +Run `main.py` from the `Machine data to InfluxDB` folder in the GitHub repository. + +## Get the full downsampling code files + +To get the complete set of files referenced in this tutorial, clone the Quix "downsampling" repository. + +### Clone the downsampling template repository + +To clone the downsampling template, enter the following command in the command line: + +```sh +git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git +``` + +This repository contains the following folders which store different parts of the whole pipeline: + +- **Machine Data to InfluxDB**: A script that generates synthetic machine data + and writes it to InfluxDB. This is useful if you dont have your own data yet, + or just want to work with test data first. + + - It produces a reading every 250 milliseconds. + - This script originally comes from the + [InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py) + but has been adapted to write directly to InfluxDB rather than using an MQTT broker. + +- **InfluxDB v2 Data Source**: A service that queries for fresh data from + InfluxDB at specific intervals. It's configured to look for the measurement + produced by the previously-mentioned synthetic machine data generator. + It writes the raw data to a Kafka topic called "influxv2-data". +- **Downsampler**: A service that performs a 1-minute tumbling window operation + on the data from InfluxDB and emits the mean of the "temperature" reading + every minute. It writes the output to a "downsampled" Kafka topic. +- **InfluxDB v2 Data Sink**: A service that reads from the "downsampled" + topic and writes the downsampled records as points back into InfluxDB. + diff --git a/content/influxdb/cloud/process-data/common-tasks/downsample-data.md b/content/influxdb/cloud/process-data/common-tasks/downsample-data.md index 12958f706..a81bbbe6f 100644 --- a/content/influxdb/cloud/process-data/common-tasks/downsample-data.md +++ b/content/influxdb/cloud/process-data/common-tasks/downsample-data.md @@ -6,7 +6,7 @@ description: > in previous versions of InfluxDB. menu: influxdb_cloud: - name: Downsample data + name: Downsample with InfluxDB parent: Common tasks weight: 201 influxdb/cloud/tags: [tasks] @@ -32,7 +32,7 @@ A separate bucket where aggregated, downsampled data is stored. To downsample data, it must be aggregated in some way. What specific method of aggregation you use depends on your specific use case, but examples include mean, median, top, bottom, etc. -View [Flux's aggregate functions](/flux/v0/stdlib/universe/) +View [Flux's aggregate functions](/flux/v0/function-types/#aggregates) for more information and ideas. ## Example downsampling task script @@ -48,7 +48,6 @@ The example task script below is a very basic form of data downsampling that doe ```js // Task Options -// Task Options option task = {name: "cq-mem-data-1w", every: 1w} // Defines a data source @@ -72,7 +71,6 @@ Once your task is ready, see [Create a task](/influxdb/cloud/process-data/manage ## Things to consider - If there is a chance that data may arrive late, specify an `offset` in your task options long enough to account for late-data. -- If running a task against a bucket with a finite retention period, do not schedule - tasks to run too closely to the end of the retention period. - Always provide a "cushion" for downsampling tasks to complete before the data - is dropped by the retention period. +- If running a task against a bucket with a finite retention period, + schedule tasks to run prior to the end of the retention period to let + downsampling tasks complete before data outside of the retention period is dropped. diff --git a/content/influxdb/clustered/process-data/downsample/_index.md b/content/influxdb/clustered/process-data/downsample/_index.md new file mode 100644 index 000000000..2bd68018e --- /dev/null +++ b/content/influxdb/clustered/process-data/downsample/_index.md @@ -0,0 +1,18 @@ +--- +title: Downsample data stored in InfluxDB +description: > + Learn about different methods for querying and downsampling time series data + stored in InfluxDB. +menu: + influxdb_clustered: + name: Downsample data + parent: Process & visualize data +weight: 101 +related: + - /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) +--- + +Learn about different methods for querying and downsampling time series data +stored in InfluxDB. + +{{< children >}} \ No newline at end of file diff --git a/content/influxdb/clustered/process-data/downsample.md b/content/influxdb/clustered/process-data/downsample/downsample-client-libraries.md similarity index 97% rename from content/influxdb/clustered/process-data/downsample.md rename to content/influxdb/clustered/process-data/downsample/downsample-client-libraries.md index cf0d723f4..72ded326a 100644 --- a/content/influxdb/clustered/process-data/downsample.md +++ b/content/influxdb/clustered/process-data/downsample/downsample-client-libraries.md @@ -1,13 +1,13 @@ --- -title: Downsample data stored in InfluxDB +title: Use client libraries to downsample data description: > - Query and downsample time series data stored in InfluxDB and write the - downsampled data back to InfluxDB. + Use InfluxDB client libraries to query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB. menu: influxdb_clustered: - name: Downsample data - parent: Process & visualize data -weight: 101 + name: Use client libraries + parent: Downsample data + identifier: downsample-influx-client-libraries +weight: 201 related: - /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) --- diff --git a/content/influxdb/clustered/process-data/downsample/quix.md b/content/influxdb/clustered/process-data/downsample/quix.md new file mode 100644 index 000000000..20ef5935c --- /dev/null +++ b/content/influxdb/clustered/process-data/downsample/quix.md @@ -0,0 +1,363 @@ +--- +title: Use Quix Streams to downsample data +description: > + Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series + data stored in InfluxDB and written to Kafka at regular intervals, continuously + downsample it, and then write the downsampled data back to InfluxDB. +menu: + influxdb_clustered: + name: Use Quix Streams + parent: Downsample data + identifier: downsample-quix +weight: 202 +related: + - /influxdb/clustered/query-data/sql/aggregate-select/, Aggregate or apply selector functions to data (SQL) +--- + +Use [Quix Streams](https://github.com/quixio/quix-streams) to query time series +data stored in InfluxDB and written to Kafka at regular intervals, continuously +downsample it, and then write the downsampled data back to InfluxDB. +Quix Streams is an open source Python library for building containerized stream +processing applications with Apache Kafka. It is designed to run as a service +that continuously processes a stream of data while streaming the results to a +Kafka topic. You can try it locally, with a local Kafka installation, or run it +in [Quix Cloud](https://quix.io/) with a free trial. + +This guide uses [Python](https://www.python.org/) and the +[InfluxDB v3 Python client library](https://github.com/InfluxCommunity/influxdb3-python), +but you can use your runtime of choice and any of the available +[InfluxDB v3 client libraries](/influxdb/cloud-serverless/reference/client-libraries/v3/). +This guide also assumes you have already +[setup your Python project and virtual environment](/influxdb/cloud-serverless/query-data/execute-queries/client-libraries/python/#create-a-python-virtual-environment). + +## Pipeline architecture + +The following diagram illustrates how data is passed between processes as it is downsampled: + +{{< html-diagram/quix-downsample-pipeline >}} + +{{% note %}} +It is usually more efficient to write raw data directly to Kafka rather than +writing raw data to InfluxDB first (essentially starting the Quix Streams +pipeline with the "raw-data" topic). However, this guide assumes that you +already have raw data in InfluxDB that you want to downsample. +{{% /note %}} + +--- + +1. [Set up prerequisites](#set-up-prerequisites) +2. [Install dependencies](#install-dependencies) +3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets) +4. [Create the downsampling logic](#create-the-downsampling-logic) +5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients) + 1. [Create the producer](#create-the-producer) + 2. [Create the consumer](#create-the-consumer) +6. [Get the full downsampling code files](#get-the-full-downsampling-code-files) + +## Set up prerequisites + +The process described in this guide requires the following: + +- An InfluxDB cluster with data ready for downsampling. +- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a + local Apache Kafka or Red Panda installation. +- Familiarity with basic Python and Docker concepts. + +## Install dependencies + +Use `pip` to install the following dependencies: + +- `influxdb_client_3` +- `quixstreams<2.5` +- `pandas` + +```sh +pip install influxdb3-python pandas quixstreams<2.5 +``` + +## Prepare InfluxDB buckets + +The downsampling process involves two InfluxDB databases. +Each database has a [retention period](/influxdb/clustered/reference/glossary/#retention-period) +that specifies how long data persists before it expires and is deleted. +By using two databases, you can store unmodified, high-resolution data in a database +with a shorter retention period and then downsampled, low-resolution data in a +database with a longer retention period. + +Ensure you have a database for each of the following: + +- One to query unmodified data from +- The other to write downsampled data to + +For information about creating databases, see +[Create a bucket](/influxdb/clustered/admin/databases/create/). + +## Create the downsampling logic + +This process reads the raw data from the input Kafka topic that stores data streamed from InfluxDB, +downsamples it, and then sends it to an output topic that is used to write back to InfluxDB. + +1. Use the Quix Streams library's `Application` class to initialize a connection to Apache Kafka. + + ```py + from quixstreams import Application + + app = Application(consumer_group='downsampling-process', auto_offset_reset='earliest') + input_topic = app.topic('raw-data') + output_topic = app.topic('downsampled-data') + + # ... + ``` + +2. Configure the Quix Streams built-in windowing function to create a tumbling + window that continously downsamples the data into 1-minute buckets. + + ```py + # ... + target_field = 'temperature' # The field that you want to downsample. + + def custom_ts_extractor(value): + # ... + # truncated for brevity - custom code that defines the 'time_recorded' + # field as the timestamp to use for windowing... + + topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) + + sdf = ( + sdf.apply(lambda value: value[target_field]) # Extract temperature values + .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows + .mean() # Calculate average temperature + .final() # Emit results at window completion + ) + + sdf = sdf.apply( + lambda value: { + 'time': value['end'], # End of the window + 'temperature_avg': value['value'], # Average temperature + } + ) + + sdf.to_topic(output_topic) # Output results to the 'downsampled-data' topic + # ... + ``` + +The results are streamed to the Kafka topic, `downsampled-data`. + +{{% note %}} +Note: "sdf" stands for "Streaming Dataframe". +{{% /note %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py). + +## Create the producer and consumer clients + +Use the `influxdb_client_3` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Apache Kafka: + +- A **producer** client configured to read from your InfluxDB database with _unmodified_ data and _produce_ that data to Kafka. +- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB database. + +### Create the producer client + +Provide the following credentials for the producer: + +- **host**: {{< product-name omit=" Clustered">}} cluster URL + _(without the protocol)_ +- **org**: An arbitrary string. {{< product-name >}} ignores the organization. +- **token**: InfluxDB database token with read and write permissions on the databases you + want to query and write to. +- **database**: InfluxDB database name + +The producer queries for fresh data from InfluxDB at specific intervals. It's configured to look for a specific measurement defined in a variable. It writes the raw data to a Kafka topic called 'raw-data' + +{{% code-placeholders "(RAW|DOWNSAMPLED)_DATABASE_(NAME|TOKEN)" %}} +```py +from influxdb_client_3 import InfluxDBClient3 +from quixstreams import Application +import pandas + +# Instantiate an InfluxDBClient3 client configured for your unmodified database +influxdb_raw = InfluxDBClient3( + host='{{< influxdb/host >}}', + token='DATABASE_TOKEN', + database='RAW_DATABASE_NAME' +) + +# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud + +# Create a Quix Streams producer application that connects to a local Kafka installation +app = Application( + broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), + consumer_group=consumer_group_name, + auto_create_topics=True +) + +# Override the app variable if the local development env var is set to false or is not present. +# This causes Quix Streams to use an application configured for Quix Cloud +localdev = os.environ.get('localdev', 'false') + +if localdev == 'false': + # Create a Quix platform-specific application instead (broker address is in-built) + app = Application(consumer_group=consumer_group_name, auto_create_topics=True) + +topic = app.topic(name='raw-data') + +## ... remaining code trunctated for brevity ... + +# Query InfluxDB for the raw data and store it in a Dataframe +def get_data(): + # Run in a loop until the main thread is terminated + while run: + try: + myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}' + print(f'sending query {myquery}') + # Query InfluxDB 3.0 using influxql or sql + table = influxdb_raw.query( + query=myquery, + mode='pandas', + language='influxql') + +#... remaining code trunctated for brevity ... + +# Send the data to a Kafka topic for the downsampling process to consumer +def main(): + """ + Read data from the Query and publish it to Kafka + """ + #... remaining code trunctated for brevity ... + + for index, obj in enumerate(records): + print(obj) # Obj contains each row in the table includimng temperature + # Generate a unique message_key for each row + message_key = obj['machineId'] + logger.info(f'Produced message with key:{message_key}, value:{obj}') + + serialized = topic.serialize( + key=message_key, value=obj, headers={'uuid': str(uuid.uuid4())} + ) + + # publish each row returned in the query to the topic 'raw-data' + producer.produce( + topic=topic.name, + headers=serialized.headers, + key=serialized.key, + value=serialized.value, + ) + +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py). + +### Create the consumer + +As before, provide the following credentials for the consumer: + +- **host**: {{< product-name omit=" Clustered">}} cluster URL + _(without the protocol)_ +- **org**: An arbitrary string. {{< product-name >}} ignores the organization. +- **token**: InfluxDB database token with read and write permissions on the databases you + want to query and write to. +- **database**: InfluxDB database name + +This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB. + +{{% code-placeholders "(RAW|DOWNSAMPLED)_DATABASE_(NAME|TOKEN)" %}} +```py +# Instantiate an InfluxDBClient3 client configured for your downsampled database. +# When writing, the org= argument is required by the client (but ignored by InfluxDB). +influxdb_downsampled = InfluxDBClient3( + host='{{< influxdb/host >}}', + token='DATABASE_TOKEN', + database='DOWNSAMPLED_DATABASE_NAME', + org='' +) + +# os.environ['localdev'] = 'true' # Uncomment if you're using local Kafka rather than Quix Cloud + +# Create a Quix Streams consumer application that connects to a local Kafka installation +app = Application( + broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'), + consumer_group=consumer_group_name, + auto_create_topics=True +) + +# Override the app variable if the local development env var is set to false or is not present. +# This causes Quix Streams to use an application configured for Quix Cloud +localdev = os.environ.get('localdev', 'false') + +if localdev == 'false': + # Create a Quix platform-specific application instead (broker address is in-built) + app = Application(consumer_group=consumer_group_name, auto_create_topics=True) + +input_topic = app.topic('downsampled-data') + +## ... remaining code trunctated for brevity ... + +def send_data_to_influx(message): + logger.info(f'Processing message: {message}') + try: + + ## ... remaining code trunctated for brevity ... + + # Construct the points dictionary + points = { + 'measurement': measurement_name, + 'tags': tags, + 'fields': fields, + 'time': message['time'] + } + + influxdb_downsampled.write(record=points, write_precision='ms') + +sdf = app.dataframe(input_topic) +sdf = sdf.update(send_data_to_influx) # Continuously apply the 'send_data' function to each message in the incoming stream + +## ... remaining code trunctated for brevity ... +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Sink/main.py). + +## Get the full downsampling code files + +To get the complete set of files referenced in this tutorial, clone the Quix "downsampling template" repository or use an interactive version of this tutorial saved as a Jupyter Notebook. + +### Clone the downsampling template repository + +To clone the downsampling template, enter the following command in the command line: + +```sh +git clone https://github.com/quixio/template-influxdbv3-downsampling.git +``` + +This repository contains the following folders which store different parts of the whole pipeline: + +- **Machine Data to InfluxDB**: A script that generates synthetic machine data + and writes it to InfluxDB. This is useful if you dont have your own data yet, + or just want to work with test data first. + + - It produces a reading every 250 milliseconds. + - This script originally comes from the + [InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py) + but has been adapted to write directly to InfluxDB rather than using an MQTT broker. + +- **InfluxDB V3 Data Source**: A service that queries for fresh data from + InfluxDB at specific intervals. It's configured to look for the measurement + produced by the previously-mentioned synthetic machine data generator. + It writes the raw data to a Kafka topic called "raw-data". +- **Downsampler**: A service that performs a 1-minute tumbling window operation + on the data from InfluxDB and emits the mean of the "temperature" reading + every minute. It writes the output to a "downsampled-data" Kafka topic. +- **InfluxDB V3 Data Sink**: A service that reads from the "downsampled-data" + topic and writes the downsample records as points back into InfluxDB. + +### Use the downsampling Jupyter Notebook + +You can use the interactive notebook ["Continuously downsample data using InfluxDB and Quix Streams"](https://github.com/quixio/tutorial-code/edit/main/notebooks/Downsampling_viaKafka_Using_Quix_Influx.ipynb) to try downsampling code yourself. It is configured to install Apache Kafka within the runtime environment (such as Google Colab). + +Each process is also set up to run in the background so that a running cell does not block the rest of the tutorial. + +Open In Colab diff --git a/content/influxdb/v2/process-data/common-tasks/_index.md b/content/influxdb/v2/process-data/common-tasks/_index.md index 92f0a0ad2..ea1a186c2 100644 --- a/content/influxdb/v2/process-data/common-tasks/_index.md +++ b/content/influxdb/v2/process-data/common-tasks/_index.md @@ -14,4 +14,4 @@ weight: 104 The following articles walk through common task use cases. -{{< children >}} +{{< children >}} \ No newline at end of file diff --git a/content/influxdb/v2/tools/downsample-data-quix.md b/content/influxdb/v2/tools/downsample-data-quix.md new file mode 100644 index 000000000..6704f1527 --- /dev/null +++ b/content/influxdb/v2/tools/downsample-data-quix.md @@ -0,0 +1,324 @@ +--- +title: Downsample data with Quix Streams +seotitle: Downsample data with Python and Quix Streams +description: > + Use Quix Streams to create Python service that downsamples data stored in InfluxDB. +menu: + influxdb_v2: + name: Quix + parent: Tools & integrations + identifier: influxdb_v2-downsample-quix +weight: 122 +--- + +A common practice when processing high volume data is to downsample it before comitting +it to InfluxDB to reduce the overall disk usage as data collects over time. + +This guide walks through the process of creating a series of Python services that ingest from an InfluxDB v2 bucket, downsample and publish the data to another InfluxDB v2 bucket. +By aggregating data within windows of time, then storing the aggregate values back to InfluxDB, you can reduce +disk usage and costs over time. + +The guide uses the InfluxDB v2 and Quix Streams Python client libraries and can be run locally or deployed within Quix Cloud with a free trial. It assumes you have setup a Python project and virtual environment. + +## Pipeline architecture +The following diagram illustrates how data is passed between processes as it is downsampled: + +{{< html-diagram/quix-downsample-pipeline "v2" >}} + +{{% note %}} +It is usually more efficient to write raw data directly to Kafka rather than +writing raw data to InfluxDB first (essentially starting the Quix Streams +pipeline with the "influxv2-data" topic). However, this guide assumes that you +already have raw data in InfluxDB that you want to downsample. +{{% /note %}} + +--- + +1. [Set up prerequisites](#set-up-prerequisites) +2. [Install dependencies](#install-dependencies) +3. [Prepare InfluxDB buckets](#prepare-influxdb-buckets) +4. [Create the downsampling logic](#create-the-downsampling-logic) +5. [Create the producer and consumer clients](#create-the-producer-and-consumer-clients) + 1. [Create the producer](#create-the-producer) + 2. [Create the consumer](#create-the-consumer) +6. [Run the machine data generator](#run-the-machine-data-generator) +7. [Get the full downsampling code files](#get-the-full-downsampling-code-files) + + +## Set up prerequisites + +The process described in this guide requires the following: + +- InfluxDB v2 with data ready for downsampling. [Use the machine data generator code](#run-the-machine-data-generator) below. +- A [Quix Cloud](https://portal.platform.quix.io/self-sign-up/) account or a + local Apache Kafka or Red Panda installation. +- Familiarity with basic Python and Docker concepts. + +## Install dependencies + +Use `pip` to install the following dependencies: + +- `influxdb-client` (InfluxDB v2 client library) +- `quixstreams<2.5` (Quix Streams client library) +- `pandas` (data analysis and manipulation tool) + + +```sh +pip install influxdb-client pandas quixstreams<2.5 +``` + +## Prepare InfluxDB buckets + +The downsampling process involves two InfluxDB buckets. +Each bucket has a [retention period](/influxdb/v2/reference/glossary/#retention-period) +that specifies how long data persists before it expires and is deleted. +By using two buckets, you can store unmodified, high-resolution data in a bucket +with a shorter retention period and then downsampled, low-resolution data in a +bucket with a longer retention period. + +Ensure you have a bucket for each of the following: + +- One to query unmodified data from your InfluxDB v2 cluster +- The other to write downsampled data into + +## Create the downsampling logic + +This process reads the raw data from the input Kafka topic that stores data streamed from the InfluxDB v2 bucket, +downsamples it, and then sends it to an output topic which is later written back to another bucket. + +1. Use the Quix Streams library's `Application` class to initialize a connection to the Kafka topics. + + ```py + from quixstreams import Application + + app = Application(consumer_group="downsampling-process", auto_offset_reset="earliest") + input_topic = app.topic("input") + output_topic = app.topic("output") + # ... + ``` + +2. Configure the Quix Streams built-in windowing function to create a tumbling + window that continously downsamples the data into 1-minute buckets. + + ```py + # ... + target_field = "temperature" # The field that you want to downsample. + + def custom_ts_extractor(value): + # ... + # truncated for brevity - custom code that defines the "time_recorded" + # field as the timestamp to use for windowing... + + topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor) + + sdf = ( + sdf.apply(lambda value: value[target_field]) # Extract temperature values + .tumbling_window(timedelta(minutes=1)) # 1-minute tumbling windows + .mean() # Calculate average temperature + .final() # Emit results at window completion + ) + + sdf = sdf.apply( + lambda value: { + "time": value["end"], # End of the window + "temperature_avg": value["value"], # Average temperature + } + ) + + sdf.to_topic(output_topic) # Output results to the "downsampled" topic + # ... + ``` + +The results are streamed to the Kafka topic, `downsampled`. + +{{% note %}} +Note: "sdf" stands for "Streaming Dataframe". +{{% /note %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/Downsampler/main.py). + +## Create the producer and consumer clients + +Use the `influxdb_client` and `quixstreams` modules to instantiate two clients that interact with InfluxDB and Kafka: + +- A **producer** client configured to read from your InfluxDB bucket with _unmodified_ data and _produce_ that data to Kafka. +- A **consumer** client configured to _consume_ data from Kafka and write the _downsampled_ data to the corresponding InfluxDB bucket. + +### Create the producer + +Provide the following credentials for the producer: + +- **INFLUXDB_HOST**: [InfluxDB URL](/influxdb/v2/reference/urls/) + _(without the protocol)_ +- **INFLUXDB_ORG**: InfluxDB organization name +- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **INFLUXDB_BUCKET**: InfluxDB bucket name + +The producer queries for fresh data from InfluxDB at specific intervals. It writes the raw data to a Kafka topic called `influxv2-data`. + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +from quixstreams import Application +import influxdb_client +# Create a Quix Application +app = Application(consumer_group="influxdbv2_migrate", auto_create_topics=True) +# Define the topic using the "output" environment variable +topic = app.topic(os.getenv("output", "influxv2-data")) +# Create an InfluxDB v2 client +influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"], + org=os.environ["INFLUXDB_ORG"], + url=os.environ["INFLUXDB_HOST"]) + +## ... remaining code trunctated for brevity ... + +# Function to fetch data from InfluxDB +# It runs in a continuous loop, periodically fetching data based on the interval. +def get_data(): + # Run in a loop until the main thread is terminated + while run: + try: + # Query InfluxDB 2.0 using flux + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -{interval}) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + logger.info(f"Sending query: {flux_query}") + +## ... remaining code trunctated for brevity ... + +# Create a pre-configured Producer object. +with app.get_producer() as producer: + for res in get_data(): + # Get the data from InfluxDB + records = json.loads(res) + for index, obj in enumerate(records): + logger.info(f"Produced message with key:{message_key}, value:{obj}") + # Publish the data to the Kafka topic + producer.produce( + topic=topic.name, + key=message_key, + value=obj, + ) +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/template-invluxdbv2-tsm-downsampling/blob/tutorial/InfluxDB%20V2%20Data%20Source/main.py). + +### Create the consumer + +As before, provide the following credentials for the consumer: + +- **INFLUXDB_HOST**: [InfluxDB URL](/influxdb/v2/reference/urls/) + _(without the protocol)_ +- **INFLUXDB_ORG**: InfluxDB organization name +- **INFLUXDB_TOKEN**: InfluxDB API token with read and write permissions on the buckets you + want to query and write to. +- **INFLUXDB_BUCKET**: InfluxDB bucket name + +{{% note %}} +Note: These will be your InfluxDB v2 credentials. +{{% /note %}} + +This process reads messages from the Kafka topic `downsampled-data` and writes each message as a point dictionary back to InfluxDB. + +{{% code-placeholders "(API|(RAW|DOWNSAMPLED)_BUCKET|ORG)_(NAME|TOKEN)" %}} +```py +from quixstreams import Application, State +from influxdb_client import InfluxDBClient, Point + +# Create a Quix platform-specific application instead +app = Application(consumer_group=consumer_group_name, auto_offset_reset="earliest", use_changelog_topics=False) + +input_topic = app.topic(os.getenv("input", "input-data")) + +# Initialize InfluxDB v2 client +influx2_client = InfluxDBClient(url={{< influxdb/host >}}, + token=API_TOKEN, + org=ORG_NAME) + +## ... remaining code trunctated for brevity ... + +def send_data_to_influx(message: dict, state: State): + global last_write_time_ns, points_buffer, service_start_state + + try: + ## ... code trunctated for brevity ... + + # Check if it's time to write the batch + # 10k records have accumulated or 15 seconds have passed + if len(points_buffer) >= 10000 or int(time() * 1e9) - last_write_time_ns >= 15e9: + with influx2_client.write_api() as write_api: + logger.info(f"Writing batch of {len(points_buffer)} points written to InfluxDB.") + write_api.write(influx_bucket, influx_org, points_buffer) + + # Clear the buffer and update the last write time + points_buffer = [] + last_write_time_ns = int(time() * 1e9) + + ## ... code trunctated for brevity ... + + except Exception as e: + logger.info(f"{str(datetime.utcnow())}: Write failed") + logger.info(e) + +## ... code trunctated for brevity ... + +# We use Quix Streams StreamingDataframe (SDF) to handle every message +# in the Kafka topic by writing it to InfluxDB +sdf = app.dataframe(input_topic) +sdf = sdf.update(send_data_to_influx, stateful=True) + +if __name__ == "__main__": + logger.info("Starting application") + app.run(sdf) + +``` +{{% /code-placeholders %}} + +You can find the full code for this process in the +[Quix GitHub repository](https://github.com/quixio/quix-samples/tree/develop/python/destinations/influxdb_2). + +## Run the Machine data generator + +Now it's time to run the machine data generator code which will populate your source +bucket with data which will be read by the [producer](#create-the-consumer). + +Run `main.py` from the `Machine data to InfluxDB` folder in the GitHub repository. + +## Get the full downsampling code files + +To get the complete set of files referenced in this tutorial, clone the Quix "downsampling" repository. + +### Clone the downsampling template repository + +To clone the downsampling template, enter the following command in the command line: + +```sh +git clone https://github.com/quixio/template-invluxdbv2-tsm-downsampling.git +``` + +This repository contains the following folders which store different parts of the whole pipeline: + +- **Machine Data to InfluxDB**: A script that generates synthetic machine data + and writes it to InfluxDB. This is useful if you dont have your own data yet, + or just want to work with test data first. + + - It produces a reading every 250 milliseconds. + - This script originally comes from the + [InfluxCommunity repository](https://github.com/InfluxCommunity/Arrow-Task-Engine/blob/master/machine_simulator/src/machine_generator.py) + but has been adapted to write directly to InfluxDB rather than using an MQTT broker. + +- **InfluxDB v2 Data Source**: A service that queries for fresh data from + InfluxDB at specific intervals. It's configured to look for the measurement + produced by the previously-mentioned synthetic machine data generator. + It writes the raw data to a Kafka topic called "influxv2-data". +- **Downsampler**: A service that performs a 1-minute tumbling window operation + on the data from InfluxDB and emits the mean of the "temperature" reading + every minute. It writes the output to a "downsampled" Kafka topic. +- **InfluxDB v2 Data Sink**: A service that reads from the "downsampled" + topic and writes the downsampled records as points back into InfluxDB. + diff --git a/layouts/shortcodes/html-diagram/quix-downsample-pipeline.html b/layouts/shortcodes/html-diagram/quix-downsample-pipeline.html index c0e538e58..4f5593176 100644 --- a/layouts/shortcodes/html-diagram/quix-downsample-pipeline.html +++ b/layouts/shortcodes/html-diagram/quix-downsample-pipeline.html @@ -1,3 +1,4 @@ +{{- $version := .Get 0 | default "v3" -}}
@@ -7,7 +8,7 @@
-

InfluxDB v2 Source Producer

+

InfluxDB {{ $version }} Source Producer

@@ -23,7 +24,7 @@
-

InfluxDB v3 Sink Consumer

+

InfluxDB {{ $version }} Sink Consumer