2018-07-26 20:27:35 +00:00
|
|
|
// Package bolt provides an bolt-backed store implementation.
|
|
|
|
//
|
|
|
|
// The data stored in bolt is structured as follows:
|
|
|
|
//
|
|
|
|
// bucket(/tasks/v1/tasks) key(:task_id) -> Content of submitted task (i.e. flux code).
|
2018-08-02 21:02:38 +00:00
|
|
|
// bucket(/tasks/v1/task_meta) Key(:task_id) -> Protocol Buffer encoded backend.StoreTaskMeta,
|
2018-07-26 20:27:35 +00:00
|
|
|
// so we have a consistent view of runs in progress and max concurrency.
|
|
|
|
// bucket(/tasks/v1/org_by_task_id) key(task_id) -> The organization ID (stored as encoded string) associated with given task.
|
|
|
|
// bucket(/tasks/v1/user_by_task_id) key(:task_id) -> The user ID (stored as encoded string) associated with given task.
|
|
|
|
// buket(/tasks/v1/name_by_task_id) key(:task_id) -> The user-supplied name of the script.
|
2018-08-09 23:54:02 +00:00
|
|
|
// bucket(/tasks/v1/name_by_org) key(:org_id) -> Task ID. This allows us to make task names unique for org
|
|
|
|
// bucket(/tasks/v1/name_by_user) key(:user_id) -> Task ID. This allows us to make task names unique for user
|
2018-07-26 20:27:35 +00:00
|
|
|
// bucket(/tasks/v1/run_ids) -> Counter for run IDs
|
|
|
|
// bucket(/tasks/v1/orgs).bucket(:org_id) key(:task_id) -> Empty content; presence of :task_id allows for lookup from org to tasks.
|
|
|
|
// bucket(/tasks/v1/users).bucket(:user_id) key(:task_id) -> Empty content; presence of :task_id allows for lookup from user to tasks.
|
|
|
|
// Note that task IDs are stored big-endian uint64s for sorting purposes,
|
|
|
|
// but presented to the users with leading 0-bytes stripped.
|
|
|
|
// Like other components of the system, IDs presented to users may be `0f12` rather than `f12`.
|
|
|
|
package bolt
|
|
|
|
|
|
|
|
import (
|
2018-08-01 18:54:32 +00:00
|
|
|
"bytes"
|
2018-07-26 20:27:35 +00:00
|
|
|
"context"
|
2018-08-01 18:54:32 +00:00
|
|
|
"encoding/binary"
|
2018-07-26 20:27:35 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
bolt "github.com/coreos/bbolt"
|
|
|
|
"github.com/influxdata/platform"
|
|
|
|
"github.com/influxdata/platform/task/backend"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ErrDBReadOnly is an error for when the database is set to read only.
|
|
|
|
// Tasks needs to be able to write to the db.
|
|
|
|
var ErrDBReadOnly = errors.New("db is read only")
|
|
|
|
|
|
|
|
// ErrMaxConcurrency is an error for when the max concurrency is already
|
|
|
|
// reached for a task when you try to schedule a task.
|
|
|
|
var ErrMaxConcurrency = errors.New("MaxConcurrency reached")
|
|
|
|
|
|
|
|
// ErrRunNotFound is an error for when a run isn't found in a FinishRun method.
|
|
|
|
var ErrRunNotFound = errors.New("run not found")
|
|
|
|
|
|
|
|
// ErrNotFound is an error for when a task could not be found
|
|
|
|
var ErrNotFound = errors.New("task not found")
|
|
|
|
|
|
|
|
// Store is task store for bolt.
|
|
|
|
type Store struct {
|
|
|
|
db *bolt.DB
|
|
|
|
bucket []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
const basePath = "/tasks/v1/"
|
|
|
|
|
|
|
|
var (
|
|
|
|
tasksPath = []byte(basePath + "tasks")
|
|
|
|
orgsPath = []byte(basePath + "orgs")
|
|
|
|
usersPath = []byte(basePath + "users")
|
|
|
|
taskMetaPath = []byte(basePath + "task_meta")
|
|
|
|
orgByTaskID = []byte(basePath + "org_by_task_id")
|
|
|
|
userByTaskID = []byte(basePath + "user_by_task_id")
|
2018-08-09 23:54:02 +00:00
|
|
|
nameByUser = []byte(basePath + "name_by_user")
|
|
|
|
nameByOrg = []byte(basePath + "name_by_org")
|
2018-07-26 20:27:35 +00:00
|
|
|
nameByTaskID = []byte(basePath + "name_by_task_id")
|
|
|
|
runIDs = []byte(basePath + "run_ids")
|
|
|
|
)
|
|
|
|
|
|
|
|
// New gives us a new Store based on "github.com/coreos/bbolt"
|
|
|
|
func New(db *bolt.DB, rootBucket string) (*Store, error) {
|
|
|
|
if db.IsReadOnly() {
|
|
|
|
return nil, ErrDBReadOnly
|
|
|
|
}
|
|
|
|
bucket := []byte(rootBucket)
|
|
|
|
|
|
|
|
err := db.Update(func(tx *bolt.Tx) error {
|
|
|
|
// create root
|
|
|
|
root, err := tx.CreateBucketIfNotExists(bucket)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// create the buckets inside the root
|
|
|
|
for _, b := range [][]byte{
|
|
|
|
tasksPath, orgsPath, usersPath, taskMetaPath,
|
2018-08-09 23:54:02 +00:00
|
|
|
orgByTaskID, userByTaskID, nameByUser, nameByOrg,
|
|
|
|
nameByTaskID, runIDs,
|
2018-07-26 20:27:35 +00:00
|
|
|
} {
|
|
|
|
_, err := root.CreateBucketIfNotExists(b)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &Store{db: db, bucket: bucket}, nil
|
|
|
|
}
|
|
|
|
|
2018-08-09 23:54:02 +00:00
|
|
|
// checkIfNameIsUsed is a helper function that returns an error if a name if already used
|
|
|
|
func (s *Store) checkIfNameIsUsed(b *bolt.Bucket, name []byte, org, user, taskID platform.ID) error {
|
|
|
|
var bNameByOrg *bolt.Bucket
|
|
|
|
if bNameByOrg = b.Bucket(nameByOrg); bNameByOrg != nil {
|
|
|
|
if bnameByOrgOrg := bNameByOrg.Bucket([]byte(org)); bnameByOrgOrg != nil {
|
|
|
|
gName := bnameByOrgOrg.Get(name)
|
|
|
|
if gName != nil && !bytes.Equal(gName, taskID) {
|
|
|
|
return backend.ErrTaskNameTaken
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var bNameByUser *bolt.Bucket
|
|
|
|
if bNameByUser = b.Bucket(nameByUser); bNameByUser != nil {
|
|
|
|
if bnameByUserUser := bNameByUser.Bucket([]byte(user)); bnameByUserUser != nil {
|
|
|
|
gUser := bnameByUserUser.Get(name)
|
|
|
|
if gUser != nil && !bytes.Equal(gUser, taskID) {
|
|
|
|
return backend.ErrTaskNameTaken
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-07-26 20:27:35 +00:00
|
|
|
// CreateTask creates a task in the boltdb task store.
|
2018-08-10 23:02:13 +00:00
|
|
|
func (s *Store) CreateTask(ctx context.Context, org, user platform.ID, script string, scheduleAfter int64) (platform.ID, error) {
|
2018-07-26 20:27:35 +00:00
|
|
|
o, err := backend.StoreValidator.CreateArgs(org, user, script)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
id := make(platform.ID, 8)
|
2018-08-09 23:54:02 +00:00
|
|
|
var upid []byte
|
2018-07-26 20:27:35 +00:00
|
|
|
err = s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
// get the root bucket
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-09 23:54:02 +00:00
|
|
|
name := []byte(o.Name)
|
2018-07-26 20:27:35 +00:00
|
|
|
// Get ID
|
|
|
|
idi, _ := b.NextSequence() // we ignore this err check, because this can't err inside an Update call
|
2018-08-01 18:54:32 +00:00
|
|
|
binary.BigEndian.PutUint64(id, idi)
|
2018-08-09 23:54:02 +00:00
|
|
|
upid = unpadID(id)
|
|
|
|
if err := s.checkIfNameIsUsed(b, name, org, user, upid); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-07-26 20:27:35 +00:00
|
|
|
// write script
|
2018-08-01 18:54:32 +00:00
|
|
|
err := b.Bucket(tasksPath).Put(id, []byte(script))
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// name
|
2018-08-09 23:54:02 +00:00
|
|
|
err = b.Bucket(nameByTaskID).Put(id, name)
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// org
|
2018-08-01 18:54:32 +00:00
|
|
|
orgB, err := b.Bucket(orgsPath).CreateBucketIfNotExists([]byte(org))
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
err = orgB.Put(id, nil)
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-09 23:54:02 +00:00
|
|
|
// name by org
|
|
|
|
orgB, err = b.Bucket(nameByOrg).CreateBucketIfNotExists([]byte(org))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = orgB.Put(name, upid)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
err = b.Bucket(orgByTaskID).Put(id, []byte(org))
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// user
|
2018-08-01 18:54:32 +00:00
|
|
|
userB, err := b.Bucket(usersPath).CreateBucketIfNotExists([]byte(user))
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
err = userB.Put(id, nil)
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-08-09 23:54:02 +00:00
|
|
|
// name by user
|
|
|
|
userB, err = b.Bucket(nameByUser).CreateBucketIfNotExists([]byte(user))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = userB.Put(name, upid)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-07-26 20:27:35 +00:00
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
err = b.Bucket(userByTaskID).Put(id, []byte(user))
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// metadata
|
2018-08-02 21:02:38 +00:00
|
|
|
stm := backend.StoreTaskMeta{
|
2018-07-31 15:47:12 +00:00
|
|
|
MaxConcurrency: int32(o.Concurrency),
|
2018-08-06 17:54:32 +00:00
|
|
|
Status: string(backend.TaskEnabled),
|
2018-08-10 23:02:13 +00:00
|
|
|
LastCompleted: scheduleAfter,
|
2018-08-15 23:13:35 +00:00
|
|
|
EffectiveCron: o.EffectiveCronString(),
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
stmBytes, err := stm.Marshal()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
metaB := b.Bucket(taskMetaPath)
|
2018-08-01 18:54:32 +00:00
|
|
|
return metaB.Put(id, stmBytes)
|
2018-07-26 20:27:35 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-08-09 23:54:02 +00:00
|
|
|
return upid, nil
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ModifyTask changes a task with a new script, it should error if the task does not exist.
|
|
|
|
func (s *Store) ModifyTask(ctx context.Context, id platform.ID, newScript string) error {
|
2018-08-09 23:54:02 +00:00
|
|
|
op, err := backend.StoreValidator.ModifyArgs(id, newScript)
|
|
|
|
if err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := padID(id)
|
2018-07-26 20:27:35 +00:00
|
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
2018-08-09 23:54:02 +00:00
|
|
|
b := tx.Bucket(s.bucket)
|
|
|
|
bt := b.Bucket(tasksPath)
|
|
|
|
if v := bt.Get(paddedID); v == nil { // this is so we can error if the task doesn't exist
|
2018-07-26 20:27:35 +00:00
|
|
|
return ErrNotFound
|
|
|
|
}
|
2018-08-09 23:54:02 +00:00
|
|
|
err = bt.Put(paddedID, []byte(newScript))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO(docmerlin): org and user should be passed in, somehow, maybe via context or as an arg or something, these lookups are unecessairly expensive
|
|
|
|
org := b.Bucket(orgByTaskID).Get(paddedID)
|
|
|
|
user := b.Bucket(userByTaskID).Get(paddedID)
|
|
|
|
name := []byte(op.Name)
|
|
|
|
if err := s.checkIfNameIsUsed(b, name, org, user, id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err = b.Bucket(nameByOrg).Bucket(org).Put(name, id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if b.Bucket(nameByUser).Bucket(user).Put(name, id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return b.Bucket(nameByTaskID).Put(paddedID, name)
|
2018-07-26 20:27:35 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListTasks lists the tasks based on a filter.
|
|
|
|
func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams) ([]backend.StoreTask, error) {
|
2018-08-01 18:54:32 +00:00
|
|
|
if len(params.Org) > 0 && len(params.User) > 0 {
|
2018-07-26 20:27:35 +00:00
|
|
|
return nil, errors.New("ListTasks: org and user filters are mutually exclusive")
|
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
|
|
|
defaultPageSize = 100
|
|
|
|
maxPageSize = 500
|
|
|
|
)
|
|
|
|
if params.PageSize < 0 {
|
|
|
|
return nil, errors.New("ListTasks: PageSize must be positive")
|
|
|
|
}
|
|
|
|
if params.PageSize > maxPageSize {
|
|
|
|
return nil, fmt.Errorf("ListTasks: PageSize exceeds maximum of %d", maxPageSize)
|
|
|
|
}
|
|
|
|
lim := params.PageSize
|
|
|
|
if lim == 0 {
|
|
|
|
lim = defaultPageSize
|
|
|
|
}
|
|
|
|
taskIDs := make([]platform.ID, 0, params.PageSize)
|
|
|
|
|
|
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
|
|
|
var c *bolt.Cursor
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
if len(params.Org) > 0 {
|
|
|
|
orgB := b.Bucket(orgsPath).Bucket(params.Org)
|
2018-07-26 20:27:35 +00:00
|
|
|
if orgB == nil {
|
|
|
|
return ErrNotFound
|
|
|
|
}
|
|
|
|
c = orgB.Cursor()
|
2018-08-01 18:54:32 +00:00
|
|
|
} else if len(params.User) > 0 {
|
|
|
|
userB := b.Bucket(usersPath).Bucket(params.User)
|
2018-07-26 20:27:35 +00:00
|
|
|
if userB == nil {
|
|
|
|
return ErrNotFound
|
|
|
|
}
|
|
|
|
c = userB.Cursor()
|
|
|
|
} else {
|
|
|
|
c = b.Bucket(tasksPath).Cursor()
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if len(params.After) > 0 {
|
|
|
|
c.Seek(padID(params.After))
|
2018-07-26 20:27:35 +00:00
|
|
|
for k, _ := c.Next(); k != nil && len(taskIDs) < lim; k, _ = c.Next() {
|
2018-08-01 18:54:32 +00:00
|
|
|
taskIDs = append(taskIDs, k)
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
for k, _ := c.First(); k != nil && len(taskIDs) < lim; k, _ = c.Next() {
|
2018-08-01 18:54:32 +00:00
|
|
|
taskIDs = append(taskIDs, k)
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err == ErrNotFound {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// now lookup each task
|
|
|
|
tasks := make([]backend.StoreTask, len(taskIDs))
|
|
|
|
if err := s.db.View(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
|
|
|
for i := range taskIDs {
|
|
|
|
// TODO(docmerlin): optimization: don't check <-ctx.Done() every time though the loop
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
// TODO(docmerlin): change the setup to reduce the number of lookups to 1 or 2.
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := taskIDs[i]
|
|
|
|
tasks[i].ID = unpadID(paddedID)
|
|
|
|
tasks[i].Script = string(b.Bucket(tasksPath).Get(paddedID))
|
|
|
|
tasks[i].Name = string(b.Bucket(nameByTaskID).Get(paddedID))
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if len(params.Org) > 0 {
|
2018-07-26 20:27:35 +00:00
|
|
|
for i := range taskIDs {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := taskIDs[i]
|
2018-07-26 20:27:35 +00:00
|
|
|
tasks[i].Org = params.Org
|
2018-08-01 18:54:32 +00:00
|
|
|
tasks[i].User = b.Bucket(userByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if len(params.User) > 0 {
|
2018-07-26 20:27:35 +00:00
|
|
|
for i := range taskIDs {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := taskIDs[i]
|
2018-07-26 20:27:35 +00:00
|
|
|
tasks[i].User = params.User
|
2018-08-01 18:54:32 +00:00
|
|
|
tasks[i].Org = b.Bucket(orgByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
for i := range taskIDs {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := taskIDs[i]
|
|
|
|
tasks[i].User = b.Bucket(userByTaskID).Get(paddedID)
|
|
|
|
tasks[i].Org = b.Bucket(orgByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// FindTaskByID finds a task with a given an ID. It will return nil if the task does not exist.
|
|
|
|
func (s *Store) FindTaskByID(ctx context.Context, id platform.ID) (*backend.StoreTask, error) {
|
|
|
|
var stmBytes []byte
|
|
|
|
var script []byte
|
2018-08-01 18:54:32 +00:00
|
|
|
var userID []byte
|
2018-07-26 20:27:35 +00:00
|
|
|
var name []byte
|
2018-08-01 18:54:32 +00:00
|
|
|
var org []byte
|
|
|
|
paddedID := padID(id)
|
|
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
2018-07-26 20:27:35 +00:00
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
script = b.Bucket(tasksPath).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
if script == nil {
|
|
|
|
return ErrNotFound
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
stmBytes = b.Bucket(taskMetaPath).Get(paddedID)
|
|
|
|
userID = b.Bucket(userByTaskID).Get(paddedID)
|
|
|
|
name = b.Bucket(nameByTaskID).Get(paddedID)
|
|
|
|
org = b.Bucket(orgByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err == ErrNotFound {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-08-02 21:02:38 +00:00
|
|
|
stm := backend.StoreTaskMeta{}
|
2018-07-26 20:27:35 +00:00
|
|
|
err = stm.Unmarshal(stmBytes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &backend.StoreTask{
|
2018-08-01 18:54:32 +00:00
|
|
|
ID: append([]byte(nil), id...), // copy of input id
|
|
|
|
Org: org,
|
2018-07-26 20:27:35 +00:00
|
|
|
User: userID,
|
|
|
|
Name: string(name),
|
|
|
|
Script: string(script),
|
|
|
|
}, err
|
|
|
|
}
|
|
|
|
|
2018-08-06 17:54:32 +00:00
|
|
|
func (s *Store) EnableTask(ctx context.Context, id platform.ID) error {
|
|
|
|
paddedID := padID(id)
|
|
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket).Bucket(taskMetaPath)
|
|
|
|
stmBytes := b.Get(paddedID)
|
|
|
|
if stmBytes == nil {
|
|
|
|
return errors.New("task meta not found")
|
|
|
|
}
|
|
|
|
stm := backend.StoreTaskMeta{}
|
|
|
|
err := stm.Unmarshal(stmBytes)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
stm.Status = string(backend.TaskEnabled)
|
|
|
|
stmBytes, err = stm.Marshal()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return b.Put(paddedID, stmBytes)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Store) DisableTask(ctx context.Context, id platform.ID) error {
|
|
|
|
paddedID := padID(id)
|
|
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket).Bucket(taskMetaPath)
|
|
|
|
stmBytes := b.Get(paddedID)
|
|
|
|
if stmBytes == nil {
|
|
|
|
return errors.New("task meta not found")
|
|
|
|
}
|
|
|
|
stm := backend.StoreTaskMeta{}
|
|
|
|
err := stm.Unmarshal(stmBytes)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
stm.Status = string(backend.TaskDisabled)
|
|
|
|
stmBytes, err = stm.Marshal()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return b.Put(paddedID, stmBytes)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-08-02 21:02:38 +00:00
|
|
|
func (s *Store) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend.StoreTaskMeta, error) {
|
2018-07-31 15:47:12 +00:00
|
|
|
var stmBytes []byte
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := padID(id)
|
|
|
|
err := s.db.View(func(tx *bolt.Tx) error {
|
2018-07-31 15:47:12 +00:00
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
stmBytes = b.Bucket(taskMetaPath).Get(paddedID)
|
2018-07-31 15:47:12 +00:00
|
|
|
if stmBytes == nil {
|
2018-07-31 16:23:35 +00:00
|
|
|
return errors.New("task meta not found")
|
2018-07-31 15:47:12 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-08-02 21:02:38 +00:00
|
|
|
stm := backend.StoreTaskMeta{}
|
2018-07-31 15:47:12 +00:00
|
|
|
err = stm.Unmarshal(stmBytes)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-31 16:23:35 +00:00
|
|
|
return &stm, nil
|
2018-07-31 15:47:12 +00:00
|
|
|
}
|
|
|
|
|
2018-07-26 20:27:35 +00:00
|
|
|
// DeleteTask deletes the task
|
|
|
|
func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := padID(id)
|
2018-07-26 20:27:35 +00:00
|
|
|
err = s.db.Batch(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
if check := b.Bucket(tasksPath).Get(paddedID); check == nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return ErrNotFound
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(taskMetaPath).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(tasksPath).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
user := b.Bucket(userByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
if len(user) > 0 {
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(usersPath).Bucket(user).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(userByTaskID).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(nameByTaskID).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
org := b.Bucket(orgByTaskID).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
if len(org) > 0 {
|
2018-08-01 18:54:32 +00:00
|
|
|
if err := b.Bucket(orgsPath).Bucket(org).Delete(paddedID); err != nil {
|
2018-07-26 20:27:35 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2018-08-01 18:54:32 +00:00
|
|
|
return b.Bucket(orgByTaskID).Delete(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
})
|
|
|
|
if err == ErrNotFound {
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// CreateRun adds `now` to the task's metaData if we have not exceeded 'max_concurrency'.
|
2018-08-14 21:24:15 +00:00
|
|
|
func (s *Store) CreateRun(ctx context.Context, taskID platform.ID, now int64) (backend.QueuedRun, error) {
|
|
|
|
queuedRun := backend.QueuedRun{TaskID: append([]byte(nil), taskID...), Now: now}
|
2018-08-02 21:02:38 +00:00
|
|
|
stm := backend.StoreTaskMeta{}
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := padID(taskID)
|
2018-07-26 20:27:35 +00:00
|
|
|
if err := s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
stmBytes := b.Bucket(taskMetaPath).Get(paddedID)
|
2018-07-26 20:27:35 +00:00
|
|
|
if err := stm.Unmarshal(stmBytes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(stm.CurrentlyRunning) >= int(stm.MaxConcurrency) {
|
|
|
|
return ErrMaxConcurrency
|
|
|
|
}
|
2018-08-02 21:02:38 +00:00
|
|
|
|
|
|
|
id := make(platform.ID, 8)
|
|
|
|
idi, err := b.Bucket(runIDs).NextSequence()
|
2018-07-26 20:27:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-02 21:02:38 +00:00
|
|
|
binary.BigEndian.PutUint64(id, idi)
|
|
|
|
running := &backend.StoreTaskMetaRun{
|
|
|
|
Now: now,
|
|
|
|
Try: 1,
|
|
|
|
RunID: id,
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
stm.CurrentlyRunning = append(stm.CurrentlyRunning, running)
|
|
|
|
stmBytes, err = stm.Marshal()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-14 21:24:15 +00:00
|
|
|
queuedRun.RunID = id
|
2018-07-26 20:27:35 +00:00
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(paddedID, stmBytes)
|
2018-07-26 20:27:35 +00:00
|
|
|
}); err != nil {
|
2018-08-14 21:24:15 +00:00
|
|
|
return queuedRun, err
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
2018-08-14 21:24:15 +00:00
|
|
|
return queuedRun, nil
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
|
|
|
|
func (s *Store) FinishRun(ctx context.Context, taskID, runID platform.ID) error {
|
2018-08-01 18:54:32 +00:00
|
|
|
paddedID := padID(taskID)
|
|
|
|
|
2018-07-26 20:27:35 +00:00
|
|
|
return s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
stmBytes := b.Bucket(taskMetaPath).Get(paddedID)
|
2018-08-14 21:00:37 +00:00
|
|
|
var stm backend.StoreTaskMeta
|
2018-07-26 20:27:35 +00:00
|
|
|
if err := stm.Unmarshal(stmBytes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-08-14 21:00:37 +00:00
|
|
|
if !stm.FinishRun(runID) {
|
2018-07-26 20:27:35 +00:00
|
|
|
return ErrRunNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
stmBytes, err := stm.Marshal()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(paddedID, stmBytes)
|
2018-07-26 20:27:35 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the store
|
|
|
|
func (s *Store) Close() error {
|
|
|
|
return s.db.Close()
|
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
// unpadID returns a copy of id with leading 0-bytes removed.
|
|
|
|
// This allows user-facing IDs to look prettier.
|
|
|
|
func unpadID(id platform.ID) platform.ID {
|
|
|
|
trimmed := bytes.TrimLeft(id, "\x00")
|
|
|
|
return append([]byte(nil), trimmed...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// padID returns an id, copying it and padding it with leading `0` bytes, if it is less than 8 long.
|
|
|
|
// it does not copy the id if it is already 8 long
|
|
|
|
// This allows us to accept pretty user-facing IDs but pad them internally for boltdb sorting.
|
|
|
|
func padID(id platform.ID) platform.ID {
|
|
|
|
if len(id) >= 8 {
|
|
|
|
// don't pad if the id is long enough
|
|
|
|
return id
|
2018-08-01 12:50:06 +00:00
|
|
|
}
|
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
var buf [8]byte
|
|
|
|
copy(buf[len(buf)-len(id):], id)
|
|
|
|
return buf[:]
|
|
|
|
}
|
2018-08-01 12:50:06 +00:00
|
|
|
|
2018-08-01 18:54:32 +00:00
|
|
|
// DeleteUser syncronously deletes a user and all their tasks from a bolt store.
|
|
|
|
func (s *Store) DeleteUser(ctx context.Context, id platform.ID) error {
|
|
|
|
userID := padID(id)
|
|
|
|
err := s.db.Update(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
|
|
|
ub := b.Bucket(usersPath).Bucket(userID)
|
2018-07-31 15:59:59 +00:00
|
|
|
if ub == nil {
|
|
|
|
return backend.ErrUserNotFound
|
|
|
|
}
|
|
|
|
c := ub.Cursor()
|
|
|
|
i := 0
|
|
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
|
|
i++
|
|
|
|
// check for cancelation every 256 tasks deleted
|
|
|
|
if i&0xFF == 0 {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := b.Bucket(tasksPath).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(taskMetaPath).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(orgByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(userByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(nameByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
org := b.Bucket(orgByTaskID).Get(k)
|
|
|
|
if len(org) > 0 {
|
|
|
|
ob := b.Bucket(orgsPath).Bucket(org)
|
|
|
|
if ob != nil {
|
|
|
|
if err := ob.Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// check for cancelation one last time before we return
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2018-08-01 18:54:32 +00:00
|
|
|
return b.Bucket(usersPath).DeleteBucket(userID)
|
2018-07-31 15:59:59 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteOrg syncronously deletes an org and all their tasks from a bolt store.
|
|
|
|
func (s *Store) DeleteOrg(ctx context.Context, id platform.ID) error {
|
2018-08-01 18:54:32 +00:00
|
|
|
orgID := padID(id)
|
2018-07-31 15:59:59 +00:00
|
|
|
return s.db.Batch(func(tx *bolt.Tx) error {
|
|
|
|
b := tx.Bucket(s.bucket)
|
2018-08-01 18:54:32 +00:00
|
|
|
ob := b.Bucket(orgsPath).Bucket(orgID)
|
2018-07-31 15:59:59 +00:00
|
|
|
if ob == nil {
|
|
|
|
return backend.ErrOrgNotFound
|
|
|
|
}
|
|
|
|
c := ob.Cursor()
|
|
|
|
i := 0
|
|
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
|
|
i++
|
|
|
|
// check for cancelation every 256 tasks deleted
|
|
|
|
if i&0xFF == 0 {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := b.Bucket(tasksPath).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(taskMetaPath).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(orgByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(userByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := b.Bucket(nameByTaskID).Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
user := b.Bucket(userByTaskID).Get(k)
|
|
|
|
if len(user) > 0 {
|
|
|
|
ub := b.Bucket(usersPath).Bucket(user)
|
|
|
|
if ub != nil {
|
|
|
|
if err := ub.Delete(k); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// check for cancelation one last time before we return
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
default:
|
2018-08-01 18:54:32 +00:00
|
|
|
return b.Bucket(orgsPath).DeleteBucket(orgID)
|
2018-07-31 15:59:59 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|