From b7437da9890e7af612c8971e114577d78d9a0c93 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Mon, 31 Oct 2016 18:11:05 -0500 Subject: [PATCH] Add kapacitor tickscript template generator for threshold, relative, deadman --- kapacitor/alerts.go | 134 +++++++++++++++++ kapacitor/alerts_test.go | 315 +++++++++++++++++++++++++++++++++++++++ kapacitor/templates.go | 105 +++++++++++++ server/kapacitors.go | 16 ++ server/mux.go | 6 + 5 files changed, 576 insertions(+) create mode 100644 kapacitor/alerts.go create mode 100644 kapacitor/alerts_test.go create mode 100644 kapacitor/templates.go diff --git a/kapacitor/alerts.go b/kapacitor/alerts.go new file mode 100644 index 000000000..16fc18e8d --- /dev/null +++ b/kapacitor/alerts.go @@ -0,0 +1,134 @@ +package tickscripts + +import ( + "bytes" + "fmt" + "log" + "text/template" + "time" + + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/ast" + "github.com/influxdata/kapacitor/tick/stateful" +) + +// Alert defines alerting strings in template rendering +type Alert struct { + Service string // Alerting service + Operator string // Operator for alert comparison + Aggregate string // Statistic aggregate over window of data +} + +// TickTemplate task to be used by kapacitor +type TickTemplate string + +// Threshold generates a tickscript template with an alert +func (a *Alert) Threshold() (TickTemplate, error) { + if err := ValidateAlert(a); err != nil { + return "", err + } + + tickscript, err := execTemplate(ThresholdTemplate, a) + if err != nil { + return "", err + } + + if err := validateTick(tickscript); err != nil { + return "", err + } + + return formatTick(tickscript) +} + +// Relative creates a tickscript that alerts on relative changes over windows of data +func (a *Alert) Relative() (TickTemplate, error) { + if err := ValidateAlert(a); err != nil { + return "", err + } + + tickscript, err := execTemplate(RelativeTemplate, a) + if err != nil { + return "", err + } + + if err := validateTick(tickscript); err != nil { + return "", err + } + + return formatTick(tickscript) +} + +// Deadman creates a tickscript that alerts when no data has been received for a time. +func (a *Alert) Deadman() (TickTemplate, error) { + if err := ValidateAlert(a); err != nil { + return "", err + } + + tickscript, err := execTemplate(DeadmanTemplate, a) + if err != nil { + return "", err + } + + if err := validateTick(tickscript); err != nil { + return "", err + } + + return formatTick(tickscript) +} + +// ValidateAlert checks if the alert is a valid kapacitor alert service. +func ValidateAlert(alert *Alert) error { + // Simple tick script to check alert service. + // If a pipeline cannot be created then we know this is an invalid + // service. At least with this version of kapacitor! + script := fmt.Sprintf("stream|from()|alert().%s()", alert.Service) + return validateTick(script) +} + +func formatTick(tickscript string) (TickTemplate, error) { + node, err := ast.Parse(tickscript) + if err != nil { + log.Fatalf("parse execution: %s", err) + return "", err + } + + output := new(bytes.Buffer) + node.Format(output, "", true) + return TickTemplate(output.String()), nil +} + +func validateTick(script string) error { + scope := stateful.NewScope() + _, err := pipeline.CreateTemplatePipeline(script, pipeline.StreamEdge, scope, &deadman{}) + return err +} + +func execTemplate(tick string, alert *Alert) (string, error) { + p := template.New("template") + t, err := p.Parse(tick) + if err != nil { + log.Fatalf("template parse: %s", err) + return "", err + } + buf := new(bytes.Buffer) + err = t.Execute(buf, &alert) + if err != nil { + log.Fatalf("template execution: %s", err) + return "", err + } + return buf.String(), nil +} + +type deadman struct { + interval time.Duration + threshold float64 + id string + message string + global bool +} + +func (d deadman) Interval() time.Duration { return d.interval } +func (d deadman) Threshold() float64 { return d.threshold } +func (d deadman) Id() string { return d.id } +func (d deadman) Message() string { return d.message } +func (d deadman) Global() bool { return d.global } diff --git a/kapacitor/alerts_test.go b/kapacitor/alerts_test.go new file mode 100644 index 000000000..32ca6fffe --- /dev/null +++ b/kapacitor/alerts_test.go @@ -0,0 +1,315 @@ +package tickscripts + +import "testing" + +func TestValidateAlert(t *testing.T) { + tests := []struct { + name string + alert Alert + wantErr bool + }{ + { + name: "Test valid template alert", + alert: Alert{ + Service: "slack", + }, + wantErr: false, + }, + { + name: "Test invalid template alert", + alert: Alert{ + Service: "invalid", + }, + wantErr: true, + }, + } + for _, tt := range tests { + if err := ValidateAlert(&tt.alert); (err != nil) != tt.wantErr { + t.Errorf("%q. ValidateAlert() error = %v, wantErr %v", tt.name, err, tt.wantErr) + } + } +} + +func Test_validateTick(t *testing.T) { + type args struct { + script string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "Valid Script", + args: args{ + script: "stream|from()", + }, + wantErr: false, + }, + { + name: "Invalid Script", + args: args{ + script: "stream|nothing", + }, + wantErr: true, + }, + } + for _, tt := range tests { + if err := validateTick(tt.args.script); (err != nil) != tt.wantErr { + t.Errorf("%q. validateTick() error = %v, wantErr %v", tt.name, err, tt.wantErr) + } + } +} + +func TestThreshold(t *testing.T) { + tests := []struct { + name string + alert Alert + want TickTemplate + wantErr bool + }{ + { + name: "Test valid template alert", + alert: Alert{ + Service: "slack", + Operator: ">", + }, + want: `var database = 'telegraf' + +var rp = 'autogen' + +var measurement string + +var metric string + +var groupby = ['host'] + +var crit int + +var period duration + +var every duration + +var message string + +var id string + +stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + |window() + .period(period) + .every(every) + |mean(metric) + .as('stat') + |alert() + .id(id) + .message(message) + .crit(lambda: "stat" > crit) + .slack() +`, + wantErr: false, + }, + { + name: "Test valid template alert", + alert: Alert{ + Service: "invalid", + Operator: ">", + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + got, err := tt.alert.Threshold() + if (err != nil) != tt.wantErr { + t.Errorf("%q. Threshold() error = %v, wantErr %v", tt.name, err, tt.wantErr) + continue + } + if got != tt.want { + t.Errorf("%q. Threshold() = %v, want %v", tt.name, got, tt.want) + } + } +} + +func TestRelative(t *testing.T) { + tests := []struct { + name string + alert Alert + want TickTemplate + wantErr bool + }{ + { + name: "Test valid template alert", + alert: Alert{ + Service: "slack", + Operator: ">", + Aggregate: "mean", + }, + want: `var database = 'telegraf' + +var rp = 'autogen' + +var measurement string + +var metric string + +var groupby = ['host'] + +var crit int + +var period duration + +var every duration + +var shift duration + +var message string + +var id string + +var data = stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + +var past = data + |window() + .period(period) + .every(every) + .align() + |mean(metric) + .as('stat') + |shift(shift) + +var current = data + |window() + .period(period) + .every(every) + .align() + |mean(metric) + .as('stat') + +past + |join(current) + .as('past', 'current') + |eval(lambda: abs(float("current.stat" - "past.stat")) / float("past.stat")) + .keep() + .as('perc') + |alert() + .id(id) + .message(message) + .crit(lambda: "perc" > crit) + .slack() +`, + wantErr: false, + }, + { + name: "Test invalid service template", + alert: Alert{ + Service: "invalid", + Operator: ">", + Aggregate: "mean", + }, + want: "", + wantErr: true, + }, + { + name: "Test invalid aggregate template", + alert: Alert{ + Service: "slack", + Operator: ">", + Aggregate: "invalid", + }, + want: "", + wantErr: true, + }, + { + name: "Test invalid operator template", + alert: Alert{ + Service: "slack", + Operator: "invalid", + Aggregate: "mean", + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + got, err := tt.alert.Relative() + if (err != nil) != tt.wantErr { + t.Errorf("%q. Relative() error = %v, wantErr %v", tt.name, err, tt.wantErr) + continue + } + if got != tt.want { + t.Errorf("%q. Relative() = %v, want %v", tt.name, got, tt.want) + } + } +} + +func TestDeadman(t *testing.T) { + tests := []struct { + name string + alert Alert + want TickTemplate + wantErr bool + }{ + { + name: "Test valid template alert", + alert: Alert{ + Service: "slack", + }, + want: `var database = 'telegraf' + +var rp = 'autogen' + +var measurement string + +var groupby = ['host'] + +var threshold float + +var period duration + +var id string + +var message string + +stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + |deadman(threshold, period) + .id(id) + .message(message) + .slack() +`, + wantErr: false, + }, + { + name: "Test valid template alert", + alert: Alert{ + Service: "invalid", + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + got, err := tt.alert.Deadman() + if (err != nil) != tt.wantErr { + t.Errorf("%q. Deadman() error = %v, wantErr %v", tt.name, err, tt.wantErr) + continue + } + if got != tt.want { + t.Errorf("%q. Deadman() = %v, want %v", tt.name, got, tt.want) + } + } +} diff --git a/kapacitor/templates.go b/kapacitor/templates.go new file mode 100644 index 000000000..1891b1758 --- /dev/null +++ b/kapacitor/templates.go @@ -0,0 +1,105 @@ +package tickscripts + +// TODO: I don't think mean is correct here. It's probably any value. +// TODO: seems like we should only have statechanges + +// ThresholdTemplate is a tickscript template template for threshold alerts +var ThresholdTemplate = `var database = 'telegraf' +var rp = 'autogen' +var measurement string +var metric string +var groupby = ['host'] +var crit int +var period duration +var every duration +var message string +var id string + +stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + |window() + .period(period) + .every(every) + |mean(metric) + .as('stat') + |alert() + .id(id) + .message(message) + .crit(lambda: "stat" {{ .Operator }} crit) + .{{ .Service }}()` + +// RelativeTemplate compares one window of data versus another. +var RelativeTemplate = `var database = 'telegraf' +var rp = 'autogen' +var measurement string +var metric string +var groupby = ['host'] +var crit int +var period duration +var every duration +var shift duration +var message string +var id string + +var data = stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + +var past = data + |window() + .period(period) + .every(every) + .align() + |{{ .Aggregate }}(metric) + .as('stat') + |shift(shift) + +var current = data + |window() + .period(period) + .every(every) + .align() + |{{ .Aggregate }}(metric) + .as('stat') + +past + |join(current) + .as('past', 'current') + |eval(lambda: abs(float("current.stat" - "past.stat"))/float("past.stat")) + .keep() + .as('perc') + |alert() + .id(id) + .message(message) + .crit(lambda: "perc" {{ .Operator }} crit) + .{{ .Service }}()` + +// DeadmanTemplate checks if any data has been streamed in the last period of time +var DeadmanTemplate = `var database = 'telegraf' +var rp = 'autogen' +var measurement string +var groupby = ['host'] +var threshold float +var period duration + +var id string +var message string + +stream + |from() + .database(database) + .retentionPolicy(rp) + .measurement(measurement) + .groupBy(groupby) + |deadman(threshold, period) + .id(id) + .message(message) + .{{ .Service }}() +` diff --git a/server/kapacitors.go b/server/kapacitors.go index edb175c37..3de3784b1 100644 --- a/server/kapacitors.go +++ b/server/kapacitors.go @@ -258,3 +258,19 @@ func (h *Service) UpdateKapacitor(w http.ResponseWriter, r *http.Request) { res := newKapacitor(srv) encodeJSON(w, http.StatusOK, res, h.Logger) } + +// KapacitorTasksPost proxies POST to kapacitor +func (h *Service) KapacitorTasksPost(w http.ResponseWriter, r *http.Request) { +} + +// KapacitorTasksPatch proxies PATCH to kapacitor +func (h *Service) KapacitorTasksPatch(w http.ResponseWriter, r *http.Request) { +} + +// KapacitorTasksGet proxies GET to kapacitor +func (h *Service) KapacitorTasksGet(w http.ResponseWriter, r *http.Request) { +} + +// KapacitorTasksDelete proxies DELETE to kapacitor +func (h *Service) KapacitorTasksDelete(w http.ResponseWriter, r *http.Request) { +} diff --git a/server/mux.go b/server/mux.go index 4c9c0a62a..6966c6562 100644 --- a/server/mux.go +++ b/server/mux.go @@ -68,6 +68,12 @@ func NewMux(opts MuxOpts, service Service) http.Handler { router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid", service.UpdateKapacitor) router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid", service.RemoveKapacitor) + // Kapacitor Tasks + router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksGet) + router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksPost) + router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksPatch) + router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksDelete) + // Kapacitor Proxy router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyGet) router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyPost)