Merge pull request #1242 from influxdata/usesnowflake
Use snowflake for bolt task idspull/10616/head
commit
5a8dd6bfef
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
bolt "github.com/coreos/bbolt"
|
bolt "github.com/coreos/bbolt"
|
||||||
"github.com/influxdata/platform"
|
"github.com/influxdata/platform"
|
||||||
|
"github.com/influxdata/platform/snowflake"
|
||||||
"github.com/influxdata/platform/task/backend"
|
"github.com/influxdata/platform/task/backend"
|
||||||
"github.com/influxdata/platform/task/options"
|
"github.com/influxdata/platform/task/options"
|
||||||
)
|
)
|
||||||
|
@ -46,6 +47,7 @@ var ErrNotFound = errors.New("task not found")
|
||||||
type Store struct {
|
type Store struct {
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
bucket []byte
|
bucket []byte
|
||||||
|
idGen platform.IDGenerator
|
||||||
}
|
}
|
||||||
|
|
||||||
const basePath = "/tasks/v1/"
|
const basePath = "/tasks/v1/"
|
||||||
|
@ -90,7 +92,7 @@ func New(db *bolt.DB, rootBucket string) (*Store, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Store{db: db, bucket: bucket}, nil
|
return &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateTask creates a task in the boltdb task store.
|
// CreateTask creates a task in the boltdb task store.
|
||||||
|
@ -99,15 +101,12 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return platform.InvalidID(), err
|
return platform.InvalidID(), err
|
||||||
}
|
}
|
||||||
|
// Get ID
|
||||||
var id platform.ID
|
id := s.idGen.ID()
|
||||||
err = s.db.Update(func(tx *bolt.Tx) error {
|
err = s.db.Update(func(tx *bolt.Tx) error {
|
||||||
// get the root bucket
|
// get the root bucket
|
||||||
b := tx.Bucket(s.bucket)
|
b := tx.Bucket(s.bucket)
|
||||||
name := []byte(o.Name)
|
name := []byte(o.Name)
|
||||||
// Get ID
|
|
||||||
idi, _ := b.NextSequence() // we ignore this err check, because this can't err inside an Update call
|
|
||||||
id = platform.ID(idi)
|
|
||||||
// Encode ID
|
// Encode ID
|
||||||
encodedID, err := id.Encode()
|
encodedID, err := id.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -631,21 +630,14 @@ func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var stm backend.StoreTaskMeta
|
var stm backend.StoreTaskMeta
|
||||||
if err := stm.Unmarshal(stmBytes); err != nil {
|
err := stm.Unmarshal(stmBytes)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
makeID := func() (platform.ID, error) {
|
rc, err = stm.CreateNextRun(now, func() (platform.ID, error) {
|
||||||
idi, err := b.Bucket(runIDs).NextSequence()
|
return s.idGen.ID(), nil
|
||||||
if err != nil {
|
})
|
||||||
return platform.InvalidID(), err
|
|
||||||
}
|
|
||||||
|
|
||||||
return platform.ID(idi), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
rc, err = stm.CreateNextRun(now, makeID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -769,9 +769,8 @@ from(bucket:"test") |> range(start:-1h)`
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rc.Created.TaskID != taskID {
|
if rc.Created.TaskID != taskID {
|
||||||
t.Fatalf("bad created task ID; exp %x got %x", taskID, rc.Created.TaskID)
|
t.Fatalf("bad created task ID; exp %s got %s", taskID, rc.Created.TaskID)
|
||||||
}
|
}
|
||||||
if rc.Created.Now != 60 {
|
if rc.Created.Now != 60 {
|
||||||
t.Fatalf("unexpected time for created run: %d", rc.Created.Now)
|
t.Fatalf("unexpected time for created run: %d", rc.Created.Now)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package mock
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -195,7 +196,9 @@ func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMet
|
||||||
func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) {
|
func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) {
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.mu.Unlock()
|
||||||
|
if !taskID.Valid() {
|
||||||
|
return backend.RunCreation{}, errors.New("Invalid task id")
|
||||||
|
}
|
||||||
tid := taskID.String()
|
tid := taskID.String()
|
||||||
|
|
||||||
meta, ok := d.meta[tid]
|
meta, ok := d.meta[tid]
|
||||||
|
|
Loading…
Reference in New Issue