342 lines
7.4 KiB
Go
342 lines
7.4 KiB
Go
package kapacitor
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/sergi/go-diff/diffmatchpatch"
|
|
)
|
|
|
|
func TestPipelineJSON(t *testing.T) {
|
|
script := `var db = 'telegraf'
|
|
|
|
var rp = 'autogen'
|
|
|
|
var measurement = 'cpu'
|
|
|
|
var groupBy = ['host', 'cluster_id']
|
|
|
|
var whereFilter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
|
|
|
var period = 10m
|
|
|
|
var every = 30s
|
|
|
|
var name = 'name'
|
|
|
|
var idVar = name + ':{{.Group}}'
|
|
|
|
var message = 'message'
|
|
|
|
var idTag = 'alertID'
|
|
|
|
var levelTag = 'level'
|
|
|
|
var messageField = 'message'
|
|
|
|
var durationField = 'duration'
|
|
|
|
var outputDB = 'chronograf'
|
|
|
|
var outputRP = 'autogen'
|
|
|
|
var outputMeasurement = 'alerts'
|
|
|
|
var triggerType = 'threshold'
|
|
|
|
var crit = 90
|
|
|
|
var data = stream
|
|
|from()
|
|
.database(db)
|
|
.retentionPolicy(rp)
|
|
.measurement(measurement)
|
|
.groupBy(groupBy)
|
|
.where(whereFilter)
|
|
|window()
|
|
.period(period)
|
|
.every(every)
|
|
.align()
|
|
|mean('usage_user')
|
|
.as('value')
|
|
|
|
var trigger = data
|
|
|alert()
|
|
.crit(lambda: "value" > crit)
|
|
.stateChangesOnly()
|
|
.message(message)
|
|
.id(idVar)
|
|
.idTag(idTag)
|
|
.levelTag(levelTag)
|
|
.messageField(messageField)
|
|
.durationField(durationField)
|
|
.slack()
|
|
.victorOps()
|
|
.email()
|
|
|
|
trigger
|
|
|influxDBOut()
|
|
.create()
|
|
.database(outputDB)
|
|
.retentionPolicy(outputRP)
|
|
.measurement(outputMeasurement)
|
|
.tag('alertName', name)
|
|
.tag('triggerType', triggerType)
|
|
|
|
trigger
|
|
|httpOut('output')
|
|
`
|
|
|
|
want := `var alert4 = stream
|
|
|from()
|
|
.database('telegraf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('cpu')
|
|
.where(lambda: "cpu" == 'cpu_total' AND "host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
|
.groupBy('host', 'cluster_id')
|
|
|window()
|
|
.period(10m)
|
|
.every(30s)
|
|
.align()
|
|
|mean('usage_user')
|
|
.as('value')
|
|
|alert()
|
|
.id('name:{{.Group}}')
|
|
.message('message')
|
|
.details('{{ json . }}')
|
|
.crit(lambda: "value" > 90)
|
|
.history(21)
|
|
.levelTag('level')
|
|
.messageField('message')
|
|
.durationField('duration')
|
|
.idTag('alertID')
|
|
.stateChangesOnly()
|
|
.email()
|
|
.victorOps()
|
|
.slack()
|
|
|
|
alert4
|
|
|httpOut('output')
|
|
|
|
alert4
|
|
|influxDBOut()
|
|
.database('chronograf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('alerts')
|
|
.buffer(1000)
|
|
.flushInterval(10s)
|
|
.create()
|
|
.tag('alertName', 'name')
|
|
.tag('triggerType', 'threshold')
|
|
`
|
|
|
|
octets, err := MarshalTICK(script)
|
|
if err != nil {
|
|
t.Fatalf("MarshalTICK unexpected error %v", err)
|
|
}
|
|
|
|
got, err := UnmarshalTICK(octets)
|
|
if err != nil {
|
|
t.Fatalf("UnmarshalTICK unexpected error %v", err)
|
|
}
|
|
|
|
if got != want {
|
|
fmt.Println(got)
|
|
diff := diffmatchpatch.New()
|
|
delta := diff.DiffMain(want, got, true)
|
|
t.Errorf("%s", diff.DiffPrettyText(delta))
|
|
}
|
|
}
|
|
func TestPipelineJSONDeadman(t *testing.T) {
|
|
script := `var db = 'telegraf'
|
|
|
|
var rp = 'autogen'
|
|
|
|
var measurement = 'cpu'
|
|
|
|
var groupBy = ['host', 'cluster_id']
|
|
|
|
var whereFilter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
|
|
|
var period = 10m
|
|
|
|
var name = 'name'
|
|
|
|
var idVar = name + ':{{.Group}}'
|
|
|
|
var message = 'message'
|
|
|
|
var idTag = 'alertID'
|
|
|
|
var levelTag = 'level'
|
|
|
|
var messageField = 'message'
|
|
|
|
var durationField = 'duration'
|
|
|
|
var outputDB = 'chronograf'
|
|
|
|
var outputRP = 'autogen'
|
|
|
|
var outputMeasurement = 'alerts'
|
|
|
|
var triggerType = 'deadman'
|
|
|
|
var threshold = 0.0
|
|
|
|
var data = stream
|
|
|from()
|
|
.database(db)
|
|
.retentionPolicy(rp)
|
|
.measurement(measurement)
|
|
.groupBy(groupBy)
|
|
.where(whereFilter)
|
|
|
|
var trigger = data
|
|
|deadman(threshold, period)
|
|
.stateChangesOnly()
|
|
.message(message)
|
|
.id(idVar)
|
|
.idTag(idTag)
|
|
.levelTag(levelTag)
|
|
.messageField(messageField)
|
|
.durationField(durationField)
|
|
.slack()
|
|
.victorOps()
|
|
.email()
|
|
|
|
trigger
|
|
|eval(lambda: "emitted")
|
|
.as('value')
|
|
.keep('value', messageField, durationField)
|
|
|influxDBOut()
|
|
.create()
|
|
.database(outputDB)
|
|
.retentionPolicy(outputRP)
|
|
.measurement(outputMeasurement)
|
|
.tag('alertName', name)
|
|
.tag('triggerType', triggerType)
|
|
|
|
trigger
|
|
|httpOut('output')
|
|
`
|
|
|
|
wantA := `var from1 = stream
|
|
|from()
|
|
.database('telegraf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('cpu')
|
|
.where(lambda: "cpu" == 'cpu_total' AND "host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
|
.groupBy('host', 'cluster_id')
|
|
|
|
var alert5 = from1
|
|
|stats(10m)
|
|
.align()
|
|
|derivative('emitted')
|
|
.as('emitted')
|
|
.unit(10m)
|
|
.nonNegative()
|
|
|alert()
|
|
.id('name:{{.Group}}')
|
|
.message('message')
|
|
.details('{{ json . }}')
|
|
.crit(lambda: "emitted" <= 0.0)
|
|
.history(21)
|
|
.levelTag('level')
|
|
.messageField('message')
|
|
.durationField('duration')
|
|
.idTag('alertID')
|
|
.stateChangesOnly()
|
|
.email()
|
|
.victorOps()
|
|
.slack()
|
|
|
|
alert5
|
|
|httpOut('output')
|
|
|
|
alert5
|
|
|eval(lambda: "emitted")
|
|
.as('value')
|
|
.tags()
|
|
.keep('value', 'message', 'duration')
|
|
|influxDBOut()
|
|
.database('chronograf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('alerts')
|
|
.buffer(1000)
|
|
.flushInterval(10s)
|
|
.create()
|
|
.tag('alertName', 'name')
|
|
.tag('triggerType', 'deadman')
|
|
`
|
|
|
|
wantB := `var from1 = stream
|
|
|from()
|
|
.database('telegraf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('cpu')
|
|
.where(lambda: "cpu" == 'cpu_total' AND "host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
|
|
.groupBy('host', 'cluster_id')
|
|
|
|
var alert5 = from1
|
|
|stats(10m)
|
|
.align()
|
|
|derivative('emitted')
|
|
.as('emitted')
|
|
.unit(10m)
|
|
.nonNegative()
|
|
|alert()
|
|
.id('name:{{.Group}}')
|
|
.message('message')
|
|
.details('{{ json . }}')
|
|
.crit(lambda: "emitted" <= 0.0)
|
|
.history(21)
|
|
.levelTag('level')
|
|
.messageField('message')
|
|
.durationField('duration')
|
|
.idTag('alertID')
|
|
.stateChangesOnly()
|
|
.email()
|
|
.victorOps()
|
|
.slack()
|
|
|
|
alert5
|
|
|eval(lambda: "emitted")
|
|
.as('value')
|
|
.tags()
|
|
.keep('value', 'message', 'duration')
|
|
|influxDBOut()
|
|
.database('chronograf')
|
|
.retentionPolicy('autogen')
|
|
.measurement('alerts')
|
|
.buffer(1000)
|
|
.flushInterval(10s)
|
|
.create()
|
|
.tag('alertName', 'name')
|
|
.tag('triggerType', 'deadman')
|
|
|
|
alert5
|
|
|httpOut('output')
|
|
`
|
|
|
|
octets, err := MarshalTICK(script)
|
|
if err != nil {
|
|
t.Fatalf("MarshalTICK unexpected error %v", err)
|
|
}
|
|
got, err := UnmarshalTICK(octets)
|
|
if err != nil {
|
|
t.Fatalf("UnmarshalTICK unexpected error %v", err)
|
|
}
|
|
|
|
if got != wantA && got != wantB {
|
|
want := wantA
|
|
fmt.Println("got")
|
|
fmt.Println(got)
|
|
fmt.Println("want")
|
|
fmt.Println(want)
|
|
diff := diffmatchpatch.New()
|
|
delta := diff.DiffMain(want, got, true)
|
|
t.Errorf("%s", diff.DiffPrettyText(delta))
|
|
}
|
|
}
|