fix(task): Remove allowance for duplicate run's in run list (#14875)
If we are caching run's in the kv storage system it is possible to get the the cached version from the kv store and the recently completed run from the analytical store. We just need to only show analytical results if we find a duplicate.pull/14881/head
parent
d2d77305a3
commit
5d1c4d814b
|
@ -2,66 +2,109 @@ package mock
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
platform "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
)
|
||||
|
||||
var _ platform.TaskService = (*TaskService)(nil)
|
||||
var _ influxdb.TaskService = (*TaskService)(nil)
|
||||
var _ backend.TaskControlService = (*TaskControlService)(nil)
|
||||
|
||||
type TaskService struct {
|
||||
FindTaskByIDFn func(context.Context, platform.ID) (*platform.Task, error)
|
||||
FindTasksFn func(context.Context, platform.TaskFilter) ([]*platform.Task, int, error)
|
||||
CreateTaskFn func(context.Context, platform.TaskCreate) (*platform.Task, error)
|
||||
UpdateTaskFn func(context.Context, platform.ID, platform.TaskUpdate) (*platform.Task, error)
|
||||
DeleteTaskFn func(context.Context, platform.ID) error
|
||||
FindLogsFn func(context.Context, platform.LogFilter) ([]*platform.Log, int, error)
|
||||
FindRunsFn func(context.Context, platform.RunFilter) ([]*platform.Run, int, error)
|
||||
FindRunByIDFn func(context.Context, platform.ID, platform.ID) (*platform.Run, error)
|
||||
CancelRunFn func(context.Context, platform.ID, platform.ID) error
|
||||
RetryRunFn func(context.Context, platform.ID, platform.ID) (*platform.Run, error)
|
||||
ForceRunFn func(context.Context, platform.ID, int64) (*platform.Run, error)
|
||||
FindTaskByIDFn func(context.Context, influxdb.ID) (*influxdb.Task, error)
|
||||
FindTasksFn func(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error)
|
||||
CreateTaskFn func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error)
|
||||
UpdateTaskFn func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error)
|
||||
DeleteTaskFn func(context.Context, influxdb.ID) error
|
||||
FindLogsFn func(context.Context, influxdb.LogFilter) ([]*influxdb.Log, int, error)
|
||||
FindRunsFn func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error)
|
||||
FindRunByIDFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
|
||||
CancelRunFn func(context.Context, influxdb.ID, influxdb.ID) error
|
||||
RetryRunFn func(context.Context, influxdb.ID, influxdb.ID) (*influxdb.Run, error)
|
||||
ForceRunFn func(context.Context, influxdb.ID, int64) (*influxdb.Run, error)
|
||||
}
|
||||
|
||||
func (s *TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||
func (s *TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
|
||||
return s.FindTaskByIDFn(ctx, id)
|
||||
}
|
||||
|
||||
func (s *TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
|
||||
func (s *TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
|
||||
return s.FindTasksFn(ctx, filter)
|
||||
}
|
||||
|
||||
func (s *TaskService) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
|
||||
func (s *TaskService) CreateTask(ctx context.Context, t influxdb.TaskCreate) (*influxdb.Task, error) {
|
||||
return s.CreateTaskFn(ctx, t)
|
||||
}
|
||||
|
||||
func (s *TaskService) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
|
||||
func (s *TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
|
||||
return s.UpdateTaskFn(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s *TaskService) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||
func (s *TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error {
|
||||
return s.DeleteTaskFn(ctx, id)
|
||||
}
|
||||
|
||||
func (s *TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||
func (s *TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
|
||||
return s.FindLogsFn(ctx, filter)
|
||||
}
|
||||
|
||||
func (s *TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||
func (s *TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
||||
return s.FindRunsFn(ctx, filter)
|
||||
}
|
||||
|
||||
func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||
func (s *TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||
return s.FindRunByIDFn(ctx, taskID, runID)
|
||||
}
|
||||
|
||||
func (s *TaskService) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
||||
func (s *TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error {
|
||||
return s.CancelRunFn(ctx, taskID, runID)
|
||||
}
|
||||
|
||||
func (s *TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
|
||||
func (s *TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||
return s.RetryRunFn(ctx, taskID, runID)
|
||||
}
|
||||
|
||||
func (s *TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
|
||||
func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
|
||||
return s.ForceRunFn(ctx, taskID, scheduledFor)
|
||||
}
|
||||
|
||||
type TaskControlService struct {
|
||||
CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error)
|
||||
NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error)
|
||||
CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error)
|
||||
CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
|
||||
ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
|
||||
StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
|
||||
FinishRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
|
||||
UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error
|
||||
AddRunLogFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
|
||||
}
|
||||
|
||||
func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) {
|
||||
return tcs.CreateNextRunFn(ctx, taskID, now)
|
||||
}
|
||||
func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
|
||||
return tcs.NextDueRunFn(ctx, taskID)
|
||||
}
|
||||
func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
|
||||
return tcs.CreateRunFn(ctx, taskID, scheduledFor)
|
||||
}
|
||||
func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
||||
return tcs.CurrentlyRunningFn(ctx, taskID)
|
||||
}
|
||||
func (tcs *TaskControlService) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
||||
return tcs.ManualRunsFn(ctx, taskID)
|
||||
}
|
||||
func (tcs *TaskControlService) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||
return tcs.StartManualRunFn(ctx, taskID, runID)
|
||||
}
|
||||
func (tcs *TaskControlService) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||
return tcs.FinishRunFn(ctx, taskID, runID)
|
||||
}
|
||||
func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
|
||||
return tcs.UpdateRunStateFn(ctx, taskID, runID, when, state)
|
||||
}
|
||||
func (tcs *TaskControlService) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
|
||||
return tcs.AddRunLogFn(ctx, taskID, runID, when, log)
|
||||
}
|
||||
|
|
|
@ -222,9 +222,29 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
|
|||
return nil, 0, fmt.Errorf("unexpected internal error while decoding run response: %v", err)
|
||||
}
|
||||
|
||||
runs = append(runs, re.runs...)
|
||||
runs = as.combineRuns(runs, re.runs)
|
||||
|
||||
return runs, n, err
|
||||
return runs, len(runs), err
|
||||
}
|
||||
|
||||
// remove any kv runs that exist in the list of completed runs
|
||||
func (as *AnalyticalStorage) combineRuns(currentRuns, completeRuns []*influxdb.Run) []*influxdb.Run {
|
||||
crMap := map[influxdb.ID]int{}
|
||||
|
||||
// track the current runs
|
||||
for i, cr := range currentRuns {
|
||||
crMap[cr.ID] = i
|
||||
}
|
||||
|
||||
// if we find a complete run that matches a current run the current run is out dated and
|
||||
// should be removed.
|
||||
for _, completeRun := range completeRuns {
|
||||
if i, ok := crMap[completeRun.ID]; ok {
|
||||
currentRuns = append(currentRuns[:i], currentRuns[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
return append(currentRuns, completeRuns...)
|
||||
}
|
||||
|
||||
// FindRunByID returns a single run.
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
icontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/inmem"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
"github.com/influxdata/influxdb/mock"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
_ "github.com/influxdata/influxdb/query/builtin"
|
||||
"github.com/influxdata/influxdb/query/control"
|
||||
|
@ -54,6 +55,52 @@ func TestAnalyticalStore(t *testing.T) {
|
|||
)
|
||||
}
|
||||
|
||||
func TestDeduplicateRuns(t *testing.T) {
|
||||
svc := kv.NewService(inmem.NewKVStore())
|
||||
if err := svc.Initialize(context.Background()); err != nil {
|
||||
t.Fatalf("error initializing kv service: %v", err)
|
||||
}
|
||||
|
||||
ab := newAnalyticalBackend(t, svc, svc)
|
||||
defer ab.Close(t)
|
||||
|
||||
mockTS := &mock.TaskService{
|
||||
FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) {
|
||||
return &influxdb.Task{ID: 1, OrganizationID: 20}, nil
|
||||
},
|
||||
FindRunsFn: func(context.Context, influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
||||
return []*influxdb.Run{
|
||||
&influxdb.Run{ID: 2, Status: "started"},
|
||||
}, 1, nil
|
||||
},
|
||||
}
|
||||
mockTCS := &mock.TaskControlService{
|
||||
FinishRunFn: func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||
return &influxdb.Run{ID: 2, TaskID: 1, Status: "success", ScheduledFor: "1", StartedAt: "2", FinishedAt: "3"}, nil
|
||||
},
|
||||
}
|
||||
|
||||
svcStack := backend.NewAnalyticalStorage(zaptest.NewLogger(t), mockTS, mockTCS, ab.PointsWriter(), ab.QueryService())
|
||||
|
||||
_, err := svcStack.FinishRun(context.Background(), 1, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
runs, _, err := svcStack.FindRuns(context.Background(), influxdb.RunFilter{Task: 1})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(runs) != 1 {
|
||||
t.Fatalf("expected 1 run but got %d", len(runs))
|
||||
}
|
||||
|
||||
if runs[0].Status != "success" {
|
||||
t.Fatalf("expected the deduped run to be 'success', got: %s", runs[0].Status)
|
||||
}
|
||||
}
|
||||
|
||||
type analyticalBackend struct {
|
||||
queryController *control.Controller
|
||||
rootDir string
|
||||
|
|
Loading…
Reference in New Issue