196 lines
5.0 KiB
Markdown
196 lines
5.0 KiB
Markdown
---
|
|
title: StreamNode
|
|
description: >
|
|
StreamNode represents the source of data being streamed to Kapacitor through any of its inputs.
|
|
note: Auto generated by tickdoc
|
|
|
|
menu:
|
|
kapacitor_1_6_ref:
|
|
name: StreamNode
|
|
identifier: stream_node
|
|
weight: 100
|
|
parent: nodes
|
|
---
|
|
|
|
The `stream` node represents the source of data being
|
|
streamed to Kapacitor via any of its inputs.
|
|
The `stream` variable in stream tasks is an instance of
|
|
a [StreamNode.](/kapacitor/v1.6/nodes/stream_node/)
|
|
[StreamNode.From](/kapacitor/v1.6/nodes/stream_node/#from) is the method/property of this node.
|
|
|
|
|
|
### Constructor
|
|
|
|
| Chaining Method | Description |
|
|
|:---------|:---------|
|
|
| **stream** | Has no constructor signature. |
|
|
|
|
### Property Methods
|
|
|
|
| Setters | Description |
|
|
|:---|:---|
|
|
| **[quiet](#quiet) ( )** | Suppress all error logging events from this node. |
|
|
|
|
|
|
### Chaining Methods
|
|
[Deadman](#deadman),
|
|
[From](#from),
|
|
[Stats](#stats)
|
|
|
|
---
|
|
|
|
## Properties
|
|
|
|
Property methods modify state on the calling node.
|
|
They do not add another node to the pipeline, and always return a reference to the calling node.
|
|
Property methods are marked using the `.` operator.
|
|
|
|
### Quiet
|
|
|
|
Suppress all error logging events from this node.
|
|
|
|
```js
|
|
stream.quiet()
|
|
```
|
|
|
|
|
|
## Chaining Methods
|
|
|
|
Chaining methods create a new node in the pipeline as a child of the calling node.
|
|
They do not modify the calling node.
|
|
Chaining methods are marked using the `|` operator.
|
|
|
|
|
|
### Deadman
|
|
|
|
Helper function for creating an alert on low throughput, a.k.a. deadman's switch.
|
|
|
|
- Threshold: trigger alert if throughput drops below threshold in points/interval.
|
|
- Interval: how often to check the throughput.
|
|
- Expressions: optional list of expressions to also evaluate. Useful for time of day alerting.
|
|
|
|
Example:
|
|
|
|
|
|
```js
|
|
var data = stream
|
|
|from()...
|
|
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
|
|
data
|
|
|deadman(100.0, 10s)
|
|
//Do normal processing of data
|
|
data...
|
|
```
|
|
|
|
The above is equivalent to this example:
|
|
|
|
|
|
```js
|
|
var data = stream
|
|
|from()...
|
|
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
|
|
data
|
|
|stats(10s)
|
|
.align()
|
|
|derivative('emitted')
|
|
.unit(10s)
|
|
.nonNegative()
|
|
|alert()
|
|
.id('node \'stream0\' in task \'{{ .TaskName }}\'')
|
|
.message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
|
|
.crit(lambda: "emitted" <= 100.0)
|
|
//Do normal processing of data
|
|
data...
|
|
```
|
|
|
|
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
|
|
|
|
Since the [AlertNode](/kapacitor/v1.6/nodes/alert_node/) is the last piece it can be further modified as usual.
|
|
Example:
|
|
|
|
|
|
```js
|
|
var data = stream
|
|
|from()...
|
|
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
|
|
data
|
|
|deadman(100.0, 10s)
|
|
.slack()
|
|
.channel('#dead_tasks')
|
|
//Do normal processing of data
|
|
data...
|
|
```
|
|
|
|
You can specify additional lambda expressions to further constrain when the deadman's switch is triggered.
|
|
Example:
|
|
|
|
|
|
```js
|
|
var data = stream
|
|
|from()...
|
|
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
|
|
// Only trigger the alert if the time of day is between 8am-5pm.
|
|
data
|
|
|deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
|
|
//Do normal processing of data
|
|
data...
|
|
```
|
|
|
|
|
|
|
|
```js
|
|
stream|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)
|
|
```
|
|
|
|
Returns: [AlertNode](/kapacitor/v1.6/nodes/alert_node/)
|
|
|
|
### From
|
|
|
|
Creates a new [FromNode](/kapacitor/v1.6/nodes/from_node/) that can be further
|
|
filtered using the Database, RetentionPolicy, Measurement and Where properties.
|
|
From can be called multiple times to create multiple
|
|
independent forks of the data stream.
|
|
|
|
Example:
|
|
|
|
|
|
```js
|
|
// Select the 'cpu' measurement from just the database 'mydb'
|
|
// and retention policy 'myrp'.
|
|
var cpu = stream
|
|
|from()
|
|
.database('mydb')
|
|
.retentionPolicy('myrp')
|
|
.measurement('cpu')
|
|
// Select the 'load' measurement from any database and retention policy.
|
|
var load = stream
|
|
|from()
|
|
.measurement('load')
|
|
// Join cpu and load streams and do further processing.
|
|
cpu
|
|
|join(load)
|
|
.as('cpu', 'load')
|
|
...
|
|
```
|
|
|
|
|
|
|
|
```js
|
|
stream|from()
|
|
```
|
|
|
|
Returns: [FromNode](/kapacitor/v1.6/nodes/from_node/)
|
|
|
|
### Stats
|
|
|
|
Create a new stream of data that contains the internal statistics of the node.
|
|
The interval represents how often to emit the statistics based on real time.
|
|
This means the interval time is independent of the times of the data points the source node is receiving.
|
|
|
|
|
|
```js
|
|
stream|stats(interval time.Duration)
|
|
```
|
|
|
|
Returns: [StatsNode](/kapacitor/v1.6/nodes/stats_node/)
|