fix: mention of water_level_checksum.flux, instruct on where to find scriptID, create a script, show task in its entirety

pull/4386/head
Anais Dotis-Georgiou 2022-09-27 13:00:47 -05:00
parent 843f5aa4dd
commit bc162f02b8
1 changed files with 89 additions and 5 deletions

View File

@ -33,7 +33,9 @@ In order to follow this guide youll need to create the following resources:
- **water_level_checksum**: Stores one minute counts of water levels.
The count is used as a checksum for each one minute window.
- An API-invokable script:
- **water_level_process.flux**: <!-- describe what this script does -->
- `water_level_process.flux`: This script computes the minute water level averages and counts the number of points that were used in water level average calculation. The average and count is written to the **water_level_mean** and **water_level_checksum** buckets respectively.
- A Task:
- `water_level_checksum.flux`: This task triggers the `water_level_process.flux` script. This task also recomputes a count of the number of points used to calculagte the most recent water level average value. It compares the most recent count from **water_level_checksum** bucket against this new count and triggers a recaclulation of the water level average to accomodate an increase in the count from late arriving data.
In this process, you compute the average water level at each location over one minute windows.
It's designed to handle data arriving up to one hour late.
@ -57,8 +59,13 @@ which recalculates the old count and aggregation.
## Flux scripts in detail
- [water_level_process.flux](#water_level_processflux)
- [water_level_checksum.flux](#water_level_checksumflux)
- [Scenario](#scenario)
- [Setup](#setup)
- [Overview](#overview)
- [Flux scripts in detail](#flux-scripts-in-detail)
- [water_level_process.flux](#water_level_processflux)
- [water_level_checsum.flux](#water_level_checsumflux)
- [Task details](#task-details)
### water_level_process.flux
@ -85,6 +92,8 @@ from(bucket: "water_level_raw")
|> yield(name: "checksums")
```
Use the [API](/cloud/api-guide/api-invokable-scripts/) or [CLI](/influxdb/cloud/reference/cli/influx/scripts/create/) to create an invokable script.
## water_level_checsum.flux
`water_level_process.flux` is a task that does the following:
@ -105,7 +114,7 @@ from(bucket: "water_level_raw")
- `offset`: Defines how much time to wait before executing the task. _The offset does not change the time range queried by the task. _
- `invokeScripts()` is a custom function that invokes the `water_level_process.flux` invokable script.
- `start` and `stop` parameters are required.
- `scriptID` is required.
- `scriptID` is required. Find the scriptID with the [API](/influxdb/cloud/api-guide/api-invokable-scripts/#list-invokable-scripts) or [CLI](/influxdb/cloud/reference/cli/influx/scripts/list/)
- Store your InfluxDB API token as an InfluxDB secret and use the `secrets` package to retrieve the token.
```
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
@ -190,4 +199,79 @@ experimental.join(
},
)
|> yield(name: "diffs")
```
```
The complete `water_level_checsum.flux` is shown below:
```
import "influxdata/influxdb/secrets"
import "experimental/http/requests"
import "json"
import "date"
import "experimental"
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
// Size of the window to aggregate
every = task.every
// Longest we are willing to wait for late data
late_window = 1h
token = secrets.get(key: "SELF_TOKEN")
// invokeScript calls a Flux script with the given start stop
// parameters to recompute the window.
invokeScript = (start, stop) =>
requests.post(
// We have hardcoded the script ID here
url: "https://eastus-1.azure.cloud2.influxdata.com/api/v2/scripts/095fabd404108000/invoke",
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
)
// Only query windows that span a full minute
start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every)
newCounts =
from(bucket: "water_level_raw")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
|> aggregateWindow(every: every, fn: count)
// Always compute the most recent interval
newCounts
|> filter(fn: (r) => r._time == stop)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "current")
oldCounts =
from(bucket: "water_level_checksum")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
// Compare old and new checksum
experimental.join(
left: oldCounts,
right: newCounts,
fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
// Recompute any windows where the checksum is different
|> filter(fn: (r) => r.old_count != r.new_count)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "diffs")
```