Merge pull request #14912 from influxdata/fix/rule-state-changes
fix(notification/rule): add support for status rulespull/14919/head
commit
9f84379255
|
@ -47,6 +47,15 @@ func Subtract(lhs, rhs ast.Expression) *ast.BinaryExpression {
|
|||
}
|
||||
}
|
||||
|
||||
// Add returns a addition *ast.BinaryExpression.
|
||||
func Add(lhs, rhs ast.Expression) *ast.BinaryExpression {
|
||||
return &ast.BinaryExpression{
|
||||
Operator: ast.AdditionOperator,
|
||||
Left: lhs,
|
||||
Right: rhs,
|
||||
}
|
||||
}
|
||||
|
||||
// Member returns an *ast.MemberExpression where the key is p and the values is c.
|
||||
func Member(p, c string) *ast.MemberExpression {
|
||||
return &ast.MemberExpression{
|
||||
|
@ -188,6 +197,13 @@ func Object(ps ...*ast.Property) *ast.ObjectExpression {
|
|||
}
|
||||
}
|
||||
|
||||
// Array returns *ast.ArrayExpression with elements es.
|
||||
func Array(es ...ast.Expression) *ast.ArrayExpression {
|
||||
return &ast.ArrayExpression{
|
||||
Elements: es,
|
||||
}
|
||||
}
|
||||
|
||||
// FunctionParams returns a slice of *ast.Property for the parameters of a function.
|
||||
func FunctionParams(args ...string) []*ast.Property {
|
||||
var params []*ast.Property
|
||||
|
|
|
@ -32,7 +32,7 @@ func (s *HTTP) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) {
|
|||
func (s *HTTP) GenerateFluxAST(e *endpoint.HTTP) (*ast.Package, error) {
|
||||
f := flux.File(
|
||||
s.Name,
|
||||
flux.Imports("influxdata/influxdb/monitor", "http", "json"),
|
||||
flux.Imports("influxdata/influxdb/monitor", "http", "json", "experimental"),
|
||||
s.generateFluxASTBody(e),
|
||||
)
|
||||
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
|
||||
|
@ -44,6 +44,7 @@ func (s *HTTP) generateFluxASTBody(e *endpoint.HTTP) []ast.Statement {
|
|||
statements = append(statements, s.generateFluxASTEndpoint(e))
|
||||
statements = append(statements, s.generateFluxASTNotificationDefinition(e))
|
||||
statements = append(statements, s.generateFluxASTStatuses())
|
||||
statements = append(statements, s.generateAllStateChanges()...)
|
||||
statements = append(statements, s.generateFluxASTNotifyPipe())
|
||||
|
||||
return statements
|
||||
|
@ -69,7 +70,7 @@ func (s *HTTP) generateFluxASTNotifyPipe() ast.Statement {
|
|||
|
||||
call := flux.Call(flux.Member("monitor", "notify"), flux.Object(props...))
|
||||
|
||||
return flux.ExpressionStatement(flux.Pipe(flux.Identifier("statuses"), call))
|
||||
return flux.ExpressionStatement(flux.Pipe(flux.Identifier("all_statuses"), call))
|
||||
}
|
||||
|
||||
type httpAlias HTTP
|
||||
|
|
|
@ -15,8 +15,9 @@ func TestHTTP_GenerateFlux(t *testing.T) {
|
|||
import "influxdata/influxdb/monitor"
|
||||
import "http"
|
||||
import "json"
|
||||
import "experimental"
|
||||
|
||||
option task = {name: "foo", every: 1h, offset: 1s}
|
||||
option task = {name: "foo", every: 2h, offset: 1s}
|
||||
|
||||
endpoint = http.endpoint(url: "http://localhost:7777")
|
||||
notification = {
|
||||
|
@ -25,10 +26,15 @@ notification = {
|
|||
_notification_endpoint_id: "0000000000000002",
|
||||
_notification_endpoint_name: "foo",
|
||||
}
|
||||
statuses = monitor.from(start: -1h, fn: (r) =>
|
||||
statuses = monitor.from(start: -2h, fn: (r) =>
|
||||
(r.foo == "bar" and r.baz == "bang"))
|
||||
any_to_crit = statuses
|
||||
|> monitor.stateChanges(fromLevel: "any", toLevel: "crit")
|
||||
all_statuses = any_to_crit
|
||||
|> filter(fn: (r) =>
|
||||
(r._time > experimental.subDuration(from: now(), d: 1h)))
|
||||
|
||||
statuses
|
||||
all_statuses
|
||||
|> monitor.notify(data: notification, endpoint: endpoint(mapFn: (r) =>
|
||||
({data: json.encode(v: r)})))`
|
||||
|
||||
|
@ -55,6 +61,11 @@ statuses
|
|||
Operator: notification.Equal,
|
||||
},
|
||||
},
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package rule
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux/ast"
|
||||
|
@ -126,6 +127,104 @@ func (b *Base) generateFluxASTNotificationDefinition(e influxdb.NotificationEndp
|
|||
return flux.DefineVariable("notification", flux.Object(ruleID, ruleName, endpointID, endpointName))
|
||||
}
|
||||
|
||||
func (b *Base) generateAllStateChanges() []ast.Statement {
|
||||
stmts := []ast.Statement{}
|
||||
tables := []ast.Expression{}
|
||||
for _, r := range b.StatusRules {
|
||||
stmt, table := b.generateStateChanges(r)
|
||||
tables = append(tables, table)
|
||||
stmts = append(stmts, stmt)
|
||||
}
|
||||
|
||||
now := flux.Call(flux.Identifier("now"), flux.Object())
|
||||
timeFilter := flux.Function(
|
||||
flux.FunctionParams("r"),
|
||||
flux.GreaterThan(
|
||||
flux.Member("r", "_time"),
|
||||
flux.Call(
|
||||
flux.Member("experimental", "subDuration"),
|
||||
flux.Object(
|
||||
flux.Property("from", now),
|
||||
flux.Property("d", (*ast.DurationLiteral)(b.Every)),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
var pipe *ast.PipeExpression
|
||||
if len(tables) == 1 {
|
||||
pipe = flux.Pipe(
|
||||
tables[0],
|
||||
flux.Call(
|
||||
flux.Identifier("filter"),
|
||||
flux.Object(
|
||||
flux.Property("fn", timeFilter),
|
||||
),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
pipe = flux.Pipe(
|
||||
flux.Call(
|
||||
flux.Identifier("union"),
|
||||
flux.Object(
|
||||
flux.Property("tables", flux.Array(tables...)),
|
||||
),
|
||||
),
|
||||
flux.Call(
|
||||
flux.Identifier("sort"),
|
||||
flux.Object(
|
||||
flux.Property("columns", flux.Array(flux.String("_time"))),
|
||||
),
|
||||
),
|
||||
flux.Call(
|
||||
flux.Identifier("filter"),
|
||||
flux.Object(
|
||||
flux.Property("fn", timeFilter),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
stmts = append(stmts, flux.DefineVariable("all_statuses", pipe))
|
||||
|
||||
return stmts
|
||||
}
|
||||
|
||||
func (b *Base) generateStateChanges(r notification.StatusRule) (ast.Statement, *ast.Identifier) {
|
||||
fromLevel := "any"
|
||||
if r.PreviousLevel != nil {
|
||||
fromLevel = strings.ToLower(r.PreviousLevel.String())
|
||||
}
|
||||
toLevel := strings.ToLower(r.CurrentLevel.String())
|
||||
|
||||
pipe := flux.Pipe(
|
||||
flux.Identifier("statuses"),
|
||||
flux.Call(
|
||||
flux.Member("monitor", "stateChanges"),
|
||||
flux.Object(
|
||||
flux.Property("fromLevel", flux.String(fromLevel)),
|
||||
flux.Property("toLevel", flux.String(toLevel)),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
name := fmt.Sprintf("%s_to_%s", fromLevel, toLevel)
|
||||
return flux.DefineVariable(name, pipe), flux.Identifier(name)
|
||||
}
|
||||
|
||||
func increaseDur(d *ast.DurationLiteral) *ast.DurationLiteral {
|
||||
dur := &ast.DurationLiteral{}
|
||||
for i, v := range d.Values {
|
||||
value := v
|
||||
if i == 0 {
|
||||
value.Magnitude += 1
|
||||
}
|
||||
dur.Values = append(dur.Values, value)
|
||||
}
|
||||
|
||||
return dur
|
||||
}
|
||||
|
||||
func (b *Base) generateTaskOption() ast.Statement {
|
||||
props := []*ast.Property{}
|
||||
|
||||
|
@ -136,7 +235,10 @@ func (b *Base) generateTaskOption() ast.Statement {
|
|||
}
|
||||
|
||||
if b.Every != nil {
|
||||
props = append(props, flux.Property("every", (*ast.DurationLiteral)(b.Every)))
|
||||
dur := increaseDur((*ast.DurationLiteral)(b.Every))
|
||||
// Make the windows overlap and filter records from previous queries.
|
||||
// This is so that we wont miss the first points possible state change.
|
||||
props = append(props, flux.Property("every", dur))
|
||||
}
|
||||
|
||||
if b.Offset != nil {
|
||||
|
@ -149,7 +251,8 @@ func (b *Base) generateTaskOption() ast.Statement {
|
|||
func (b *Base) generateFluxASTStatuses() ast.Statement {
|
||||
props := []*ast.Property{}
|
||||
|
||||
props = append(props, flux.Property("start", flux.Negative((*ast.DurationLiteral)(b.Every))))
|
||||
dur := (*ast.DurationLiteral)(b.Every)
|
||||
props = append(props, flux.Property("start", flux.Negative(increaseDur(dur))))
|
||||
|
||||
if len(b.TagRules) > 0 {
|
||||
r := b.TagRules[0]
|
||||
|
|
|
@ -34,7 +34,7 @@ func (s *Slack) GenerateFlux(e influxdb.NotificationEndpoint) (string, error) {
|
|||
func (s *Slack) GenerateFluxAST(e *endpoint.Slack) (*ast.Package, error) {
|
||||
f := flux.File(
|
||||
s.Name,
|
||||
flux.Imports("influxdata/influxdb/monitor", "slack", "influxdata/influxdb/secrets"),
|
||||
flux.Imports("influxdata/influxdb/monitor", "slack", "influxdata/influxdb/secrets", "experimental"),
|
||||
s.generateFluxASTBody(e),
|
||||
)
|
||||
return &ast.Package{Package: "main", Files: []*ast.File{f}}, nil
|
||||
|
@ -47,6 +47,7 @@ func (s *Slack) generateFluxASTBody(e *endpoint.Slack) []ast.Statement {
|
|||
statements = append(statements, s.generateFluxASTEndpoint(e))
|
||||
statements = append(statements, s.generateFluxASTNotificationDefinition(e))
|
||||
statements = append(statements, s.generateFluxASTStatuses())
|
||||
statements = append(statements, s.generateAllStateChanges()...)
|
||||
statements = append(statements, s.generateFluxASTNotifyPipe())
|
||||
|
||||
return statements
|
||||
|
@ -83,7 +84,7 @@ func (s *Slack) generateFluxASTNotifyPipe() ast.Statement {
|
|||
|
||||
call := flux.Call(flux.Member("monitor", "notify"), flux.Object(props...))
|
||||
|
||||
return flux.ExpressionStatement(flux.Pipe(flux.Identifier("statuses"), call))
|
||||
return flux.ExpressionStatement(flux.Pipe(flux.Identifier("all_statuses"), call))
|
||||
}
|
||||
|
||||
type slackAlias Slack
|
||||
|
|
|
@ -19,14 +19,19 @@ func mustDuration(d string) *notification.Duration {
|
|||
return (*notification.Duration)(dur)
|
||||
}
|
||||
|
||||
func statusRulePtr(r notification.CheckLevel) *notification.CheckLevel {
|
||||
return &r
|
||||
}
|
||||
|
||||
func TestSlack_GenerateFlux(t *testing.T) {
|
||||
want := `package main
|
||||
// foo
|
||||
import "influxdata/influxdb/monitor"
|
||||
import "slack"
|
||||
import "influxdata/influxdb/secrets"
|
||||
import "experimental"
|
||||
|
||||
option task = {name: "foo", every: 1h}
|
||||
option task = {name: "foo", every: 2h}
|
||||
|
||||
slack_secret = secrets.get(key: "slack_token")
|
||||
slack_endpoint = slack.endpoint(token: slack_secret, url: "http://localhost:7777")
|
||||
|
@ -36,10 +41,18 @@ notification = {
|
|||
_notification_endpoint_id: "0000000000000002",
|
||||
_notification_endpoint_name: "foo",
|
||||
}
|
||||
statuses = monitor.from(start: -1h, fn: (r) =>
|
||||
statuses = monitor.from(start: -2h, fn: (r) =>
|
||||
(r.foo == "bar" and r.baz == "bang"))
|
||||
any_to_crit = statuses
|
||||
|> monitor.stateChanges(fromLevel: "any", toLevel: "crit")
|
||||
info_to_warn = statuses
|
||||
|> monitor.stateChanges(fromLevel: "info", toLevel: "warn")
|
||||
all_statuses = union(tables: [any_to_crit, info_to_warn])
|
||||
|> sort(columns: ["_time"])
|
||||
|> filter(fn: (r) =>
|
||||
(r._time > experimental.subDuration(from: now(), d: 1h)))
|
||||
|
||||
statuses
|
||||
all_statuses
|
||||
|> monitor.notify(data: notification, endpoint: slack_endpoint(mapFn: (r) =>
|
||||
({channel: "bar", text: "blah"})))`
|
||||
|
||||
|
@ -67,6 +80,15 @@ statuses
|
|||
Operator: notification.Equal,
|
||||
},
|
||||
},
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
{
|
||||
CurrentLevel: notification.Warn,
|
||||
PreviousLevel: statusRulePtr(notification.Info),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
e := &endpoint.Slack{
|
||||
|
|
|
@ -136,6 +136,11 @@ func CreateNotificationRule(
|
|||
RunbookLink: "runbooklink1",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
},
|
||||
TagRules: []notification.TagRule{
|
||||
{
|
||||
Tag: influxdb.Tag{
|
||||
|
@ -182,6 +187,11 @@ func CreateNotificationRule(
|
|||
RunbookLink: "runbooklink1",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
},
|
||||
TagRules: []notification.TagRule{
|
||||
{
|
||||
Tag: influxdb.Tag{
|
||||
|
@ -215,6 +225,11 @@ func CreateNotificationRule(
|
|||
RunbookLink: "runbooklink1",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
},
|
||||
TagRules: []notification.TagRule{
|
||||
{
|
||||
Tag: influxdb.Tag{
|
||||
|
@ -250,6 +265,11 @@ func CreateNotificationRule(
|
|||
RunbookLink: "runbooklink1",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
},
|
||||
TagRules: []notification.TagRule{
|
||||
{
|
||||
Tag: influxdb.Tag{
|
||||
|
@ -1605,10 +1625,15 @@ func UpdateNotificationRule(
|
|||
NotificationRules: []influxdb.NotificationRule{
|
||||
&rule.Slack{
|
||||
Base: rule.Base{
|
||||
ID: MustIDBase16(oneID),
|
||||
Name: "name1",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
ID: MustIDBase16(oneID),
|
||||
Name: "name1",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
Status: influxdb.Active,
|
||||
TaskID: MustIDBase16(twoID),
|
||||
|
@ -1625,10 +1650,15 @@ func UpdateNotificationRule(
|
|||
},
|
||||
&rule.Slack{
|
||||
Base: rule.Base{
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name2",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name2",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
TaskID: MustIDBase16(twoID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
Status: influxdb.Active,
|
||||
|
@ -1649,11 +1679,16 @@ func UpdateNotificationRule(
|
|||
id: MustIDBase16(twoID),
|
||||
notificationRule: &rule.Slack{
|
||||
Base: rule.Base{
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
Name: "name3",
|
||||
OrgID: MustIDBase16(fourID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
Status: influxdb.Inactive,
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
Name: "name3",
|
||||
OrgID: MustIDBase16(fourID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
Status: influxdb.Inactive,
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
RunbookLink: "runbooklink3",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("2h"),
|
||||
|
@ -1664,11 +1699,16 @@ func UpdateNotificationRule(
|
|||
wants: wants{
|
||||
notificationRule: &rule.Slack{
|
||||
Base: rule.Base{
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name3",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name3",
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
Status: influxdb.Inactive,
|
||||
RunbookLink: "runbooklink3",
|
||||
|
@ -1798,7 +1838,7 @@ func PatchNotificationRule(
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "regular update",
|
||||
name: "regular patch",
|
||||
fields: NotificationRuleFields{
|
||||
TimeGenerator: fakeGenerator,
|
||||
IDGenerator: mock.NewIDGenerator(twoID, t),
|
||||
|
@ -1852,6 +1892,14 @@ func PatchNotificationRule(
|
|||
RunbookLink: "runbooklink1",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
CRUDLog: influxdb.CRUDLog{
|
||||
CreatedAt: timeGen1.Now(),
|
||||
UpdatedAt: timeGen2.Now(),
|
||||
|
@ -1862,12 +1910,20 @@ func PatchNotificationRule(
|
|||
},
|
||||
&rule.Slack{
|
||||
Base: rule.Base{
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name2",
|
||||
Status: influxdb.Active,
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: "name2",
|
||||
Status: influxdb.Active,
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
OrgID: MustIDBase16(fourID),
|
||||
RunbookLink: "runbooklink2",
|
||||
SleepUntil: &time3,
|
||||
|
@ -1897,13 +1953,21 @@ func PatchNotificationRule(
|
|||
wants: wants{
|
||||
notificationRule: &rule.Slack{
|
||||
Base: rule.Base{
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: name3,
|
||||
Status: status3,
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
ID: MustIDBase16(twoID),
|
||||
Name: name3,
|
||||
Status: status3,
|
||||
OwnerID: MustIDBase16(sixID),
|
||||
OrgID: MustIDBase16(fourID),
|
||||
EndpointID: MustIDBase16(twoID),
|
||||
TaskID: MustIDBase16(twoID),
|
||||
StatusRules: []notification.StatusRule{
|
||||
{
|
||||
CurrentLevel: notification.Critical,
|
||||
},
|
||||
{
|
||||
CurrentLevel: notification.Info,
|
||||
},
|
||||
},
|
||||
RunbookLink: "runbooklink2",
|
||||
SleepUntil: &time3,
|
||||
Every: mustDuration("1h"),
|
||||
|
|
Loading…
Reference in New Issue