diff --git a/task/backend/middleware/check_middleware.go b/task/backend/middleware/check_middleware.go index 76a2c4b5fe..ae321636c6 100644 --- a/task/backend/middleware/check_middleware.go +++ b/task/backend/middleware/check_middleware.go @@ -3,6 +3,9 @@ package middleware import ( "context" "fmt" + "time" + + "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb" ) @@ -13,6 +16,7 @@ type CoordinatingCheckService struct { influxdb.CheckService coordinator Coordinator taskService influxdb.TaskService + Now func() time.Time } // NewCheckService constructs a new coordinating check service @@ -21,6 +25,9 @@ func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordina CheckService: cs, taskService: ts, coordinator: coordinator, + Now: func() time.Time { + return time.Now().UTC() + }, } return c @@ -71,6 +78,12 @@ func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb return nil, err } + // if the update is to activate and the previous task was inactive we should add a "latest completed" update + // this allows us to see not run the task for inactive time + if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + toTask.LatestCompleted = cs.Now().Format(time.RFC3339) + } + return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) } @@ -96,6 +109,12 @@ func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb. return nil, err } + // if the update is to activate and the previous task was inactive we should add a "latest completed" update + // this allows us to see not run the task for inactive time + if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + toTask.LatestCompleted = cs.Now().Format(time.RFC3339) + } + return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) } diff --git a/task/backend/middleware/check_middleware_test.go b/task/backend/middleware/check_middleware_test.go index 61ec925e43..7ee35d856a 100644 --- a/task/backend/middleware/check_middleware_test.go +++ b/task/backend/middleware/check_middleware_test.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/mock" @@ -154,6 +157,85 @@ func TestCheckCreate(t *testing.T) { } } +func TestCheckUpdateFromInactive(t *testing.T) { + mocks, checkService := newCheckSvcStack() + latest := time.Now().UTC() + checkService.Now = func() time.Time { + return latest + } + ch := mocks.pipingCoordinator.taskUpdatedChan() + + mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { + c.SetTaskID(10) + c.SetStatus(influxdb.Active) + c.SetUpdatedAt(latest.Add(-20 * time.Hour)) + return c, nil + } + + mocks.checkSvc.PatchCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.CheckUpdate) (influxdb.Check, error) { + ic := &check.Deadman{} + ic.SetTaskID(10) + ic.SetStatus(influxdb.Active) + ic.SetUpdatedAt(latest.Add(-20 * time.Hour)) + return ic, nil + } + + mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) { + c := &check.Deadman{} + c.SetID(id) + c.SetTaskID(1) + c.SetStatus(influxdb.TaskStatusInactive) + return c, nil + } + + mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { + if id == 1 { + return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil + } else if id == 10 { + return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil + } + return &influxdb.Task{ID: id}, nil + } + + deadman := &check.Deadman{} + deadman.SetTaskID(10) + deadman.SetStatus(influxdb.Active) + + thecheck, err := checkService.UpdateCheck(context.Background(), 1, deadman) + if err != nil { + t.Fatal(err) + } + select { + case task := <-ch: + if task.ID != thecheck.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + if task.LatestCompleted != latest.Format(time.RFC3339) { + t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) + } + default: + t.Fatal("didn't receive task") + } + + action := influxdb.Active + thecheck, err = checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{Status: &action}) + if err != nil { + t.Fatal(err) + } + select { + case task := <-ch: + if task.ID != thecheck.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + if task.LatestCompleted != latest.Format(time.RFC3339) { + t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) + } + default: + t.Fatal("didn't receive task") + } + +} + func TestCheckUpdate(t *testing.T) { mocks, checkService := newCheckSvcStack() ch := mocks.pipingCoordinator.taskUpdatedChan() diff --git a/task/backend/middleware/notification_middleware.go b/task/backend/middleware/notification_middleware.go index 2d784eb113..e054fd249f 100644 --- a/task/backend/middleware/notification_middleware.go +++ b/task/backend/middleware/notification_middleware.go @@ -3,6 +3,9 @@ package middleware import ( "context" "fmt" + "time" + + "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb" ) @@ -13,6 +16,7 @@ type CoordinatingNotificationRuleStore struct { influxdb.NotificationRuleStore coordinator Coordinator taskService influxdb.TaskService + Now func() time.Time } // NewNotificationRuleStore constructs a new coordinating notification service @@ -21,6 +25,9 @@ func NewNotificationRuleStore(ns influxdb.NotificationRuleStore, ts influxdb.Tas NotificationRuleStore: ns, taskService: ts, coordinator: coordinator, + Now: func() time.Time { + return time.Now().UTC() + }, } return c @@ -70,6 +77,11 @@ func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context. if err != nil { return nil, err } + // if the update is to activate and the previous task was inactive we should add a "latest completed" update + // this allows us to see not run the task for inactive time + if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + toTask.LatestCompleted = ns.Now().Format(time.RFC3339) + } return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) } @@ -96,6 +108,12 @@ func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.C return nil, err } + // if the update is to activate and the previous task was inactive we should add a "latest completed" update + // this allows us to see not run the task for inactive time + if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + toTask.LatestCompleted = ns.Now().Format(time.RFC3339) + } + return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) } diff --git a/task/backend/middleware/notification_middleware_test.go b/task/backend/middleware/notification_middleware_test.go index de2092f6aa..951bd9282c 100644 --- a/task/backend/middleware/notification_middleware_test.go +++ b/task/backend/middleware/notification_middleware_test.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification/rule" @@ -45,6 +48,84 @@ func TestNotificationRuleCreate(t *testing.T) { } } +func TestNotificationRuleUpdateFromInactive(t *testing.T) { + mocks, nrService := newNotificationRuleSvcStack() + latest := time.Now().UTC() + nrService.Now = func() time.Time { + return latest + } + ch := mocks.pipingCoordinator.taskUpdatedChan() + + mocks.notificationSvc.UpdateNotificationRuleF = func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) { + c.SetTaskID(10) + c.SetStatus(influxdb.Active) + c.SetUpdatedAt(latest.Add(-20 * time.Hour)) + return c, nil + } + + mocks.notificationSvc.PatchNotificationRuleF = func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + ic := &rule.HTTP{} + ic.SetTaskID(10) + ic.SetStatus(influxdb.Active) + ic.SetUpdatedAt(latest.Add(-20 * time.Hour)) + return ic, nil + } + + mocks.notificationSvc.FindNotificationRuleByIDF = func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { + c := &rule.HTTP{} + c.SetID(id) + c.SetTaskID(1) + c.SetStatus(influxdb.TaskStatusInactive) + return c, nil + } + + mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { + if id == 1 { + return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil + } else if id == 10 { + return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil + } + return &influxdb.Task{ID: id}, nil + } + + deadman := &rule.HTTP{} + deadman.SetTaskID(10) + deadman.SetStatus(influxdb.Active) + + therule, err := nrService.UpdateNotificationRule(context.Background(), 1, deadman, 11) + if err != nil { + t.Fatal(err) + } + select { + case task := <-ch: + if task.ID != therule.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + if task.LatestCompleted != latest.Format(time.RFC3339) { + t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) + } + default: + t.Fatal("didn't receive task") + } + + action := influxdb.Active + therule, err = nrService.PatchNotificationRule(context.Background(), 1, influxdb.NotificationRuleUpdate{Status: &action}) + if err != nil { + t.Fatal(err) + } + select { + case task := <-ch: + if task.ID != therule.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + if task.LatestCompleted != latest.Format(time.RFC3339) { + t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) + } + default: + t.Fatal("didn't receive task") + } + +} func TestNotificationRuleUpdate(t *testing.T) { mocks, nrService := newNotificationRuleSvcStack() ch := mocks.pipingCoordinator.taskUpdatedChan()