diff --git a/task/backend/bolt/bolt.go b/task/backend/bolt/bolt.go index b2c2392f30..87e78266b8 100644 --- a/task/backend/bolt/bolt.go +++ b/task/backend/bolt/bolt.go @@ -24,6 +24,7 @@ import ( bolt "github.com/coreos/bbolt" "github.com/influxdata/platform" + "github.com/influxdata/platform/snowflake" "github.com/influxdata/platform/task/backend" "github.com/influxdata/platform/task/options" ) @@ -46,6 +47,7 @@ var ErrNotFound = errors.New("task not found") type Store struct { db *bolt.DB bucket []byte + idGen platform.IDGenerator } const basePath = "/tasks/v1/" @@ -90,7 +92,7 @@ func New(db *bolt.DB, rootBucket string) (*Store, error) { if err != nil { return nil, err } - return &Store{db: db, bucket: bucket}, nil + return &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator()}, nil } // CreateTask creates a task in the boltdb task store. @@ -99,15 +101,12 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) ( if err != nil { return platform.InvalidID(), err } - - var id platform.ID + // Get ID + id := s.idGen.ID() err = s.db.Update(func(tx *bolt.Tx) error { // get the root bucket b := tx.Bucket(s.bucket) name := []byte(o.Name) - // Get ID - idi, _ := b.NextSequence() // we ignore this err check, because this can't err inside an Update call - id = platform.ID(idi) // Encode ID encodedID, err := id.Encode() if err != nil { @@ -631,21 +630,14 @@ func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64 } var stm backend.StoreTaskMeta - if err := stm.Unmarshal(stmBytes); err != nil { + err := stm.Unmarshal(stmBytes) + if err != nil { return err } - makeID := func() (platform.ID, error) { - idi, err := b.Bucket(runIDs).NextSequence() - if err != nil { - return platform.InvalidID(), err - } - - return platform.ID(idi), nil - } - - var err error - rc, err = stm.CreateNextRun(now, makeID) + rc, err = stm.CreateNextRun(now, func() (platform.ID, error) { + return s.idGen.ID(), nil + }) if err != nil { return err } diff --git a/task/backend/storetest/storetest.go b/task/backend/storetest/storetest.go index 967507b686..b29ef85b45 100644 --- a/task/backend/storetest/storetest.go +++ b/task/backend/storetest/storetest.go @@ -769,9 +769,8 @@ from(bucket:"test") |> range(start:-1h)` if err != nil { t.Fatal(err) } - if rc.Created.TaskID != taskID { - t.Fatalf("bad created task ID; exp %x got %x", taskID, rc.Created.TaskID) + t.Fatalf("bad created task ID; exp %s got %s", taskID, rc.Created.TaskID) } if rc.Created.Now != 60 { t.Fatalf("unexpected time for created run: %d", rc.Created.Now) diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 04fb4a61ed..c680c061ef 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -3,6 +3,7 @@ package mock import ( "context" + "errors" "fmt" "strings" "sync" @@ -195,7 +196,9 @@ func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMet func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) { d.mu.Lock() defer d.mu.Unlock() - + if !taskID.Valid() { + return backend.RunCreation{}, errors.New("Invalid task id") + } tid := taskID.String() meta, ok := d.meta[tid]