Add kapacitor tickscript template generator for threshold, relative, deadman
parent
1b7292e3db
commit
b7437da989
|
@ -0,0 +1,134 @@
|
|||
package tickscripts
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/kapacitor/pipeline"
|
||||
"github.com/influxdata/kapacitor/tick/ast"
|
||||
"github.com/influxdata/kapacitor/tick/stateful"
|
||||
)
|
||||
|
||||
// Alert defines alerting strings in template rendering
|
||||
type Alert struct {
|
||||
Service string // Alerting service
|
||||
Operator string // Operator for alert comparison
|
||||
Aggregate string // Statistic aggregate over window of data
|
||||
}
|
||||
|
||||
// TickTemplate task to be used by kapacitor
|
||||
type TickTemplate string
|
||||
|
||||
// Threshold generates a tickscript template with an alert
|
||||
func (a *Alert) Threshold() (TickTemplate, error) {
|
||||
if err := ValidateAlert(a); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tickscript, err := execTemplate(ThresholdTemplate, a)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := validateTick(tickscript); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return formatTick(tickscript)
|
||||
}
|
||||
|
||||
// Relative creates a tickscript that alerts on relative changes over windows of data
|
||||
func (a *Alert) Relative() (TickTemplate, error) {
|
||||
if err := ValidateAlert(a); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tickscript, err := execTemplate(RelativeTemplate, a)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := validateTick(tickscript); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return formatTick(tickscript)
|
||||
}
|
||||
|
||||
// Deadman creates a tickscript that alerts when no data has been received for a time.
|
||||
func (a *Alert) Deadman() (TickTemplate, error) {
|
||||
if err := ValidateAlert(a); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tickscript, err := execTemplate(DeadmanTemplate, a)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := validateTick(tickscript); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return formatTick(tickscript)
|
||||
}
|
||||
|
||||
// ValidateAlert checks if the alert is a valid kapacitor alert service.
|
||||
func ValidateAlert(alert *Alert) error {
|
||||
// Simple tick script to check alert service.
|
||||
// If a pipeline cannot be created then we know this is an invalid
|
||||
// service. At least with this version of kapacitor!
|
||||
script := fmt.Sprintf("stream|from()|alert().%s()", alert.Service)
|
||||
return validateTick(script)
|
||||
}
|
||||
|
||||
func formatTick(tickscript string) (TickTemplate, error) {
|
||||
node, err := ast.Parse(tickscript)
|
||||
if err != nil {
|
||||
log.Fatalf("parse execution: %s", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
output := new(bytes.Buffer)
|
||||
node.Format(output, "", true)
|
||||
return TickTemplate(output.String()), nil
|
||||
}
|
||||
|
||||
func validateTick(script string) error {
|
||||
scope := stateful.NewScope()
|
||||
_, err := pipeline.CreateTemplatePipeline(script, pipeline.StreamEdge, scope, &deadman{})
|
||||
return err
|
||||
}
|
||||
|
||||
func execTemplate(tick string, alert *Alert) (string, error) {
|
||||
p := template.New("template")
|
||||
t, err := p.Parse(tick)
|
||||
if err != nil {
|
||||
log.Fatalf("template parse: %s", err)
|
||||
return "", err
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
err = t.Execute(buf, &alert)
|
||||
if err != nil {
|
||||
log.Fatalf("template execution: %s", err)
|
||||
return "", err
|
||||
}
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
type deadman struct {
|
||||
interval time.Duration
|
||||
threshold float64
|
||||
id string
|
||||
message string
|
||||
global bool
|
||||
}
|
||||
|
||||
func (d deadman) Interval() time.Duration { return d.interval }
|
||||
func (d deadman) Threshold() float64 { return d.threshold }
|
||||
func (d deadman) Id() string { return d.id }
|
||||
func (d deadman) Message() string { return d.message }
|
||||
func (d deadman) Global() bool { return d.global }
|
|
@ -0,0 +1,315 @@
|
|||
package tickscripts
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestValidateAlert(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
alert Alert
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Test invalid template alert",
|
||||
alert: Alert{
|
||||
Service: "invalid",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
if err := ValidateAlert(&tt.alert); (err != nil) != tt.wantErr {
|
||||
t.Errorf("%q. ValidateAlert() error = %v, wantErr %v", tt.name, err, tt.wantErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_validateTick(t *testing.T) {
|
||||
type args struct {
|
||||
script string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Valid Script",
|
||||
args: args{
|
||||
script: "stream|from()",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Invalid Script",
|
||||
args: args{
|
||||
script: "stream|nothing",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
if err := validateTick(tt.args.script); (err != nil) != tt.wantErr {
|
||||
t.Errorf("%q. validateTick() error = %v, wantErr %v", tt.name, err, tt.wantErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestThreshold(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
alert Alert
|
||||
want TickTemplate
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
Operator: ">",
|
||||
},
|
||||
want: `var database = 'telegraf'
|
||||
|
||||
var rp = 'autogen'
|
||||
|
||||
var measurement string
|
||||
|
||||
var metric string
|
||||
|
||||
var groupby = ['host']
|
||||
|
||||
var crit int
|
||||
|
||||
var period duration
|
||||
|
||||
var every duration
|
||||
|
||||
var message string
|
||||
|
||||
var id string
|
||||
|
||||
stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|alert()
|
||||
.id(id)
|
||||
.message(message)
|
||||
.crit(lambda: "stat" > crit)
|
||||
.slack()
|
||||
`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "invalid",
|
||||
Operator: ">",
|
||||
},
|
||||
want: "",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got, err := tt.alert.Threshold()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("%q. Threshold() error = %v, wantErr %v", tt.name, err, tt.wantErr)
|
||||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("%q. Threshold() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelative(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
alert Alert
|
||||
want TickTemplate
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
},
|
||||
want: `var database = 'telegraf'
|
||||
|
||||
var rp = 'autogen'
|
||||
|
||||
var measurement string
|
||||
|
||||
var metric string
|
||||
|
||||
var groupby = ['host']
|
||||
|
||||
var crit int
|
||||
|
||||
var period duration
|
||||
|
||||
var every duration
|
||||
|
||||
var shift duration
|
||||
|
||||
var message string
|
||||
|
||||
var id string
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|
||||
var past = data
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|
||||
past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.stat" - "past.stat")) / float("past.stat"))
|
||||
.keep()
|
||||
.as('perc')
|
||||
|alert()
|
||||
.id(id)
|
||||
.message(message)
|
||||
.crit(lambda: "perc" > crit)
|
||||
.slack()
|
||||
`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Test invalid service template",
|
||||
alert: Alert{
|
||||
Service: "invalid",
|
||||
Operator: ">",
|
||||
Aggregate: "mean",
|
||||
},
|
||||
want: "",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Test invalid aggregate template",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
Operator: ">",
|
||||
Aggregate: "invalid",
|
||||
},
|
||||
want: "",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Test invalid operator template",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
Operator: "invalid",
|
||||
Aggregate: "mean",
|
||||
},
|
||||
want: "",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got, err := tt.alert.Relative()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("%q. Relative() error = %v, wantErr %v", tt.name, err, tt.wantErr)
|
||||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("%q. Relative() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeadman(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
alert Alert
|
||||
want TickTemplate
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "slack",
|
||||
},
|
||||
want: `var database = 'telegraf'
|
||||
|
||||
var rp = 'autogen'
|
||||
|
||||
var measurement string
|
||||
|
||||
var groupby = ['host']
|
||||
|
||||
var threshold float
|
||||
|
||||
var period duration
|
||||
|
||||
var id string
|
||||
|
||||
var message string
|
||||
|
||||
stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|deadman(threshold, period)
|
||||
.id(id)
|
||||
.message(message)
|
||||
.slack()
|
||||
`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Test valid template alert",
|
||||
alert: Alert{
|
||||
Service: "invalid",
|
||||
},
|
||||
want: "",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got, err := tt.alert.Deadman()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("%q. Deadman() error = %v, wantErr %v", tt.name, err, tt.wantErr)
|
||||
continue
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("%q. Deadman() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
package tickscripts
|
||||
|
||||
// TODO: I don't think mean is correct here. It's probably any value.
|
||||
// TODO: seems like we should only have statechanges
|
||||
|
||||
// ThresholdTemplate is a tickscript template template for threshold alerts
|
||||
var ThresholdTemplate = `var database = 'telegraf'
|
||||
var rp = 'autogen'
|
||||
var measurement string
|
||||
var metric string
|
||||
var groupby = ['host']
|
||||
var crit int
|
||||
var period duration
|
||||
var every duration
|
||||
var message string
|
||||
var id string
|
||||
|
||||
stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
|mean(metric)
|
||||
.as('stat')
|
||||
|alert()
|
||||
.id(id)
|
||||
.message(message)
|
||||
.crit(lambda: "stat" {{ .Operator }} crit)
|
||||
.{{ .Service }}()`
|
||||
|
||||
// RelativeTemplate compares one window of data versus another.
|
||||
var RelativeTemplate = `var database = 'telegraf'
|
||||
var rp = 'autogen'
|
||||
var measurement string
|
||||
var metric string
|
||||
var groupby = ['host']
|
||||
var crit int
|
||||
var period duration
|
||||
var every duration
|
||||
var shift duration
|
||||
var message string
|
||||
var id string
|
||||
|
||||
var data = stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|
||||
var past = data
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|{{ .Aggregate }}(metric)
|
||||
.as('stat')
|
||||
|shift(shift)
|
||||
|
||||
var current = data
|
||||
|window()
|
||||
.period(period)
|
||||
.every(every)
|
||||
.align()
|
||||
|{{ .Aggregate }}(metric)
|
||||
.as('stat')
|
||||
|
||||
past
|
||||
|join(current)
|
||||
.as('past', 'current')
|
||||
|eval(lambda: abs(float("current.stat" - "past.stat"))/float("past.stat"))
|
||||
.keep()
|
||||
.as('perc')
|
||||
|alert()
|
||||
.id(id)
|
||||
.message(message)
|
||||
.crit(lambda: "perc" {{ .Operator }} crit)
|
||||
.{{ .Service }}()`
|
||||
|
||||
// DeadmanTemplate checks if any data has been streamed in the last period of time
|
||||
var DeadmanTemplate = `var database = 'telegraf'
|
||||
var rp = 'autogen'
|
||||
var measurement string
|
||||
var groupby = ['host']
|
||||
var threshold float
|
||||
var period duration
|
||||
|
||||
var id string
|
||||
var message string
|
||||
|
||||
stream
|
||||
|from()
|
||||
.database(database)
|
||||
.retentionPolicy(rp)
|
||||
.measurement(measurement)
|
||||
.groupBy(groupby)
|
||||
|deadman(threshold, period)
|
||||
.id(id)
|
||||
.message(message)
|
||||
.{{ .Service }}()
|
||||
`
|
|
@ -258,3 +258,19 @@ func (h *Service) UpdateKapacitor(w http.ResponseWriter, r *http.Request) {
|
|||
res := newKapacitor(srv)
|
||||
encodeJSON(w, http.StatusOK, res, h.Logger)
|
||||
}
|
||||
|
||||
// KapacitorTasksPost proxies POST to kapacitor
|
||||
func (h *Service) KapacitorTasksPost(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// KapacitorTasksPatch proxies PATCH to kapacitor
|
||||
func (h *Service) KapacitorTasksPatch(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// KapacitorTasksGet proxies GET to kapacitor
|
||||
func (h *Service) KapacitorTasksGet(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// KapacitorTasksDelete proxies DELETE to kapacitor
|
||||
func (h *Service) KapacitorTasksDelete(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
|
|
@ -68,6 +68,12 @@ func NewMux(opts MuxOpts, service Service) http.Handler {
|
|||
router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid", service.UpdateKapacitor)
|
||||
router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid", service.RemoveKapacitor)
|
||||
|
||||
// Kapacitor Tasks
|
||||
router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksGet)
|
||||
router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksPost)
|
||||
router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksPatch)
|
||||
router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksDelete)
|
||||
|
||||
// Kapacitor Proxy
|
||||
router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyGet)
|
||||
router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyPost)
|
||||
|
|
Loading…
Reference in New Issue