Fix bolt scoping and minor kapacitor tickscript updates

pull/10616/head
Chris Goller 2016-11-03 20:56:42 -05:00
parent 780e84760f
commit 73cbc4abc6
13 changed files with 184 additions and 128 deletions

View File

@ -18,15 +18,18 @@ type AlertsStore struct {
}
// All returns all known alerts
func (s *AlertsStore) All(ctx context.Context) ([]chronograf.AlertRule, error) {
func (s *AlertsStore) All(ctx context.Context, sourceID, kapaID int) ([]chronograf.AlertRule, error) {
var srcs []chronograf.AlertRule
if err := s.client.db.View(func(tx *bolt.Tx) error {
if err := tx.Bucket(AlertsBucket).ForEach(func(k, v []byte) error {
var src chronograf.AlertRule
var src internal.ScopedAlert
if err := internal.UnmarshalAlertRule(v, &src); err != nil {
return err
}
srcs = append(srcs, src)
// filter by only those rules with these ids
if src.KapaID == kapaID && src.SrcID == sourceID {
srcs = append(srcs, src.AlertRule)
}
return nil
}); err != nil {
return err
@ -41,10 +44,15 @@ func (s *AlertsStore) All(ctx context.Context) ([]chronograf.AlertRule, error) {
}
// Add creates a new Alerts in the AlertsStore.
func (s *AlertsStore) Add(ctx context.Context, src chronograf.AlertRule) (chronograf.AlertRule, error) {
func (s *AlertsStore) Add(ctx context.Context, sourceID, kapaID int, src chronograf.AlertRule) (chronograf.AlertRule, error) {
if err := s.client.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(AlertsBucket)
if v, err := internal.MarshalAlertRule(&src); err != nil {
scoped := internal.ScopedAlert{
AlertRule: src,
SrcID: sourceID,
KapaID: kapaID,
}
if v, err := internal.MarshalAlertRule(&scoped); err != nil {
return err
} else if err := b.Put([]byte(src.ID), v); err != nil {
return err
@ -58,11 +66,12 @@ func (s *AlertsStore) Add(ctx context.Context, src chronograf.AlertRule) (chrono
}
// Delete removes the Alerts from the AlertsStore
func (s *AlertsStore) Delete(ctx context.Context, src chronograf.AlertRule) error {
_, err := s.Get(ctx, src.ID)
func (s *AlertsStore) Delete(ctx context.Context, srcID, kapaID int, src chronograf.AlertRule) error {
_, err := s.Get(ctx, srcID, kapaID, src.ID)
if err != nil {
return err
}
if err := s.client.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(AlertsBucket).Delete([]byte(src.ID)); err != nil {
return err
@ -75,9 +84,9 @@ func (s *AlertsStore) Delete(ctx context.Context, src chronograf.AlertRule) erro
return nil
}
// Get returns a Alerts if the id exists.
func (s *AlertsStore) Get(ctx context.Context, id string) (chronograf.AlertRule, error) {
var src chronograf.AlertRule
// scopedGet returns a Alerts if the id exists.
func (s *AlertsStore) scopedGet(ctx context.Context, id string) (internal.ScopedAlert, error) {
var src internal.ScopedAlert
if err := s.client.db.View(func(tx *bolt.Tx) error {
if v := tx.Bucket(AlertsBucket).Get([]byte(id)); v == nil {
return chronograf.ErrAlertNotFound
@ -86,22 +95,42 @@ func (s *AlertsStore) Get(ctx context.Context, id string) (chronograf.AlertRule,
}
return nil
}); err != nil {
return chronograf.AlertRule{}, err
return internal.ScopedAlert{}, err
}
return src, nil
}
// Get returns a Alerts if the id exists.
func (s *AlertsStore) Get(ctx context.Context, srcID, kapaID int, id string) (chronograf.AlertRule, error) {
scoped, err := s.scopedGet(ctx, id)
if err != nil {
return chronograf.AlertRule{}, err
}
if scoped.SrcID != srcID || scoped.KapaID != kapaID {
return chronograf.AlertRule{}, chronograf.ErrAlertNotFound
}
return scoped.AlertRule, nil
}
// Update a Alerts
func (s *AlertsStore) Update(ctx context.Context, src chronograf.AlertRule) error {
func (s *AlertsStore) Update(ctx context.Context, srcID, kapaID int, src chronograf.AlertRule) error {
// Check if we have permissions to get this alert
_, err := s.Get(ctx, srcID, kapaID, src.ID)
if err != nil {
return err
}
if err := s.client.db.Update(func(tx *bolt.Tx) error {
// Get an existing alerts with the same ID.
b := tx.Bucket(AlertsBucket)
if v := b.Get([]byte(src.ID)); v == nil {
return chronograf.ErrAlertNotFound
scoped := internal.ScopedAlert{
AlertRule: src,
SrcID: srcID,
KapaID: kapaID,
}
if v, err := internal.MarshalAlertRule(&src); err != nil {
if v, err := internal.MarshalAlertRule(&scoped); err != nil {
return err
} else if err := b.Put([]byte(src.ID), v); err != nil {
return err

View File

@ -163,28 +163,39 @@ func UnmarshalLayout(data []byte, l *chronograf.Layout) error {
return nil
}
// ScopedAlert contains the source and the kapacitor id
type ScopedAlert struct {
chronograf.AlertRule
SrcID int
KapaID int
}
// MarshalAlertRule encodes an alert rule to binary protobuf format.
func MarshalAlertRule(r *chronograf.AlertRule) ([]byte, error) {
j, err := json.Marshal(r)
func MarshalAlertRule(r *ScopedAlert) ([]byte, error) {
j, err := json.Marshal(r.AlertRule)
if err != nil {
return nil, err
}
return proto.Marshal(&AlertRule{
ID: r.ID,
JSON: string(j),
ID: r.ID,
SrcID: int64(r.SrcID),
KapaID: int64(r.KapaID),
JSON: string(j),
})
}
// UnmarshalAlertRule decodes an alert rule from binary protobuf data.
func UnmarshalAlertRule(data []byte, r *chronograf.AlertRule) error {
func UnmarshalAlertRule(data []byte, r *ScopedAlert) error {
var pb AlertRule
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
err := json.Unmarshal([]byte(pb.JSON), r)
err := json.Unmarshal([]byte(pb.JSON), &r.AlertRule)
if err != nil {
return err
}
r.SrcID = int(pb.SrcID)
r.KapaID = int(pb.KapaID)
return nil
}

View File

@ -131,8 +131,10 @@ func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorInternal, []int{5} }
type AlertRule struct {
ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
JSON string `protobuf:"bytes,2,opt,name=JSON,json=jSON,proto3" json:"JSON,omitempty"`
ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
JSON string `protobuf:"bytes,2,opt,name=JSON,json=jSON,proto3" json:"JSON,omitempty"`
SrcID int64 `protobuf:"varint,3,opt,name=SrcID,json=srcID,proto3" json:"SrcID,omitempty"`
KapaID int64 `protobuf:"varint,4,opt,name=KapaID,json=kapaID,proto3" json:"KapaID,omitempty"`
}
func (m *AlertRule) Reset() { *m = AlertRule{} }
@ -153,36 +155,37 @@ func init() {
func init() { proto.RegisterFile("internal.proto", fileDescriptorInternal) }
var fileDescriptorInternal = []byte{
// 486 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x8e, 0xd3, 0x3c,
0x14, 0xc5, 0xe5, 0x26, 0x4e, 0x9a, 0xdb, 0x4f, 0xfd, 0x90, 0x85, 0x50, 0x84, 0x58, 0x54, 0x11,
0x8b, 0xb2, 0x19, 0x24, 0x78, 0x82, 0x4e, 0xc3, 0xa2, 0xa8, 0x74, 0x8a, 0x4b, 0x1f, 0xc0, 0x24,
0x17, 0x4d, 0x50, 0xfe, 0xe1, 0xd8, 0xb4, 0xd9, 0xb2, 0x85, 0xc7, 0xe0, 0x0d, 0x78, 0x41, 0x64,
0xd7, 0x21, 0x23, 0x81, 0x46, 0xb3, 0x3c, 0xf7, 0xdc, 0xe8, 0xfe, 0xee, 0xb9, 0x0e, 0xcc, 0x8b,
0x5a, 0xa1, 0xac, 0x45, 0x79, 0xd5, 0xca, 0x46, 0x35, 0x6c, 0x3a, 0xe8, 0xe4, 0x17, 0x81, 0xd9,
0x9b, 0x73, 0x5b, 0x36, 0x52, 0xa8, 0xa2, 0xa9, 0xd9, 0x1c, 0x26, 0x9b, 0x34, 0x26, 0x0b, 0xb2,
0xf4, 0xf8, 0xa4, 0x48, 0x19, 0x03, 0x7f, 0x27, 0x2a, 0x8c, 0x27, 0x0b, 0xb2, 0x8c, 0xb8, 0x5f,
0x8b, 0x0a, 0xd9, 0x13, 0x08, 0x8e, 0x1d, 0xca, 0x4d, 0x1a, 0x7b, 0xb6, 0x2f, 0xd0, 0x56, 0x99,
0xde, 0x54, 0x28, 0x11, 0xfb, 0x97, 0xde, 0x5c, 0x28, 0xc1, 0x9e, 0x41, 0xb4, 0x96, 0x28, 0x14,
0xe6, 0x2b, 0x15, 0x53, 0xdb, 0x1e, 0x65, 0x43, 0xc1, 0xb8, 0xc7, 0x36, 0x77, 0x6e, 0x70, 0x71,
0xf5, 0x50, 0x60, 0x31, 0x84, 0x29, 0x7e, 0x12, 0xba, 0x54, 0x71, 0xb8, 0x20, 0xcb, 0x29, 0x0f,
0xf3, 0x8b, 0x4c, 0x7e, 0x12, 0x08, 0x0e, 0x8d, 0x96, 0x19, 0x3e, 0x08, 0x98, 0x81, 0xff, 0xa1,
0x6f, 0xd1, 0xe2, 0x46, 0xdc, 0x57, 0x7d, 0x8b, 0xec, 0x29, 0x4c, 0xcd, 0x12, 0xc6, 0x77, 0xc0,
0x53, 0xed, 0xb4, 0xf1, 0xf6, 0xa2, 0xeb, 0x4e, 0x8d, 0xcc, 0x2d, 0x73, 0xc4, 0xa7, 0xad, 0xd3,
0xec, 0x11, 0x78, 0x47, 0xbe, 0xb5, 0xb0, 0x11, 0xf7, 0x34, 0xdf, 0xde, 0x83, 0xf9, 0xc3, 0x60,
0xa2, 0xfc, 0x8a, 0xf2, 0x41, 0x98, 0x77, 0x91, 0xbc, 0x7b, 0x90, 0xfc, 0x7f, 0x23, 0xd1, 0x11,
0xe9, 0x31, 0xd0, 0x83, 0xcc, 0x36, 0xa9, 0xcb, 0x94, 0x76, 0x46, 0x24, 0xdf, 0x08, 0x04, 0x5b,
0xd1, 0x37, 0x5a, 0xdd, 0xc1, 0x89, 0x2c, 0xce, 0x02, 0x66, 0xab, 0xb6, 0x2d, 0x8b, 0xcc, 0xbe,
0x02, 0x47, 0x35, 0x13, 0x63, 0xc9, 0x74, 0xbc, 0x43, 0xd1, 0x69, 0x89, 0x15, 0xd6, 0xca, 0xf1,
0xcd, 0xaa, 0xb1, 0xc4, 0x9e, 0x03, 0x5d, 0x63, 0x59, 0x76, 0xb1, 0xbf, 0xf0, 0x96, 0xb3, 0x57,
0xf3, 0xab, 0x3f, 0x8f, 0xce, 0x94, 0x39, 0xcd, 0x8c, 0x99, 0x7c, 0x27, 0xe0, 0x1b, 0xcd, 0xfe,
0x03, 0x72, 0xb6, 0x04, 0x94, 0x93, 0xb3, 0x51, 0xbd, 0x1d, 0x4b, 0x39, 0xe9, 0x8d, 0x3a, 0xd9,
0x11, 0x94, 0x93, 0x93, 0x51, 0xb7, 0x76, 0x69, 0xca, 0xc9, 0x2d, 0x7b, 0x01, 0xe1, 0x17, 0x8d,
0xb2, 0xc0, 0x2e, 0xa6, 0x76, 0xd0, 0xff, 0xe3, 0xa0, 0xf7, 0x1a, 0x65, 0xcf, 0x07, 0xdf, 0x7c,
0x58, 0xb8, 0x4b, 0x91, 0xc2, 0x44, 0x6e, 0xa3, 0x0d, 0xc7, 0xc8, 0x93, 0x15, 0x50, 0xfb, 0x8d,
0x39, 0xe2, 0xba, 0xa9, 0x2a, 0x51, 0xe7, 0x2e, 0x95, 0x30, 0xbb, 0x48, 0x13, 0x55, 0x7a, 0xed,
0x12, 0x99, 0xe4, 0xd7, 0x46, 0xf3, 0xbd, 0xdb, 0x7f, 0x22, 0xf7, 0xc9, 0x4b, 0x88, 0x56, 0x25,
0x4a, 0xc5, 0x75, 0x89, 0x7f, 0xe5, 0xca, 0xc0, 0x7f, 0x7b, 0xb8, 0xd9, 0x0d, 0x67, 0xfe, 0x7c,
0xb8, 0xd9, 0x7d, 0x0c, 0xec, 0x3f, 0xf8, 0xfa, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe0, 0xb6,
0x3a, 0xf1, 0x95, 0x03, 0x00, 0x00,
// 505 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0xae, 0xd3, 0x3c,
0x10, 0xc5, 0xe5, 0x26, 0x4e, 0x9a, 0xe9, 0xa7, 0x7e, 0xc8, 0x42, 0x28, 0x42, 0x2c, 0xaa, 0x88,
0x45, 0xd9, 0xdc, 0x05, 0x3c, 0x41, 0x6f, 0xc3, 0xa2, 0x50, 0x7a, 0x8b, 0x4b, 0x97, 0x2c, 0x4c,
0x32, 0xe8, 0x06, 0xf2, 0x0f, 0xc7, 0xa6, 0xcd, 0x96, 0x2d, 0x3c, 0x06, 0x6f, 0xc0, 0x0b, 0x22,
0xbb, 0x2e, 0xa9, 0x04, 0xba, 0xba, 0xcb, 0x33, 0x67, 0xa2, 0xf9, 0xcd, 0x99, 0x18, 0xa6, 0x45,
0xad, 0x50, 0xd6, 0xa2, 0xbc, 0x6a, 0x65, 0xa3, 0x1a, 0x36, 0x3e, 0xeb, 0xe4, 0x17, 0x81, 0xc9,
0xcb, 0x63, 0x5b, 0x36, 0x52, 0xa8, 0xa2, 0xa9, 0xd9, 0x14, 0x46, 0xab, 0x34, 0x26, 0x33, 0x32,
0xf7, 0xf8, 0xa8, 0x48, 0x19, 0x03, 0x7f, 0x23, 0x2a, 0x8c, 0x47, 0x33, 0x32, 0x8f, 0xb8, 0x5f,
0x8b, 0x0a, 0xd9, 0x23, 0x08, 0xf6, 0x1d, 0xca, 0x55, 0x1a, 0x7b, 0xb6, 0x2f, 0xd0, 0x56, 0x99,
0xde, 0x54, 0x28, 0x11, 0xfb, 0xa7, 0xde, 0x5c, 0x28, 0xc1, 0x9e, 0x40, 0xb4, 0x94, 0x28, 0x14,
0xe6, 0x0b, 0x15, 0x53, 0xdb, 0x1e, 0x65, 0xe7, 0x82, 0x71, 0xf7, 0x6d, 0xee, 0xdc, 0xe0, 0xe4,
0xea, 0x73, 0x81, 0xc5, 0x10, 0xa6, 0xf8, 0x51, 0xe8, 0x52, 0xc5, 0xe1, 0x8c, 0xcc, 0xc7, 0x3c,
0xcc, 0x4f, 0x32, 0xf9, 0x49, 0x20, 0xd8, 0x35, 0x5a, 0x66, 0x78, 0x2f, 0x60, 0x06, 0xfe, 0xbb,
0xbe, 0x45, 0x8b, 0x1b, 0x71, 0x5f, 0xf5, 0x2d, 0xb2, 0xc7, 0x30, 0x36, 0x4b, 0x18, 0xdf, 0x01,
0x8f, 0xb5, 0xd3, 0xc6, 0xdb, 0x8a, 0xae, 0x3b, 0x34, 0x32, 0xb7, 0xcc, 0x11, 0x1f, 0xb7, 0x4e,
0xb3, 0x07, 0xe0, 0xed, 0xf9, 0xda, 0xc2, 0x46, 0xdc, 0xd3, 0x7c, 0x7d, 0x07, 0xe6, 0x0f, 0x83,
0x89, 0xf2, 0x2b, 0xca, 0x7b, 0x61, 0x5e, 0x22, 0x79, 0x77, 0x20, 0xf9, 0xff, 0x46, 0xa2, 0x03,
0xd2, 0x43, 0xa0, 0x3b, 0x99, 0xad, 0x52, 0x97, 0x29, 0xed, 0x8c, 0x48, 0xbe, 0x11, 0x08, 0xd6,
0xa2, 0x6f, 0xb4, 0xba, 0xc0, 0x89, 0x2c, 0xce, 0x0c, 0x26, 0x8b, 0xb6, 0x2d, 0x8b, 0xcc, 0xfe,
0x05, 0x8e, 0x6a, 0x22, 0x86, 0x92, 0xe9, 0x78, 0x83, 0xa2, 0xd3, 0x12, 0x2b, 0xac, 0x95, 0xe3,
0x9b, 0x54, 0x43, 0x89, 0x3d, 0x05, 0xba, 0xc4, 0xb2, 0xec, 0x62, 0x7f, 0xe6, 0xcd, 0x27, 0xcf,
0xa7, 0x57, 0x7f, 0x7e, 0x3a, 0x53, 0xe6, 0x34, 0x33, 0x66, 0xf2, 0x9d, 0x80, 0x6f, 0x34, 0xfb,
0x0f, 0xc8, 0xd1, 0x12, 0x50, 0x4e, 0x8e, 0x46, 0xf5, 0x76, 0x2c, 0xe5, 0xa4, 0x37, 0xea, 0x60,
0x47, 0x50, 0x4e, 0x0e, 0x46, 0xdd, 0xda, 0xa5, 0x29, 0x27, 0xb7, 0xec, 0x19, 0x84, 0x5f, 0x34,
0xca, 0x02, 0xbb, 0x98, 0xda, 0x41, 0xff, 0x0f, 0x83, 0xde, 0x6a, 0x94, 0x3d, 0x3f, 0xfb, 0xe6,
0xc3, 0xc2, 0x5d, 0x8a, 0x14, 0x26, 0x72, 0x1b, 0x6d, 0x38, 0x44, 0x9e, 0x2c, 0x80, 0xda, 0x6f,
0xcc, 0x11, 0x97, 0x4d, 0x55, 0x89, 0x3a, 0x77, 0xa9, 0x84, 0xd9, 0x49, 0x9a, 0xa8, 0xd2, 0x6b,
0x97, 0xc8, 0x28, 0xbf, 0x36, 0x9a, 0x6f, 0xdd, 0xfe, 0x23, 0xb9, 0x4d, 0xde, 0x43, 0xb4, 0x28,
0x51, 0x2a, 0xae, 0x4b, 0xfc, 0x2b, 0x57, 0x06, 0xfe, 0xab, 0xdd, 0xcd, 0xe6, 0x7c, 0xe6, 0x4f,
0xbb, 0x9b, 0xcd, 0x70, 0x1c, 0xef, 0xe2, 0x38, 0xe6, 0x51, 0xbd, 0x16, 0xad, 0x58, 0xa5, 0x76,
0x53, 0x8f, 0x07, 0x9f, 0xad, 0xfa, 0x10, 0xd8, 0x17, 0xfb, 0xe2, 0x77, 0x00, 0x00, 0x00, 0xff,
0xff, 0x61, 0x43, 0x41, 0x73, 0xc3, 0x03, 0x00, 0x00,
}

View File

@ -56,4 +56,6 @@ message Query {
message AlertRule {
string ID = 1; // ID is the unique ID of this alert rule
string JSON = 2; // JSON byte representation of the alert
int64 SrcID = 3; // SrcID is the id of the source this alert is associated with
int64 KapaID = 4; // KapaID is the id of the kapacitor this alert is associated with
}

View File

@ -102,16 +102,16 @@ type AlertRule struct {
// AlertRulesStore stores rules for building tickscript alerting tasks
type AlertRulesStore interface {
// All returns all rules in the store
All(context.Context) ([]AlertRule, error)
// Add creates a new rule in the AlertRulesStore and returns AlertRule with ID
Add(context.Context, AlertRule) (AlertRule, error)
// Delete the AlertRule from the store
Delete(context.Context, AlertRule) error
// Get retrieves AlertRule if `ID` exists
Get(ctx context.Context, ID string) (AlertRule, error)
// Update the AlertRule in the store.
Update(context.Context, AlertRule) error
// All returns all rules in the store for the given source and kapacitor id
All(ctx context.Context, sourceID, kapaID int) ([]AlertRule, error)
// Add creates a new rule in the AlertRulesStore and returns AlertRule with ID for a given source and kapacitor id
Add(ctx context.Context, sourceID, kapaID int, rule AlertRule) (AlertRule, error)
// Delete the AlertRule from the store for a given source and kapacitor ID
Delete(ctx context.Context, sourceID, kapaID int, rule AlertRule) error
// Get retrieves AlertRule if `ID` exists within a given source and kapacitor id
Get(ctx context.Context, sourceID, kapaID int, ID string) (AlertRule, error)
// Update the AlertRule in the store within a given source and kapacitor id
Update(ctx context.Context, sourceID, kapaID int, rule AlertRule) error
}
// TICKScript task to be used by kapacitor

View File

@ -34,20 +34,21 @@ func Data(rule chronograf.AlertRule) (string, error) {
stream = stream + ".where(where_filter)\n"
// Only need aggregate functions for threshold and relative
value := ""
if rule.Trigger != "deadman" {
value := ""
for _, field := range rule.Query.Fields {
for _, fnc := range field.Funcs {
// Only need a window if we have an aggregate function
value = value + "|window().period(period).every(every).align()\n"
value = value + fmt.Sprintf(`|%s(field).as(value)`, fnc)
value = value + fmt.Sprintf(`|%s(field).as('value')`, fnc)
break // only support a single field
}
break // only support a single field
}
if value == "" {
value = `|eval(lambda: field).as('value')`
}
stream = stream + value
}
if value == "" {
value = `|eval(lambda: field).as(value)`
}
return stream + value, nil
return stream, nil
}

View File

@ -1,11 +1,22 @@
package kapacitor
import "github.com/influxdata/chronograf"
import (
"fmt"
"github.com/influxdata/chronograf"
)
// InfluxOut creates a kapacitor influxDBOut node to write alert data to Database, RP, Measurement.
func InfluxOut(rule chronograf.AlertRule) string {
return `
// For some of the alert, the data needs to be renamed (normalized)
// before being sent to influxdb.
rename := ""
if rule.Trigger == "deadman" {
rename = `|eval(lambda: field).as('value')`
}
return fmt.Sprintf(`
trigger
%s
|influxDBOut()
.create()
.database(output_db)
@ -13,5 +24,5 @@ func InfluxOut(rule chronograf.AlertRule) string {
.measurement(output_mt)
.tag('alertName', name)
.tag('triggerType', triggerType)
`
`, rename)
}

View File

@ -11,6 +11,8 @@ func TestInfluxOut(t *testing.T) {
{
name: "Test influxDBOut kapacitor node",
want: `trigger
|eval(lambda: field)
.as('value')
|influxDBOut()
.create()
.database(output_db)

View File

@ -1,7 +1,6 @@
package kapacitor
import (
"encoding/json"
"fmt"
"testing"
@ -131,7 +130,7 @@ var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var period = 10m
var name = 'name'
@ -147,8 +146,6 @@ var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
@ -157,7 +154,7 @@ var output_mt = 'alerts'
var triggerType = 'threshold'
var period = 10m
var every = 30s
var crit = 90
@ -173,7 +170,7 @@ var data = stream
.every(every)
.align()
|mean(field)
.as(value)
.as('value')
var trigger = data
|alert()
@ -284,7 +281,7 @@ var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var period = 10m
var name = 'name'
@ -300,8 +297,6 @@ var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
@ -310,7 +305,7 @@ var output_mt = 'alerts'
var triggerType = 'relative'
var period = 10m
var every = 30s
var shift = -1m
@ -328,7 +323,7 @@ var data = stream
.every(every)
.align()
|mean(field)
.as(value)
.as('value')
var past = data
|shift(shift)
@ -377,8 +372,6 @@ trigger
fmt.Printf("%s", got)
t.Errorf("%q. Relative() = %v, want %v", tt.name, got, tt.want)
}
b, _ := json.Marshal(tt.alert)
fmt.Printf("%s", string(b))
}
}
@ -447,7 +440,7 @@ var groupby = ['host', 'cluster_id']
var where_filter = lambda: ("cpu" == 'cpu_total') AND ("host" == 'acc-0eabc309-eu-west-1-data-3' OR "host" == 'prod')
var every = 30s
var period = 10m
var name = 'name'
@ -463,8 +456,6 @@ var messagefield = 'message'
var durationfield = 'duration'
var value = 'value'
var output_db = 'chronograf'
var output_rp = 'autogen'
@ -475,8 +466,6 @@ var triggerType = 'deadman'
var threshold = 0.0
var period = 10m
var data = stream
|from()
.database(db)
@ -484,11 +473,9 @@ var data = stream
.measurement(measurement)
.groupBy(groupby)
.where(where_filter)
|eval(lambda: field)
.as(value)
var trigger = data
|deadman(threshold, every)
|deadman(threshold, period)
.stateChangesOnly()
.message(message)
.id(idVar)
@ -501,6 +488,8 @@ var trigger = data
.email()
trigger
|eval(lambda: field)
.as('value')
|influxDBOut()
.create()
.database(output_db)

View File

@ -43,7 +43,7 @@ var trigger = past
// DeadmanTrigger checks if any data has been streamed in the last period of time
var DeadmanTrigger = `
var trigger = data|deadman(threshold, every)
var trigger = data|deadman(threshold, period)
.stateChangesOnly()
.message(message)
.id(idVar)

View File

@ -19,7 +19,7 @@ func TestTrigger(t *testing.T) {
Trigger: "deadman",
},
want: `var trigger = data
|deadman(threshold, every)
|deadman(threshold, period)
.stateChangesOnly()
.message(message)
.id(idVar)

View File

@ -36,23 +36,23 @@ func Vars(rule chronograf.AlertRule) (string, error) {
case "threshold":
vars := `
%s
var period = %s
var every = %s
var crit = %s
`
return fmt.Sprintf(vars,
common,
rule.TriggerValues.Period,
rule.Every,
rule.TriggerValues.Value), nil
case "relative":
vars := `
%s
var period = %s
var every = %s
var shift = -%s
var crit = %s
`
return fmt.Sprintf(vars,
common,
rule.TriggerValues.Period,
rule.Every,
rule.TriggerValues.Shift,
rule.TriggerValues.Value,
), nil
@ -60,12 +60,10 @@ func Vars(rule chronograf.AlertRule) (string, error) {
vars := `
%s
var threshold = %s
var period = %s
`
return fmt.Sprintf(vars,
common,
"0.0", // deadman threshold hardcoded to zero
rule.TriggerValues.Period,
), nil
default:
return "", fmt.Errorf("Unknown trigger mechanism")
@ -85,8 +83,7 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
var field = '%s'
var groupby = %s
var where_filter = %s
var every = %s
var period = %s
var name = '%s'
var idVar = name + ':{{.Group}}'
@ -96,8 +93,6 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
var messagefield = '%s'
var durationfield = '%s'
var value = 'value'
var output_db = '%s'
var output_rp = '%s'
var output_mt = '%s'
@ -110,7 +105,7 @@ func commonVars(rule chronograf.AlertRule) (string, error) {
fld,
groupBy(rule.Query),
whereFilter(rule.Query),
rule.Every,
rule.TriggerValues.Period,
rule.Name,
rule.Message,
IDTag,
@ -139,19 +134,6 @@ func field(q chronograf.QueryConfig) (string, error) {
return "", fmt.Errorf("No fields set in query")
}
// value will be "value"" unless there are no field aggregates. If no aggregates, then it is the field name.
func value(rule chronograf.AlertRule) string {
for _, field := range rule.Query.Fields {
// Deadman triggers do not need any aggregate functions
if field.Field != "" && rule.Trigger == "deadman" {
return field.Field
} else if field.Field != "" && len(field.Funcs) == 0 {
return field.Field
}
}
return "value"
}
func whereFilter(q chronograf.QueryConfig) string {
operator := "=="
if !q.AreTagsAccepted {

View File

@ -312,12 +312,12 @@ func (h *Service) KapacitorTasksPost(w http.ResponseWriter, r *http.Request) {
return
}
req.ID = task.ID
rule, err := h.AlertRulesStore.Add(ctx, req)
rule, err := h.AlertRulesStore.Add(ctx, srcID, id, req)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
res := alertResponse{
AlertRule: rule,
Links: alertLinks{
@ -383,6 +383,16 @@ func (h *Service) KapacitorTasksPut(w http.ResponseWriter, r *http.Request) {
}
*/
// Check if the task exists and is scoped correctly
if _, err := h.AlertRulesStore.Get(ctx, srcID, id, tid); err != nil {
if err == chronograf.ErrAlertNotFound {
notFound(w, id)
return
}
Error(w, http.StatusInternalServerError, err.Error())
return
}
req.ID = tid
task, err := c.Update(ctx, c.Href(tid), req)
if err != nil {
@ -390,7 +400,7 @@ func (h *Service) KapacitorTasksPut(w http.ResponseWriter, r *http.Request) {
return
}
if err := h.AlertRulesStore.Update(ctx, req); err != nil {
if err := h.AlertRulesStore.Update(ctx, srcID, id, req); err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}
@ -427,7 +437,7 @@ func (h *Service) KapacitorTasksGet(w http.ResponseWriter, r *http.Request) {
return
}
rules, err := h.AlertRulesStore.All(ctx)
rules, err := h.AlertRulesStore.All(ctx, srcID, id)
if err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
@ -483,8 +493,13 @@ func (h *Service) KapacitorTasksID(w http.ResponseWriter, r *http.Request) {
return
}
tid := httprouter.GetParamFromContext(ctx, "tid")
rule, err := h.AlertRulesStore.Get(ctx, tid)
// Check if the rule exists within scope
rule, err := h.AlertRulesStore.Get(ctx, srcID, id, tid)
if err != nil {
if err == chronograf.ErrAlertNotFound {
notFound(w, id)
return
}
Error(w, http.StatusInternalServerError, err.Error())
return
}
@ -530,6 +545,17 @@ func (h *Service) KapacitorTasksDelete(w http.ResponseWriter, r *http.Request) {
}
tid := httprouter.GetParamFromContext(ctx, "tid")
// Check if the rule is linked to this server and kapacitor
if _, err := h.AlertRulesStore.Get(ctx, srcID, id, tid); err != nil {
if err == chronograf.ErrAlertNotFound {
notFound(w, id)
return
}
Error(w, http.StatusInternalServerError, err.Error())
return
}
c := kapa.Client{
URL: srv.URL,
Username: srv.Username,
@ -540,7 +566,7 @@ func (h *Service) KapacitorTasksDelete(w http.ResponseWriter, r *http.Request) {
return
}
if err := h.AlertRulesStore.Delete(ctx, chronograf.AlertRule{ID: tid}); err != nil {
if err := h.AlertRulesStore.Delete(ctx, srcID, id, chronograf.AlertRule{ID: tid}); err != nil {
Error(w, http.StatusInternalServerError, err.Error())
return
}