Merge pull request #342 from influxdata/feature/tickscripts

Feature/tickscripts
pull/10616/head
Chris Goller 2016-11-03 20:48:54 -05:00 committed by GitHub
commit 780e84760f
28 changed files with 2654 additions and 498 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ node_modules/
build/ build/
chronograf.db chronograf.db
npm-debug.log npm-debug.log
.vscode

1
Godeps
View File

@ -6,6 +6,7 @@ github.com/elazarl/go-bindata-assetfs 9a6736ed45b44bf3835afeebb3034b57ed329f3e
github.com/gogo/protobuf 6abcf94fd4c97dcb423fdafd42fe9f96ca7e421b github.com/gogo/protobuf 6abcf94fd4c97dcb423fdafd42fe9f96ca7e421b
github.com/google/go-github 1bc362c7737e51014af7299e016444b654095ad9 github.com/google/go-github 1bc362c7737e51014af7299e016444b654095ad9
github.com/google/go-querystring 9235644dd9e52eeae6fa48efd539fdc351a0af53 github.com/google/go-querystring 9235644dd9e52eeae6fa48efd539fdc351a0af53
github.com/influxdata/kapacitor 0eb8c348b210dd3d32cb240a417f9e6ded1b691d
github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jessevdk/go-flags 4cc2832a6e6d1d3b815e2b9d544b2a4dfb3ce8fa github.com/jessevdk/go-flags 4cc2832a6e6d1d3b815e2b9d544b2a4dfb3ce8fa
github.com/satori/go.uuid b061729afc07e77a8aa4fad0a2fd840958f1942a github.com/satori/go.uuid b061729afc07e77a8aa4fad0a2fd840958f1942a

115
bolt/alerts.go Normal file
View File

@ -0,0 +1,115 @@
package bolt
import (
"context"
"github.com/boltdb/bolt"
"github.com/influxdata/chronograf"
"github.com/influxdata/chronograf/bolt/internal"
)
// Ensure AlertsStore implements chronograf.AlertsStore.
var _ chronograf.AlertRulesStore = &AlertsStore{}
var AlertsBucket = []byte("Alerts")
type AlertsStore struct {
client *Client
}
// All returns all known alerts
func (s *AlertsStore) All(ctx context.Context) ([]chronograf.AlertRule, error) {
var srcs []chronograf.AlertRule
if err := s.client.db.View(func(tx *bolt.Tx) error {
if err := tx.Bucket(AlertsBucket).ForEach(func(k, v []byte) error {
var src chronograf.AlertRule
if err := internal.UnmarshalAlertRule(v, &src); err != nil {
return err
}
srcs = append(srcs, src)
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return srcs, nil
}
// Add creates a new Alerts in the AlertsStore.
func (s *AlertsStore) Add(ctx context.Context, src chronograf.AlertRule) (chronograf.AlertRule, error) {
if err := s.client.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(AlertsBucket)
if v, err := internal.MarshalAlertRule(&src); err != nil {
return err
} else if err := b.Put([]byte(src.ID), v); err != nil {
return err
}
return nil
}); err != nil {
return chronograf.AlertRule{}, err
}
return src, nil
}
// Delete removes the Alerts from the AlertsStore
func (s *AlertsStore) Delete(ctx context.Context, src chronograf.AlertRule) error {
_, err := s.Get(ctx, src.ID)
if err != nil {
return err
}
if err := s.client.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(AlertsBucket).Delete([]byte(src.ID)); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}
// Get returns a Alerts if the id exists.
func (s *AlertsStore) Get(ctx context.Context, id string) (chronograf.AlertRule, error) {
var src chronograf.AlertRule
if err := s.client.db.View(func(tx *bolt.Tx) error {
if v := tx.Bucket(AlertsBucket).Get([]byte(id)); v == nil {
return chronograf.ErrAlertNotFound
} else if err := internal.UnmarshalAlertRule(v, &src); err != nil {
return err
}
return nil
}); err != nil {
return chronograf.AlertRule{}, err
}
return src, nil
}
// Update a Alerts
func (s *AlertsStore) Update(ctx context.Context, src chronograf.AlertRule) error {
if err := s.client.db.Update(func(tx *bolt.Tx) error {
// Get an existing alerts with the same ID.
b := tx.Bucket(AlertsBucket)
if v := b.Get([]byte(src.ID)); v == nil {
return chronograf.ErrAlertNotFound
}
if v, err := internal.MarshalAlertRule(&src); err != nil {
return err
} else if err := b.Put([]byte(src.ID), v); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}

View File

@ -19,6 +19,7 @@ type Client struct {
SourcesStore *SourcesStore SourcesStore *SourcesStore
ServersStore *ServersStore ServersStore *ServersStore
LayoutStore *LayoutStore LayoutStore *LayoutStore
AlertsStore *AlertsStore
} }
func NewClient() *Client { func NewClient() *Client {
@ -26,6 +27,7 @@ func NewClient() *Client {
c.ExplorationStore = &ExplorationStore{client: c} c.ExplorationStore = &ExplorationStore{client: c}
c.SourcesStore = &SourcesStore{client: c} c.SourcesStore = &SourcesStore{client: c}
c.ServersStore = &ServersStore{client: c} c.ServersStore = &ServersStore{client: c}
c.AlertsStore = &AlertsStore{client: c}
c.LayoutStore = &LayoutStore{ c.LayoutStore = &LayoutStore{
client: c, client: c,
IDs: &uuid.V4{}, IDs: &uuid.V4{},
@ -59,20 +61,15 @@ func (c *Client) Open() error {
if _, err := tx.CreateBucketIfNotExists(LayoutBucket); err != nil { if _, err := tx.CreateBucketIfNotExists(LayoutBucket); err != nil {
return err return err
} }
// Always create Alerts bucket.
if _, err := tx.CreateBucketIfNotExists(AlertsBucket); err != nil {
return err
}
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
// TODO: Ask @gunnar about these
/*
c.ExplorationStore = &ExplorationStore{client: c}
c.SourcesStore = &SourcesStore{client: c}
c.ServersStore = &ServersStore{client: c}
c.LayoutStore = &LayoutStore{client: c}
*/
return nil return nil
} }

View File

@ -1,6 +1,7 @@
package internal package internal
import ( import (
"encoding/json"
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -161,3 +162,29 @@ func UnmarshalLayout(data []byte, l *chronograf.Layout) error {
l.Cells = cells l.Cells = cells
return nil return nil
} }
// MarshalAlertRule encodes an alert rule to binary protobuf format.
func MarshalAlertRule(r *chronograf.AlertRule) ([]byte, error) {
j, err := json.Marshal(r)
if err != nil {
return nil, err
}
return proto.Marshal(&AlertRule{
ID: r.ID,
JSON: string(j),
})
}
// UnmarshalAlertRule decodes an alert rule from binary protobuf data.
func UnmarshalAlertRule(data []byte, r *chronograf.AlertRule) error {
var pb AlertRule
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
err := json.Unmarshal([]byte(pb.JSON), r)
if err != nil {
return err
}
return nil
}

View File

@ -15,6 +15,7 @@ It has these top-level messages:
Layout Layout
Cell Cell
Query Query
AlertRule
*/ */
package internal package internal
@ -34,13 +35,13 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Exploration struct { type Exploration struct {
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` ID int64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name,json=name,proto3" json:"Name,omitempty"`
UserID int64 `protobuf:"varint,3,opt,name=UserID,proto3" json:"UserID,omitempty"` UserID int64 `protobuf:"varint,3,opt,name=UserID,json=userID,proto3" json:"UserID,omitempty"`
Data string `protobuf:"bytes,4,opt,name=Data,proto3" json:"Data,omitempty"` Data string `protobuf:"bytes,4,opt,name=Data,json=data,proto3" json:"Data,omitempty"`
CreatedAt int64 `protobuf:"varint,5,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"` CreatedAt int64 `protobuf:"varint,5,opt,name=CreatedAt,json=createdAt,proto3" json:"CreatedAt,omitempty"`
UpdatedAt int64 `protobuf:"varint,6,opt,name=UpdatedAt,proto3" json:"UpdatedAt,omitempty"` UpdatedAt int64 `protobuf:"varint,6,opt,name=UpdatedAt,json=updatedAt,proto3" json:"UpdatedAt,omitempty"`
Default bool `protobuf:"varint,7,opt,name=Default,proto3" json:"Default,omitempty"` Default bool `protobuf:"varint,7,opt,name=Default,json=default,proto3" json:"Default,omitempty"`
} }
func (m *Exploration) Reset() { *m = Exploration{} } func (m *Exploration) Reset() { *m = Exploration{} }
@ -49,13 +50,13 @@ func (*Exploration) ProtoMessage() {}
func (*Exploration) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} } func (*Exploration) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{0} }
type Source struct { type Source struct {
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` ID int64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name,json=name,proto3" json:"Name,omitempty"`
Type string `protobuf:"bytes,3,opt,name=Type,proto3" json:"Type,omitempty"` Type string `protobuf:"bytes,3,opt,name=Type,json=type,proto3" json:"Type,omitempty"`
Username string `protobuf:"bytes,4,opt,name=Username,proto3" json:"Username,omitempty"` Username string `protobuf:"bytes,4,opt,name=Username,json=username,proto3" json:"Username,omitempty"`
Password string `protobuf:"bytes,5,opt,name=Password,proto3" json:"Password,omitempty"` Password string `protobuf:"bytes,5,opt,name=Password,json=password,proto3" json:"Password,omitempty"`
URL string `protobuf:"bytes,6,opt,name=URL,proto3" json:"URL,omitempty"` URL string `protobuf:"bytes,6,opt,name=URL,json=uRL,proto3" json:"URL,omitempty"`
Default bool `protobuf:"varint,7,opt,name=Default,proto3" json:"Default,omitempty"` Default bool `protobuf:"varint,7,opt,name=Default,json=default,proto3" json:"Default,omitempty"`
} }
func (m *Source) Reset() { *m = Source{} } func (m *Source) Reset() { *m = Source{} }
@ -64,12 +65,12 @@ func (*Source) ProtoMessage() {}
func (*Source) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} } func (*Source) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{1} }
type Server struct { type Server struct {
ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` ID int64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name,json=name,proto3" json:"Name,omitempty"`
Username string `protobuf:"bytes,3,opt,name=Username,proto3" json:"Username,omitempty"` Username string `protobuf:"bytes,3,opt,name=Username,json=username,proto3" json:"Username,omitempty"`
Password string `protobuf:"bytes,4,opt,name=Password,proto3" json:"Password,omitempty"` Password string `protobuf:"bytes,4,opt,name=Password,json=password,proto3" json:"Password,omitempty"`
URL string `protobuf:"bytes,5,opt,name=URL,proto3" json:"URL,omitempty"` URL string `protobuf:"bytes,5,opt,name=URL,json=uRL,proto3" json:"URL,omitempty"`
SrcID int64 `protobuf:"varint,6,opt,name=SrcID,proto3" json:"SrcID,omitempty"` SrcID int64 `protobuf:"varint,6,opt,name=SrcID,json=srcID,proto3" json:"SrcID,omitempty"`
} }
func (m *Server) Reset() { *m = Server{} } func (m *Server) Reset() { *m = Server{} }
@ -78,10 +79,10 @@ func (*Server) ProtoMessage() {}
func (*Server) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} } func (*Server) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{2} }
type Layout struct { type Layout struct {
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
Application string `protobuf:"bytes,2,opt,name=Application,proto3" json:"Application,omitempty"` Application string `protobuf:"bytes,2,opt,name=Application,json=application,proto3" json:"Application,omitempty"`
Measurement string `protobuf:"bytes,3,opt,name=Measurement,proto3" json:"Measurement,omitempty"` Measurement string `protobuf:"bytes,3,opt,name=Measurement,json=measurement,proto3" json:"Measurement,omitempty"`
Cells []*Cell `protobuf:"bytes,4,rep,name=Cells" json:"Cells,omitempty"` Cells []*Cell `protobuf:"bytes,4,rep,name=Cells,json=cells" json:"Cells,omitempty"`
} }
func (m *Layout) Reset() { *m = Layout{} } func (m *Layout) Reset() { *m = Layout{} }
@ -119,9 +120,9 @@ func (m *Cell) GetQueries() []*Query {
} }
type Query struct { type Query struct {
Command string `protobuf:"bytes,1,opt,name=Command,proto3" json:"Command,omitempty"` Command string `protobuf:"bytes,1,opt,name=Command,json=command,proto3" json:"Command,omitempty"`
DB string `protobuf:"bytes,2,opt,name=DB,proto3" json:"DB,omitempty"` DB string `protobuf:"bytes,2,opt,name=DB,json=dB,proto3" json:"DB,omitempty"`
RP string `protobuf:"bytes,3,opt,name=RP,proto3" json:"RP,omitempty"` RP string `protobuf:"bytes,3,opt,name=RP,json=rP,proto3" json:"RP,omitempty"`
} }
func (m *Query) Reset() { *m = Query{} } func (m *Query) Reset() { *m = Query{} }
@ -129,6 +130,16 @@ func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} } func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} }
type AlertRule struct {
ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
JSON string `protobuf:"bytes,2,opt,name=JSON,json=jSON,proto3" json:"JSON,omitempty"`
}
func (m *AlertRule) Reset() { *m = AlertRule{} }
func (m *AlertRule) String() string { return proto.CompactTextString(m) }
func (*AlertRule) ProtoMessage() {}
func (*AlertRule) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{6} }
func init() { func init() {
proto.RegisterType((*Exploration)(nil), "internal.Exploration") proto.RegisterType((*Exploration)(nil), "internal.Exploration")
proto.RegisterType((*Source)(nil), "internal.Source") proto.RegisterType((*Source)(nil), "internal.Source")
@ -136,38 +147,42 @@ func init() {
proto.RegisterType((*Layout)(nil), "internal.Layout") proto.RegisterType((*Layout)(nil), "internal.Layout")
proto.RegisterType((*Cell)(nil), "internal.Cell") proto.RegisterType((*Cell)(nil), "internal.Cell")
proto.RegisterType((*Query)(nil), "internal.Query") proto.RegisterType((*Query)(nil), "internal.Query")
proto.RegisterType((*AlertRule)(nil), "internal.AlertRule")
} }
func init() { proto.RegisterFile("internal.proto", fileDescriptorInternal) } func init() { proto.RegisterFile("internal.proto", fileDescriptorInternal) }
var fileDescriptorInternal = []byte{ var fileDescriptorInternal = []byte{
// 442 bytes of a gzipped FileDescriptorProto // 486 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x8e, 0xd3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x8e, 0xd3, 0x3c,
0x10, 0xc6, 0xe5, 0x24, 0x4e, 0x9b, 0x29, 0x2a, 0xc8, 0x42, 0xc8, 0x42, 0x1c, 0xa2, 0x88, 0x43, 0x14, 0xc5, 0xe5, 0x26, 0x4e, 0x9a, 0xdb, 0x4f, 0xfd, 0x90, 0x85, 0x50, 0x84, 0x58, 0x54, 0x11,
0xb9, 0xec, 0x01, 0x9e, 0xa0, 0xdb, 0x70, 0xa8, 0xb4, 0xa0, 0xe2, 0xa5, 0x0f, 0x60, 0x5a, 0xa3, 0x8b, 0xb2, 0x19, 0x24, 0x78, 0x82, 0x4e, 0xc3, 0xa2, 0xa8, 0x74, 0x8a, 0x4b, 0x1f, 0xc0, 0x24,
0x8d, 0x94, 0x26, 0xc1, 0x71, 0x68, 0x73, 0xe5, 0x0a, 0x8f, 0xc1, 0x1b, 0xf0, 0x82, 0x68, 0x26, 0x17, 0x4d, 0x50, 0xfe, 0xe1, 0xd8, 0xb4, 0xd9, 0xb2, 0x85, 0xc7, 0xe0, 0x0d, 0x78, 0x41, 0x64,
0xee, 0x9f, 0xc3, 0x6a, 0xd5, 0xdb, 0x7c, 0x33, 0x5f, 0x34, 0x3f, 0x7f, 0x76, 0x60, 0x5a, 0x54, 0xd7, 0x21, 0x23, 0x81, 0x46, 0xb3, 0x3c, 0xf7, 0xdc, 0xe8, 0xfe, 0xee, 0xb9, 0x0e, 0xcc, 0x8b,
0xce, 0xd8, 0x4a, 0x97, 0x37, 0x8d, 0xad, 0x5d, 0x2d, 0xc6, 0x47, 0x9d, 0xfd, 0x63, 0x30, 0xf9, 0x5a, 0xa1, 0xac, 0x45, 0x79, 0xd5, 0xca, 0x46, 0x35, 0x6c, 0x3a, 0xe8, 0xe4, 0x17, 0x81, 0xd9,
0x78, 0x68, 0xca, 0xda, 0x6a, 0x57, 0xd4, 0x95, 0x98, 0x42, 0xb0, 0xcc, 0x25, 0x4b, 0xd9, 0x2c, 0x9b, 0x73, 0x5b, 0x36, 0x52, 0xa8, 0xa2, 0xa9, 0xd9, 0x1c, 0x26, 0x9b, 0x34, 0x26, 0x0b, 0xb2,
0x54, 0xc1, 0x32, 0x17, 0x02, 0xa2, 0xcf, 0x7a, 0x67, 0x64, 0x90, 0xb2, 0x59, 0xa2, 0xa8, 0x16, 0xf4, 0xf8, 0xa4, 0x48, 0x19, 0x03, 0x7f, 0x27, 0x2a, 0x8c, 0x27, 0x0b, 0xb2, 0x8c, 0xb8, 0x5f,
0xaf, 0x20, 0x5e, 0xb7, 0xc6, 0x2e, 0x73, 0x19, 0x92, 0xcf, 0x2b, 0xf4, 0xe6, 0xda, 0x69, 0x19, 0x8b, 0x0a, 0xd9, 0x13, 0x08, 0x8e, 0x1d, 0xca, 0x4d, 0x1a, 0x7b, 0xb6, 0x2f, 0xd0, 0x56, 0x99,
0x0d, 0x5e, 0xac, 0xc5, 0x1b, 0x48, 0x16, 0xd6, 0x68, 0x67, 0xb6, 0x73, 0x27, 0x39, 0xd9, 0xcf, 0xde, 0x54, 0x28, 0x11, 0xfb, 0x97, 0xde, 0x5c, 0x28, 0xc1, 0x9e, 0x41, 0xb4, 0x96, 0x28, 0x14,
0x0d, 0x9c, 0xae, 0x9b, 0xad, 0x9f, 0xc6, 0xc3, 0xf4, 0xd4, 0x10, 0x12, 0x46, 0xb9, 0xf9, 0xae, 0xe6, 0x2b, 0x15, 0x53, 0xdb, 0x1e, 0x65, 0x43, 0xc1, 0xb8, 0xc7, 0x36, 0x77, 0x6e, 0x70, 0x71,
0xbb, 0xd2, 0xc9, 0x51, 0xca, 0x66, 0x63, 0x75, 0x94, 0xd9, 0x5f, 0x06, 0xf1, 0x7d, 0xdd, 0xd9, 0xf5, 0x50, 0x60, 0x31, 0x84, 0x29, 0x7e, 0x12, 0xba, 0x54, 0x71, 0xb8, 0x20, 0xcb, 0x29, 0x0f,
0x8d, 0xb9, 0x0a, 0x58, 0x40, 0xf4, 0xb5, 0x6f, 0x0c, 0xe1, 0x26, 0x8a, 0x6a, 0xf1, 0x1a, 0xc6, 0xf3, 0x8b, 0x4c, 0x7e, 0x12, 0x08, 0x0e, 0x8d, 0x96, 0x19, 0x3e, 0x08, 0x98, 0x81, 0xff, 0xa1,
0x88, 0x5d, 0xa1, 0x77, 0x00, 0x3e, 0x69, 0x9c, 0xad, 0x74, 0xdb, 0xee, 0x6b, 0xbb, 0x25, 0xe6, 0x6f, 0xd1, 0xe2, 0x46, 0xdc, 0x57, 0x7d, 0x8b, 0xec, 0x29, 0x4c, 0xcd, 0x12, 0xc6, 0x77, 0xc0,
0x44, 0x9d, 0xb4, 0x78, 0x01, 0xe1, 0x5a, 0xdd, 0x11, 0x6c, 0xa2, 0xb0, 0x7c, 0x02, 0xf3, 0x0f, 0x53, 0xed, 0xb4, 0xf1, 0xf6, 0xa2, 0xeb, 0x4e, 0x8d, 0xcc, 0x2d, 0x73, 0xc4, 0xa7, 0xad, 0xd3,
0x62, 0x1a, 0xfb, 0xd3, 0xd8, 0xab, 0x30, 0x2f, 0x91, 0xc2, 0x27, 0x90, 0xa2, 0xc7, 0x91, 0xf8, 0xec, 0x11, 0x78, 0x47, 0xbe, 0xb5, 0xb0, 0x11, 0xf7, 0x34, 0xdf, 0xde, 0x83, 0xf9, 0xc3, 0x60,
0x19, 0xe9, 0x25, 0xf0, 0x7b, 0xbb, 0x59, 0xe6, 0x3e, 0xd3, 0x41, 0x64, 0xbf, 0x18, 0xc4, 0x77, 0xa2, 0xfc, 0x8a, 0xf2, 0x41, 0x98, 0x77, 0x91, 0xbc, 0x7b, 0x90, 0xfc, 0x7f, 0x23, 0xd1, 0x11,
0xba, 0xaf, 0x3b, 0x77, 0x81, 0x93, 0x10, 0x4e, 0x0a, 0x93, 0x79, 0xd3, 0x94, 0xc5, 0x86, 0x5e, 0xe9, 0x31, 0xd0, 0x83, 0xcc, 0x36, 0xa9, 0xcb, 0x94, 0x76, 0x46, 0x24, 0xdf, 0x08, 0x04, 0x5b,
0x81, 0xa7, 0xba, 0x6c, 0xa1, 0xe3, 0x93, 0xd1, 0x6d, 0x67, 0xcd, 0xce, 0x54, 0xce, 0xf3, 0x5d, 0xd1, 0x37, 0x5a, 0xdd, 0xc1, 0x89, 0x2c, 0xce, 0x02, 0x66, 0xab, 0xb6, 0x2d, 0x8b, 0xcc, 0xbe,
0xb6, 0xc4, 0x5b, 0xe0, 0x0b, 0x53, 0x96, 0xad, 0x8c, 0xd2, 0x70, 0x36, 0x79, 0x3f, 0xbd, 0x39, 0x02, 0x47, 0x35, 0x13, 0x63, 0xc9, 0x74, 0xbc, 0x43, 0xd1, 0x69, 0x89, 0x15, 0xd6, 0xca, 0xf1,
0x3d, 0x3a, 0x6c, 0xab, 0x61, 0x98, 0xfd, 0x66, 0x10, 0x61, 0x25, 0x9e, 0x01, 0x3b, 0x10, 0x01, 0xcd, 0xaa, 0xb1, 0xc4, 0x9e, 0x03, 0x5d, 0x63, 0x59, 0x76, 0xb1, 0xbf, 0xf0, 0x96, 0xb3, 0x57,
0x57, 0xec, 0x80, 0xaa, 0xa7, 0xb5, 0x5c, 0xb1, 0x1e, 0xd5, 0x9e, 0x56, 0x70, 0xc5, 0xf6, 0xa8, 0xf3, 0xab, 0x3f, 0x8f, 0xce, 0x94, 0x39, 0xcd, 0x8c, 0x99, 0x7c, 0x27, 0xe0, 0x1b, 0xcd, 0xfe,
0x1e, 0xe8, 0xd0, 0x5c, 0xb1, 0x07, 0xf1, 0x0e, 0x46, 0x3f, 0x3a, 0x63, 0x0b, 0xd3, 0x4a, 0x4e, 0x03, 0x72, 0xb6, 0x04, 0x94, 0x93, 0xb3, 0x51, 0xbd, 0x1d, 0x4b, 0x39, 0xe9, 0x8d, 0x3a, 0xd9,
0x8b, 0x9e, 0x9f, 0x17, 0x7d, 0xe9, 0x8c, 0xed, 0xd5, 0x71, 0x8e, 0x1f, 0x16, 0xfe, 0xa6, 0x58, 0x11, 0x94, 0x93, 0x93, 0x51, 0xb7, 0x76, 0x69, 0xca, 0xc9, 0x2d, 0x7b, 0x01, 0xe1, 0x17, 0x8d,
0x81, 0x91, 0x53, 0xb4, 0xa3, 0x21, 0x72, 0xac, 0xb3, 0x39, 0x70, 0xfa, 0x06, 0x2f, 0x71, 0x51, 0xb2, 0xc0, 0x2e, 0xa6, 0x76, 0xd0, 0xff, 0xe3, 0xa0, 0xf7, 0x1a, 0x65, 0xcf, 0x07, 0xdf, 0x7c,
0xef, 0x76, 0xba, 0xda, 0xfa, 0x54, 0x8e, 0x12, 0xa3, 0xca, 0x6f, 0x7d, 0x22, 0x41, 0x7e, 0x8b, 0x58, 0xb8, 0x4b, 0x91, 0xc2, 0x44, 0x6e, 0xa3, 0x0d, 0xc7, 0xc8, 0x93, 0x15, 0x50, 0xfb, 0x8d,
0x5a, 0xad, 0xfc, 0xf9, 0x03, 0xb5, 0xfa, 0x16, 0xd3, 0x2f, 0xf5, 0xe1, 0x7f, 0x00, 0x00, 0x00, 0x39, 0xe2, 0xba, 0xa9, 0x2a, 0x51, 0xe7, 0x2e, 0x95, 0x30, 0xbb, 0x48, 0x13, 0x55, 0x7a, 0xed,
0xff, 0xff, 0x85, 0xa7, 0xa7, 0xb1, 0x64, 0x03, 0x00, 0x00, 0x12, 0x99, 0xe4, 0xd7, 0x46, 0xf3, 0xbd, 0xdb, 0x7f, 0x22, 0xf7, 0xc9, 0x4b, 0x88, 0x56, 0x25,
0x4a, 0xc5, 0x75, 0x89, 0x7f, 0xe5, 0xca, 0xc0, 0x7f, 0x7b, 0xb8, 0xd9, 0x0d, 0x67, 0xfe, 0x7c,
0xb8, 0xd9, 0x7d, 0x0c, 0xec, 0x3f, 0xf8, 0xfa, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe0, 0xb6,
0x3a, 0xf1, 0x95, 0x03, 0x00, 0x00,
} }

View File

@ -52,3 +52,8 @@ message Query {
string DB = 2; // DB the database for the query (optional) string DB = 2; // DB the database for the query (optional)
string RP = 3; // RP is a retention policy and optional; string RP = 3; // RP is a retention policy and optional;
} }
message AlertRule {
string ID = 1; // ID is the unique ID of this alert rule
string JSON = 2; // JSON byte representation of the alert
}

View File

@ -13,6 +13,7 @@ const (
ErrSourceNotFound = Error("source not found") ErrSourceNotFound = Error("source not found")
ErrServerNotFound = Error("server not found") ErrServerNotFound = Error("server not found")
ErrLayoutNotFound = Error("layout not found") ErrLayoutNotFound = Error("layout not found")
ErrAlertNotFound = Error("alert not found")
ErrAuthentication = Error("user not authenticated") ErrAuthentication = Error("user not authenticated")
) )
@ -68,7 +69,7 @@ type Source struct {
Name string `json:"name"` // Name is the user-defined name for the source Name string `json:"name"` // Name is the user-defined name for the source
Type string `json:"type,omitempty"` // Type specifies which kinds of source (enterprise vs oss) Type string `json:"type,omitempty"` // Type specifies which kinds of source (enterprise vs oss)
Username string `json:"username,omitempty"` // Username is the username to connect to the source Username string `json:"username,omitempty"` // Username is the username to connect to the source
Password string `json:"password,omitempty"` // Password is in CLEARTEXT FIXME Password string `json:"password,omitempty"` // Password is in CLEARTEXT // TODO: fixme
URL string `json:"url"` // URL are the connections to the source URL string `json:"url"` // URL are the connections to the source
Default bool `json:"default"` // Default specifies the default source for the application Default bool `json:"default"` // Default specifies the default source for the application
} }
@ -87,13 +88,78 @@ type SourcesStore interface {
Update(context.Context, Source) error Update(context.Context, Source) error
} }
// AlertRule represents rules for building a tickscript alerting task
type AlertRule struct {
ID string `json:"id,omitempty"` // ID is the unique ID of the alert
Query QueryConfig `json:"query"` // Query is the filter of data for the alert.
Every string `json:"every"` // Every how often to check for the alerting criteria
Alerts []string `json:"alerts"` // AlertServices name all the services to notify (e.g. pagerduty)
Message string `json:"message"` // Message included with alert
Trigger string `json:"trigger"` // Trigger is a type that defines when to trigger the alert
TriggerValues TriggerValues `json:"values"` // Defines the values that cause the alert to trigger
Name string `json:"name"` // Name is the user-defined name for the alert
}
// AlertRulesStore stores rules for building tickscript alerting tasks
type AlertRulesStore interface {
// All returns all rules in the store
All(context.Context) ([]AlertRule, error)
// Add creates a new rule in the AlertRulesStore and returns AlertRule with ID
Add(context.Context, AlertRule) (AlertRule, error)
// Delete the AlertRule from the store
Delete(context.Context, AlertRule) error
// Get retrieves AlertRule if `ID` exists
Get(ctx context.Context, ID string) (AlertRule, error)
// Update the AlertRule in the store.
Update(context.Context, AlertRule) error
}
// TICKScript task to be used by kapacitor
type TICKScript string
// Ticker generates tickscript tasks for kapacitor
type Ticker interface {
// Generate will create the tickscript to be used as a kapacitor task
Generate(AlertRule) (TICKScript, error)
}
// TriggerValues specifies the alerting logic for a specific trigger type
type TriggerValues struct {
Change string `json:"change,omitempty"` // Change specifies if the change is a percent or absolute
Period string `json:"period,omitempty"` // Period is the window to search for alerting criteria
Shift string `json:"shift,omitempty"` // Shift is the amount of time to look into the past for the alert to compare to the present
Operator string `json:"operator,omitempty"` // Operator for alert comparison
Value string `json:"value,omitempty"` // Value is the boundary value when alert goes critical
Percentile string `json:"percentile,omitempty"` // Percentile is defined only when Relation is not "Once"
Relation string `json:"relation,omitempty"` // Relation defines the logic about how often the threshold is met to be an alert.
}
// QueryConfig represents UI query from the data explorer
type QueryConfig struct {
ID string `json:"id,omitempty"`
Database string `json:"database"`
Measurement string `json:"measurement"`
RetentionPolicy string `json:"retentionPolicy"`
Fields []struct {
Field string `json:"field"`
Funcs []string `json:"funcs"`
} `json:"fields"`
Tags map[string][]string `json:"tags"`
GroupBy struct {
Time string `json:"time"`
Tags []string `json:"tags"`
} `json:"groupBy"`
AreTagsAccepted bool `json:"areTagsAccepted"`
RawText string `json:"rawText,omitempty"`
}
// Server represents a proxy connection to an HTTP server // Server represents a proxy connection to an HTTP server
type Server struct { type Server struct {
ID int // ID is the unique ID of the server ID int // ID is the unique ID of the server
SrcID int // SrcID of the data source SrcID int // SrcID of the data source
Name string // Name is the user-defined name for the server Name string // Name is the user-defined name for the server
Username string // Username is the username to connect to the server Username string // Username is the username to connect to the server
Password string // Password is in CLEARTEXT FIXME Password string // Password is in CLEARTEXT // TODO: FIXME
URL string // URL are the connections to the server URL string // URL are the connections to the server
} }

42
kapacitor/alerts.go Normal file
View File

@ -0,0 +1,42 @@
package kapacitor
import (
"fmt"
"github.com/influxdata/chronograf"
)
func kapaService(alert string) (string, error) {
switch alert {
case "hipchat":
return "hipChat", nil
case "opsgenie":
return "opsGenie", nil
case "pagerduty":
return "pagerDuty", nil
case "victorops":
return "victorOps", nil
case "smtp":
return "email", nil
case "sensu", "slack", "email", "talk", "telegram":
return alert, nil
default:
return "", fmt.Errorf("Unsupport alert %s", alert)
}
}
// AlertServices generates alert chaining methods to be attached to an alert from all rule Services
func AlertServices(rule chronograf.AlertRule) (string, error) {
alert := ""
for _, service := range rule.Alerts {
srv, err := kapaService(service)
if err != nil {
return "", err
}
if err := ValidateAlert(srv); err != nil {
return "", err
}
alert = alert + fmt.Sprintf(".%s()", srv)
}
return alert, nil
}

71
kapacitor/alerts_test.go Normal file
View File

@ -0,0 +1,71 @@
package kapacitor
import (
"testing"
"github.com/influxdata/chronograf"
)
func TestAlertServices(t *testing.T) {
tests := []struct {
name string
rule chronograf.AlertRule
want chronograf.TICKScript
wantErr bool
}{
{
name: "Test several valid services",
rule: chronograf.AlertRule{
Alerts: []string{"slack", "victorops", "email"},
},
want: `alert()
.slack()
.victorOps()
.email()
`,
},
{
name: "Test single invalid services amongst several valid",
rule: chronograf.AlertRule{
Alerts: []string{"slack", "invalid", "email"},
},
want: ``,
wantErr: true,
},
{
name: "Test single invalid service",
rule: chronograf.AlertRule{
Alerts: []string{"invalid"},
},
want: ``,
wantErr: true,
},
{
name: "Test single valid service",
rule: chronograf.AlertRule{
Alerts: []string{"slack"},
},
want: `alert()
.slack()
`,
},
}
for _, tt := range tests {
got, err := AlertServices(tt.rule)
if (err != nil) != tt.wantErr {
t.Errorf("%q. AlertServices() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
if tt.wantErr {
continue
}
formatted, err := formatTick("alert()" + got)
if err != nil {
t.Errorf("%q. formatTick() error = %v", tt.name, err)
continue
}
if formatted != tt.want {
t.Errorf("%q. AlertServices() = %v, want %v", tt.name, formatted, tt.want)
}
}
}

139
kapacitor/client.go Normal file
View File

@ -0,0 +1,139 @@
package kapacitor
import (
"context"
"fmt"
"github.com/influxdata/chronograf"
client "github.com/influxdata/kapacitor/client/v1"
)
// Client communicates to kapacitor
type Client struct {
URL string
Username string
Password string
ID chronograf.ID
Ticker chronograf.Ticker
}
const (
// Prefix is prepended to the ID of all alerts
Prefix = "chronograf-v1-"
)
// Task represents a running kapacitor task
type Task struct {
ID string // Kapacitor ID
Href string // Kapacitor relative URI
TICKScript chronograf.TICKScript // TICKScript is the running script
}
// Href returns the link to a kapacitor task given an id
func (c *Client) Href(ID string) string {
return fmt.Sprintf("/kapacitor/v1/tasks/%s", ID)
}
// Create builds and POSTs a tickscript to kapacitor
func (c *Client) Create(ctx context.Context, rule chronograf.AlertRule) (*Task, error) {
kapa, err := c.kapaClient(ctx)
if err != nil {
return nil, err
}
id, err := c.ID.Generate()
if err != nil {
return nil, err
}
script, err := c.Ticker.Generate(rule)
if err != nil {
return nil, err
}
kapaID := Prefix + id
task, err := kapa.CreateTask(client.CreateTaskOptions{
ID: kapaID,
Type: toTask(rule.Query),
DBRPs: []client.DBRP{{Database: rule.Query.Database, RetentionPolicy: rule.Query.RetentionPolicy}},
TICKscript: string(script),
Status: client.Enabled,
})
if err != nil {
return nil, err
}
return &Task{
ID: kapaID,
Href: task.Link.Href,
TICKScript: script,
}, nil
}
// Delete removes tickscript task from kapacitor
func (c *Client) Delete(ctx context.Context, href string) error {
kapa, err := c.kapaClient(ctx)
if err != nil {
return err
}
return kapa.DeleteTask(client.Link{Href: href})
}
// Update changes the tickscript of a given id.
func (c *Client) Update(ctx context.Context, href string, rule chronograf.AlertRule) (*Task, error) {
kapa, err := c.kapaClient(ctx)
if err != nil {
return nil, err
}
script, err := c.Ticker.Generate(rule)
if err != nil {
return nil, err
}
opts := client.UpdateTaskOptions{
TICKscript: string(script),
Status: client.Enabled,
Type: toTask(rule.Query),
DBRPs: []client.DBRP{
{
Database: rule.Query.Database,
RetentionPolicy: rule.Query.RetentionPolicy,
},
},
}
task, err := kapa.UpdateTask(client.Link{Href: href}, opts)
if err != nil {
return nil, err
}
return &Task{
ID: task.ID,
Href: task.Link.Href,
TICKScript: script,
}, nil
}
func (c *Client) kapaClient(ctx context.Context) (*client.Client, error) {
var creds *client.Credentials
if c.Username != "" {
creds = &client.Credentials{
Method: client.UserAuthentication,
Username: c.Username,
Password: c.Password,
}
}
return client.New(client.Config{
URL: c.URL,
Credentials: creds,
})
}
func toTask(q chronograf.QueryConfig) client.TaskType {
if q.RawText == "" {
return client.StreamTask
}
return client.BatchTask
}

53
kapacitor/data.go Normal file
View File

@ -0,0 +1,53 @@
package kapacitor
import (
"fmt"
"github.com/influxdata/chronograf"
)
// Data returns the tickscript data section for querying
func Data(rule chronograf.AlertRule) (string, error) {
if rule.Query.RawText != "" {
batch := `
var data = batch
|query('''
%s
''')
.period(period)
.every(every)
.align()`
batch = fmt.Sprintf(batch, rule.Query.RawText)
if rule.Query.GroupBy.Time != "" {
batch = batch + fmt.Sprintf(".groupBy(%s)", rule.Query.GroupBy.Time)
}
return batch, nil
}
stream := `var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
`
stream = fmt.Sprintf("%s\n.groupBy(groupby)\n", stream)
stream = stream + ".where(where_filter)\n"
// Only need aggregate functions for threshold and relative
value := ""
if rule.Trigger != "deadman" {
for _, field := range rule.Query.Fields {
for _, fnc := range field.Funcs {
// Only need a window if we have an aggregate function
value = value + "|window().period(period).every(every).align()\n"
value = value + fmt.Sprintf(`|%s(field).as(value)`, fnc)
break // only support a single field
}
break // only support a single field
}
}
if value == "" {
value = `|eval(lambda: field).as(value)`
}
return stream + value, nil
}

60
kapacitor/data_test.go Normal file
View File

@ -0,0 +1,60 @@
package kapacitor
import (
"encoding/json"
"fmt"
"testing"
"github.com/influxdata/chronograf"
)
var config = `{
"id": "93e17825-2fb0-4507-87bd-a0c136947f7e",
"database": "telegraf",
"measurement": "cpu",
"retentionPolicy": "default",
"fields": [{
"field": "usage_user",
"funcs": ["mean"]
}],
"tags": {
"host": [
"acc-0eabc309-eu-west-1-data-3",
"prod"
],
"cpu": [
"cpu_total"
]
},
"groupBy": {
"time": null,
"tags": [
"host",
"cluster_id"
]
},
"areTagsAccepted": true,
"rawText": null
}`
func TestData(t *testing.T) {
q := chronograf.QueryConfig{}
err := json.Unmarshal([]byte(config), &q)
if err != nil {
t.Errorf("Error unmarshaling %v", err)
}
alert := chronograf.AlertRule{
Trigger: "deadman",
Query: q,
}
if tick, err := Data(alert); err != nil {
t.Errorf("Error creating tick %v", err)
} else {
_, err := formatTick(tick)
if err != nil {
fmt.Printf(tick)
t.Errorf("Error formatting tick %v", err)
}
}
}

17
kapacitor/influxout.go Normal file
View File

@ -0,0 +1,17 @@
package kapacitor
import "github.com/influxdata/chronograf"
// InfluxOut creates a kapacitor influxDBOut node to write alert data to Database, RP, Measurement.
func InfluxOut(rule chronograf.AlertRule) string {
return `
trigger
|influxDBOut()
.create()
.database(output_db)
.retentionPolicy(output_rp)
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`
}

View File

@ -0,0 +1,38 @@
package kapacitor
import "testing"
import "github.com/influxdata/chronograf"
func TestInfluxOut(t *testing.T) {
tests := []struct {
name string
want chronograf.TICKScript
}{
{
name: "Test influxDBOut kapacitor node",
want: `trigger
|influxDBOut()
.create()
.database(output_db)
.retentionPolicy(output_rp)
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`,
},
}
for _, tt := range tests {
got := InfluxOut(chronograf.AlertRule{
Name: "name",
Trigger: "deadman",
})
formatted, err := formatTick(got)
if err != nil {
t.Errorf("%q. formatTick() error = %v", tt.name, err)
continue
}
if formatted != tt.want {
t.Errorf("%q. InfluxOut() = %v, want %v", tt.name, formatted, tt.want)
}
}
}

1
kapacitor/templates.go Normal file
View File

@ -0,0 +1 @@
package kapacitor

42
kapacitor/tickscripts.go Normal file
View File

@ -0,0 +1,42 @@
package kapacitor
import (
"fmt"
"github.com/influxdata/chronograf"
)
var _ chronograf.Ticker = &Alert{}
// Alert defines alerting strings in template rendering
type Alert struct{}
// Generate creates a Tickscript from the alertrule
func (a *Alert) Generate(rule chronograf.AlertRule) (chronograf.TICKScript, error) {
vars, err := Vars(rule)
if err != nil {
return "", nil
}
data, err := Data(rule)
if err != nil {
return "", nil
}
trigger, err := Trigger(rule)
if err != nil {
return "", err
}
services, err := AlertServices(rule)
if err != nil {
return "", err
}
output := InfluxOut(rule)
raw := fmt.Sprintf("%s\n%s\n%s%s\n%s", vars, data, trigger, services, output)
tick, err := formatTick(raw)
if err != nil {
return "", err
}
if err := validateTick(tick); err != nil {
return tick, err
}
return tick, nil
}

View File

@ -0,0 +1,527 @@
package kapacitor
import (
"encoding/json"
"fmt"
"testing"
"github.com/influxdata/chronograf"
)
func TestGenerate(t *testing.T) {
alert := chronograf.AlertRule{
Name: "name",
Trigger: "relative",
Alerts: []string{"slack", "victorops", "email"},
TriggerValues: chronograf.TriggerValues{
Change: "change",
Period: "10m",
Shift: "1m",
Operator: "greater than",
Value: "90",
},
Every: "30s",
Query: chronograf.QueryConfig{
Database: "telegraf",
Measurement: "cpu",
RetentionPolicy: "autogen",
Fields: []struct {
Field string `json:"field"`
Funcs []string `json:"funcs"`
}{
{
Field: "usage_user",
Funcs: []string{"mean"},
},
},
Tags: map[string][]string{
"host": []string{
"acc-0eabc309-eu-west-1-data-3",
"prod",
},
"cpu": []string{
"cpu_total",
},
},
GroupBy: struct {
Time string `json:"time"`
Tags []string `json:"tags"`
}{
Time: "",
Tags: []string{"host", "cluster_id"},
},
AreTagsAccepted: true,
RawText: "",
},
}
gen := Alert{}
tick, err := gen.Generate(alert)
if err != nil {
fmt.Printf("%s", tick)
t.Errorf("Error generating alert: %v", err)
}
}
func TestThreshold(t *testing.T) {
alert := chronograf.AlertRule{
Name: "name",
Trigger: "threshold",
Alerts: []string{"slack", "victorops", "email"},
TriggerValues: chronograf.TriggerValues{
Relation: "once",
Period: "10m",
Percentile: "", // TODO: if relation is not once then this will have a number
Operator: "greater than",
Value: "90",
},
Every: "30s",
Message: "message",
Query: chronograf.QueryConfig{
Database: "telegraf",
Measurement: "cpu",
RetentionPolicy: "autogen",
Fields: []struct {
Field string `json:"field"`
Funcs []string `json:"funcs"`
}{
{
Field: "usage_user",
Funcs: []string{"mean"},
},
},
Tags: map[string][]string{
"host": []string{
"acc-0eabc309-eu-west-1-data-3",
"prod",
},
"cpu": []string{
"cpu_total",
},
},
GroupBy: struct {
Time string `json:"time"`
Tags []string `json:"tags"`
}{
Time: "",
Tags: []string{"host", "cluster_id"},
},
AreTagsAccepted: true,
RawText: "",
},
}
tests := []struct {
name string
alert chronograf.AlertRule
want chronograf.TICKScript
wantErr bool
}{
{
name: "Test valid template alert",
alert: alert,
want: `var db = 'telegraf'
var rp = 'autogen'
var measurement = 'cpu'
var field = 'usage_user'
var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var name = 'name'
var idVar = name + ':{{.Group}}'
var message = 'message'
var idtag = 'alertID'
var leveltag = 'level'
var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
var output_mt = 'alerts'
var triggerType = 'threshold'
var period = 10m
var crit = 90
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy(groupby)
.where(where_filter)
|window()
.period(period)
.every(every)
.align()
|mean(field)
.as(value)
var trigger = data
|alert()
.stateChangesOnly()
.crit(lambda: "value" > crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
.slack()
.victorOps()
.email()
trigger
|influxDBOut()
.create()
.database(output_db)
.retentionPolicy(output_rp)
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`,
wantErr: false,
},
}
for _, tt := range tests {
gen := Alert{}
got, err := gen.Generate(tt.alert)
if (err != nil) != tt.wantErr {
t.Errorf("%q. Threshold() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
if got != tt.want {
fmt.Printf("%s", got)
t.Errorf("%q. Threshold() = %v, want %v", tt.name, got, tt.want)
}
}
}
func TestRelative(t *testing.T) {
alert := chronograf.AlertRule{
Name: "name",
Trigger: "relative",
Alerts: []string{"slack", "victorops", "email"},
TriggerValues: chronograf.TriggerValues{
Change: "change",
Period: "10m",
Shift: "1m",
Operator: "greater than",
Value: "90",
},
Every: "30s",
Message: "message",
Query: chronograf.QueryConfig{
Database: "telegraf",
Measurement: "cpu",
RetentionPolicy: "autogen",
Fields: []struct {
Field string `json:"field"`
Funcs []string `json:"funcs"`
}{
{
Field: "usage_user",
Funcs: []string{"mean"},
},
},
Tags: map[string][]string{
"host": []string{
"acc-0eabc309-eu-west-1-data-3",
"prod",
},
"cpu": []string{
"cpu_total",
},
},
GroupBy: struct {
Time string `json:"time"`
Tags []string `json:"tags"`
}{
Time: "",
Tags: []string{"host", "cluster_id"},
},
AreTagsAccepted: true,
RawText: "",
},
}
tests := []struct {
name string
alert chronograf.AlertRule
want chronograf.TICKScript
wantErr bool
}{
{
name: "Test valid template alert",
alert: alert,
want: `var db = 'telegraf'
var rp = 'autogen'
var measurement = 'cpu'
var field = 'usage_user'
var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var name = 'name'
var idVar = name + ':{{.Group}}'
var message = 'message'
var idtag = 'alertID'
var leveltag = 'level'
var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
var output_mt = 'alerts'
var triggerType = 'relative'
var period = 10m
var shift = -1m
var crit = 90
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy(groupby)
.where(where_filter)
|window()
.period(period)
.every(every)
.align()
|mean(field)
.as(value)
var past = data
|shift(shift)
var current = data
var trigger = past
|join(current)
.as('past', 'current')
|eval(lambda: abs(float("current.value" - "past.value")) / float("past.value"))
.keep()
.as('value')
|alert()
.stateChangesOnly()
.crit(lambda: "value" > crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
.slack()
.victorOps()
.email()
trigger
|influxDBOut()
.create()
.database(output_db)
.retentionPolicy(output_rp)
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`,
wantErr: false,
},
}
for _, tt := range tests {
gen := Alert{}
got, err := gen.Generate(tt.alert)
if (err != nil) != tt.wantErr {
t.Errorf("%q. Relative() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
if got != tt.want {
fmt.Printf("%s", got)
t.Errorf("%q. Relative() = %v, want %v", tt.name, got, tt.want)
}
b, _ := json.Marshal(tt.alert)
fmt.Printf("%s", string(b))
}
}
func TestDeadman(t *testing.T) {
alert := chronograf.AlertRule{
Name: "name",
Trigger: "deadman",
Alerts: []string{"slack", "victorops", "email"},
TriggerValues: chronograf.TriggerValues{
Period: "10m",
},
Every: "30s",
Message: "message",
Query: chronograf.QueryConfig{
Database: "telegraf",
Measurement: "cpu",
RetentionPolicy: "autogen",
Fields: []struct {
Field string `json:"field"`
Funcs []string `json:"funcs"`
}{
{
Field: "usage_user",
Funcs: []string{"mean"},
},
},
Tags: map[string][]string{
"host": []string{
"acc-0eabc309-eu-west-1-data-3",
"prod",
},
"cpu": []string{
"cpu_total",
},
},
GroupBy: struct {
Time string `json:"time"`
Tags []string `json:"tags"`
}{
Time: "",
Tags: []string{"host", "cluster_id"},
},
AreTagsAccepted: true,
RawText: "",
},
}
tests := []struct {
name string
alert chronograf.AlertRule
want chronograf.TICKScript
wantErr bool
}{
{
name: "Test valid template alert",
alert: alert,
want: `var db = 'telegraf'
var rp = 'autogen'
var measurement = 'cpu'
var field = 'usage_user'
var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var name = 'name'
var idVar = name + ':{{.Group}}'
var message = 'message'
var idtag = 'alertID'
var leveltag = 'level'
var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
var output_mt = 'alerts'
var triggerType = 'deadman'
var threshold = 0.0
var period = 10m
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy(groupby)
.where(where_filter)
|eval(lambda: field)
.as(value)
var trigger = data
|deadman(threshold, every)
.stateChangesOnly()
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
.slack()
.victorOps()
.email()
trigger
|influxDBOut()
.create()
.database(output_db)
.retentionPolicy(output_rp)
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`,
wantErr: false,
},
}
for _, tt := range tests {
gen := Alert{}
got, err := gen.Generate(tt.alert)
if (err != nil) != tt.wantErr {
t.Errorf("%q. Deadman() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
if got != tt.want {
fmt.Printf("%s", got)
t.Errorf("%q. Deadman() = %v, want %v", tt.name, got, tt.want)
}
}
}

96
kapacitor/triggers.go Normal file
View File

@ -0,0 +1,96 @@
package kapacitor
import "github.com/influxdata/chronograf"
import "fmt"
// ThresholdTrigger is the trickscript trigger for alerts that exceed a value
var ThresholdTrigger = `
var trigger = data
|alert()
.stateChangesOnly()
.crit(lambda: "value" %s crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`
// RelativeTrigger compares one window of data versus another.
var RelativeTrigger = `
var past = data
|shift(shift)
var current = data
var trigger = past
|join(current)
.as('past', 'current')
|eval(lambda: abs(float("current.value" - "past.value"))/float("past.value"))
.keep()
.as('value')
|alert()
.stateChangesOnly()
.crit(lambda: "value" %s crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`
// DeadmanTrigger checks if any data has been streamed in the last period of time
var DeadmanTrigger = `
var trigger = data|deadman(threshold, every)
.stateChangesOnly()
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`
// Trigger returns the trigger mechanism for a tickscript
func Trigger(rule chronograf.AlertRule) (string, error) {
switch rule.Trigger {
case "deadman":
return DeadmanTrigger, nil
case "relative":
op, err := kapaOperator(rule.TriggerValues.Operator)
if err != nil {
return "", err
}
return fmt.Sprintf(RelativeTrigger, op), nil
case "threshold":
op, err := kapaOperator(rule.TriggerValues.Operator)
if err != nil {
return "", err
}
return fmt.Sprintf(ThresholdTrigger, op), nil
default:
return "", fmt.Errorf("Unknown trigger type: %s", rule.Trigger)
}
}
// kapaOperator converts UI strings to kapacitor operators
func kapaOperator(operator string) (string, error) {
switch operator {
case "greater than":
return ">", nil
case "less than":
return "<", nil
case "equal to or less than":
return "<=", nil
case "equal to or greater than":
return ">=", nil
case "equal":
return "==", nil
case "not equal":
return "!=", nil
default:
return "", fmt.Errorf("invalid operator: %s is unknown", operator)
}
}

109
kapacitor/triggers_test.go Normal file
View File

@ -0,0 +1,109 @@
package kapacitor
import (
"testing"
"github.com/influxdata/chronograf"
)
func TestTrigger(t *testing.T) {
tests := []struct {
name string
rule chronograf.AlertRule
want string
wantErr bool
}{
{
name: "Test Deadman",
rule: chronograf.AlertRule{
Trigger: "deadman",
},
want: `var trigger = data
|deadman(threshold, every)
.stateChangesOnly()
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`,
wantErr: false,
},
{
name: "Test Relative",
rule: chronograf.AlertRule{
Trigger: "relative",
TriggerValues: chronograf.TriggerValues{
Operator: "greater than",
},
},
want: `var past = data
|shift(shift)
var current = data
var trigger = past
|join(current)
.as('past', 'current')
|eval(lambda: abs(float("current.value" - "past.value")) / float("past.value"))
.keep()
.as('value')
|alert()
.stateChangesOnly()
.crit(lambda: "value" > crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`,
wantErr: false,
},
{
name: "Test Threshold",
rule: chronograf.AlertRule{
Trigger: "threshold",
TriggerValues: chronograf.TriggerValues{
Operator: "greater than",
},
},
want: `var trigger = data
|alert()
.stateChangesOnly()
.crit(lambda: "value" > crit)
.message(message)
.id(idVar)
.idTag(idtag)
.levelTag(leveltag)
.messageField(messagefield)
.durationField(durationfield)
`,
wantErr: false,
},
{
name: "Test Invalid",
rule: chronograf.AlertRule{
Trigger: "invalid",
},
want: ``,
wantErr: true,
},
}
for _, tt := range tests {
got, err := Trigger(tt.rule)
if (err != nil) != tt.wantErr {
t.Errorf("%q. Trigger() error = %v, wantErr %v", tt.name, err, tt.wantErr)
continue
}
formatted, err := formatTick(got)
if err != nil {
t.Errorf("%q. formatTick() error = %v", tt.name, err)
continue
}
if string(formatted) != tt.want {
t.Errorf("%q. Trigger() = \n%v\n want \n%v\n", tt.name, string(formatted), tt.want)
}
}
}

56
kapacitor/validate.go Normal file
View File

@ -0,0 +1,56 @@
package kapacitor
import (
"bytes"
"fmt"
"log"
"time"
"github.com/influxdata/chronograf"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
)
// ValidateAlert checks if the alert is a valid kapacitor alert service.
func ValidateAlert(service string) error {
// Simple tick script to check alert service.
// If a pipeline cannot be created then we know this is an invalid
// service. At least with this version of kapacitor!
script := fmt.Sprintf("stream|from()|alert().%s()", service)
return validateTick(chronograf.TICKScript(script))
}
func formatTick(tickscript string) (chronograf.TICKScript, error) {
node, err := ast.Parse(tickscript)
if err != nil {
log.Fatalf("parse execution: %s", err)
return "", err
}
output := new(bytes.Buffer)
node.Format(output, "", true)
return chronograf.TICKScript(output.String()), nil
}
func validateTick(script chronograf.TICKScript) error {
scope := stateful.NewScope()
predefinedVars := map[string]tick.Var{}
_, err := pipeline.CreatePipeline(string(script), pipeline.StreamEdge, scope, &deadman{}, predefinedVars)
return err
}
type deadman struct {
interval time.Duration
threshold float64
id string
message string
global bool
}
func (d deadman) Interval() time.Duration { return d.interval }
func (d deadman) Threshold() float64 { return d.threshold }
func (d deadman) Id() string { return d.id }
func (d deadman) Message() string { return d.message }
func (d deadman) Global() bool { return d.global }

View File

@ -0,0 +1,52 @@
package kapacitor
import "testing"
import "github.com/influxdata/chronograf"
func TestValidateAlert(t *testing.T) {
tests := []struct {
name string
service string
wantErr bool
}{
{
name: "Test valid template alert",
service: "slack",
wantErr: false,
},
{
name: "Test invalid template alert",
service: "invalid",
wantErr: true,
},
}
for _, tt := range tests {
if err := ValidateAlert(tt.service); (err != nil) != tt.wantErr {
t.Errorf("%q. ValidateAlert() error = %v, wantErr %v", tt.name, err, tt.wantErr)
}
}
}
func Test_validateTick(t *testing.T) {
tests := []struct {
name string
script chronograf.TICKScript
wantErr bool
}{
{
name: "Valid Script",
script: "stream|from()",
wantErr: false,
},
{
name: "Invalid Script",
script: "stream|nothing",
wantErr: true,
},
}
for _, tt := range tests {
if err := validateTick(tt.script); (err != nil) != tt.wantErr {
t.Errorf("%q. validateTick() error = %v, wantErr %v", tt.name, err, tt.wantErr)
}
}
}

175
kapacitor/vars.go Normal file
View File

@ -0,0 +1,175 @@
package kapacitor
import (
"fmt"
"sort"
"strings"
"github.com/influxdata/chronograf"
)
var (
// Database is the output database for alerts.
Database = "chronograf"
// RP will be autogen for alerts because it is default.
RP = "autogen"
// Measurement will be alerts so that the app knows where to get this data.
Measurement = "alerts"
// IDTag is the output tag key for the ID of the alert
IDTag = "alertID"
//LevelTag is the output tag key for the alert level information
LevelTag = "level"
// MessageField is the output field key for the message in the alert
MessageField = "message"
// DurationField is the output field key for the duration of the alert
DurationField = "duration"
)
// Vars builds the top level vars for a kapacitor alert script
func Vars(rule chronograf.AlertRule) (string, error) {
common, err := commonVars(rule)
if err != nil {
return "", err
}
switch rule.Trigger {
case "threshold":
vars := `
%s
var period = %s
var crit = %s
`
return fmt.Sprintf(vars,
common,
rule.TriggerValues.Period,
rule.TriggerValues.Value), nil
case "relative":
vars := `
%s
var period = %s
var shift = -%s
var crit = %s
`
return fmt.Sprintf(vars,
common,
rule.TriggerValues.Period,
rule.TriggerValues.Shift,
rule.TriggerValues.Value,
), nil
case "deadman":
vars := `
%s
var threshold = %s
var period = %s
`
return fmt.Sprintf(vars,
common,
"0.0", // deadman threshold hardcoded to zero
rule.TriggerValues.Period,
), nil
default:
return "", fmt.Errorf("Unknown trigger mechanism")
}
}
func commonVars(rule chronograf.AlertRule) (string, error) {
fld, err := field(rule.Query)
if err != nil {
return "", err
}
common := `
var db = '%s'
var rp = '%s'
var measurement = '%s'
var field = '%s'
var groupby = %s
var where_filter = %s
var every = %s
var name = '%s'
var idVar = name + ':{{.Group}}'
var message = '%s'
var idtag = '%s'
var leveltag = '%s'
var messagefield = '%s'
var durationfield = '%s'
var value = 'value'
var output_db = '%s'
var output_rp = '%s'
var output_mt = '%s'
var triggerType = '%s'
`
return fmt.Sprintf(common,
rule.Query.Database,
rule.Query.RetentionPolicy,
rule.Query.Measurement,
fld,
groupBy(rule.Query),
whereFilter(rule.Query),
rule.Every,
rule.Name,
rule.Message,
IDTag,
LevelTag,
MessageField,
DurationField,
Database,
RP,
Measurement,
rule.Trigger,
), nil
}
func groupBy(q chronograf.QueryConfig) string {
groups := []string{}
for _, tag := range q.GroupBy.Tags {
groups = append(groups, fmt.Sprintf("'%s'", tag))
}
return "[" + strings.Join(groups, ",") + "]"
}
func field(q chronograf.QueryConfig) (string, error) {
for _, field := range q.Fields {
return field.Field, nil
}
return "", fmt.Errorf("No fields set in query")
}
// value will be "value"" unless there are no field aggregates. If no aggregates, then it is the field name.
func value(rule chronograf.AlertRule) string {
for _, field := range rule.Query.Fields {
// Deadman triggers do not need any aggregate functions
if field.Field != "" && rule.Trigger == "deadman" {
return field.Field
} else if field.Field != "" && len(field.Funcs) == 0 {
return field.Field
}
}
return "value"
}
func whereFilter(q chronograf.QueryConfig) string {
operator := "=="
if !q.AreTagsAccepted {
operator = "!="
}
outer := []string{}
for tag, values := range q.Tags {
inner := []string{}
for _, value := range values {
inner = append(inner, fmt.Sprintf(`"%s" %s '%s'`, tag, operator, value))
}
outer = append(outer, "("+strings.Join(inner, " OR ")+")")
}
if len(outer) > 0 {
sort.Strings(outer)
return "lambda: " + strings.Join(outer, " AND ")
}
return "lambda: TRUE"
}

View File

@ -7,7 +7,10 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"github.com/bouk/httprouter"
"github.com/influxdata/chronograf" "github.com/influxdata/chronograf"
kapa "github.com/influxdata/chronograf/kapacitor"
"github.com/influxdata/chronograf/uuid"
) )
type postKapacitorRequest struct { type postKapacitorRequest struct {
@ -36,6 +39,7 @@ func (p *postKapacitorRequest) Valid() error {
type kapaLinks struct { type kapaLinks struct {
Proxy string `json:"proxy"` // URL location of proxy endpoint for this source Proxy string `json:"proxy"` // URL location of proxy endpoint for this source
Self string `json:"self"` // Self link mapping to this resource Self string `json:"self"` // Self link mapping to this resource
Tasks string `json:"tasks"` // Tasks link for defining task alerts for kapacitor
} }
type kapacitor struct { type kapacitor struct {
@ -102,6 +106,7 @@ func newKapacitor(srv chronograf.Server) kapacitor {
Links: kapaLinks{ Links: kapaLinks{
Self: fmt.Sprintf("%s/%d/kapacitors/%d", httpAPISrcs, srv.SrcID, srv.ID), Self: fmt.Sprintf("%s/%d/kapacitors/%d", httpAPISrcs, srv.SrcID, srv.ID),
Proxy: fmt.Sprintf("%s/%d/kapacitors/%d/proxy", httpAPISrcs, srv.SrcID, srv.ID), Proxy: fmt.Sprintf("%s/%d/kapacitors/%d/proxy", httpAPISrcs, srv.SrcID, srv.ID),
Tasks: fmt.Sprintf("%s/%d/kapacitors/%d/tasks", httpAPISrcs, srv.SrcID, srv.ID),
}, },
} }
} }
@ -258,3 +263,287 @@ func (h *Service) UpdateKapacitor(w http.ResponseWriter, r *http.Request) {
res := newKapacitor(srv) res := newKapacitor(srv)
encodeJSON(w, http.StatusOK, res, h.Logger) encodeJSON(w, http.StatusOK, res, h.Logger)
} }
// KapacitorTasksPost proxies POST to kapacitor
func (h *Service) KapacitorTasksPost(w http.ResponseWriter, r *http.Request) {
id, err := paramID("kid", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
ctx := r.Context()
srv, err := h.ServersStore.Get(ctx, id)
if err != nil || srv.SrcID != srcID {
notFound(w, id)
return
}
c := kapa.Client{
URL: srv.URL,
Username: srv.Username,
Password: srv.Password,
Ticker: &kapa.Alert{},
ID: &uuid.V4{},
}
var req chronograf.AlertRule
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w)
return
}
// TODO: validate this data
/*
if err := req.Valid(); err != nil {
invalidData(w, err)
return
}
*/
task, err := c.Create(ctx, req)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
req.ID = task.ID
rule, err := h.AlertRulesStore.Add(ctx, req)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
res := alertResponse{
AlertRule: rule,
Links: alertLinks{
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/tasks/%s", srv.SrcID, srv.ID, req.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srv.SrcID, srv.ID, url.QueryEscape(task.Href)),
},
TICKScript: string(task.TICKScript),
}
w.Header().Add("Location", res.Links.Self)
encodeJSON(w, http.StatusCreated, res, h.Logger)
}
type alertLinks struct {
Self string `json:"self"`
Kapacitor string `json:"kapacitor"`
}
type alertResponse struct {
chronograf.AlertRule
TICKScript string `json:"tickscript"`
Links alertLinks `json:"links"`
}
// KapacitorTasksPut proxies PATCH to kapacitor
func (h *Service) KapacitorTasksPut(w http.ResponseWriter, r *http.Request) {
id, err := paramID("kid", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
ctx := r.Context()
srv, err := h.ServersStore.Get(ctx, id)
if err != nil || srv.SrcID != srcID {
notFound(w, id)
return
}
tid := httprouter.GetParamFromContext(ctx, "tid")
c := kapa.Client{
URL: srv.URL,
Username: srv.Username,
Password: srv.Password,
Ticker: &kapa.Alert{},
}
var req chronograf.AlertRule
if err = json.NewDecoder(r.Body).Decode(&req); err != nil {
invalidJSON(w)
return
}
// TODO: validate this data
/*
if err := req.Valid(); err != nil {
invalidData(w, err)
return
}
*/
req.ID = tid
task, err := c.Update(ctx, c.Href(tid), req)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.AlertRulesStore.Update(ctx, req); err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
res := alertResponse{
AlertRule: req,
Links: alertLinks{
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/tasks/%s", srv.SrcID, srv.ID, req.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srv.SrcID, srv.ID, url.QueryEscape(task.Href)),
},
TICKScript: string(task.TICKScript),
}
encodeJSON(w, http.StatusOK, res, h.Logger)
}
// KapacitorTasksGet retrieves all tasks
func (h *Service) KapacitorTasksGet(w http.ResponseWriter, r *http.Request) {
id, err := paramID("kid", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
ctx := r.Context()
srv, err := h.ServersStore.Get(ctx, id)
if err != nil || srv.SrcID != srcID {
notFound(w, id)
return
}
rules, err := h.AlertRulesStore.All(ctx)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
ticker := &kapa.Alert{}
c := kapa.Client{}
res := allAlertsResponse{
Tasks: []alertResponse{},
}
for _, rule := range rules {
tickscript, err := ticker.Generate(rule)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
ar := alertResponse{
AlertRule: rule,
Links: alertLinks{
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/tasks/%s", srv.SrcID, srv.ID, rule.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srv.SrcID, srv.ID, url.QueryEscape(c.Href(rule.ID))),
},
TICKScript: string(tickscript),
}
res.Tasks = append(res.Tasks, ar)
}
encodeJSON(w, http.StatusOK, res, h.Logger)
}
type allAlertsResponse struct {
Tasks []alertResponse `json:"tasks"`
}
// KapacitorTasksGet retrieves specific task
func (h *Service) KapacitorTasksID(w http.ResponseWriter, r *http.Request) {
id, err := paramID("kid", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
ctx := r.Context()
srv, err := h.ServersStore.Get(ctx, id)
if err != nil || srv.SrcID != srcID {
notFound(w, id)
return
}
tid := httprouter.GetParamFromContext(ctx, "tid")
rule, err := h.AlertRulesStore.Get(ctx, tid)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
ticker := &kapa.Alert{}
c := kapa.Client{}
tickscript, err := ticker.Generate(rule)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
res := alertResponse{
AlertRule: rule,
Links: alertLinks{
Self: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/tasks/%s", srv.SrcID, srv.ID, rule.ID),
Kapacitor: fmt.Sprintf("/chronograf/v1/sources/%d/kapacitors/%d/proxy?path=%s", srv.SrcID, srv.ID, url.QueryEscape(c.Href(rule.ID))),
},
TICKScript: string(tickscript),
}
encodeJSON(w, http.StatusOK, res, h.Logger)
}
// KapacitorTasksDelete proxies DELETE to kapacitor
func (h *Service) KapacitorTasksDelete(w http.ResponseWriter, r *http.Request) {
id, err := paramID("kid", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
srcID, err := paramID("id", r)
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error())
return
}
ctx := r.Context()
srv, err := h.ServersStore.Get(ctx, id)
if err != nil || srv.SrcID != srcID {
notFound(w, id)
return
}
tid := httprouter.GetParamFromContext(ctx, "tid")
c := kapa.Client{
URL: srv.URL,
Username: srv.Username,
Password: srv.Password,
}
if err := c.Delete(ctx, c.Href(tid)); err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.AlertRulesStore.Delete(ctx, chronograf.AlertRule{ID: tid}); err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
w.WriteHeader(http.StatusNoContent)
}

View File

@ -68,6 +68,14 @@ func NewMux(opts MuxOpts, service Service) http.Handler {
router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid", service.UpdateKapacitor) router.PATCH("/chronograf/v1/sources/:id/kapacitors/:kid", service.UpdateKapacitor)
router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid", service.RemoveKapacitor) router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid", service.RemoveKapacitor)
// Kapacitor Tasks
router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksGet)
router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/tasks", service.KapacitorTasksPost)
router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/tasks/:tid", service.KapacitorTasksID)
router.PUT("/chronograf/v1/sources/:id/kapacitors/:kid/tasks/:tid", service.KapacitorTasksPut)
router.DELETE("/chronograf/v1/sources/:id/kapacitors/:kid/tasks/:tid", service.KapacitorTasksDelete)
// Kapacitor Proxy // Kapacitor Proxy
router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyGet) router.GET("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyGet)
router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyPost) router.POST("/chronograf/v1/sources/:id/kapacitors/:kid/proxy", service.KapacitorProxyPost)

View File

@ -128,6 +128,7 @@ func openService(boltPath, cannedPath string, logger chronograf.Logger) Service
ServersStore: db.ServersStore, ServersStore: db.ServersStore,
TimeSeries: &influx.Client{}, TimeSeries: &influx.Client{},
LayoutStore: layouts, LayoutStore: layouts,
AlertRulesStore: db.AlertsStore,
} }
} }

View File

@ -8,6 +8,7 @@ type Service struct {
SourcesStore chronograf.SourcesStore SourcesStore chronograf.SourcesStore
ServersStore chronograf.ServersStore ServersStore chronograf.ServersStore
LayoutStore chronograf.LayoutStore LayoutStore chronograf.LayoutStore
AlertRulesStore chronograf.AlertRulesStore
TimeSeries chronograf.TimeSeries TimeSeries chronograf.TimeSeries
Logger chronograf.Logger Logger chronograf.Logger
} }

File diff suppressed because it is too large Load Diff