influxdb/notification/check/threshold.go

476 lines
13 KiB
Go

package check
import (
"encoding/json"
"fmt"
"strings"
"github.com/influxdata/flux/ast"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/notification"
"github.com/influxdata/influxdb/v2/notification/flux"
"github.com/influxdata/influxdb/v2/query"
)
var _ influxdb.Check = (*Threshold)(nil)
// Threshold is the threshold check.
type Threshold struct {
Base
Thresholds []ThresholdConfig `json:"thresholds"`
}
// Type returns the type of the check.
func (t Threshold) Type() string {
return "threshold"
}
// Valid returns error if something is invalid.
func (t Threshold) Valid(lang influxdb.FluxLanguageService) error {
if err := t.Base.Valid(lang); err != nil {
return err
}
for _, cc := range t.Thresholds {
if err := cc.Valid(); err != nil {
return err
}
}
return nil
}
type thresholdDecode struct {
Base
Thresholds []thresholdConfigDecode `json:"thresholds"`
}
type thresholdConfigDecode struct {
ThresholdConfigBase
Type string `json:"type"`
Value float64 `json:"value"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Within bool `json:"within"`
}
// UnmarshalJSON implement json.Unmarshaler interface.
func (t *Threshold) UnmarshalJSON(b []byte) error {
tdRaws := new(thresholdDecode)
if err := json.Unmarshal(b, tdRaws); err != nil {
return err
}
t.Base = tdRaws.Base
for _, tdRaw := range tdRaws.Thresholds {
switch tdRaw.Type {
case "lesser":
td := &Lesser{
ThresholdConfigBase: tdRaw.ThresholdConfigBase,
Value: tdRaw.Value,
}
t.Thresholds = append(t.Thresholds, td)
case "greater":
td := &Greater{
ThresholdConfigBase: tdRaw.ThresholdConfigBase,
Value: tdRaw.Value,
}
t.Thresholds = append(t.Thresholds, td)
case "range":
td := &Range{
ThresholdConfigBase: tdRaw.ThresholdConfigBase,
Min: tdRaw.Min,
Max: tdRaw.Max,
Within: tdRaw.Within,
}
t.Thresholds = append(t.Thresholds, td)
default:
return &influxdb.Error{
Msg: fmt.Sprintf("invalid threshold type %s", tdRaw.Type),
}
}
}
return nil
}
func multiError(errs []error) error {
var b strings.Builder
for _, err := range errs {
b.WriteString(err.Error() + "\n")
}
return fmt.Errorf(b.String())
}
// GenerateFlux returns a flux script for the threshold provided. If there
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFlux(lang influxdb.FluxLanguageService) (string, error) {
p, err := t.GenerateFluxAST(lang)
if err != nil {
return "", err
}
return ast.Format(p), nil
}
// GenerateFluxAST returns a flux AST for the threshold provided. If there
// are any errors in the flux that the user provided the function will return
// an error for each error found when the script is parsed.
func (t Threshold) GenerateFluxAST(lang influxdb.FluxLanguageService) (*ast.Package, error) {
p, err := query.Parse(lang, t.Query.Text)
if p == nil {
return nil, err
}
replaceDurationsWithEvery(p, t.Every)
removeStopFromRange(p)
addCreateEmptyFalseToAggregateWindow(p)
if errs := ast.GetErrors(p); len(errs) != 0 {
return nil, multiError(errs)
}
// TODO(desa): this is a hack that we had to do as a result of https://github.com/influxdata/flux/issues/1701
// when it is fixed we should use a separate file and not manipulate the existing one.
if len(p.Files) != 1 {
return nil, fmt.Errorf("expect a single file to be returned from query parsing got %d", len(p.Files))
}
fields := getFields(p)
if len(fields) != 1 {
return nil, fmt.Errorf("expected a single field but got: %s", fields)
}
f := p.Files[0]
assignPipelineToData(f)
f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody(fields[0])...)
return p, nil
}
// TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now.
func addCreateEmptyFalseToAggregateWindow(pkg *ast.Package) {
ast.Visit(pkg, func(n ast.Node) {
if call, ok := n.(*ast.CallExpression); ok {
if id, ok := call.Callee.(*ast.Identifier); ok && id.Name == "aggregateWindow" {
for _, args := range call.Arguments {
if obj, ok := args.(*ast.ObjectExpression); ok {
foundCreateEmpty := false
for _, props := range obj.Properties {
if props.Key.Key() == "createEmpty" {
foundCreateEmpty = true
break
}
}
if !foundCreateEmpty {
obj.Properties = append(obj.Properties, flux.Property("createEmpty", flux.Bool(false)))
}
}
}
}
}
})
}
// TODO(desa): we'll likely want something slightly more sophisitcated long term, but this should work for now.
func replaceDurationsWithEvery(pkg *ast.Package, every *notification.Duration) {
ast.Visit(pkg, func(n ast.Node) {
switch e := n.(type) {
case *ast.Property:
key := e.Key.Key()
newEvery := (ast.DurationLiteral)(*every)
switch key {
case "start":
e.Value = flux.Negative(&newEvery)
case "every":
e.Value = &newEvery
}
}
})
}
// TODO(desa): we'll likely want to remove all other arguments to range that are provided, but for now this should work.
// When we decide to implement the full feature we'll have to do something more sophisticated.
func removeStopFromRange(pkg *ast.Package) {
ast.Visit(pkg, func(n ast.Node) {
if call, ok := n.(*ast.CallExpression); ok {
if id, ok := call.Callee.(*ast.Identifier); ok && id.Name == "range" {
for _, args := range call.Arguments {
if obj, ok := args.(*ast.ObjectExpression); ok {
props := obj.Properties[:0]
for _, prop := range obj.Properties {
if prop.Key.Key() == "start" {
props = append(props, prop)
}
}
obj.Properties = props
}
}
}
}
})
}
// TODO(desa): we'll likely want to remove all other arguments to range that are provided, but for now this should work.
// When we decide to implement the full feature we'll have to do something more sophisticated.
func removeAggregateWindow(pkg *ast.Package) {
ast.Visit(pkg, func(n ast.Node) {
if pipe, ok := n.(*ast.PipeExpression); ok {
if id, ok := pipe.Call.Callee.(*ast.Identifier); ok && id.Name == "aggregateWindow" {
if subPipe, ok := pipe.Argument.(*ast.PipeExpression); ok {
*pipe = *subPipe
}
}
}
})
}
func getFields(pkg *ast.Package) []string {
var fields []string
ast.Visit(pkg, func(n ast.Node) {
if fn, ok := n.(*ast.BinaryExpression); ok {
if me, ok := fn.Left.(*ast.MemberExpression); ok {
if me.Property.Key() == "_field" {
if str, ok := fn.Right.(*ast.StringLiteral); ok {
fields = append(fields, str.Value)
}
}
}
}
})
return fields
}
func assignPipelineToData(f *ast.File) error {
if len(f.Body) != 1 {
return fmt.Errorf("expected there to be a single statement in the flux script body, recieved %d", len(f.Body))
}
stmt := f.Body[0]
e, ok := stmt.(*ast.ExpressionStatement)
if !ok {
return fmt.Errorf("statement is not an *ast.Expression statement, recieved %T", stmt)
}
exp := e.Expression
pipe, ok := exp.(*ast.PipeExpression)
if !ok {
return fmt.Errorf("expression is not an *ast.PipeExpression statement, recieved %T", exp)
}
if id, ok := pipe.Call.Callee.(*ast.Identifier); ok && id.Name == "yield" {
exp = pipe.Argument
}
f.Body[0] = flux.DefineVariable("data", exp)
return nil
}
func (t Threshold) generateFluxASTBody(field string) []ast.Statement {
var statements []ast.Statement
statements = append(statements, t.generateTaskOption())
statements = append(statements, t.generateFluxASTCheckDefinition("threshold"))
statements = append(statements, t.generateFluxASTThresholdFunctions(field)...)
statements = append(statements, t.generateFluxASTMessageFunction())
statements = append(statements, t.generateFluxASTChecksFunction())
return statements
}
func (t Threshold) generateFluxASTChecksFunction() ast.Statement {
return flux.ExpressionStatement(flux.Pipe(
flux.Identifier("data"),
flux.Call(flux.Member("v1", "fieldsAsCols"), flux.Object()),
t.generateFluxASTChecksCall(),
))
}
func (t Threshold) generateFluxASTChecksCall() *ast.CallExpression {
objectProps := append(([]*ast.Property)(nil), flux.Property("data", flux.Identifier("check")))
objectProps = append(objectProps, flux.Property("messageFn", flux.Identifier("messageFn")))
// This assumes that the ThresholdConfigs we've been provided do not have duplicates.
for _, c := range t.Thresholds {
lvl := strings.ToLower(c.GetLevel().String())
objectProps = append(objectProps, flux.Property(lvl, flux.Identifier(lvl)))
}
return flux.Call(flux.Member("monitor", "check"), flux.Object(objectProps...))
}
func (t Threshold) generateFluxASTThresholdFunctions(field string) []ast.Statement {
thresholdStatements := make([]ast.Statement, len(t.Thresholds))
// This assumes that the ThresholdConfigs we've been provided do not have duplicates.
for k, v := range t.Thresholds {
thresholdStatements[k] = v.generateFluxASTThresholdFunction(field)
}
return thresholdStatements
}
func (td Greater) generateFluxASTThresholdFunction(field string) ast.Statement {
fnBody := flux.GreaterThan(flux.Member("r", field), flux.Float(td.Value))
fn := flux.Function(flux.FunctionParams("r"), fnBody)
lvl := strings.ToLower(td.Level.String())
return flux.DefineVariable(lvl, fn)
}
func (td Lesser) generateFluxASTThresholdFunction(field string) ast.Statement {
fnBody := flux.LessThan(flux.Member("r", field), flux.Float(td.Value))
fn := flux.Function(flux.FunctionParams("r"), fnBody)
lvl := strings.ToLower(td.Level.String())
return flux.DefineVariable(lvl, fn)
}
func (td Range) generateFluxASTThresholdFunction(field string) ast.Statement {
var fnBody *ast.LogicalExpression
if !td.Within {
fnBody = flux.Or(
flux.LessThan(flux.Member("r", field), flux.Float(td.Min)),
flux.GreaterThan(flux.Member("r", field), flux.Float(td.Max)),
)
} else {
fnBody = flux.And(
flux.LessThan(flux.Member("r", field), flux.Float(td.Max)),
flux.GreaterThan(flux.Member("r", field), flux.Float(td.Min)),
)
}
fn := flux.Function(flux.FunctionParams("r"), fnBody)
lvl := strings.ToLower(td.Level.String())
return flux.DefineVariable(lvl, fn)
}
type thresholdAlias Threshold
// MarshalJSON implement json.Marshaler interface.
func (t Threshold) MarshalJSON() ([]byte, error) {
return json.Marshal(
struct {
thresholdAlias
Type string `json:"type"`
}{
thresholdAlias: thresholdAlias(t),
Type: t.Type(),
})
}
// ThresholdConfig is the base of all threshold config.
type ThresholdConfig interface {
MarshalJSON() ([]byte, error)
Valid() error
Type() string
generateFluxASTThresholdFunction(string) ast.Statement
GetLevel() notification.CheckLevel
}
// Valid returns error if something is invalid.
func (b ThresholdConfigBase) Valid() error {
return nil
}
// ThresholdConfigBase is the base of all threshold config.
type ThresholdConfigBase struct {
// If true, only alert if all values meet threshold.
AllValues bool `json:"allValues"`
Level notification.CheckLevel `json:"level"`
}
// GetLevel return the check level.
func (b ThresholdConfigBase) GetLevel() notification.CheckLevel {
return b.Level
}
// Lesser threshold type.
type Lesser struct {
ThresholdConfigBase
Value float64 `json:"value"`
}
// Type of the threshold config.
func (td Lesser) Type() string {
return "lesser"
}
// MarshalJSON implement json.Marshaler interface.
func (td Lesser) MarshalJSON() ([]byte, error) {
type lesserAlias Lesser
return json.Marshal(
struct {
lesserAlias
Type string `json:"type"`
}{
lesserAlias: lesserAlias(td),
Type: "lesser",
})
}
// Greater threshold type.
type Greater struct {
ThresholdConfigBase
Value float64 `json:"value"`
}
// Type of the threshold config.
func (td Greater) Type() string {
return "greater"
}
// MarshalJSON implement json.Marshaler interface.
func (td Greater) MarshalJSON() ([]byte, error) {
type greaterAlias Greater
return json.Marshal(
struct {
greaterAlias
Type string `json:"type"`
}{
greaterAlias: greaterAlias(td),
Type: "greater",
})
}
// Range threshold type.
type Range struct {
ThresholdConfigBase
Min float64 `json:"min,omitempty"`
Max float64 `json:"max,omitempty"`
Within bool `json:"within"`
}
// Type of the threshold config.
func (td Range) Type() string {
return "range"
}
// MarshalJSON implement json.Marshaler interface.
func (td Range) MarshalJSON() ([]byte, error) {
type rangeAlias Range
return json.Marshal(
struct {
rangeAlias
Type string `json:"type"`
}{
rangeAlias: rangeAlias(td),
Type: "range",
})
}
// Valid overwrite the base threshold.
func (td Range) Valid() error {
if td.Min > td.Max {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "range threshold min can't be larger than max",
}
}
return nil
}