From de577125a8df02bd567c43287de8ed958f53f4d1 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Fri, 17 Aug 2018 11:49:09 -0700 Subject: [PATCH] refactor(task): remove CreateRun method from Store This method has been superseded by CreateNextRun. --- task/backend/bolt/bolt.go | 44 ----------------------- task/backend/inmem_store.go | 32 ----------------- task/backend/scheduler_test.go | 2 +- task/backend/store.go | 3 -- task/backend/storetest/storetest.go | 54 +++++------------------------ task/mock/scheduler.go | 21 ----------- 6 files changed, 9 insertions(+), 147 deletions(-) diff --git a/task/backend/bolt/bolt.go b/task/backend/bolt/bolt.go index 5f9497dca8..10633fafe0 100644 --- a/task/backend/bolt/bolt.go +++ b/task/backend/bolt/bolt.go @@ -537,50 +537,6 @@ func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, e return true, nil } -// CreateRun adds `now` to the task's metaData if we have not exceeded 'max_concurrency'. -func (s *Store) CreateRun(ctx context.Context, taskID platform.ID, now int64) (backend.QueuedRun, error) { - queuedRun := backend.QueuedRun{TaskID: append([]byte(nil), taskID...), Now: now} - stm := backend.StoreTaskMeta{} - paddedID := padID(taskID) - if err := s.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket) - stmBytes := b.Bucket(taskMetaPath).Get(paddedID) - if err := stm.Unmarshal(stmBytes); err != nil { - return err - } - if len(stm.CurrentlyRunning) >= int(stm.MaxConcurrency) { - return ErrMaxConcurrency - } - - id := make(platform.ID, 8) - idi, err := b.Bucket(runIDs).NextSequence() - if err != nil { - return err - } - - binary.BigEndian.PutUint64(id, idi) - running := &backend.StoreTaskMetaRun{ - Now: now, - Try: 1, - RunID: id, - } - - stm.CurrentlyRunning = append(stm.CurrentlyRunning, running) - stmBytes, err = stm.Marshal() - if err != nil { - return err - } - - queuedRun.RunID = id - - return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(paddedID, stmBytes) - }); err != nil { - return queuedRun, err - } - - return queuedRun, nil -} - func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) { var rc backend.RunCreation paddedID := padID(taskID) diff --git a/task/backend/inmem_store.go b/task/backend/inmem_store.go index 99b87ccce5..cf6d2544f9 100644 --- a/task/backend/inmem_store.go +++ b/task/backend/inmem_store.go @@ -233,38 +233,6 @@ func (s *inmem) Close() error { return nil } -// CreateRun adds `now` to the task's metaData if we have not exceeded 'max_concurrency'. -func (s *inmem) CreateRun(ctx context.Context, taskID platform.ID, now int64) (QueuedRun, error) { - queuedRun := QueuedRun{} - - stm, ok := s.runners[taskID.String()] - if !ok { - return queuedRun, errors.New("taskRunner not found") - } - - if len(stm.CurrentlyRunning) >= int(stm.MaxConcurrency) { - return queuedRun, errors.New("MaxConcurrency reached") - } - - runID := s.idgen.ID() - - running := &StoreTaskMetaRun{ - Now: now, - Try: 1, - RunID: runID, - } - - stm.CurrentlyRunning = append(stm.CurrentlyRunning, running) - s.mu.Lock() - s.runners[taskID.String()] = stm - s.mu.Unlock() - - queuedRun.TaskID = taskID - queuedRun.RunID = runID - queuedRun.Now = now - return queuedRun, nil -} - func (s *inmem) CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index e6cbb8a574..41c0b13876 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -56,7 +56,7 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { } } -func TestScheduler_CreateRunOnTick(t *testing.T) { +func TestScheduler_CreateNextRunOnTick(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) diff --git a/task/backend/store.go b/task/backend/store.go index b1af3a5f95..395d756131 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -113,9 +113,6 @@ type Store interface { // or deleted is true if there was a matching entry and it was deleted. DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) - // CreateRun adds `now` to the task's metaData if we have not exceeded 'max_concurrency'. - CreateRun(ctx context.Context, taskID platform.ID, now int64) (QueuedRun, error) - // CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now. // Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run. CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) diff --git a/task/backend/storetest/storetest.go b/task/backend/storetest/storetest.go index 9aaa1cef7f..7ced2f156c 100644 --- a/task/backend/storetest/storetest.go +++ b/task/backend/storetest/storetest.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "fmt" "math" - "strings" "testing" "time" @@ -29,7 +28,6 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName "FindMeta", "EnableDisableTask", "DeleteTask", - "CreateRun", "CreateNextRun", "FinishRun", } @@ -42,7 +40,6 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName "FindMeta": testStoreFindMeta, "EnableDisableTask": testStoreTaskEnableDisable, "DeleteTask": testStoreDelete, - "CreateRun": testStoreCreateRun, "CreateNextRun": testStoreCreateNextRun, "FinishRun": testStoreFinishRun, "DeleteOrg": testStoreDeleteOrg, @@ -408,7 +405,7 @@ func testStoreFindMeta(t *testing.T, create CreateStoreFunc, destroy DestroyStor name: "a task", cron: "* * * * *", concurrency: 3, - delay: 2m, + delay: 5s, } from(db:"test") |> range(start:-1h)` @@ -441,7 +438,7 @@ from(db:"test") |> range(start:-1h)` t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron) } - if time.Duration(meta.Delay)*time.Second != 2*time.Minute { + if time.Duration(meta.Delay)*time.Second != 5*time.Second { t.Fatalf("unexpected delay stored in meta: %v", meta.Delay) } @@ -454,17 +451,17 @@ from(db:"test") |> range(start:-1h)` t.Fatalf("expected nil meta when finding nonexistent ID, got %#v", meta) } - qr, err := s.CreateRun(context.Background(), id, 6060) + rc, err := s.CreateNextRun(context.Background(), id, 6065) if err != nil { t.Fatal(err) } - _, err = s.CreateRun(context.Background(), id, 6120) + _, err = s.CreateNextRun(context.Background(), id, 6125) if err != nil { t.Fatal(err) } - err = s.FinishRun(context.Background(), id, qr.RunID) + err = s.FinishRun(context.Background(), id, rc.Created.RunID) if err != nil { t.Fatal(err) } @@ -583,41 +580,6 @@ from(db:"test") |> range(start:-1h)` }) } -func testStoreCreateRun(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { - const script = `option task = { - name: "a task", - cron: "* * * * *", - } - -from(db:"test") |> range(start:-1h)` - s := create(t) - defer destroy(t, s) - - task, err := s.CreateTask(context.Background(), []byte{1}, []byte{2}, script, 0) - if err != nil { - t.Fatal(err) - } - - run, err := s.CreateRun(context.Background(), task, 1) - if err != nil { - t.Fatal(err) - } - - if run.TaskID.String() != task.String() { - t.Fatalf("task id mismatch: want %q, got %q", task.String(), run.TaskID.String()) - } - - if run.Now != 1 { - t.Fatal("run now mismatch") - } - - if _, err := s.CreateRun(context.Background(), task, 1); err == nil { - t.Fatal("expected error for exceeding MaxConcurrency") - } else if !strings.Contains(err.Error(), "MaxConcurrency") { - t.Fatalf("expected error for MaxConcurrency, got %v", err) - } -} - func testStoreCreateNextRun(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { const script = `option task = { name: "a task", @@ -695,16 +657,16 @@ from(db:"test") |> range(start:-1h)` t.Fatal(err) } - run, err := s.CreateRun(context.Background(), task, 1) + rc, err := s.CreateNextRun(context.Background(), task, 60) if err != nil { t.Fatal(err) } - if err := s.FinishRun(context.Background(), task, run.RunID); err != nil { + if err := s.FinishRun(context.Background(), task, rc.Created.RunID); err != nil { t.Fatal(err) } - if err := s.FinishRun(context.Background(), task, run.RunID); err == nil { + if err := s.FinishRun(context.Background(), task, rc.Created.RunID); err == nil { t.Fatal("expected failure when removing run that doesnt exist") } } diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 18b44de646..7fab695bb8 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -148,27 +148,6 @@ func NewDesiredState() *DesiredState { } } -// TODO(mr): inject a way to treat CreateRun as blocking? -func (d *DesiredState) CreateRun(_ context.Context, taskID platform.ID, now int64) (backend.QueuedRun, error) { - d.mu.Lock() - defer d.mu.Unlock() - - tid := taskID.String() - d.runIDs[tid]++ - - runID := make([]byte, 4) - binary.BigEndian.PutUint32(runID, d.runIDs[tid]) - qr := backend.QueuedRun{ - TaskID: taskID, - RunID: runID, - Now: now, - } - - d.created[tid+platform.ID(runID).String()] = qr - - return qr, nil -} - // SetTaskMeta sets the task meta for the given task ID. // SetTaskMeta must be called before CreateNextRun, for a given task ID. func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMeta) {