From 5d1c4d814b18103235f05c9e3556595dd4436d6b Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Thu, 29 Aug 2019 14:56:55 -0600 Subject: [PATCH] fix(task): Remove allowance for duplicate run's in run list (#14875) If we are caching run's in the kv storage system it is possible to get the the cached version from the kv store and the recently completed run from the analytical store. We just need to only show analytical results if we find a duplicate. --- mock/task_service.go | 91 ++++++++++++++++++------- task/backend/analytical_storage.go | 24 ++++++- task/backend/analytical_storage_test.go | 47 +++++++++++++ 3 files changed, 136 insertions(+), 26 deletions(-) diff --git a/mock/task_service.go b/mock/task_service.go index 7062d6dd25..5586e60f6b 100644 --- a/mock/task_service.go +++ b/mock/task_service.go @@ -2,66 +2,109 @@ package mock import ( "context" + "time" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend" ) -var _ platform.TaskService = (*TaskService)(nil) +var _ influxdb.TaskService = (*TaskService)(nil) +var _ backend.TaskControlService = (*TaskControlService)(nil) type TaskService struct { - FindTaskByIDFn func(context.Context, platform.ID) (*platform.Task, error) - FindTasksFn func(context.Context, platform.TaskFilter) ([]*platform.Task, int, error) - CreateTaskFn func(context.Context, platform.TaskCreate) (*platform.Task, error) - UpdateTaskFn func(context.Context, platform.ID, platform.TaskUpdate) (*platform.Task, error) - DeleteTaskFn func(context.Context, platform.ID) error - FindLogsFn func(context.Context, platform.LogFilter) ([]*platform.Log, int, error) - FindRunsFn func(context.Context, platform.RunFilter) ([]*platform.Run, int, error) - FindRunByIDFn func(context.Context, platform.ID, platform.ID) (*platform.Run, error) - CancelRunFn func(context.Context, platform.ID, platform.ID) error - RetryRunFn func(context.Context, platform.ID, platform.ID) (*platform.Run, error) - ForceRunFn func(context.Context, platform.ID, int64) (*platform.Run, error) + FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error) + FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error) + CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) + UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) + DeleteTaskFn func(context.Context, influxdb.ID) error + FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error) + FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error) + FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) + CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error + RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error) + ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error) } -func (s *TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) { +func (s *TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { return s.FindTaskByIDFn(ctx, id) } -func (s *TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) { +func (s *TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { return s.FindTasksFn(ctx, filter) } -func (s *TaskService) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) { +func (s *TaskService) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) { return s.CreateTaskFn(ctx, t) } -func (s *TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) { +func (s *TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { return s.UpdateTaskFn(ctx, id, upd) } -func (s *TaskService) DeleteTask(ctx context.Context, id platform.ID) error { +func (s *TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error { return s.DeleteTaskFn(ctx, id) } -func (s *TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) { +func (s *TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { return s.FindLogsFn(ctx, filter) } -func (s *TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) { +func (s *TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { return s.FindRunsFn(ctx, filter) } -func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { +func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { return s.FindRunByIDFn(ctx, taskID, runID) } -func (s *TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) error { +func (s *TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error { return s.CancelRunFn(ctx, taskID, runID) } -func (s *TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { +func (s *TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { return s.RetryRunFn(ctx, taskID, runID) } -func (s *TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) { +func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { return s.ForceRunFn(ctx, taskID, scheduledFor) } + +type TaskControlService struct { + CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) + NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error) + CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) + CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) + ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) + StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) + FinishRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) + UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error + AddRunLogFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error +} + +func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) { + return tcs.CreateNextRunFn(ctx, taskID, now) +} +func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) { + return tcs.NextDueRunFn(ctx, taskID) +} +func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { + return tcs.CreateRunFn(ctx, taskID, scheduledFor) +} +func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { + return tcs.CurrentlyRunningFn(ctx, taskID) +} +func (tcs *TaskControlService) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { + return tcs.ManualRunsFn(ctx, taskID) +} +func (tcs *TaskControlService) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { + return tcs.StartManualRunFn(ctx, taskID, runID) +} +func (tcs *TaskControlService) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { + return tcs.FinishRunFn(ctx, taskID, runID) +} +func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { + return tcs.UpdateRunStateFn(ctx, taskID, runID, when, state) +} +func (tcs *TaskControlService) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error { + return tcs.AddRunLogFn(ctx, taskID, runID, when, log) +} diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index bd88248bca..25d48b9aca 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -222,9 +222,29 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi return nil, 0, fmt.Errorf("unexpected internal error while decoding run response: %v", err) } - runs = append(runs, re.runs...) + runs = as.combineRuns(runs, re.runs) - return runs, n, err + return runs, len(runs), err +} + +// remove any kv runs that exist in the list of completed runs +func (as *AnalyticalStorage) combineRuns(currentRuns, completeRuns []*influxdb.Run) []*influxdb.Run { + crMap := map[influxdb.ID]int{} + + // track the current runs + for i, cr := range currentRuns { + crMap[cr.ID] = i + } + + // if we find a complete run that matches a current run the current run is out dated and + // should be removed. + for _, completeRun := range completeRuns { + if i, ok := crMap[completeRun.ID]; ok { + currentRuns = append(currentRuns[:i], currentRuns[i+1:]...) + } + } + + return append(currentRuns, completeRuns...) } // FindRunByID returns a single run. diff --git a/task/backend/analytical_storage_test.go b/task/backend/analytical_storage_test.go index 8b7b447a1b..e218233316 100644 --- a/task/backend/analytical_storage_test.go +++ b/task/backend/analytical_storage_test.go @@ -11,6 +11,7 @@ import ( icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/inmem" "github.com/influxdata/influxdb/kv" + "github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/query" _ "github.com/influxdata/influxdb/query/builtin" "github.com/influxdata/influxdb/query/control" @@ -54,6 +55,52 @@ func TestAnalyticalStore(t *testing.T) { ) } +func TestDeduplicateRuns(t *testing.T) { + svc := kv.NewService(inmem.NewKVStore()) + if err := svc.Initialize(context.Background()); err != nil { + t.Fatalf("error initializing kv service: %v", err) + } + + ab := newAnalyticalBackend(t, svc, svc) + defer ab.Close(t) + + mockTS := &mock.TaskService{ + FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) { + return &influxdb.Task{ID: 1, OrganizationID: 20}, nil + }, + FindRunsFn: func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error) { + return []*influxdb.Run{ + &influxdb.Run{ID: 2, Status: "started"}, + }, 1, nil + }, + } + mockTCS := &mock.TaskControlService{ + FinishRunFn: func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { + return &influxdb.Run{ID: 2, TaskID: 1, Status: "success", ScheduledFor: "1", StartedAt: "2", FinishedAt: "3"}, nil + }, + } + + svcStack := backend.NewAnalyticalStorage(zaptest.NewLogger(t), mockTS, mockTCS, ab.PointsWriter(), ab.QueryService()) + + _, err := svcStack.FinishRun(context.Background(), 1, 2) + if err != nil { + t.Fatal(err) + } + + runs, _, err := svcStack.FindRuns(context.Background(), influxdb.RunFilter{Task: 1}) + if err != nil { + t.Fatal(err) + } + + if len(runs) != 1 { + t.Fatalf("expected 1 run but got %d", len(runs)) + } + + if runs[0].Status != "success" { + t.Fatalf("expected the deduped run to be 'success', got: %s", runs[0].Status) + } +} + type analyticalBackend struct { queryController *control.Controller rootDir string