Update kapacitor tickscript generation to use alert handlers

pull/10616/head
Chris Goller 2017-11-30 11:40:36 -06:00
parent 86d5ba5013
commit 731d8ddf3e
7 changed files with 199 additions and 139 deletions

View File

@ -1,5 +1,7 @@
package chronograf
import "encoding/json"
// AlertHandlers defines all possible kapacitor interactions with an alert.
type AlertHandlers struct {
IsStateChangesOnly bool `json:"stateChangesOnly"` // IsStateChangesOnly will only send alerts on state changes.
@ -11,27 +13,27 @@ type AlertHandlers struct {
Email []*Email `json:"email"` // Email will send alert data to the specified emails.
Exec []*Exec `json:"exec"` // Exec will run shell commandss when an alert triggers
Log []*Log `json:"log"` // Log will log JSON alert data to files in JSON lines format.
VictorOps []*VictorOps `json:"victorOps"` // VictorOps will send alert to all VictorOps .
PagerDuty []*PagerDuty `json:"pagerDuty"` // PagerDuty will send alert to all PagerDuty .
Pushover []*Pushover `json:"pushover"` // Pushover will send alert to all Pushover .
Sensu []*Sensu `json:"sensu"` // Sensu will send alert to all Sensu .
Slack []*Slack `json:"slack"` // Slack will send alert to Slack .
Telegram []*Telegram `json:"telegram"` // Telegram will send alert to all Telegram .
HipChat []*HipChat `json:"hipChat"` // HipChat will send alert to all HipChat .
Alerta []*Alerta `json:"alerta"` // Alerta will send alert to all Alerta .
OpsGenie []*OpsGenie `json:"opsGenie"` // OpsGenie will send alert to all OpsGenie .
Talk []*Talk `json:"talk"` // Talk will send alert to all Talk .
VictorOps []*VictorOps `json:"victorOps"` // VictorOps will send alert to all VictorOps
PagerDuty []*PagerDuty `json:"pagerDuty"` // PagerDuty will send alert to all PagerDuty
Pushover []*Pushover `json:"pushover"` // Pushover will send alert to all Pushover
Sensu []*Sensu `json:"sensu"` // Sensu will send alert to all Sensu
Slack []*Slack `json:"slack"` // Slack will send alert to Slack
Telegram []*Telegram `json:"telegram"` // Telegram will send alert to all Telegram
HipChat []*HipChat `json:"hipChat"` // HipChat will send alert to all HipChat
Alerta []*Alerta `json:"alerta"` // Alerta will send alert to all Alerta
OpsGenie []*OpsGenie `json:"opsGenie"` // OpsGenie will send alert to all OpsGenie
Talk []*Talk `json:"talk"` // Talk will send alert to all Talk
}
// Post will POST alerts to a destination URL
type Post struct {
URL string `json:"url"` // URL is the destination of the POST.
Headers map[string]string `json:"headers"`
URL string `json:"url"` // URL is the destination of the POST.
Headers map[string]string `json:"headers"` // Headers are added to the output POST
}
// Log sends the output of the alert to a file
type Log struct {
FilePath string `json:"filePath"` // Absolute path the the log file. // It will be created if it does not exist.
FilePath string `json:"filePath"` // Absolute path the the log file; it will be created if it does not exist.
}
// Alerta sends the output of the alert to an alerta service
@ -58,7 +60,7 @@ type TCP struct {
// Email sends the alert to a list of email addresses
type Email struct {
ToList []string `json:"to"` // ToList is the list of email recipients.
To []string `json:"to"` // ToList is the list of email recipients.
}
// VictorOps sends alerts to the victorops.com service
@ -132,3 +134,17 @@ type OpsGenie struct {
// Talk sends alerts to Jane Talk (https://jianliao.com/site)
type Talk struct{}
// MarshalJSON converts AlertHandlers to JSON
func (n *AlertHandlers) MarshalJSON() ([]byte, error) {
type Alias AlertHandlers
var raw = &struct {
Type string `json:"typeOf"`
*Alias
}{
Type: "alert",
Alias: (*Alias)(n),
}
return json.Marshal(raw)
}

View File

@ -1,12 +1,17 @@
package kapacitor
import (
"fmt"
"bytes"
"encoding/json"
"regexp"
"strings"
"github.com/influxdata/chronograf"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/pipeline/tick"
)
/*
func kapaHandler(handler string) (string, error) {
switch handler {
case "hipchat":
@ -80,11 +85,11 @@ func addAlertNodes(rule chronograf.AlertRule) (string, error) {
}
}
return alert, nil
}
}*/
// AlertServices generates alert chaining methods to be attached to an alert from all rule Services
func AlertServices(rule chronograf.AlertRule) (string, error) {
node, err := addAlertNodes(rule)
node, err := addAlertNodes(rule.AlertHandlers)
if err != nil {
return "", err
}
@ -94,3 +99,45 @@ func AlertServices(rule chronograf.AlertRule) (string, error) {
}
return node, nil
}
func addAlertNodes(handlers chronograf.AlertHandlers) (string, error) {
octets, err := json.Marshal(&handlers)
if err != nil {
return "", err
}
stream := &pipeline.StreamNode{}
pipe := pipeline.CreatePipelineSources(stream)
from := stream.From()
node := from.Alert()
if err = json.Unmarshal(octets, node); err != nil {
return "", err
}
aster := tick.AST{}
err = aster.Build(pipe)
if err != nil {
return "", err
}
var buf bytes.Buffer
aster.Program.Format(&buf, "", false)
rawTick := buf.String()
return toOldSchema(rawTick), nil
}
var (
removeID = regexp.MustCompile(`(?m)\s*\.id\(.*\)$`) // Remove to use ID variable
removeMessage = regexp.MustCompile(`(?m)\s*\.message\(.*\)$`) // Remove to use message variable
removeDetails = regexp.MustCompile(`(?m)\s*\.details\(.*\)$`) // Remove to use details variable
removeHistory = regexp.MustCompile(`(?m)\s*\.history\(21\)$`) // Remove default history
)
func toOldSchema(rawTick string) string {
rawTick = strings.Replace(rawTick, "stream\n |from()\n |alert()", "", -1)
rawTick = removeID.ReplaceAllString(rawTick, "")
rawTick = removeMessage.ReplaceAllString(rawTick, "")
rawTick = removeDetails.ReplaceAllString(rawTick, "")
rawTick = removeHistory.ReplaceAllString(rawTick, "")
return rawTick
}

View File

@ -1,6 +1,7 @@
package kapacitor
import (
"log"
"testing"
"github.com/influxdata/chronograf"
@ -16,34 +17,24 @@ func TestAlertServices(t *testing.T) {
{
name: "Test several valid services",
rule: chronograf.AlertRule{
Alerts: []string{"slack", "victorops", "email"},
AlertHandlers: chronograf.AlertHandlers{
Slack: []*chronograf.Slack{{}},
VictorOps: []*chronograf.VictorOps{{}},
Email: []*chronograf.Email{{}},
},
},
want: `alert()
.slack()
.victorOps()
.email()
.victorOps()
.slack()
`,
},
{
name: "Test single invalid services amongst several valid",
rule: chronograf.AlertRule{
Alerts: []string{"slack", "invalid", "email"},
},
want: ``,
wantErr: true,
},
{
name: "Test single invalid service",
rule: chronograf.AlertRule{
Alerts: []string{"invalid"},
},
want: ``,
wantErr: true,
},
{
name: "Test single valid service",
rule: chronograf.AlertRule{
Alerts: []string{"slack"},
AlertHandlers: chronograf.AlertHandlers{
Slack: []*chronograf.Slack{{}},
},
},
want: `alert()
.slack()
@ -52,15 +43,10 @@ func TestAlertServices(t *testing.T) {
{
name: "Test single valid service and property",
rule: chronograf.AlertRule{
Alerts: []string{"slack"},
AlertNodes: []chronograf.KapacitorNode{
{
Name: "slack",
Properties: []chronograf.KapacitorProperty{
{
Name: "channel",
Args: []string{"#general"},
},
AlertHandlers: chronograf.AlertHandlers{
Slack: []*chronograf.Slack{
{
Channel: "#general",
},
},
},
@ -73,10 +59,11 @@ func TestAlertServices(t *testing.T) {
{
name: "Test tcp",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "tcp",
Args: []string{"myaddress:22"},
AlertHandlers: chronograf.AlertHandlers{
TCPs: []*chronograf.TCP{
{
Address: "myaddress:22",
},
},
},
},
@ -87,10 +74,8 @@ func TestAlertServices(t *testing.T) {
{
name: "Test tcp no argument",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "tcp",
},
AlertHandlers: chronograf.AlertHandlers{
TCPs: []*chronograf.TCP{{}},
},
},
wantErr: true,
@ -98,10 +83,11 @@ func TestAlertServices(t *testing.T) {
{
name: "Test log",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "log",
Args: []string{"/tmp/alerts.log"},
AlertHandlers: chronograf.AlertHandlers{
Log: []*chronograf.Log{
{
FilePath: "/tmp/alerts.log",
},
},
},
},
@ -112,10 +98,8 @@ func TestAlertServices(t *testing.T) {
{
name: "Test log no argument",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "log",
},
AlertHandlers: chronograf.AlertHandlers{
Log: []*chronograf.Log{{}},
},
},
wantErr: true,
@ -123,11 +107,10 @@ func TestAlertServices(t *testing.T) {
{
name: "Test tcp no argument with other services",
rule: chronograf.AlertRule{
Alerts: []string{"slack", "tcp", "email"},
AlertNodes: []chronograf.KapacitorNode{
{
Name: "tcp",
},
AlertHandlers: chronograf.AlertHandlers{
Slack: []*chronograf.Slack{{}},
TCPs: []*chronograf.TCP{{}},
Email: []*chronograf.Email{{}},
},
},
wantErr: true,
@ -135,24 +118,11 @@ func TestAlertServices(t *testing.T) {
{
name: "Test http as post",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "http",
Args: []string{"http://myaddress"},
},
},
},
want: `alert()
.post('http://myaddress')
`,
},
{
name: "Test post",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "post",
Args: []string{"http://myaddress"},
AlertHandlers: chronograf.AlertHandlers{
Posts: []*chronograf.Post{
{
URL: "http://myaddress",
},
},
},
},
@ -163,9 +133,11 @@ func TestAlertServices(t *testing.T) {
{
name: "Test http no arguments",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "http",
AlertHandlers: chronograf.AlertHandlers{
Posts: []*chronograf.Post{
{
URL: "",
},
},
},
},
@ -176,15 +148,11 @@ func TestAlertServices(t *testing.T) {
{
name: "Test post with headers",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "post",
Args: []string{"http://myaddress"},
Properties: []chronograf.KapacitorProperty{
{
Name: "header",
Args: []string{"key", "value"},
},
AlertHandlers: chronograf.AlertHandlers{
Posts: []*chronograf.Post{
{
URL: "http://myaddress",
Headers: map[string]string{"key": "value"},
},
},
},
@ -192,33 +160,13 @@ func TestAlertServices(t *testing.T) {
want: `alert()
.post('http://myaddress')
.header('key', 'value')
`,
},
{
name: "Test post with headers",
rule: chronograf.AlertRule{
AlertNodes: []chronograf.KapacitorNode{
{
Name: "post",
Args: []string{"http://myaddress"},
Properties: []chronograf.KapacitorProperty{
{
Name: "endpoint",
Args: []string{"myendpoint"},
},
},
},
},
},
want: `alert()
.post('http://myaddress')
.endpoint('myendpoint')
`,
},
}
for _, tt := range tests {
got, err := AlertServices(tt.rule)
if (err != nil) != tt.wantErr {
log.Printf("GOT %s", got)
t.Errorf("%q. AlertServices() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
@ -235,3 +183,46 @@ func TestAlertServices(t *testing.T) {
}
}
}
func Test_addAlertNodes(t *testing.T) {
tests := []struct {
name string
handlers chronograf.AlertHandlers
want string
wantErr bool
}{
{
name: "foo",
handlers: chronograf.AlertHandlers{
IsStateChangesOnly: true,
Message: "mymessage",
Details: "mydetails",
Email: []*chronograf.Email{
{
To: []string{
"me@me.com", "you@you.com",
},
},
},
},
want: `
.stateChangesOnly()
.email()
.to('me@me.com')
.to('you@you.com')
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := addAlertNodes(tt.handlers)
if (err != nil) != tt.wantErr {
t.Errorf("addAlertNodes() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("addAlertNodes() =\n%v\n, want\n%v", got, tt.want)
}
})
}
}

View File

@ -1112,7 +1112,7 @@ func TestClient_Update(t *testing.T) {
},
Trigger: Relative,
TriggerValues: chronograf.TriggerValues{
Operator: InsideRange,
Operator: insideRange,
},
},
},

View File

@ -40,7 +40,10 @@ func benchmark_PaginatingKapaClient(taskCount int, b *testing.B) {
},
}
pkap := kapacitor.PaginatingKapaClient{mockClient, 50}
pkap := kapacitor.PaginatingKapaClient{
KapaClient: mockClient,
FetchRate: 50,
}
opts := &client.ListTasksOptions{}

View File

@ -34,7 +34,10 @@ func Test_Kapacitor_PaginatingKapaClient(t *testing.T) {
},
}
pkap := kapacitor.PaginatingKapaClient{mockClient, 50}
pkap := kapacitor.PaginatingKapaClient{
KapaClient: mockClient,
FetchRate: 50,
}
opts := &client.ListTasksOptions{
Limit: 100,

View File

@ -7,12 +7,12 @@ import (
const (
greaterThan = "greater than"
lessThan = "less than"
LessThanEqual = "equal to or less than"
GreaterThanEqual = "equal to or greater"
Equal = "equal to"
NotEqual = "not equal to"
InsideRange = "inside range"
OutsideRange = "outside range"
lessThanEqual = "equal to or less than"
greaterThanEqual = "equal to or greater"
equal = "equal to"
notEqual = "not equal to"
insideRange = "inside range"
outsideRange = "outside range"
)
// kapaOperator converts UI strings to kapacitor operators
@ -22,13 +22,13 @@ func kapaOperator(operator string) (string, error) {
return ">", nil
case lessThan:
return "<", nil
case LessThanEqual:
case lessThanEqual:
return "<=", nil
case GreaterThanEqual:
case greaterThanEqual:
return ">=", nil
case Equal:
case equal:
return "==", nil
case NotEqual:
case notEqual:
return "!=", nil
default:
return "", fmt.Errorf("invalid operator: %s is unknown", operator)
@ -42,13 +42,13 @@ func chronoOperator(operator string) (string, error) {
case "<":
return lessThan, nil
case "<=":
return LessThanEqual, nil
return lessThanEqual, nil
case ">=":
return GreaterThanEqual, nil
return greaterThanEqual, nil
case "==":
return Equal, nil
return equal, nil
case "!=":
return NotEqual, nil
return notEqual, nil
default:
return "", fmt.Errorf("invalid operator: %s is unknown", operator)
}
@ -56,9 +56,9 @@ func chronoOperator(operator string) (string, error) {
func rangeOperators(operator string) ([]string, error) {
switch operator {
case InsideRange:
case insideRange:
return []string{">=", "AND", "<="}, nil
case OutsideRange:
case outsideRange:
return []string{"<", "OR", ">"}, nil
default:
return nil, fmt.Errorf("invalid operator: %s is unknown", operator)
@ -70,9 +70,9 @@ func chronoRangeOperators(ops []string) (string, error) {
return "", fmt.Errorf("Unknown operators")
}
if ops[0] == ">=" && ops[1] == "AND" && ops[2] == "<=" {
return InsideRange, nil
return insideRange, nil
} else if ops[0] == "<" && ops[1] == "OR" && ops[2] == ">" {
return OutsideRange, nil
return outsideRange, nil
}
return "", fmt.Errorf("Unknown operators")
}