From 31e0b6a02f1a5e2d370b053883e3d3983b89bc3c Mon Sep 17 00:00:00 2001 From: Scott Anderson Date: Wed, 16 Jan 2019 10:41:37 -0700 Subject: [PATCH] finalized write a task content --- content/v2.0/process-data/_index.md | 2 +- .../common-tasks/downsample-data.md | 21 +++ content/v2.0/process-data/task-options.md | 45 ++++++ content/v2.0/process-data/write-a-task.md | 145 +++++++++++++----- 4 files changed, 171 insertions(+), 42 deletions(-) create mode 100644 content/v2.0/process-data/task-options.md diff --git a/content/v2.0/process-data/_index.md b/content/v2.0/process-data/_index.md index 4b4c94483..6aa8a7985 100644 --- a/content/v2.0/process-data/_index.md +++ b/content/v2.0/process-data/_index.md @@ -9,7 +9,7 @@ menu: --- InfluxDB's _**task engine**_ is designed for processing and analyzing data. -Tasks are user defined Flux scripts that take a stream of input data, modify or +Tasks are user-defined Flux scripts that take a stream of input data, modify or analyze it in some way, then perform an action all on a specified schedule. Examples include data downsampling, anomaly detection _(Coming)_, alerting _(Coming)_, etc. diff --git a/content/v2.0/process-data/common-tasks/downsample-data.md b/content/v2.0/process-data/common-tasks/downsample-data.md index 5eafaabc5..9a565fef4 100644 --- a/content/v2.0/process-data/common-tasks/downsample-data.md +++ b/content/v2.0/process-data/common-tasks/downsample-data.md @@ -14,3 +14,24 @@ menu: - Data source - Some type of aggregation - and a `to` statement + + +```js +option task = { + name: "cqinterval15m", + every: 1w, +} + +data = from(bucket: "telegraf") + |> range(start: -task.every * 2) + |> filter(fn: (r) => r._measurement == "cpu") + +downsampleHourly = (table=<-) => + table + |> aggregateWindow(fn: mean, every: 1h) + |> set(key: "_measurement", value: "cpu_1h" ) + |> to(bucket: "telegraf_downsampled") + +downsampleHourly(data) + +``` diff --git a/content/v2.0/process-data/task-options.md b/content/v2.0/process-data/task-options.md new file mode 100644 index 000000000..740a6ab20 --- /dev/null +++ b/content/v2.0/process-data/task-options.md @@ -0,0 +1,45 @@ +--- +title: Task configuration options +seotitle: InfluxDB task configuration options +description: placeholder +menu: + v2_0: + name: Task options + parent: Process data + weight: 5 +--- + +### name +I think it might even be optional +if you dont specify one i think we just put in some default name. + +_**Data type:** String_ + +### every +Defines the interval at which the task will run. + +_**Data type:** Duration_ + +_Cannot be used with `cron`_ + +### cron +- The cron schedule. +- Based on system time. +_**Data type:** String_ + +_Cannot be used with `every`_ + +### offset +is so you can allow for data to come in off scheduler. so if you want a task to run on the hour `cron: "0 * * * *"` but your data might come in 10 min late you could say `offset: 15m` + +_**Data type:** Duration_ + +### concurrency +how many concurrent runs of a task can happen at once.. say your schedule is `every: 1s` but it takes 10 sec to complete. you can set a concurrency that will allow that to happen and not just queue up. + +_**Data type:** Integer_ + +### retry +The number of times to retry before we assume failure. + +_**Data type:** Integer_ diff --git a/content/v2.0/process-data/write-a-task.md b/content/v2.0/process-data/write-a-task.md index 20f48aa11..2f04f8e03 100644 --- a/content/v2.0/process-data/write-a-task.md +++ b/content/v2.0/process-data/write-a-task.md @@ -1,6 +1,6 @@ --- -title: Write a task -seotitle: Write a task that processes data in InfluxDB +title: Write an InfluxDB task +seotitle: Write an InfluxDB task that processes data description: placeholder menu: v2_0: @@ -9,70 +9,133 @@ menu: weight: 1 --- -Tasks are essentially Flux scripts with a "destination." -This destination could be +InfluxDB tasks are user-defined Flux scripts that take a stream of input data, +modify or analyze it in some way, then perform an action all on a specified schedule. +In their simplest form, tasks are essentially Flux scripts with a "destination." +This destination could be another bucket, another measurement, an alert endpoint _(Coming)_, etc. +This article walks through writing a basic InfluxDB task that downsamples +data and stores it in a new bucket. -**Requirements:** +## Components of a Task +Every InfluxDB task needs the following four components. +Their form and order can vary, but the are all essential parts of a task. -- Task options / scheduler block -- Data source -- Some type of aggregation -- and a `to` statement +- [Task options](#define-task-options) +- [A data source](#define-a-data-source) +- [Data processing or transformation](#process-or-transform-your-data) +- [A destination](#define-a-destination) + +_[Skip to the full example task script](#full-example-task-script)_ + +## Define task options +Task options define specific information about the task. +The example below illustrates how task options are defined in your Flux script: ```js option task = { name: "cqinterval15m", every: 1h, - offset: 15m , - concurrency: 4, + offset: 0m, + concurrency: 1, retry: 5 } ``` +_See [Task configuration options](/v2.0/process-data/task-options) for detailed information +about each option._ + +{{% note %}} +**When creating a task in the InfluxDB user interface (UI)**, task options are not required +in your Flux script. They are defined in UI while creating the task. +{{% /note %}} + +## Define a data source +Define a data source using Flux's [`from()` function](#) or any other [Flux input functions](#). + +For convenience, consider creating a variable that includes the sourced data with +the required `range()` and any relevant filters. + ```js -option task = { - name: "cqinterval15m", - cron: "0 0 * * *", - offset: 15m , - concurrency: 4, - retry: 5 -} +data = from(bucket: "telegraf/default") + |> range(start: -task.every) + |> filter(fn: (r) => + r._measurement == "mem" AND + r.host == "myHost" + ) ``` -## Task configuration options +{{% note %}} +#### Using task options in your Flux script +Task options are passed as part of a `task` object and can be referenced in your Flux script. +In the example above, the time range is defined as `-task.every`. -### name -I think it might even be optional -if you dont specify one i think we just put in some default name. +`task.every` is dot notation that references the `every` property of the `task` object. +`every` is defined as `1h`, therefore `-task.every` equates to `-1h`. -_**Data type:** String_ +Using task options to define values in your Flux script can make reusing your task easier. +{{% /note %}} -### every -Defines the inteveral at which the task will run. +## Process or transform your data +The purpose of tasks is to process or transform data in some way. +What exactly happens and what form the output data takes is up to you and your +specific use case. -_**Data type:** Duration_ +The example below illustrates a task that downsamples data by calculating the average of set intervals. +It uses the `data` variable defined [above](#define-a-data-source) as the data source. +It then windows the data into 5 minute intervals and calculates the average of each +window using the [`aggregateWindow()` function](#). -_Cannot be used with `cron`_ +```js +data + |> aggregateWindow( + every: 5m, + fn: mean + ) +``` -### cron -- The cron schedule. -- Based on system time. -_**Data type:** String_ +_See [Common tasks](/v2.0/process-data/common-tasks) for examples of tasks commonly used with InfluxDB._ -_Cannot be used with `every`_ +## Define a destination +In the vast majority of task use cases, once data is transformed, it needs to sent and stored somewhere. +This could be a separate bucket with a different retention policy, another measurement, or even an alert endpoint _(Coming)_. -### offset -is so you can allow for data to come in off scheduler. so if you want a task to run on the hour `cron: "0 * * * *"` but your data might come in 10 min late you could say `offset: 15m` +The example below uses Flux's [`to()` function](#) to send the transformed data to another bucket: -_**Data type:** Duration_ +```js +// ... +|> to(bucket: "telegraf_downsampled") +``` -### concurrency -how many concurrent runs of a task can happen at once.. say your schedule is `every: 1s` but it takes 10 sec to complete. you can set a concurrency that will allow that to happen and not just queue up. +## Full example task script +Below is the full example task script that combines all of the components described above: -_**Data type:** Integer_ -### retry -is a int, the number of times to retry before we assume failure. +```js +// Task options +option task = { + name: "cqinterval15m", + every: 1h, + offset: 0m, + concurrency: 1, + retry: 5 +} -_**Data type:** Integer_ +// Data source +data = from(bucket: "telegraf/default") + |> range(start: -task.every) + |> filter(fn: (r) => + r._measurement == "mem" AND + r.host == "myHost" + ) + +data + // Data transformation + |> aggregateWindow( + every: 5m, + fn: mean + ) + // Data destination + |> to(bucket: "telegraf_downsampled") + +```