fix(alerts and notifications): updates latest completed when status goes from inactive->active
parent
799838d327
commit
c91ef8e398
|
@ -3,6 +3,9 @@ package middleware
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
@ -13,6 +16,7 @@ type CoordinatingCheckService struct {
|
|||
influxdb.CheckService
|
||||
coordinator Coordinator
|
||||
taskService influxdb.TaskService
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
// NewCheckService constructs a new coordinating check service
|
||||
|
@ -21,6 +25,9 @@ func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordina
|
|||
CheckService: cs,
|
||||
taskService: ts,
|
||||
coordinator: coordinator,
|
||||
Now: func() time.Time {
|
||||
return time.Now().UTC()
|
||||
},
|
||||
}
|
||||
|
||||
return c
|
||||
|
@ -71,6 +78,12 @@ func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
|
||||
// this allows us to see not run the task for inactive time
|
||||
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
|
||||
toTask.LatestCompleted = cs.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
|
||||
}
|
||||
|
||||
|
@ -96,6 +109,12 @@ func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
|
||||
// this allows us to see not run the task for inactive time
|
||||
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
|
||||
toTask.LatestCompleted = cs.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,9 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
|
@ -154,6 +157,85 @@ func TestCheckCreate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCheckUpdateFromInactive(t *testing.T) {
|
||||
mocks, checkService := newCheckSvcStack()
|
||||
latest := time.Now().UTC()
|
||||
checkService.Now = func() time.Time {
|
||||
return latest
|
||||
}
|
||||
ch := mocks.pipingCoordinator.taskUpdatedChan()
|
||||
|
||||
mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
|
||||
c.SetTaskID(10)
|
||||
c.SetStatus(influxdb.Active)
|
||||
c.SetUpdatedAt(latest.Add(-20 * time.Hour))
|
||||
return c, nil
|
||||
}
|
||||
|
||||
mocks.checkSvc.PatchCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.CheckUpdate) (influxdb.Check, error) {
|
||||
ic := &check.Deadman{}
|
||||
ic.SetTaskID(10)
|
||||
ic.SetStatus(influxdb.Active)
|
||||
ic.SetUpdatedAt(latest.Add(-20 * time.Hour))
|
||||
return ic, nil
|
||||
}
|
||||
|
||||
mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
|
||||
c := &check.Deadman{}
|
||||
c.SetID(id)
|
||||
c.SetTaskID(1)
|
||||
c.SetStatus(influxdb.TaskStatusInactive)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) {
|
||||
if id == 1 {
|
||||
return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil
|
||||
} else if id == 10 {
|
||||
return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil
|
||||
}
|
||||
return &influxdb.Task{ID: id}, nil
|
||||
}
|
||||
|
||||
deadman := &check.Deadman{}
|
||||
deadman.SetTaskID(10)
|
||||
deadman.SetStatus(influxdb.Active)
|
||||
|
||||
thecheck, err := checkService.UpdateCheck(context.Background(), 1, deadman)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case task := <-ch:
|
||||
if task.ID != thecheck.GetTaskID() {
|
||||
t.Fatalf("task sent to coordinator doesn't match expected")
|
||||
}
|
||||
if task.LatestCompleted != latest.Format(time.RFC3339) {
|
||||
t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted)
|
||||
}
|
||||
default:
|
||||
t.Fatal("didn't receive task")
|
||||
}
|
||||
|
||||
action := influxdb.Active
|
||||
thecheck, err = checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{Status: &action})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case task := <-ch:
|
||||
if task.ID != thecheck.GetTaskID() {
|
||||
t.Fatalf("task sent to coordinator doesn't match expected")
|
||||
}
|
||||
if task.LatestCompleted != latest.Format(time.RFC3339) {
|
||||
t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted)
|
||||
}
|
||||
default:
|
||||
t.Fatal("didn't receive task")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCheckUpdate(t *testing.T) {
|
||||
mocks, checkService := newCheckSvcStack()
|
||||
ch := mocks.pipingCoordinator.taskUpdatedChan()
|
||||
|
|
|
@ -3,6 +3,9 @@ package middleware
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
@ -13,6 +16,7 @@ type CoordinatingNotificationRuleStore struct {
|
|||
influxdb.NotificationRuleStore
|
||||
coordinator Coordinator
|
||||
taskService influxdb.TaskService
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
// NewNotificationRuleStore constructs a new coordinating notification service
|
||||
|
@ -21,6 +25,9 @@ func NewNotificationRuleStore(ns influxdb.NotificationRuleStore, ts influxdb.Tas
|
|||
NotificationRuleStore: ns,
|
||||
taskService: ts,
|
||||
coordinator: coordinator,
|
||||
Now: func() time.Time {
|
||||
return time.Now().UTC()
|
||||
},
|
||||
}
|
||||
|
||||
return c
|
||||
|
@ -70,6 +77,11 @@ func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context.
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
|
||||
// this allows us to see not run the task for inactive time
|
||||
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
|
||||
toTask.LatestCompleted = ns.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask)
|
||||
}
|
||||
|
@ -96,6 +108,12 @@ func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.C
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
|
||||
// this allows us to see not run the task for inactive time
|
||||
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
|
||||
toTask.LatestCompleted = ns.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask)
|
||||
|
||||
}
|
||||
|
|
|
@ -4,6 +4,9 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/notification/rule"
|
||||
|
@ -45,6 +48,84 @@ func TestNotificationRuleCreate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNotificationRuleUpdateFromInactive(t *testing.T) {
|
||||
mocks, nrService := newNotificationRuleSvcStack()
|
||||
latest := time.Now().UTC()
|
||||
nrService.Now = func() time.Time {
|
||||
return latest
|
||||
}
|
||||
ch := mocks.pipingCoordinator.taskUpdatedChan()
|
||||
|
||||
mocks.notificationSvc.UpdateNotificationRuleF = func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) {
|
||||
c.SetTaskID(10)
|
||||
c.SetStatus(influxdb.Active)
|
||||
c.SetUpdatedAt(latest.Add(-20 * time.Hour))
|
||||
return c, nil
|
||||
}
|
||||
|
||||
mocks.notificationSvc.PatchNotificationRuleF = func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
|
||||
ic := &rule.HTTP{}
|
||||
ic.SetTaskID(10)
|
||||
ic.SetStatus(influxdb.Active)
|
||||
ic.SetUpdatedAt(latest.Add(-20 * time.Hour))
|
||||
return ic, nil
|
||||
}
|
||||
|
||||
mocks.notificationSvc.FindNotificationRuleByIDF = func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
|
||||
c := &rule.HTTP{}
|
||||
c.SetID(id)
|
||||
c.SetTaskID(1)
|
||||
c.SetStatus(influxdb.TaskStatusInactive)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) {
|
||||
if id == 1 {
|
||||
return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil
|
||||
} else if id == 10 {
|
||||
return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil
|
||||
}
|
||||
return &influxdb.Task{ID: id}, nil
|
||||
}
|
||||
|
||||
deadman := &rule.HTTP{}
|
||||
deadman.SetTaskID(10)
|
||||
deadman.SetStatus(influxdb.Active)
|
||||
|
||||
therule, err := nrService.UpdateNotificationRule(context.Background(), 1, deadman, 11)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case task := <-ch:
|
||||
if task.ID != therule.GetTaskID() {
|
||||
t.Fatalf("task sent to coordinator doesn't match expected")
|
||||
}
|
||||
if task.LatestCompleted != latest.Format(time.RFC3339) {
|
||||
t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted)
|
||||
}
|
||||
default:
|
||||
t.Fatal("didn't receive task")
|
||||
}
|
||||
|
||||
action := influxdb.Active
|
||||
therule, err = nrService.PatchNotificationRule(context.Background(), 1, influxdb.NotificationRuleUpdate{Status: &action})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case task := <-ch:
|
||||
if task.ID != therule.GetTaskID() {
|
||||
t.Fatalf("task sent to coordinator doesn't match expected")
|
||||
}
|
||||
if task.LatestCompleted != latest.Format(time.RFC3339) {
|
||||
t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted)
|
||||
}
|
||||
default:
|
||||
t.Fatal("didn't receive task")
|
||||
}
|
||||
|
||||
}
|
||||
func TestNotificationRuleUpdate(t *testing.T) {
|
||||
mocks, nrService := newNotificationRuleSvcStack()
|
||||
ch := mocks.pipingCoordinator.taskUpdatedChan()
|
||||
|
|
Loading…
Reference in New Issue