Apply suggestions from code review
Co-authored-by: Scott Anderson <sanderson@users.noreply.github.com>pull/4386/head
parent
9e95204ca9
commit
843f5aa4dd
|
@ -1,62 +1,82 @@
|
|||
---
|
||||
title: Late arriving data
|
||||
title: Handle late arriving data
|
||||
description: >
|
||||
Learn how to correct downsampling aggregations for data latency or late arriving data.
|
||||
Learn how to account for data latency or late arriving data when aggregating or downsampling data.
|
||||
menu:
|
||||
resources:
|
||||
parent: How-to guides
|
||||
weight: 102
|
||||
---
|
||||
You may want to ensure that your computed aggregations are correct in the face of data latency. Use Flux to correct downsampling aggregations for late arriving data.
|
||||
|
||||
## Setup and Scenario
|
||||
In some cases, due to network latency or other issues, your time series data may arrive to InfluxDB late.
|
||||
To ensure that your computed aggregations are correct, you must account for data latency in your
|
||||
aggregation and downsampling tasks.
|
||||
This guides walks through a method that detects and accounts for late arriving data using InfluxDB
|
||||
tasks and [API-invokable scripts](/influxdb/cloud/api-guide/api-invokable-scripts/).
|
||||
|
||||
## Scenario
|
||||
|
||||
Your are collecting and storing water levels at 100 different locations.
|
||||
Data at each location is reported every 10 seconds.
|
||||
Network connectivity varies at each location, but reported data can confidently be written to InfluxDB
|
||||
at least every hour, but maybe more often.
|
||||
|
||||
## Setup
|
||||
|
||||
In order to follow this guide you’ll need to create the following resources:
|
||||
|
||||
- An [All-Access token](/influxdb/cloud/security/tokens/#all-access-token).
|
||||
- Three InfluxDB buckets:
|
||||
- **water_level_raw**: Stores the raw water level data.
|
||||
- **water_level_mean**: Stores one minute averages of water levels.
|
||||
Averages include late arriving data from the last hour.
|
||||
- **water_level_checksum**: Stores one minute counts of water levels.
|
||||
The count is used as a checksum for each one minute window.
|
||||
- An API-invokable script:
|
||||
- **water_level_process.flux**: <!-- describe what this script does -->
|
||||
|
||||
|
||||
* An all access token.
|
||||
* Three buckets:
|
||||
* water_level_raw: The raw data is written to this bucket
|
||||
* water_level_mean: The 1 minute means are stored here. This bucket is updated after recomputing a window. The means account for data arriving late for the last hour.
|
||||
* water_level_checksum: Store a checksum (count) per 1 minute window for the last hour.
|
||||
* An invokable script:
|
||||
* water_level_process.flux
|
||||
|
||||
Also imagine the following scenario. You’re writing a fake dataset. This fake dataset contains water levels at 100 different locations. You will be computing the mean water level at each location over 1 minute windows. You can handle data arriving up to 1 hour late. Data for each location is being written every 10s. Additionally every 10s a late data point is written somewhere in the last 1 hour for each location.
|
||||
In this process, you compute the average water level at each location over one minute windows.
|
||||
It's designed to handle data arriving up to one hour late.
|
||||
Data from each location is written every 10 seconds.
|
||||
Additionally, every 10 seconds, a late data point is written somewhere in the last one hour for each location.
|
||||
|
||||
|
||||
## Overview
|
||||
|
||||
Before diving into the code, take a high level look at the logic of the Flux scripts.
|
||||
|
||||
{{< img-hd src="/img/resources/late-arriving-data.png" alt="Late arriving data architecture" />}}
|
||||
|
||||

|
||||
The `water_level_checksum.flux` is a task that runs every minute.
|
||||
It counts the number of points that exist in the **water_level_raw** bucket (new count) and compares
|
||||
that count against the count in the **water_level_checksum** bucket (old count).
|
||||
If the new count from the **water_level_raw** bucket isn’t equal to the count from the
|
||||
**water_level_checksum** bucket, then the task invokes `water_level_process.flux` API-invokable script
|
||||
which recalculates the old count and aggregation.
|
||||
|
||||
|
||||
The water_level_checksum.flux is a task. It runs every minute. It counts the number of points that exist in the water_level_raw bucket (new count) and compares that count against the count in the water_level_checksum bucket (old count). If the new count from the water_level_raw bucket isn’t equal to the count from the water_level_checksum bucket then an invokable script is called. Specifically, the water_level_process.flux script is called. This script is responsible for recalculating the old count and aggregation.
|
||||
## Flux scripts in detail
|
||||
|
||||
- [water_level_process.flux](#water_level_processflux)
|
||||
- [water_level_checksum.flux](#water_level_checksumflux)
|
||||
|
||||
### water_level_process.flux
|
||||
|
||||
`water_level_process.flux` is an invokable script that does two things:
|
||||
|
||||
|
||||
## Flux scripts in detail: water_level_process.flux
|
||||
1. Computes the mean of values in the time range defined by the `start` and `stop` script parameters and writes the computed mean to the **water_level_mean** bucket.
|
||||
2. Computes the count or total number of points in the time range defined by the `start` and `stop` script parameters and writes the count to the **water_level_checksum** bucket.
|
||||
|
||||
The water_level_process.flux script is an invokable script. It’s responsible for two things:
|
||||
|
||||
|
||||
1. Computing the mean value over a parameterized start and stop times. This mean value is written to the water_level_mean bucket.
|
||||
2. Calculating the count or total numper of points over parameterized start and stop times. This count is written to the water_level_checksum bucket.
|
||||
|
||||
The water_leve_process.flux script contains the following code: \
|
||||
`// Compute the mean for the window`
|
||||
|
||||
|
||||
```
|
||||
```js
|
||||
// Compute and store the mean for the window
|
||||
from(bucket: "water_level_raw")
|
||||
|> range(start: params.start, stop: params.stop)
|
||||
|> mean()
|
||||
|> to(bucket: "water_level_mean", timeColumn: "_stop")
|
||||
|> yield(name: "means")
|
||||
|
||||
// Compute and store new checksum for this window
|
||||
// Compute and store the new checksum for this window
|
||||
from(bucket: "water_level_raw")
|
||||
|> range(start: params.start, stop: params.stop)
|
||||
|> group(columns: ["_measurement", "_field", "_stop"])
|
||||
|
@ -65,38 +85,34 @@ from(bucket: "water_level_raw")
|
|||
|> yield(name: "checksums")
|
||||
```
|
||||
|
||||
## water_level_checsum.flux
|
||||
|
||||
`water_level_process.flux` is a task that does the following:
|
||||
|
||||
|
||||
## Flux scripts in detail: water_level_checsum.flux
|
||||
|
||||
The water_level_process.flux script is a task. This task is responsible for:
|
||||
|
||||
|
||||
1. Counting the number of points in the water_level_raw bucket (new count) for the last hour across 1minute windows.
|
||||
2. Invoking the water_level_process.flux invokable script to calculate a new mean and a new count across 1 minute windows.
|
||||
3. Gathering the previous count in the water_level_checksum bucket (old count) for the last hour.
|
||||
4. Comparing an old count against a new count with a join.
|
||||
5. Filtering for where the counts do not match.
|
||||
6. Invoking the water_level_process.flux invokable script to recompute the mean and count for every 1 min window where the counts don't match.
|
||||
|
||||
The task runs at 1m intervals specified by the `every` parameter:
|
||||
1. Counts the number of points in the **water_level_raw** bucket (new count) for the last hour across one minute windows.
|
||||
2. Invokes the `water_level_process.flux` invokable script to calculate a new mean and a new count across one minute windows.
|
||||
3. Gathers the previous count in the **water_level_checksum** bucket (old count) for the last hour.
|
||||
4. Joins the old and new streams and compares the old count against a new count.
|
||||
5. Filters for counts that do not match.
|
||||
6. Invokes the `water_level_process.flux` invokable script to recompute the mean and count for every one minute window where the counts do not match.
|
||||
|
||||
#### Task details
|
||||
|
||||
- The `task` option provides configuration settings for the task:
|
||||
- `name`: Provides a name for the task.
|
||||
- `every`: Defines how often the task runs (every one minute) and, in this case, the window interval used to compute means and counts.
|
||||
- `offset`: Defines how much time to wait before executing the task. _The offset does not change the time range queried by the task. _
|
||||
- `invokeScripts()` is a custom function that invokes the `water_level_process.flux` invokable script.
|
||||
- `start` and `stop` parameters are required.
|
||||
- `scriptID` is required.
|
||||
- Store your InfluxDB API token as an InfluxDB secret and use the `secrets` package to retrieve the token.
|
||||
```
|
||||
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
|
||||
```
|
||||
|
||||
|
||||
This `every` parameter also determines the window period for which the mean and counts will be computed over.
|
||||
|
||||
The invokeScripts() function is a custom function that invokes the water_level_process.flux invokable script. \
|
||||
`invokeScript = (start, stop) =>`
|
||||
|
||||
|
||||
```
|
||||
invokeScript = (start, stop, scriptID) =>
|
||||
requests.post(
|
||||
// The script ID here has been hardcoded here
|
||||
url: "https://eastus-1.azure.cloud2.influxdata.com/api/v2/scripts/095fabd404108000/invoke",
|
||||
url: "https://cloud2.influxdata.com/api/v2/scripts/${scriptID}/invoke",
|
||||
|
||||
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
|
||||
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
|
||||
|
@ -104,7 +120,6 @@ The invokeScripts() function is a custom function that invokes the water_level_p
|
|||
```
|
||||
|
||||
|
||||
Remember this invokable script is responsible for both calculating the mean and count and writing it to the water_level_mean and water_level_checksum buckets respectively. You must pass in values for the start and stop parameters. The token is imported with the secrets manager.
|
||||
|
||||
First the new counts are calculated and stored in the variable `newCounts`. \
|
||||
`newCounts =`
|
||||
|
|
Loading…
Reference in New Issue