finalized write a task content
parent
a1c24793a8
commit
31e0b6a02f
|
@ -9,7 +9,7 @@ menu:
|
||||||
---
|
---
|
||||||
|
|
||||||
InfluxDB's _**task engine**_ is designed for processing and analyzing data.
|
InfluxDB's _**task engine**_ is designed for processing and analyzing data.
|
||||||
Tasks are user defined Flux scripts that take a stream of input data, modify or
|
Tasks are user-defined Flux scripts that take a stream of input data, modify or
|
||||||
analyze it in some way, then perform an action all on a specified schedule.
|
analyze it in some way, then perform an action all on a specified schedule.
|
||||||
Examples include data downsampling, anomaly detection _(Coming)_, alerting _(Coming)_, etc.
|
Examples include data downsampling, anomaly detection _(Coming)_, alerting _(Coming)_, etc.
|
||||||
|
|
||||||
|
|
|
@ -14,3 +14,24 @@ menu:
|
||||||
- Data source
|
- Data source
|
||||||
- Some type of aggregation
|
- Some type of aggregation
|
||||||
- and a `to` statement
|
- and a `to` statement
|
||||||
|
|
||||||
|
|
||||||
|
```js
|
||||||
|
option task = {
|
||||||
|
name: "cqinterval15m",
|
||||||
|
every: 1w,
|
||||||
|
}
|
||||||
|
|
||||||
|
data = from(bucket: "telegraf")
|
||||||
|
|> range(start: -task.every * 2)
|
||||||
|
|> filter(fn: (r) => r._measurement == "cpu")
|
||||||
|
|
||||||
|
downsampleHourly = (table=<-) =>
|
||||||
|
table
|
||||||
|
|> aggregateWindow(fn: mean, every: 1h)
|
||||||
|
|> set(key: "_measurement", value: "cpu_1h" )
|
||||||
|
|> to(bucket: "telegraf_downsampled")
|
||||||
|
|
||||||
|
downsampleHourly(data)
|
||||||
|
|
||||||
|
```
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
---
|
||||||
|
title: Task configuration options
|
||||||
|
seotitle: InfluxDB task configuration options
|
||||||
|
description: placeholder
|
||||||
|
menu:
|
||||||
|
v2_0:
|
||||||
|
name: Task options
|
||||||
|
parent: Process data
|
||||||
|
weight: 5
|
||||||
|
---
|
||||||
|
|
||||||
|
### name
|
||||||
|
I think it might even be optional
|
||||||
|
if you dont specify one i think we just put in some default name.
|
||||||
|
|
||||||
|
_**Data type:** String_
|
||||||
|
|
||||||
|
### every
|
||||||
|
Defines the interval at which the task will run.
|
||||||
|
|
||||||
|
_**Data type:** Duration_
|
||||||
|
|
||||||
|
_Cannot be used with `cron`_
|
||||||
|
|
||||||
|
### cron
|
||||||
|
- The cron schedule.
|
||||||
|
- Based on system time.
|
||||||
|
_**Data type:** String_
|
||||||
|
|
||||||
|
_Cannot be used with `every`_
|
||||||
|
|
||||||
|
### offset
|
||||||
|
is so you can allow for data to come in off scheduler. so if you want a task to run on the hour `cron: "0 * * * *"` but your data might come in 10 min late you could say `offset: 15m`
|
||||||
|
|
||||||
|
_**Data type:** Duration_
|
||||||
|
|
||||||
|
### concurrency
|
||||||
|
how many concurrent runs of a task can happen at once.. say your schedule is `every: 1s` but it takes 10 sec to complete. you can set a concurrency that will allow that to happen and not just queue up.
|
||||||
|
|
||||||
|
_**Data type:** Integer_
|
||||||
|
|
||||||
|
### retry
|
||||||
|
The number of times to retry before we assume failure.
|
||||||
|
|
||||||
|
_**Data type:** Integer_
|
|
@ -1,6 +1,6 @@
|
||||||
---
|
---
|
||||||
title: Write a task
|
title: Write an InfluxDB task
|
||||||
seotitle: Write a task that processes data in InfluxDB
|
seotitle: Write an InfluxDB task that processes data
|
||||||
description: placeholder
|
description: placeholder
|
||||||
menu:
|
menu:
|
||||||
v2_0:
|
v2_0:
|
||||||
|
@ -9,70 +9,133 @@ menu:
|
||||||
weight: 1
|
weight: 1
|
||||||
---
|
---
|
||||||
|
|
||||||
Tasks are essentially Flux scripts with a "destination."
|
InfluxDB tasks are user-defined Flux scripts that take a stream of input data,
|
||||||
This destination could be
|
modify or analyze it in some way, then perform an action all on a specified schedule.
|
||||||
|
In their simplest form, tasks are essentially Flux scripts with a "destination."
|
||||||
|
This destination could be another bucket, another measurement, an alert endpoint _(Coming)_, etc.
|
||||||
|
|
||||||
|
This article walks through writing a basic InfluxDB task that downsamples
|
||||||
|
data and stores it in a new bucket.
|
||||||
|
|
||||||
**Requirements:**
|
## Components of a Task
|
||||||
|
Every InfluxDB task needs the following four components.
|
||||||
|
Their form and order can vary, but the are all essential parts of a task.
|
||||||
|
|
||||||
- Task options / scheduler block
|
- [Task options](#define-task-options)
|
||||||
- Data source
|
- [A data source](#define-a-data-source)
|
||||||
- Some type of aggregation
|
- [Data processing or transformation](#process-or-transform-your-data)
|
||||||
- and a `to` statement
|
- [A destination](#define-a-destination)
|
||||||
|
|
||||||
|
_[Skip to the full example task script](#full-example-task-script)_
|
||||||
|
|
||||||
|
## Define task options
|
||||||
|
Task options define specific information about the task.
|
||||||
|
The example below illustrates how task options are defined in your Flux script:
|
||||||
|
|
||||||
```js
|
```js
|
||||||
option task = {
|
option task = {
|
||||||
name: "cqinterval15m",
|
name: "cqinterval15m",
|
||||||
every: 1h,
|
every: 1h,
|
||||||
offset: 15m ,
|
offset: 0m,
|
||||||
concurrency: 4,
|
concurrency: 1,
|
||||||
retry: 5
|
retry: 5
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
_See [Task configuration options](/v2.0/process-data/task-options) for detailed information
|
||||||
|
about each option._
|
||||||
|
|
||||||
|
{{% note %}}
|
||||||
|
**When creating a task in the InfluxDB user interface (UI)**, task options are not required
|
||||||
|
in your Flux script. They are defined in UI while creating the task.
|
||||||
|
{{% /note %}}
|
||||||
|
|
||||||
|
## Define a data source
|
||||||
|
Define a data source using Flux's [`from()` function](#) or any other [Flux input functions](#).
|
||||||
|
|
||||||
|
For convenience, consider creating a variable that includes the sourced data with
|
||||||
|
the required `range()` and any relevant filters.
|
||||||
|
|
||||||
```js
|
```js
|
||||||
option task = {
|
data = from(bucket: "telegraf/default")
|
||||||
name: "cqinterval15m",
|
|> range(start: -task.every)
|
||||||
cron: "0 0 * * *",
|
|> filter(fn: (r) =>
|
||||||
offset: 15m ,
|
r._measurement == "mem" AND
|
||||||
concurrency: 4,
|
r.host == "myHost"
|
||||||
retry: 5
|
)
|
||||||
}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Task configuration options
|
{{% note %}}
|
||||||
|
#### Using task options in your Flux script
|
||||||
|
Task options are passed as part of a `task` object and can be referenced in your Flux script.
|
||||||
|
In the example above, the time range is defined as `-task.every`.
|
||||||
|
|
||||||
### name
|
`task.every` is dot notation that references the `every` property of the `task` object.
|
||||||
I think it might even be optional
|
`every` is defined as `1h`, therefore `-task.every` equates to `-1h`.
|
||||||
if you dont specify one i think we just put in some default name.
|
|
||||||
|
|
||||||
_**Data type:** String_
|
Using task options to define values in your Flux script can make reusing your task easier.
|
||||||
|
{{% /note %}}
|
||||||
|
|
||||||
### every
|
## Process or transform your data
|
||||||
Defines the inteveral at which the task will run.
|
The purpose of tasks is to process or transform data in some way.
|
||||||
|
What exactly happens and what form the output data takes is up to you and your
|
||||||
|
specific use case.
|
||||||
|
|
||||||
_**Data type:** Duration_
|
The example below illustrates a task that downsamples data by calculating the average of set intervals.
|
||||||
|
It uses the `data` variable defined [above](#define-a-data-source) as the data source.
|
||||||
|
It then windows the data into 5 minute intervals and calculates the average of each
|
||||||
|
window using the [`aggregateWindow()` function](#).
|
||||||
|
|
||||||
_Cannot be used with `cron`_
|
```js
|
||||||
|
data
|
||||||
|
|> aggregateWindow(
|
||||||
|
every: 5m,
|
||||||
|
fn: mean
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
### cron
|
_See [Common tasks](/v2.0/process-data/common-tasks) for examples of tasks commonly used with InfluxDB._
|
||||||
- The cron schedule.
|
|
||||||
- Based on system time.
|
|
||||||
_**Data type:** String_
|
|
||||||
|
|
||||||
_Cannot be used with `every`_
|
## Define a destination
|
||||||
|
In the vast majority of task use cases, once data is transformed, it needs to sent and stored somewhere.
|
||||||
|
This could be a separate bucket with a different retention policy, another measurement, or even an alert endpoint _(Coming)_.
|
||||||
|
|
||||||
### offset
|
The example below uses Flux's [`to()` function](#) to send the transformed data to another bucket:
|
||||||
is so you can allow for data to come in off scheduler. so if you want a task to run on the hour `cron: "0 * * * *"` but your data might come in 10 min late you could say `offset: 15m`
|
|
||||||
|
|
||||||
_**Data type:** Duration_
|
```js
|
||||||
|
// ...
|
||||||
|
|> to(bucket: "telegraf_downsampled")
|
||||||
|
```
|
||||||
|
|
||||||
### concurrency
|
## Full example task script
|
||||||
how many concurrent runs of a task can happen at once.. say your schedule is `every: 1s` but it takes 10 sec to complete. you can set a concurrency that will allow that to happen and not just queue up.
|
Below is the full example task script that combines all of the components described above:
|
||||||
|
|
||||||
_**Data type:** Integer_
|
|
||||||
|
|
||||||
### retry
|
```js
|
||||||
is a int, the number of times to retry before we assume failure.
|
// Task options
|
||||||
|
option task = {
|
||||||
|
name: "cqinterval15m",
|
||||||
|
every: 1h,
|
||||||
|
offset: 0m,
|
||||||
|
concurrency: 1,
|
||||||
|
retry: 5
|
||||||
|
}
|
||||||
|
|
||||||
_**Data type:** Integer_
|
// Data source
|
||||||
|
data = from(bucket: "telegraf/default")
|
||||||
|
|> range(start: -task.every)
|
||||||
|
|> filter(fn: (r) =>
|
||||||
|
r._measurement == "mem" AND
|
||||||
|
r.host == "myHost"
|
||||||
|
)
|
||||||
|
|
||||||
|
data
|
||||||
|
// Data transformation
|
||||||
|
|> aggregateWindow(
|
||||||
|
every: 5m,
|
||||||
|
fn: mean
|
||||||
|
)
|
||||||
|
// Data destination
|
||||||
|
|> to(bucket: "telegraf_downsampled")
|
||||||
|
|
||||||
|
```
|
||||||
|
|
Loading…
Reference in New Issue