fix(task): Remove older unused task store code. (#14068)

* fix(task): Remove older unused task store code.
pull/14064/head
Lyon Hill 2019-06-06 10:05:27 -06:00 committed by GitHub
parent eeb32beb49
commit ab79b68840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 204 additions and 7104 deletions

View File

@ -13,7 +13,7 @@
# SUBDIRS are directories that have their own Makefile.
# It is required that all subdirs have the `all` and `clean` targets.
SUBDIRS := http ui chronograf query storage task
SUBDIRS := http ui chronograf query storage
GO_ARGS=-tags '$(GO_TAGS)'
# Test vars can be used by all recursive Makefiles

View File

@ -8,79 +8,82 @@ import (
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kv"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task"
"github.com/influxdata/influxdb/task/backend"
tmock "github.com/influxdata/influxdb/task/mock"
"github.com/influxdata/influxdb/task/servicetest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
store := backend.NewInMemStore()
rrw := backend.NewInMemRunReaderWriter()
sch := tmock.NewScheduler()
ctx, cancel := context.WithCancel(context.Background())
i := inmem.NewService()
backingTS := task.PlatformAdapter(store, rrw, sch, i, i, i)
h := http.NewAuthenticationHandler()
h.AuthorizationService = i
th := http.NewTaskHandler(http.NewMockTaskBackend(t))
th.TaskService = backingTS
th.AuthorizationService = i
th.OrganizationService = i
th.UserService = i
th.UserResourceMappingService = i
h.Handler = th
org := &platform.Organization{Name: t.Name() + "_org"}
if err := i.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}
user := &platform.User{Name: t.Name() + "_user"}
if err := i.CreateUser(ctx, user); err != nil {
t.Fatal(err)
}
auth := platform.Authorization{UserID: user.ID, OrgID: org.ID}
if err := i.CreateAuthorization(ctx, &auth); err != nil {
t.Fatal(err)
}
server := httptest.NewServer(h)
go func() {
<-ctx.Done()
server.Close()
}()
taskService := http.TaskService{
Addr: server.URL,
Token: auth.Token,
}
cFunc := func() (servicetest.TestCreds, error) {
return servicetest.TestCreds{
OrgID: org.ID,
Org: org.Name,
UserID: user.ID,
AuthorizationID: auth.ID,
Token: auth.Token,
}, nil
}
return &servicetest.System{
TaskControlService: backend.TaskControlAdaptor(store, rrw, rrw),
TaskService: taskService,
Ctx: ctx,
I: i,
CredsFunc: cFunc,
}, cancel
}
func TestTaskService(t *testing.T) {
t.Parallel()
servicetest.TestTaskService(
t,
func(t *testing.T) (*servicetest.System, context.CancelFunc) {
servicetest.TestTaskService(t, httpTaskServiceFactory)
service := kv.NewService(inmem.NewKVStore())
ctx, cancelFunc := context.WithCancel(context.Background())
if err := service.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
}
h := http.NewAuthenticationHandler()
h.AuthorizationService = service
th := http.NewTaskHandler(&http.TaskBackend{
Logger: zaptest.NewLogger(t).With(zap.String("handler", "task")),
TaskService: service,
AuthorizationService: service,
OrganizationService: service,
UserResourceMappingService: service,
LabelService: service,
UserService: service,
BucketService: service,
})
h.Handler = th
org := &platform.Organization{Name: t.Name() + "_org"}
if err := service.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}
user := &platform.User{Name: t.Name() + "_user"}
if err := service.CreateUser(ctx, user); err != nil {
t.Fatal(err)
}
auth := platform.Authorization{UserID: user.ID, OrgID: org.ID}
if err := service.CreateAuthorization(ctx, &auth); err != nil {
t.Fatal(err)
}
server := httptest.NewServer(h)
go func() {
<-ctx.Done()
server.Close()
}()
taskService := http.TaskService{
Addr: server.URL,
Token: auth.Token,
}
cFunc := func() (servicetest.TestCreds, error) {
return servicetest.TestCreds{
OrgID: org.ID,
Org: org.Name,
UserID: user.ID,
AuthorizationID: auth.ID,
Token: auth.Token,
}, nil
}
return &servicetest.System{
TaskControlService: service,
TaskService: taskService,
I: service,
Ctx: ctx,
CredsFunc: cFunc,
}, cancelFunc
},
"transactional",
)
}

View File

@ -1,26 +0,0 @@
# List any generated files here
TARGETS =
# List any source files used to generate the targets here
SOURCES =
# List any directories that have their own Makefile here
SUBDIRS = backend
# Default target
all: $(SUBDIRS) $(TARGETS)
# Recurse into subdirs for same make goal
$(SUBDIRS):
$(MAKE) -C $@ $(MAKECMDGOALS)
# Clean all targets recursively
clean: $(SUBDIRS)
rm -f $(TARGETS)
# Define go generate if not already defined
GO_GENERATE := go generate
# Run go generate for the targets
$(TARGETS): $(SOURCES)
$(GO_GENERATE) -x
.PHONY: all clean $(SUBDIRS)

View File

@ -1,26 +0,0 @@
# List any generated files here
TARGETS = meta.pb.go
# List any source files used to generate the targets here
SOURCES = meta.proto
# List any directories that have their own Makefile here
SUBDIRS =
# Default target
all: $(SUBDIRS) $(TARGETS)
# Recurse into subdirs for same make goal
$(SUBDIRS):
$(MAKE) -C $@ $(MAKECMDGOALS)
# Clean all targets recursively
clean: $(SUBDIRS)
rm -f $(TARGETS)
# Define go generate if not already defined
GO_GENERATE := go generate
# Run go generate for the targets
$(TARGETS): $(SOURCES)
$(GO_GENERATE) -x
.PHONY: all clean $(SUBDIRS)

View File

@ -3,12 +3,14 @@ package backend
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
platform "github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
@ -16,6 +18,29 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
const (
runIDField = "runID"
scheduledForField = "scheduledFor"
startedAtField = "startedAt"
finishedAtField = "finishedAt"
requestedAtField = "requestedAt"
statusField = "status"
logField = "logs"
taskIDTag = "taskID"
// Fixed system bucket ID for task and run logs.
taskSystemBucketID platform.ID = 10
)
var (
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = errors.New("task not found")
// ErrRunNotFound is returned when searching for a single run that doesn't exist.
ErrRunNotFound = errors.New("run not found")
)
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
func NewAnalyticalStorage(ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
return &AnalyticalStorage{

View File

@ -1,717 +0,0 @@
// Package bolt provides an bolt-backed store implementation.
//
// The data stored in bolt is structured as follows:
//
// bucket(/tasks/v1/tasks) key(:task_id) -> Content of submitted task (i.e. flux code).
// bucket(/tasks/v1/task_meta) key(:task_id) -> Protocol Buffer encoded backend.StoreTaskMeta,
// so we have a consistent view of runs in progress and max concurrency.
// bucket(/tasks/v1/org_by_task_id) key(task_id) -> The organization ID (stored as encoded string) associated with given task.
// bucket(/tasks/v1/name_by_task_id) key(:task_id) -> The user-supplied name of the script.
// bucket(/tasks/v1/run_ids) -> Counter for run IDs
// bucket(/tasks/v1/orgs).bucket(:org_id) key(:task_id) -> Empty content; presence of :task_id allows for lookup from org to tasks.
// Note that task IDs are stored big-endian uint64s for sorting purposes,
// but presented to the users with leading 0-bytes stripped.
// Like other components of the system, IDs presented to users may be `0f12` rather than `f12`.
package bolt
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"time"
bolt "github.com/coreos/bbolt"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
)
// ErrDBReadOnly is an error for when the database is set to read only.
// Tasks needs to be able to write to the db.
var ErrDBReadOnly = errors.New("db is read only")
// ErrMaxConcurrency is an error for when the max concurrency is already
// reached for a task when you try to schedule a task.
var ErrMaxConcurrency = errors.New("max concurrency reached")
// ErrRunNotFound is an error for when a run isn't found in a FinishRun method.
var ErrRunNotFound = errors.New("run not found")
// ErrNotFound is an error for when a task could not be found
var ErrNotFound = errors.New("task not found")
// Store is task store for bolt.
type Store struct {
db *bolt.DB
bucket []byte
idGen platform.IDGenerator
minLatestCompleted int64
}
const basePath = "/tasks/v1/"
var (
tasksPath = []byte(basePath + "tasks")
orgsPath = []byte(basePath + "orgs")
taskMetaPath = []byte(basePath + "task_meta")
orgByTaskID = []byte(basePath + "org_by_task_id")
nameByTaskID = []byte(basePath + "name_by_task_id")
runIDs = []byte(basePath + "run_ids")
)
// Option is a optional configuration for the store.
type Option func(*Store)
// NoCatchUp allows you to skip any task that was supposed to run during down time.
func NoCatchUp(st *Store) { st.minLatestCompleted = time.Now().Unix() }
// New gives us a new Store based on "github.com/coreos/bbolt"
func New(db *bolt.DB, rootBucket string, opts ...Option) (*Store, error) {
if db.IsReadOnly() {
return nil, ErrDBReadOnly
}
bucket := []byte(rootBucket)
err := db.Update(func(tx *bolt.Tx) error {
// create root
root, err := tx.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
// create the buckets inside the root
for _, b := range [][]byte{
tasksPath, orgsPath, taskMetaPath,
orgByTaskID, nameByTaskID, runIDs,
} {
_, err := root.CreateBucketIfNotExists(b)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
st := &Store{db: db, bucket: bucket, idGen: snowflake.NewDefaultIDGenerator(), minLatestCompleted: math.MinInt64}
for _, opt := range opts {
opt(st)
}
return st, nil
}
// CreateTask creates a task in the boltdb task store.
func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (platform.ID, error) {
o, err := backend.StoreValidator.CreateArgs(req)
if err != nil {
return platform.InvalidID(), err
}
// Get ID
id := s.idGen.ID()
err = s.db.Update(func(tx *bolt.Tx) error {
// get the root bucket
b := tx.Bucket(s.bucket)
name := []byte(o.Name)
// Encode ID
encodedID, err := id.Encode()
if err != nil {
return err
}
// write script
err = b.Bucket(tasksPath).Put(encodedID, []byte(req.Script))
if err != nil {
return err
}
// name
err = b.Bucket(nameByTaskID).Put(encodedID, name)
if err != nil {
return err
}
// Encode org ID
encodedOrg, err := req.Org.Encode()
if err != nil {
return err
}
// org
orgB, err := b.Bucket(orgsPath).CreateBucketIfNotExists(encodedOrg)
if err != nil {
return err
}
err = orgB.Put(encodedID, nil)
if err != nil {
return err
}
err = b.Bucket(orgByTaskID).Put(encodedID, encodedOrg)
if err != nil {
return err
}
stm := backend.NewStoreTaskMeta(req, o)
stmBytes, err := stm.Marshal()
if err != nil {
return err
}
metaB := b.Bucket(taskMetaPath)
return metaB.Put(encodedID, stmBytes)
})
if err != nil {
return platform.InvalidID(), err
}
return id, nil
}
func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) {
var res backend.UpdateTaskResult
op, err := backend.StoreValidator.UpdateArgs(req)
if err != nil {
return res, err
}
encodedID, err := req.ID.Encode()
if err != nil {
return res, err
}
err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
bt := b.Bucket(tasksPath)
v := bt.Get(encodedID)
if v == nil {
return backend.ErrTaskNotFound
}
res.OldScript = string(v)
if res.OldScript == "" {
return errors.New("task script not stored properly")
}
var newScript string
if !req.Options.IsZero() || req.Script != "" {
if err = req.UpdateFlux(res.OldScript); err != nil {
return err
}
newScript = req.Script
}
if req.Script == "" {
// Need to build op from existing script.
op, err = options.FromScript(res.OldScript)
if err != nil {
return err
}
newScript = res.OldScript
} else {
op, err = options.FromScript(req.Script)
if err != nil {
return err
}
if err := bt.Put(encodedID, []byte(req.Script)); err != nil {
return err
}
if err := b.Bucket(nameByTaskID).Put(encodedID, []byte(op.Name)); err != nil {
return err
}
}
var orgID platform.ID
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
return err
}
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
if stmBytes == nil {
return backend.ErrTaskNotFound
}
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(stmBytes); err != nil {
return err
}
stm.UpdatedAt = time.Now().Unix()
res.OldStatus = backend.TaskStatus(stm.Status)
if req.Status != "" {
stm.Status = string(req.Status)
}
if req.AuthorizationID.Valid() {
stm.AuthorizationID = uint64(req.AuthorizationID)
}
stmBytes, err = stm.Marshal()
if err != nil {
return err
}
if err := b.Bucket(taskMetaPath).Put(encodedID, stmBytes); err != nil {
return err
}
res.NewMeta = stm
res.NewTask = backend.StoreTask{
ID: req.ID,
Org: orgID,
Name: op.Name,
Script: newScript,
}
return nil
})
return res, err
}
// ListTasks lists the tasks based on a filter.
func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams) ([]backend.StoreTaskWithMeta, error) {
if params.PageSize < 0 {
return nil, errors.New("ListTasks: PageSize must be positive")
}
if params.PageSize > platform.TaskMaxPageSize {
return nil, fmt.Errorf("ListTasks: PageSize exceeds maximum of %d", platform.TaskMaxPageSize)
}
lim := params.PageSize
if lim == 0 {
lim = platform.TaskDefaultPageSize
}
taskIDs := make([]platform.ID, 0, lim)
var tasks []backend.StoreTaskWithMeta
if err := s.db.View(func(tx *bolt.Tx) error {
var c *bolt.Cursor
b := tx.Bucket(s.bucket)
if params.Org.Valid() {
encodedOrg, err := params.Org.Encode()
if err != nil {
return err
}
orgB := b.Bucket(orgsPath).Bucket(encodedOrg)
if orgB == nil {
return ErrNotFound
}
c = orgB.Cursor()
} else {
c = b.Bucket(tasksPath).Cursor()
}
if params.After.Valid() {
encodedAfter, err := params.After.Encode()
if err != nil {
return err
}
// If the taskID returned by c.Seek is greater than after param, append taskID to taskIDs.
k, _ := c.Seek(encodedAfter)
if bytes.Compare(k, encodedAfter) > 0 {
var nID platform.ID
if err := nID.Decode(k); err != nil {
return err
}
taskIDs = append(taskIDs, nID)
}
for k, _ := c.Next(); k != nil && len(taskIDs) < lim; k, _ = c.Next() {
var nID platform.ID
if err := nID.Decode(k); err != nil {
return err
}
taskIDs = append(taskIDs, nID)
}
} else {
for k, _ := c.First(); k != nil && len(taskIDs) < lim; k, _ = c.Next() {
var nID platform.ID
if err := nID.Decode(k); err != nil {
return err
}
taskIDs = append(taskIDs, nID)
}
}
tasks = make([]backend.StoreTaskWithMeta, len(taskIDs))
for i := range taskIDs {
// TODO(docmerlin): optimization: don't check <-ctx.Done() every time though the loop
select {
case <-ctx.Done():
return ctx.Err()
default:
// TODO(docmerlin): change the setup to reduce the number of lookups to 1 or 2.
encodedID, err := taskIDs[i].Encode()
if err != nil {
return err
}
tasks[i].Task.ID = taskIDs[i]
tasks[i].Task.Script = string(b.Bucket(tasksPath).Get(encodedID))
tasks[i].Task.Name = string(b.Bucket(nameByTaskID).Get(encodedID))
}
}
if params.Org.Valid() {
for i := range taskIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
tasks[i].Task.Org = params.Org
}
}
goto POPULATE_META
}
for i := range taskIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
encodedID, err := taskIDs[i].Encode()
if err != nil {
return err
}
var orgID platform.ID
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
return err
}
tasks[i].Task.Org = orgID
}
}
POPULATE_META:
for i := range taskIDs {
select {
case <-ctx.Done():
return ctx.Err()
default:
encodedID, err := taskIDs[i].Encode()
if err != nil {
return err
}
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(b.Bucket(taskMetaPath).Get(encodedID)); err != nil {
return err
}
if stm.LatestCompleted < s.minLatestCompleted {
stm.LatestCompleted = s.minLatestCompleted
stm.AlignLatestCompleted()
}
tasks[i].Meta = stm
}
}
return nil
}); err != nil {
if err == ErrNotFound {
return nil, nil
}
return nil, err
}
return tasks, nil
}
// FindTaskByID finds a task with a given an ID. It will return nil if the task does not exist.
func (s *Store) FindTaskByID(ctx context.Context, id platform.ID) (*backend.StoreTask, error) {
var orgID platform.ID
var script, name string
encodedID, err := id.Encode()
if err != nil {
return nil, err
}
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
scriptBytes := b.Bucket(tasksPath).Get(encodedID)
if scriptBytes == nil {
return backend.ErrTaskNotFound
}
script = string(scriptBytes)
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
return err
}
name = string(b.Bucket(nameByTaskID).Get(encodedID))
return nil
})
if err != nil {
return nil, err
}
return &backend.StoreTask{
ID: id,
Org: orgID,
Name: name,
Script: script,
}, err
}
func (s *Store) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend.StoreTaskMeta, error) {
var stm backend.StoreTaskMeta
encodedID, err := id.Encode()
if err != nil {
return nil, err
}
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
if stmBytes == nil {
return backend.ErrTaskNotFound
}
return stm.Unmarshal(stmBytes)
})
if err != nil {
return nil, err
}
if stm.LatestCompleted < s.minLatestCompleted {
stm.LatestCompleted = s.minLatestCompleted
stm.AlignLatestCompleted()
}
return &stm, nil
}
func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*backend.StoreTask, *backend.StoreTaskMeta, error) {
var stmBytes []byte
var orgID platform.ID
var script, name string
encodedID, err := id.Encode()
if err != nil {
return nil, nil, err
}
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
scriptBytes := b.Bucket(tasksPath).Get(encodedID)
if scriptBytes == nil {
return backend.ErrTaskNotFound
}
script = string(scriptBytes)
// Assign copies of everything so we don't hold a stale reference to a bolt-maintained byte slice.
stmBytes = append(stmBytes, b.Bucket(taskMetaPath).Get(encodedID)...)
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
return err
}
name = string(b.Bucket(nameByTaskID).Get(encodedID))
return nil
})
if err != nil {
return nil, nil, err
}
stm := backend.StoreTaskMeta{}
if err := stm.Unmarshal(stmBytes); err != nil {
return nil, nil, err
}
if stm.LatestCompleted < s.minLatestCompleted {
stm.LatestCompleted = s.minLatestCompleted
stm.AlignLatestCompleted()
}
return &backend.StoreTask{
ID: id,
Org: orgID,
Name: name,
Script: script,
}, &stm, nil
}
// DeleteTask deletes the task.
func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
encodedID, err := id.Encode()
if err != nil {
return false, err
}
err = s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
if check := b.Bucket(tasksPath).Get(encodedID); check == nil {
return backend.ErrTaskNotFound
}
if err := b.Bucket(taskMetaPath).Delete(encodedID); err != nil {
return err
}
if err := b.Bucket(tasksPath).Delete(encodedID); err != nil {
return err
}
if err := b.Bucket(nameByTaskID).Delete(encodedID); err != nil {
return err
}
org := b.Bucket(orgByTaskID).Get(encodedID)
if len(org) > 0 {
if err := b.Bucket(orgsPath).Bucket(org).Delete(encodedID); err != nil {
return err
}
}
return b.Bucket(orgByTaskID).Delete(encodedID)
})
if err != nil {
if err == backend.ErrTaskNotFound {
return false, nil
}
return false, err
}
return true, nil
}
func (s *Store) CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) {
var rc backend.RunCreation
encodedID, err := taskID.Encode()
if err != nil {
return rc, err
}
if err := s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
if stmBytes == nil {
return backend.ErrTaskNotFound
}
var stm backend.StoreTaskMeta
err := stm.Unmarshal(stmBytes)
if err != nil {
return err
}
if stm.LatestCompleted < s.minLatestCompleted {
stm.LatestCompleted = s.minLatestCompleted
stm.AlignLatestCompleted()
}
rc, err = stm.CreateNextRun(now, func() (platform.ID, error) {
return s.idGen.ID(), nil
})
if err != nil {
return err
}
rc.Created.TaskID = taskID
stmBytes, err = stm.Marshal()
if err != nil {
return err
}
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(encodedID, stmBytes)
}); err != nil {
return backend.RunCreation{}, err
}
return rc, nil
}
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (s *Store) FinishRun(ctx context.Context, taskID, runID platform.ID) error {
encodedID, err := taskID.Encode()
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(stmBytes); err != nil {
return err
}
if !stm.FinishRun(runID) {
return ErrRunNotFound
}
stmBytes, err := stm.Marshal()
if err != nil {
return err
}
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(encodedID, stmBytes)
})
}
func (s *Store) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) (*backend.StoreTaskMetaManualRun, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, err
}
var mRun *backend.StoreTaskMetaManualRun
if err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(stmBytes); err != nil {
return err
}
makeID := func() (platform.ID, error) { return s.idGen.ID(), nil }
if err := stm.ManuallyRunTimeRange(start, end, requestedAt, makeID); err != nil {
return err
}
stmBytes, err := stm.Marshal()
if err != nil {
return err
}
mRun = stm.ManualRuns[len(stm.ManualRuns)-1]
return tx.Bucket(s.bucket).Bucket(taskMetaPath).Put(encodedID, stmBytes)
}); err != nil {
return nil, err
}
return mRun, nil
}
// Close closes the store
func (s *Store) Close() error {
return s.db.Close()
}
// DeleteOrg synchronously deletes an org and all their tasks from a bolt store.
func (s *Store) DeleteOrg(ctx context.Context, id platform.ID) error {
orgID, err := id.Encode()
if err != nil {
return err
}
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
ob := b.Bucket(orgsPath).Bucket(orgID)
if ob == nil {
return backend.ErrOrgNotFound
}
c := ob.Cursor()
i := 0
for k, _ := c.First(); k != nil; k, _ = c.Next() {
i++
// check for cancelation every 256 tasks deleted
if i&0xFF == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
if err := b.Bucket(tasksPath).Delete(k); err != nil {
return err
}
if err := b.Bucket(taskMetaPath).Delete(k); err != nil {
return err
}
if err := b.Bucket(orgByTaskID).Delete(k); err != nil {
return err
}
if err := b.Bucket(nameByTaskID).Delete(k); err != nil {
return err
}
}
// check for cancelation one last time before we return
select {
case <-ctx.Done():
return ctx.Err()
default:
return b.Bucket(orgsPath).DeleteBucket(orgID)
}
})
}

View File

@ -1,147 +0,0 @@
package bolt_test
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
bolt "github.com/coreos/bbolt"
"github.com/influxdata/influxdb"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
boltstore "github.com/influxdata/influxdb/task/backend/bolt"
"github.com/influxdata/influxdb/task/backend/storetest"
"github.com/influxdata/influxdb/task/options"
)
func init() {
// TODO(mr): remove as part of https://github.com/influxdata/platform/issues/484.
options.EnableScriptCacheForTest()
}
func TestBoltStore(t *testing.T) {
var f *os.File
storetest.NewStoreTest(
"boltstore",
func(t *testing.T) backend.Store {
var err error
f, err = ioutil.TempFile("", "influx_bolt_task_store_test")
if err != nil {
t.Fatalf("failed to create tempfile for test db %v\n", err)
}
db, err := bolt.Open(f.Name(), os.ModeTemporary, nil)
if err != nil {
t.Fatalf("failed to open bolt db for test db %v\n", err)
}
s, err := boltstore.New(db, "testbucket")
if err != nil {
t.Fatalf("failed to create new bolt store %v\n", err)
}
return s
},
func(t *testing.T, s backend.Store) {
if err := s.Close(); err != nil {
t.Error(err)
}
err := os.Remove(f.Name())
if err != nil {
t.Error(err)
}
},
)(t)
}
func TestSkip(t *testing.T) {
f, err := ioutil.TempFile("", "influx_bolt_task_store_test")
if err != nil {
t.Fatalf("failed to create tempfile for test db %v\n", err)
}
defer f.Close()
defer os.Remove(f.Name())
db, err := bolt.Open(f.Name(), os.ModeTemporary, nil)
if err != nil {
t.Fatalf("failed to open bolt db for test db %v\n", err)
}
s, err := boltstore.New(db, "testbucket")
if err != nil {
t.Fatalf("failed to create new bolt store %v\n", err)
}
schedAfter := time.Now().Add(-time.Minute)
tskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{
Org: influxdb.ID(1),
AuthorizationID: influxdb.ID(2),
Script: `option task = {name:"x", every:1s} from(bucket:"b-src") |> range(start:-1m) |> to(bucket:"b-dst", org:"o")`,
ScheduleAfter: schedAfter.Unix(),
Status: backend.TaskActive,
})
if err != nil {
t.Fatalf("failed to create new task %v\n", err)
}
rc, err := s.CreateNextRun(context.Background(), tskID, schedAfter.Add(10*time.Second).Unix())
if err != nil {
t.Fatalf("failed to create new run %v\n", err)
}
if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil {
t.Fatalf("failed to finish run %v\n", err)
}
meta, err := s.FindTaskMetaByID(context.Background(), tskID)
if err != nil {
t.Fatalf("failed to pull meta %v\n", err)
}
if meta.LatestCompleted <= schedAfter.Unix() {
t.Fatal("failed to update latestCompleted")
}
latestCompleted := meta.LatestCompleted
s.Close()
db, err = bolt.Open(f.Name(), os.ModeTemporary, nil)
if err != nil {
t.Fatalf("failed to open bolt db for test db %v\n", err)
}
s, err = boltstore.New(db, "testbucket", boltstore.NoCatchUp)
if err != nil {
t.Fatalf("failed to create new bolt store %v\n", err)
}
defer s.Close()
meta, err = s.FindTaskMetaByID(context.Background(), tskID)
if err != nil {
t.Fatalf("failed to pull meta %v\n", err)
}
if meta.LatestCompleted == latestCompleted {
t.Fatal("failed to overwrite latest completed on new meta pull")
}
latestCompleted = meta.LatestCompleted
rc, err = s.CreateNextRun(context.Background(), tskID, time.Now().Add(10*time.Second).Unix())
if err != nil {
t.Fatalf("failed to create new run %v\n", err)
}
if err := s.FinishRun(context.Background(), tskID, rc.Created.RunID); err != nil {
t.Fatalf("failed to finish run %v\n", err)
}
tasks, err := s.ListTasks(context.Background(), backend.TaskSearchParams{})
if err != nil {
t.Fatalf("failed to pull meta %v\n", err)
}
if len(tasks) != 1 {
t.Fatal("task not found")
}
if tasks[0].Meta.LatestCompleted == latestCompleted {
t.Fatal("failed to run after an override")
}
}

View File

@ -18,9 +18,9 @@ import (
platform "github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/query"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/executor"
"go.uber.org/zap"
@ -230,10 +230,6 @@ func (ts tables) Do(f func(flux.Table) error) error {
func (ts tables) Statistics() flux.Statistics { return flux.Statistics{} }
type noopRunCanceler struct{}
func (noopRunCanceler) CancelRun(ctx context.Context, taskID, runID platform.ID) error { return nil }
type system struct {
name string
svc *fakeQueryService
@ -241,39 +237,45 @@ type system struct {
ex backend.Executor
// We really just want an authorization service here, but we take a whole inmem service
// to ensure that the authorization service validates org and user existence properly.
i *inmem.Service
i *kv.Service
}
type createSysFn func() *system
func createAsyncSystem() *system {
svc := newFakeQueryService()
i := inmem.NewService()
ts := task.PlatformAdapter(backend.NewInMemStore(), backend.NopLogReader{}, noopRunCanceler{}, i, i, i)
i := kv.NewService(inmem.NewKVStore())
if err := i.Initialize(context.Background()); err != nil {
panic(err)
}
return &system{
name: "AsyncExecutor",
svc: svc,
ts: ts,
ex: executor.NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, ts),
ts: i,
ex: executor.NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, i),
i: i,
}
}
func createSyncSystem() *system {
svc := newFakeQueryService()
i := inmem.NewService()
ts := task.PlatformAdapter(backend.NewInMemStore(), backend.NopLogReader{}, noopRunCanceler{}, i, i, i)
i := kv.NewService(inmem.NewKVStore())
if err := i.Initialize(context.Background()); err != nil {
panic(err)
}
return &system{
name: "SynchronousExecutor",
svc: svc,
ts: ts,
ts: i,
ex: executor.NewQueryServiceExecutor(
zap.NewNop(),
query.QueryServiceBridge{
AsyncQueryService: svc,
},
i,
ts,
i,
),
i: i,
}
@ -660,7 +662,7 @@ type testCreds struct {
Auth *platform.Authorization
}
func createCreds(t *testing.T, i *inmem.Service) testCreds {
func createCreds(t *testing.T, i *kv.Service) testCreds {
t.Helper()
org := &platform.Organization{Name: t.Name() + "-org"}

View File

@ -1,158 +0,0 @@
package backend
import (
"context"
"errors"
"sync"
"time"
platform "github.com/influxdata/influxdb"
)
// orgtask is used as a key for storing runs by org and task ID.
// This is only relevant for the in-memory run store.
type orgtask struct {
o, t platform.ID
}
type runReaderWriter struct {
mu sync.RWMutex
byOrgTask map[orgtask][]*platform.Run
byRunID map[string]*platform.Run
}
func NewInMemRunReaderWriter() *runReaderWriter {
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byOrgTask: map[orgtask][]*platform.Run{}}
}
func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
r.mu.Lock()
defer r.mu.Unlock()
timeSetter := func(r *platform.Run) {
whenStr := when.UTC().Format(time.RFC3339Nano)
switch status {
case RunStarted:
r.StartedAt = whenStr
case RunFail, RunSuccess, RunCanceled:
r.FinishedAt = whenStr
}
}
ridStr := rlb.RunID.String()
existingRun, ok := r.byRunID[ridStr]
if !ok {
sf := time.Unix(rlb.RunScheduledFor, 0).UTC()
run := &platform.Run{
ID: rlb.RunID,
TaskID: rlb.Task.ID,
Status: status.String(),
ScheduledFor: sf.Format(time.RFC3339),
}
if rlb.RequestedAt != 0 {
run.RequestedAt = time.Unix(rlb.RequestedAt, 0).UTC().Format(time.RFC3339)
}
timeSetter(run)
r.byRunID[ridStr] = run
ot := orgtask{o: rlb.Task.Org, t: rlb.Task.ID}
r.byOrgTask[ot] = append(r.byOrgTask[ot], run)
return nil
}
timeSetter(existingRun)
existingRun.Status = status.String()
return nil
}
func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error {
r.mu.Lock()
defer r.mu.Unlock()
pLog := platform.Log{Time: when.Format(time.RFC3339Nano), Message: log}
ridStr := rlb.RunID.String()
existingRun, ok := r.byRunID[ridStr]
if !ok {
return ErrRunNotFound
}
existingRun.Log = append(existingRun.Log, pLog)
return nil
}
func (r *runReaderWriter) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if !runFilter.Task.Valid() {
return nil, errors.New("task is required")
}
ex, ok := r.byOrgTask[orgtask{o: orgID, t: runFilter.Task}]
if !ok {
return []*platform.Run{}, nil
}
afterID := ""
if runFilter.After != nil {
afterID = runFilter.After.String()
}
runs := make([]*platform.Run, 0, len(ex))
for _, r := range ex {
// Skip this entry if we would be filtering it out.
if runFilter.BeforeTime != "" && runFilter.BeforeTime <= r.ScheduledFor {
continue
}
if runFilter.AfterTime != "" && runFilter.AfterTime >= r.ScheduledFor {
continue
}
if r.ID.String() <= afterID {
continue
}
// Copy the element, to avoid a data race if the original Run is modified in UpdateRunState or AddRunLog.
r := *r
runs = append(runs, &r)
if runFilter.Limit > 0 && len(runs) >= runFilter.Limit {
break
}
}
return runs, nil
}
func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
r.mu.RLock()
defer r.mu.RUnlock()
run, ok := r.byRunID[runID.String()]
if !ok {
return nil, ErrRunNotFound
}
rtnRun := *run
return &rtnRun, nil
}
func (r *runReaderWriter) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if !logFilter.Task.Valid() {
return nil, errors.New("task ID required")
}
if logFilter.Run != nil {
run, ok := r.byRunID[logFilter.Run.String()]
if !ok {
return nil, ErrRunNotFound
}
// TODO(mr): validate that task ID matches, if task is also set. Needs test.
return run.Log, nil
}
var logs []platform.Log
ot := orgtask{o: orgID, t: logFilter.Task}
for _, run := range r.byOrgTask[ot] {
logs = append(logs, run.Log...)
}
return logs, nil
}

View File

@ -1,358 +0,0 @@
package backend
import (
"context"
"errors"
"fmt"
"sync"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/options"
)
var _ Store = (*inmem)(nil)
// inmem is an in-memory task store.
type inmem struct {
idgen platform.IDGenerator
mu sync.RWMutex
// It might be more natural to use a map of ID to task,
// but then we wouldn't have guaranteed ordering for paging.
tasks []StoreTask
meta map[platform.ID]StoreTaskMeta
}
// NewInMemStore returns a new in-memory store.
// This store is not designed to be efficient, it is here for testing purposes.
func NewInMemStore() Store {
return &inmem{
idgen: snowflake.NewIDGenerator(),
meta: map[platform.ID]StoreTaskMeta{},
}
}
func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.ID, error) {
o, err := StoreValidator.CreateArgs(req)
if err != nil {
return platform.InvalidID(), err
}
id := s.idgen.ID()
task := StoreTask{
ID: id,
Org: req.Org,
Name: o.Name,
Script: req.Script,
}
s.mu.Lock()
defer s.mu.Unlock()
s.tasks = append(s.tasks, task)
s.meta[id] = NewStoreTaskMeta(req, o)
return id, nil
}
func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTaskResult, error) {
var res UpdateTaskResult
op, err := StoreValidator.UpdateArgs(req)
if err != nil {
return res, err
}
idStr := req.ID.String()
s.mu.Lock()
defer s.mu.Unlock()
found := false
for n, t := range s.tasks {
if t.ID != req.ID {
continue
}
found = true
res.OldScript = t.Script
if err = req.UpdateFlux(t.Script); err != nil {
return res, err
}
if req.Script == "" {
op, err = options.FromScript(t.Script)
if err != nil {
return res, err
}
} else {
t.Script = req.Script
}
t.Name = op.Name
s.tasks[n] = t
res.NewTask = t
break
}
if !found {
return res, fmt.Errorf("modifyTask: record not found for %s", idStr)
}
stm, ok := s.meta[req.ID]
if !ok {
panic("inmem store: had task without runner for task ID " + idStr)
}
stm.UpdatedAt = time.Now().Unix()
res.OldStatus = TaskStatus(stm.Status)
if req.Status != "" {
// Changing the status.
stm.Status = string(req.Status)
}
if req.AuthorizationID.Valid() {
stm.AuthorizationID = uint64(req.AuthorizationID)
}
s.meta[req.ID] = stm
res.NewMeta = stm
return res, nil
}
func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTaskWithMeta, error) {
if params.PageSize < 0 {
return nil, errors.New("ListTasks: PageSize must be positive")
}
if params.PageSize > platform.TaskMaxPageSize {
return nil, fmt.Errorf("ListTasks: PageSize exceeds maximum of %d", platform.TaskMaxPageSize)
}
lim := params.PageSize
if lim == 0 {
lim = platform.TaskDefaultPageSize
}
out := make([]StoreTaskWithMeta, 0, lim)
org := params.Org
var after platform.ID
if !params.After.Valid() {
after = platform.ID(1)
} else {
after = params.After
}
s.mu.RLock()
defer s.mu.RUnlock()
for _, t := range s.tasks {
if after >= t.ID {
continue
}
if org.Valid() && org != t.Org {
continue
}
out = append(out, StoreTaskWithMeta{Task: t})
if len(out) >= lim {
break
}
}
for i := range out {
id := out[i].Task.ID
out[i].Meta = s.meta[id]
}
return out, nil
}
func (s *inmem) FindTaskByID(_ context.Context, id platform.ID) (*StoreTask, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, t := range s.tasks {
if t.ID == id {
// Return a copy of the task.
task := new(StoreTask)
*task = t
return task, nil
}
}
return nil, ErrTaskNotFound
}
func (s *inmem) FindTaskByIDWithMeta(_ context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var task *StoreTask
for _, t := range s.tasks {
if t.ID == id {
// Return a copy of the task.
task = new(StoreTask)
*task = t
break
}
}
if task == nil {
return nil, nil, ErrTaskNotFound
}
meta, ok := s.meta[id]
if !ok {
return nil, nil, errors.New("task meta not found")
}
return task, &meta, nil
}
func (s *inmem) FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error) {
s.mu.RLock()
defer s.mu.RUnlock()
meta, ok := s.meta[id]
if !ok {
return nil, ErrTaskNotFound
}
return &meta, nil
}
func (s *inmem) DeleteTask(_ context.Context, id platform.ID) (deleted bool, err error) {
s.mu.Lock()
defer s.mu.Unlock()
idx := -1
for i, t := range s.tasks {
if t.ID == id {
idx = i
break
}
}
if idx < 0 {
return false, nil
}
// Delete entry from slice.
s.tasks = append(s.tasks[:idx], s.tasks[idx+1:]...)
delete(s.meta, id)
return true, nil
}
func (s *inmem) Close() error {
return nil
}
func (s *inmem) CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) {
s.mu.Lock()
defer s.mu.Unlock()
stm, ok := s.meta[taskID]
if !ok {
return RunCreation{}, errors.New("task not found")
}
makeID := func() (platform.ID, error) {
return s.idgen.ID(), nil
}
rc, err := stm.CreateNextRun(now, makeID)
if err != nil {
return RunCreation{}, err
}
rc.Created.TaskID = taskID
s.meta[taskID] = stm
return rc, nil
}
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (s *inmem) FinishRun(ctx context.Context, taskID, runID platform.ID) error {
s.mu.RLock()
stm, ok := s.meta[taskID]
s.mu.RUnlock()
if !ok {
return errors.New("taskRunner not found")
}
if !stm.FinishRun(runID) {
return errors.New("run not found")
}
s.mu.Lock()
s.meta[taskID] = stm
s.mu.Unlock()
return nil
}
func (s *inmem) ManuallyRunTimeRange(_ context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error) {
s.mu.Lock()
defer s.mu.Unlock()
stm, ok := s.meta[taskID]
if !ok {
return nil, errors.New("task not found")
}
if err := stm.ManuallyRunTimeRange(start, end, requestedAt, func() (platform.ID, error) { return s.idgen.ID(), nil }); err != nil {
return nil, err
}
s.meta[taskID] = stm
mr := stm.ManualRuns[len(stm.ManualRuns)-1]
return mr, nil
}
func (s *inmem) delete(ctx context.Context, id platform.ID, f func(StoreTask) platform.ID) error {
s.mu.Lock()
defer s.mu.Unlock()
newTasks := []StoreTask{}
deletingTasks := []platform.ID{}
for i := range s.tasks {
if f(s.tasks[i]) != id {
newTasks = append(newTasks, s.tasks[i])
} else {
deletingTasks = append(deletingTasks, s.tasks[i].ID)
}
//check for cancelations
if i&(1024-1) == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}
//last check for cancelations
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for i := range deletingTasks {
delete(s.meta, s.tasks[i].ID)
}
s.tasks = newTasks
return nil
}
func getOrg(st StoreTask) platform.ID {
return st.Org
}
// DeleteOrg synchronously deletes an org and all their tasks from a from an in-mem store store.
func (s *inmem) DeleteOrg(ctx context.Context, id platform.ID) error {
return s.delete(ctx, id, getOrg)
}

View File

@ -1,24 +0,0 @@
package backend_test
import (
"testing"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/storetest"
"github.com/influxdata/influxdb/task/options"
)
func init() {
// TODO(mr): remove as part of https://github.com/influxdata/platform/issues/484.
options.EnableScriptCacheForTest()
}
func TestInMemStore(t *testing.T) {
storetest.NewStoreTest(
"in-mem store",
func(t *testing.T) backend.Store {
return backend.NewInMemStore()
},
func(t *testing.T, s backend.Store) {},
)(t)
}

View File

@ -1,157 +0,0 @@
package backend_test
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/influxdata/flux/execute"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/query/control"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/readservice"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/storetest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func TestInMemRunStore(t *testing.T) {
storetest.NewRunStoreTest(
"inmem",
func(t *testing.T) (backend.LogWriter, backend.LogReader, storetest.MakeNewAuthorizationFunc) {
rw := backend.NewInMemRunReaderWriter()
return rw, rw, nil
},
func(t *testing.T, w backend.LogWriter, r backend.LogReader) {})(t)
}
func TestQueryStorageRunStore(t *testing.T) {
storetest.NewRunStoreTest(
"PointLogWriter and QueryLogReader",
func(t *testing.T) (backend.LogWriter, backend.LogReader, storetest.MakeNewAuthorizationFunc) {
lrw := newFullStackAwareLogReaderWriter(t)
return lrw, lrw, lrw.makeNewAuthorization
},
func(t *testing.T, w backend.LogWriter, r backend.LogReader) {
if w.(*fullStackAwareLogReaderWriter) != r.(*fullStackAwareLogReaderWriter) {
panic("impossible cleanup values")
}
w.(*fullStackAwareLogReaderWriter).Close(t)
},
)(t)
}
type fullStackAwareLogReaderWriter struct {
*backend.PointLogWriter
*backend.QueryLogReader
queryController *control.Controller
rootDir string
storageEngine *storage.Engine
i *inmem.Service
}
func (lrw *fullStackAwareLogReaderWriter) Close(t *testing.T) {
if err := lrw.queryController.Shutdown(context.Background()); err != nil {
t.Error(err)
}
if err := lrw.storageEngine.Close(); err != nil {
t.Error(err)
}
if err := os.RemoveAll(lrw.rootDir); err != nil {
t.Error(err)
}
}
func (lrw *fullStackAwareLogReaderWriter) makeNewAuthorization(ctx context.Context, t *testing.T) *influxdb.Authorization {
o := &influxdb.Organization{Name: fmt.Sprintf("org-%s-%d", t.Name(), time.Now().UnixNano())}
if err := lrw.i.CreateOrganization(ctx, o); err != nil {
t.Fatal(err)
}
u := &influxdb.User{Name: fmt.Sprintf("user-%s-%d", t.Name(), time.Now().UnixNano())}
if err := lrw.i.CreateUser(ctx, u); err != nil {
t.Fatal(err)
}
a := &influxdb.Authorization{
UserID: u.ID,
OrgID: o.ID,
Permissions: influxdb.OperPermissions(),
}
if err := lrw.i.CreateAuthorization(ctx, a); err != nil {
t.Fatal(err)
}
return a
}
func newFullStackAwareLogReaderWriter(t *testing.T) *fullStackAwareLogReaderWriter {
// Mostly copied out of cmd/influxd/main.go.
logger := zaptest.NewLogger(t)
rootDir, err := ioutil.TempDir("", "task-logreaderwriter-")
if err != nil {
t.Fatal(err)
}
engine := storage.NewEngine(rootDir, storage.NewConfig())
engine.WithLogger(logger)
if err := engine.Open(context.Background()); err != nil {
t.Fatal(err)
}
defer func() {
if t.Failed() {
engine.Close()
}
}()
svc := inmem.NewService()
const (
concurrencyQuota = 10
memoryBytesQuotaPerQuery = 1e6
queueSize = 10
)
cc := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuotaPerQuery: int64(memoryBytesQuotaPerQuery),
QueueSize: queueSize,
Logger: logger.With(zap.String("service", "storage-reads")),
}
if err := readservice.AddControllerConfigDependencies(
&cc, engine, svc, svc,
); err != nil {
t.Fatal(err)
}
queryController, err := control.New(cc)
if err != nil {
t.Fatal(err)
}
return &fullStackAwareLogReaderWriter{
PointLogWriter: backend.NewPointLogWriter(engine),
QueryLogReader: backend.NewQueryLogReader(query.QueryServiceBridge{AsyncQueryService: queryController}),
queryController: queryController,
rootDir: rootDir,
storageEngine: engine,
i: svc,
}
}

View File

@ -1,351 +0,0 @@
package backend
import (
"errors"
"math"
"strings"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/options"
cron "gopkg.in/robfig/cron.v2"
)
// This file contains helper methods for the StoreTaskMeta type defined in protobuf.
// NewStoreTaskMeta returns a new StoreTaskMeta based on the given request and parsed options.
// Do not call this without validating the request and options first.
func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta {
stm := StoreTaskMeta{
Status: string(req.Status),
LatestCompleted: req.ScheduleAfter,
CreatedAt: time.Now().Unix(),
EffectiveCron: o.EffectiveCronString(),
AuthorizationID: uint64(req.AuthorizationID),
}
if o.Concurrency != nil {
stm.MaxConcurrency = int32(*o.Concurrency)
}
if o.Offset != nil {
offset, _ := o.Offset.DurationFrom(time.Unix(req.ScheduleAfter, 0)) // we can do this because it is validated already.
stm.Offset = offset.String()
}
if stm.Status == "" {
stm.Status = string(DefaultTaskStatus)
}
stm.AlignLatestCompleted()
return stm
}
// AlignLatestCompleted alligns the latest completed to be on the min/hour/day
func (stm *StoreTaskMeta) AlignLatestCompleted() {
if strings.HasPrefix(stm.EffectiveCron, "@every ") {
everyString := strings.TrimPrefix(stm.EffectiveCron, "@every ")
every := options.Duration{}
err := every.Parse(everyString)
if err != nil {
// We cannot align a invalid time
return
}
t := time.Unix(stm.LatestCompleted, 0)
everyDur, err := every.DurationFrom(t)
if err != nil {
return
}
t = t.Truncate(everyDur)
if t.Unix() == stm.LatestCompleted {
// For example, every 1m truncates to exactly on the minute.
// But the input request is schedule after, not "on or after".
// Add one interval.
tafter, err := every.Add(t)
if err != nil {
return
}
t = tafter
}
stm.LatestCompleted = t.Truncate(time.Second).Unix()
}
}
// FinishRun removes the run matching runID from m's CurrentlyRunning slice,
// and if that run's Now value is greater than m's LatestCompleted value,
// updates the value of LatestCompleted to the run's Now value.
//
// If runID matched a run, FinishRun returns true. Otherwise it returns false.
func (stm *StoreTaskMeta) FinishRun(runID platform.ID) bool {
for i, runner := range stm.CurrentlyRunning {
if platform.ID(runner.RunID) != runID {
continue
}
stm.CurrentlyRunning = append(stm.CurrentlyRunning[:i], stm.CurrentlyRunning[i+1:]...)
rs, re, ra := runner.RangeStart, runner.RangeEnd, runner.RequestedAt
if rs == 0 && re == 0 && ra == 0 {
// It must be a naturally scheduled run.
if runner.Now > stm.LatestCompleted {
stm.LatestCompleted = runner.Now
}
} else {
// It was a requested run. Check if we need to update a latest completed.
for _, q := range stm.ManualRuns {
if q.Start == rs && q.End == re && q.RequestedAt == ra {
// Match.
if runner.Now > q.LatestCompleted {
q.LatestCompleted = runner.Now
}
break
}
}
}
return true
}
return false
}
// CreateNextRun attempts to update stm's CurrentlyRunning slice with a new run.
// The new run's now is assigned the earliest possible time according to stm.EffectiveCron,
// that is later than any in-progress run and stm's LatestCompleted timestamp.
// If the run's now would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.
//
// makeID is a function provided by the caller to create an ID, in case we can create a run.
// Because a StoreTaskMeta doesn't know the ID of the task it belongs to, it never sets RunCreation.Created.TaskID.
func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, error)) (RunCreation, error) {
if len(stm.CurrentlyRunning) >= int(stm.MaxConcurrency) {
return RunCreation{}, errors.New("cannot create next run when max concurrency already reached")
}
// Not calling stm.DueAt here because we reuse sch.
// We can definitely optimize (minimize) cron parsing at a later point in time.
sch, err := cron.Parse(stm.EffectiveCron)
if err != nil {
return RunCreation{}, err
}
latest := stm.LatestCompleted
for _, cr := range stm.CurrentlyRunning {
if cr.Now > latest {
latest = cr.Now
}
}
nowTime := time.Unix(now, 0)
nextScheduled := sch.Next(time.Unix(latest, 0))
nextScheduledUnix := nextScheduled.Unix()
offset := &options.Duration{}
if err := offset.Parse(stm.Offset); err != nil {
return RunCreation{}, err
}
dueAt, err := offset.Add(nextScheduled)
if err != nil {
return RunCreation{}, err
}
if dueAt.After(nowTime) {
// Can't schedule yet.
if len(stm.ManualRuns) > 0 {
return stm.createNextRunFromQueue(now, dueAt.Unix(), sch, makeID)
}
return RunCreation{}, RunNotYetDueError{DueAt: dueAt.Unix()}
}
id, err := makeID()
if err != nil {
return RunCreation{}, err
}
stm.CurrentlyRunning = append(stm.CurrentlyRunning, &StoreTaskMetaRun{
Now: nextScheduledUnix,
Try: 1,
RunID: uint64(id),
})
nextDue, err := offset.Add(sch.Next(nextScheduled))
if err != nil {
return RunCreation{}, err
}
return RunCreation{
Created: QueuedRun{
RunID: id,
DueAt: dueAt.Unix(),
Now: nextScheduledUnix,
},
NextDue: nextDue.Unix(),
HasQueue: len(stm.ManualRuns) > 0,
}, nil
}
// createNextRunFromQueue creates the next run from a queue.
// This should only be called when the queue is not empty.
func (stm *StoreTaskMeta) createNextRunFromQueue(now, nextDue int64, sch cron.Schedule, makeID func() (platform.ID, error)) (RunCreation, error) {
if len(stm.ManualRuns) == 0 {
return RunCreation{}, errors.New("cannot create run from empty queue")
}
q := stm.ManualRuns[0]
latest := q.LatestCompleted
for _, r := range stm.CurrentlyRunning {
if r.RangeStart != q.Start || r.RangeEnd != q.End || r.RequestedAt != q.RequestedAt {
// Doesn't match our queue.
continue
}
if r.Now > latest {
latest = r.Now
}
}
runNow := sch.Next(time.Unix(latest, 0)).Unix()
// Already validated that we have room to create another run, in CreateNextRun.
id := platform.ID(q.RunID)
if !id.Valid() {
var err error
id, err = makeID()
if err != nil {
return RunCreation{}, err
}
}
stm.CurrentlyRunning = append(stm.CurrentlyRunning, &StoreTaskMetaRun{
Now: runNow,
Try: 1,
RunID: uint64(id),
RangeStart: q.Start,
RangeEnd: q.End,
RequestedAt: q.RequestedAt,
})
if runNow >= q.End {
// Drop the queue.
stm.ManualRuns = append(stm.ManualRuns[:0], stm.ManualRuns[1:]...)
}
return RunCreation{
Created: QueuedRun{
RunID: id,
Now: runNow,
DueAt: time.Now().UTC().Unix(),
RequestedAt: q.RequestedAt,
},
NextDue: nextDue,
HasQueue: len(stm.ManualRuns) > 0,
}, nil
}
// NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready.
// The returned timestamp reflects the task's delay, so it does not necessarily exactly match the schedule time.
func (stm *StoreTaskMeta) NextDueRun() (int64, error) {
sch, err := cron.Parse(stm.EffectiveCron)
if err != nil {
return 0, err
}
latest := stm.LatestCompleted
currRun := make([]*StoreTaskMetaRun, len(stm.CurrentlyRunning))
copy(currRun, stm.CurrentlyRunning)
for _, cr := range currRun {
if cr.Now > latest {
latest = cr.Now
}
}
offset := &options.Duration{}
if err := offset.Parse(stm.Offset); err != nil {
return 0, err
}
nextDue, err := offset.Add(sch.Next(time.Unix(latest, 0)))
if err != nil {
return 0, err
}
return nextDue.Unix(), nil
}
// ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end.
// More specifically, it requests runs scheduled no earlier than start, but possibly later than start,
// if start does not land on the task's schedule; and as late as, but not necessarily equal to, end.
// requestedAt is the Unix timestamp indicating when this run range was requested.
//
// There is no schedule validation in this method,
// so ManuallyRunTimeRange can be used to create a run at a specific time that isn't aligned with the task's schedule.
//
// If adding the range would exceed the queue size, ManuallyRunTimeRange returns ErrManualQueueFull.
func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64, makeID func() (platform.ID, error)) error {
// Arbitrarily chosen upper limit that seems unlikely to be reached except in pathological cases.
const maxQueueSize = 32
if len(stm.ManualRuns) >= maxQueueSize {
return ErrManualQueueFull
}
lc := start - 1
if start == math.MinInt64 {
// Don't roll over in pathological case of starting at minimum int64.
lc = start
}
for _, mr := range stm.ManualRuns {
if mr.Start == start && mr.End == end {
return RequestStillQueuedError{Start: start, End: end}
}
}
run := &StoreTaskMetaManualRun{
Start: start,
End: end,
LatestCompleted: lc,
RequestedAt: requestedAt,
}
if start == end && makeID != nil {
id, err := makeID()
if err != nil {
return err
}
run.RunID = uint64(id)
}
stm.ManualRuns = append(stm.ManualRuns, run)
return nil
}
// Equal returns true if all of stm's fields compare equal to other.
// Note that this method operates on values, unlike the other methods which operate on pointers.
//
// Equal is probably not very useful outside of test.
func (stm StoreTaskMeta) Equal(other StoreTaskMeta) bool {
if stm.MaxConcurrency != other.MaxConcurrency ||
stm.LatestCompleted != other.LatestCompleted ||
stm.Status != other.Status ||
stm.EffectiveCron != other.EffectiveCron ||
stm.Offset != other.Offset ||
len(stm.CurrentlyRunning) != len(other.CurrentlyRunning) ||
len(stm.ManualRuns) != len(other.ManualRuns) {
return false
}
for i, o := range other.CurrentlyRunning {
s := stm.CurrentlyRunning[i]
if s.Now != o.Now ||
s.Try != o.Try ||
s.RunID != o.RunID ||
s.RangeStart != o.RangeStart ||
s.RangeEnd != o.RangeEnd ||
s.RequestedAt != o.RequestedAt {
return false
}
}
for i, o := range other.ManualRuns {
s := stm.ManualRuns[i]
if s.Start != o.Start ||
s.End != o.End ||
s.LatestCompleted != o.LatestCompleted ||
s.RequestedAt != o.RequestedAt {
return false
}
}
return true
}

File diff suppressed because it is too large Load Diff

View File

@ -1,76 +0,0 @@
syntax = "proto3";
import "gogoproto/gogo.proto";
package com.influxdata.platform.task.backend;
option go_package = "backend";
// StoreTaskMeta is the internal state of a task.
message StoreTaskMeta {
int32 max_concurrency = 1;
// latest_completed is the unix timestamp of the latest "naturally" completed run.
// If a run for time t finishes before a run for time t - u, latest_completed will reflect time t.
int64 latest_completed = 2;
// status indicates if the task is enabled or disabled.
string status = 3;
// currently_running is the collection of runs in-progress.
// If a runner crashes or otherwise disappears, this indicates to the new runner what needs to be picked up.
repeated StoreTaskMetaRun currently_running = 4;
// effective_cron is the effective cron string as reported by the task's options.
string effective_cron = 5;
// Task's configured delay, in seconds.
string offset = 6;
int64 created_at = 7;
int64 updated_at = 8;
// The Authorization ID associated with the task.
uint64 authorization_id = 9 [(gogoproto.customname) = "AuthorizationID"];
// Fields below here are less likely to be present, so we're counting from 16 in order to
// use the 1-byte-encodable values where we can be more sure they're present.
repeated StoreTaskMetaManualRun manual_runs = 16;
}
message StoreTaskMetaRun {
// now is the unix timestamp of the "now" value for the run.
int64 now = 1;
uint32 try = 2;
uint64 run_id = 3 [(gogoproto.customname) = "RunID"];
// range_start is the start of the manual run's time range.
int64 range_start = 4;
// range_end is the end of the manual run's time range.
int64 range_end = 5;
// requested_at is the unix timestamp indicating when this run was requested.
// It is the same value as the "parent" StoreTaskMetaManualRun, if this run was the result of a manual request.
int64 requested_at = 6;
}
// StoreTaskMetaManualRun indicates a manually requested run for a time range.
// It has a start and end pair of unix timestamps indicating the time range covered by the request.
message StoreTaskMetaManualRun {
// start is the earliest allowable unix time stamp for this queue of runs.
int64 start = 1;
// end is the latest allowable unix time stamp for this queue of runs.
int64 end = 2;
// latest_completed is the timestamp of the latest completed run from this queue.
int64 latest_completed = 3;
// requested_at is the unix timestamp indicating when this run was requested.
int64 requested_at = 4;
// run_id is set ahead of time for retries of individual runs. Manually run time ranges do not receive an ID.
uint64 run_id = 5 [(gogoproto.customname) = "RunID"];
}

View File

@ -1,281 +0,0 @@
package backend_test
import (
"errors"
"strings"
"testing"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
)
var idGen = snowflake.NewIDGenerator()
func makeID() (platform.ID, error) {
return idGen.ID(), nil
}
func TestMeta_CreateNextRun(t *testing.T) {
good := backend.StoreTaskMeta{
MaxConcurrency: 2,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
LatestCompleted: 60, // It has run once for the first minute.
}
_, err := good.CreateNextRun(59, makeID)
if e, ok := err.(backend.RunNotYetDueError); !ok {
t.Fatalf("expected RunNotYetDueError, got %v (%T)", err, err)
} else if e.DueAt != 120 {
t.Fatalf("expected run due at 120, got %d", e.DueAt)
}
bad := new(backend.StoreTaskMeta)
*bad = good
bad.MaxConcurrency = 0
if _, err := bad.CreateNextRun(120, makeID); err == nil || !strings.Contains(err.Error(), "max concurrency") {
t.Fatalf("expected error about max concurrency, got %v", err)
}
*bad = good
bad.EffectiveCron = "not a cron"
if _, err := bad.CreateNextRun(120, makeID); err == nil {
t.Fatal("expected error with bad cron")
}
idErr := errors.New("error making ID")
*bad = good
if _, err := bad.CreateNextRun(120, func() (platform.ID, error) {
return platform.InvalidID(), idErr
}); err != idErr {
t.Fatalf("expected id creation error, got %v", err)
}
rc, err := good.CreateNextRun(300, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.TaskID.Valid() {
t.Fatalf("CreateNextRun should not have set task ID; got %v", rc.Created.TaskID)
}
if !rc.Created.RunID.Valid() {
t.Fatal("CreateNextRun should have set run ID but didn't")
}
if rc.Created.Now != 120 {
t.Fatalf("expected created run to have time 120, got %d", rc.Created.Now)
}
if rc.NextDue != 180 {
t.Fatalf("unexpected next run time: %d", rc.NextDue)
}
rc, err = good.CreateNextRun(300, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.TaskID.Valid() {
t.Fatalf("CreateNextRun should not have set task ID; got %v", rc.Created.TaskID)
}
if !rc.Created.RunID.Valid() {
t.Fatal("CreateNextRun should have set run ID but didn't")
}
if rc.Created.Now != 180 {
t.Fatalf("expected created run to have time 180, got %d", rc.Created.Now)
}
if rc.NextDue != 240 {
t.Fatalf("unexpected next run time: %d", rc.NextDue)
}
if _, err := good.CreateNextRun(300, makeID); err == nil || !strings.Contains(err.Error(), "max concurrency") {
t.Fatalf("expected error about max concurrency, got %v", err)
}
}
func TestMeta_CreateNextRun_Queue(t *testing.T) {
stm := backend.StoreTaskMeta{
MaxConcurrency: 9,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
LatestCompleted: 3000, // It has run once for the first minute.
}
// Should run on 0, 60, and 120.
if err := stm.ManuallyRunTimeRange(0, 120, 3005, nil); err != nil {
t.Fatal(err)
}
// Should run once: 240.
if err := stm.ManuallyRunTimeRange(240, 240, 3005, nil); err != nil {
t.Fatal(err)
}
// Run once on the next natural schedule.
rc, err := stm.CreateNextRun(3060, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 3060 {
t.Fatalf("expected created now of 3060, got %d", rc.Created.Now)
}
if rc.NextDue != 3120 {
t.Fatalf("expected NextDue = 3120, got %d", rc.NextDue)
}
if !rc.HasQueue {
t.Fatal("expected to have queue but didn't")
}
// 0 from queue.
rc, err = stm.CreateNextRun(3060, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 0 {
t.Fatalf("expected created now of 0, got %d", rc.Created.Now)
}
if rc.NextDue != 3120 {
t.Fatalf("expected NextDue = 3120, got %d", rc.NextDue)
}
if !rc.HasQueue {
t.Fatal("expected to have queue but didn't")
}
// 60 from queue.
rc, err = stm.CreateNextRun(3060, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 60 {
t.Fatalf("expected created now of 60, got %d", rc.Created.Now)
}
if rc.NextDue != 3120 {
t.Fatalf("expected NextDue = 3120, got %d", rc.NextDue)
}
if !rc.HasQueue {
t.Fatal("expected to have queue but didn't")
}
// 120 from queue.
rc, err = stm.CreateNextRun(3060, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 120 {
t.Fatalf("expected created now of 120, got %d", rc.Created.Now)
}
if rc.NextDue != 3120 {
t.Fatalf("expected NextDue = 3120, got %d", rc.NextDue)
}
if !rc.HasQueue {
t.Fatal("expected to have queue but didn't")
}
// 240 from queue.
rc, err = stm.CreateNextRun(3060, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 240 {
t.Fatalf("expected created now of 240, got %d", rc.Created.Now)
}
if rc.NextDue != 3120 {
t.Fatalf("expected NextDue = 3120, got %d", rc.NextDue)
}
if rc.HasQueue {
t.Fatal("expected to have empty queue but didn't")
}
}
func TestMeta_CreateNextRun_Delay(t *testing.T) {
stm := backend.StoreTaskMeta{
MaxConcurrency: 2,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
Offset: "5s",
LatestCompleted: 30, // Arbitrary non-overlap starting point.
}
_, err := stm.CreateNextRun(61, makeID)
if e, ok := err.(backend.RunNotYetDueError); !ok {
t.Fatalf("expected RunNotYetDueError, got %v (%T)", err, err)
} else if e.DueAt != 65 {
t.Fatalf("expected run due at 65, got %d", e.DueAt)
}
rc, err := stm.CreateNextRun(300, makeID)
if err != nil {
t.Fatal(err)
}
if rc.Created.Now != 60 {
t.Fatalf("expected created run to have time 60, got %d", rc.Created.Now)
}
if rc.NextDue != 125 {
t.Fatalf("unexpected next run time: %d", rc.NextDue)
}
}
func TestMeta_ManuallyRunTimeRange(t *testing.T) {
now := time.Now().Unix()
stm := backend.StoreTaskMeta{
MaxConcurrency: 2,
Status: "enabled",
EffectiveCron: "* * * * *", // Every minute.
Offset: "5s",
LatestCompleted: 30, // Arbitrary non-overlap starting point.
}
// Constant defined in (*StoreTaskMeta).ManuallyRunTimeRange.
const maxQueueSize = 32
for i := int64(0); i < maxQueueSize; i++ {
j := i * 10
if err := stm.ManuallyRunTimeRange(j, j+5, j+now, nil); err != nil {
t.Fatal(err)
}
if int64(len(stm.ManualRuns)) != i+1 {
t.Fatalf("expected %d runs queued, got %d", i+1, len(stm.ManualRuns))
}
run := stm.ManualRuns[len(stm.ManualRuns)-1]
if run.Start != j {
t.Fatalf("expected start %d, got %d", j, run.Start)
}
if run.End != j+5 {
t.Fatalf("expected end %d, got %d", j+5, run.End)
}
if run.LatestCompleted != j-1 {
t.Fatalf("expected LatestCompleted %d, got %d", j-1, run.LatestCompleted)
}
if run.RequestedAt != j+now {
t.Fatalf("expected RequestedAt %d, got %d", j+now, run.RequestedAt)
}
}
// One more should cause ErrManualQueueFull.
if err := stm.ManuallyRunTimeRange(maxQueueSize*100, maxQueueSize*200, maxQueueSize+now, nil); err != backend.ErrManualQueueFull {
t.Fatalf("expected ErrManualQueueFull, got %v", err)
}
if len(stm.ManualRuns) != maxQueueSize {
t.Fatalf("expected to be unable to exceed queue size of %d; got %d", maxQueueSize, len(stm.ManualRuns))
}
// Reset manual runs.
stm.ManualRuns = stm.ManualRuns[:0]
// Duplicate manual run with single timestamp should be rejected.
if err := stm.ManuallyRunTimeRange(1, 1, 2, nil); err != nil {
t.Fatal(err)
}
if exp, err := (backend.RequestStillQueuedError{Start: 1, End: 1}), stm.ManuallyRunTimeRange(1, 1, 3, func() (platform.ID, error) { return platform.ID(1099), nil }); err != exp {
t.Fatalf("expected %v, got %v", exp, err)
}
// Duplicate manual run with time range should be rejected.
if err := stm.ManuallyRunTimeRange(100, 200, 201, nil); err != nil {
t.Fatal(err)
}
if exp, err := (backend.RequestStillQueuedError{Start: 100, End: 200}), stm.ManuallyRunTimeRange(100, 200, 202, nil); err != exp {
t.Fatalf("expected %v, got %v", exp, err)
}
// Not currently enforcing one way or another when a newly requested time range overlaps with an existing one.
}

View File

@ -1,90 +0,0 @@
package backend
import (
"context"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
const (
lineField = "line"
runIDField = "runID"
scheduledForField = "scheduledFor"
startedAtField = "startedAt"
finishedAtField = "finishedAt"
requestedAtField = "requestedAt"
statusField = "status"
logField = "logs"
taskIDTag = "taskID"
// Fixed system bucket ID for task and run logs.
taskSystemBucketID platform.ID = 10
)
// Copy of storage.PointsWriter interface.
// Duplicating it here to avoid having tasks/backend depend directly on storage.
type PointsWriter interface {
WritePoints(ctx context.Context, points []models.Point) error
}
// PointLogWriter writes task and run logs as time-series points.
type PointLogWriter struct {
pointsWriter PointsWriter
}
// NewPointLogWriter returns a PointLogWriter.
func NewPointLogWriter(pw PointsWriter) *PointLogWriter {
return &PointLogWriter{pointsWriter: pw}
}
func (p *PointLogWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
tags := models.Tags{
models.NewTag([]byte(taskIDTag), []byte(rlb.Task.ID.String())),
}
fields := make(map[string]interface{}, 4)
fields[statusField] = status.String()
fields[runIDField] = rlb.RunID.String()
fields[scheduledForField] = time.Unix(rlb.RunScheduledFor, 0).UTC().Format(time.RFC3339)
if rlb.RequestedAt != 0 {
fields[requestedAtField] = time.Unix(rlb.RequestedAt, 0).UTC().Format(time.RFC3339)
}
pt, err := models.NewPoint("records", tags, fields, when)
if err != nil {
return err
}
// TODO(mr): it would probably be lighter-weight to just build exploded points in the first place.
exploded, err := tsdb.ExplodePoints(rlb.Task.Org, taskSystemBucketID, []models.Point{pt})
if err != nil {
return err
}
return p.pointsWriter.WritePoints(ctx, exploded)
}
func (p *PointLogWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error {
tags := models.Tags{
models.NewTag([]byte(taskIDTag), []byte(rlb.Task.ID.String())),
}
fields := map[string]interface{}{
runIDField: rlb.RunID.String(),
lineField: log,
}
pt, err := models.NewPoint("logs", tags, fields, when)
if err != nil {
return err
}
// TODO(mr): it would probably be lighter-weight to just build exploded points in the first place.
exploded, err := tsdb.ExplodePoints(rlb.Task.Org, taskSystemBucketID, []models.Point{pt})
if err != nil {
return err
}
return p.pointsWriter.WritePoints(ctx, exploded)
}

View File

@ -1,370 +0,0 @@
package backend
import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/influxdata/flux/values"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/semantic"
platform "github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/query"
)
type QueryLogReader struct {
queryService query.QueryService
}
var _ LogReader = (*QueryLogReader)(nil)
func NewQueryLogReader(qs query.QueryService) *QueryLogReader {
return &QueryLogReader{
queryService: qs,
}
}
func (qlr *QueryLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
if !logFilter.Task.Valid() {
return nil, errors.New("task ID required to list logs")
}
filterPart := ""
if logFilter.Run != nil {
filterPart = fmt.Sprintf(`|> filter(fn: (r) => r.runID == %q)`, logFilter.Run.String())
}
// TODO(lh): Change the range to something more reasonable. Not sure what that range will be.
listScript := fmt.Sprintf(`from(bucketID: "000000000000000a")
|> range(start: -100h)
|> filter(fn: (r) => r._measurement == "logs" and r.taskID == %q)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
%s
|> group(columns: ["taskID", "runID", "_measurement"])
`, logFilter.Task.String(), filterPart)
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
if auth.Kind() != "authorization" {
return nil, platform.ErrAuthorizerNotSupported
}
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
ittr, err := qlr.queryService.Query(ctx, request)
if err != nil {
return nil, err
}
defer ittr.Release()
re := newRunExtractor()
for ittr.More() {
if err := ittr.Next().Tables().Do(re.Extract); err != nil {
return nil, err
}
}
if err := ittr.Err(); err != nil {
return nil, err
}
runs := re.Runs()
var logs []platform.Log
for _, r := range runs {
logs = append(logs, r.Log...)
}
return logs, nil
}
func (qlr *QueryLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
if !runFilter.Task.Valid() {
return nil, errors.New("task required")
}
limit := "|> limit(n: 100)\n"
if runFilter.Limit > 0 {
limit = fmt.Sprintf("|> limit(n: %d)\n", runFilter.Limit)
}
afterID := ""
if runFilter.After != nil {
afterID = runFilter.After.String()
}
scheduledAfter := runFilter.AfterTime // Fine if this is empty string.
scheduledBefore := "Z" // Arbitrary string that occurs after numbers, so it won't reject anything.
if runFilter.BeforeTime != "" {
scheduledBefore = runFilter.BeforeTime
}
// Because flux doesnt support piviting on a rowkey that might not exist we need first check if we can pivot with "requestedAt"
// and if that fails we can fall back to pivot without "requestedAt"
// TODO(lh): After we transition to a separation of transactional and analytical stores this can be simplified.
pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")`
pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")`
listFmtString := `
import "influxdata/influxdb/v1"
from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records" and r.taskID == %q)
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|> v1.fieldsAsCols()
|> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q and r.runID > %q)
%s
%s
`
listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithRequestedAt, limit)
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
if auth.Kind() != "authorization" {
return nil, platform.ErrAuthorizerNotSupported
}
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
ittr, err := qlr.queryService.Query(ctx, request)
if err != nil {
return nil, err
}
runs, err := queryIttrToRuns(ittr)
if err != nil {
// try re running the script without the requested at
listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithOutRequestedAt, limit)
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
ittr, err := qlr.queryService.Query(ctx, request)
if err != nil {
return nil, err
}
runs, err = queryIttrToRuns(ittr)
if err != nil {
return nil, err
}
}
return runs, nil
}
func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")`
pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")`
showFmtScript := `
import "influxdata/influxdb/v1"
logs = from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "logs")
|> drop(columns: ["_start", "_stop"])
|> v1.fieldsAsCols()
|> filter(fn: (r) => r.runID == %q)
|> yield(name: "logs")
from(bucketID: "000000000000000a")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "records")
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|> v1.fieldsAsCols()
|> filter(fn: (r) => r.runID == %q)
%s
|> yield(name: "result")
`
showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithRequestedAt)
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
if auth.Kind() != "authorization" {
return nil, platform.ErrAuthorizerNotSupported
}
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}}
ittr, err := qlr.queryService.Query(ctx, request)
if err != nil {
return nil, err
}
runs, err := queryIttrToRuns(ittr)
if err != nil {
showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithOutRequestedAt)
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}}
ittr, err := qlr.queryService.Query(ctx, request)
if err != nil {
return nil, err
}
runs, err = queryIttrToRuns(ittr)
if err != nil {
return nil, err
}
}
if len(runs) == 0 {
return nil, ErrRunNotFound
}
if len(runs) > 1 {
return nil, fmt.Errorf("expected one run, got %d", len(runs))
}
return runs[0], nil
}
func queryIttrToRuns(results flux.ResultIterator) ([]*platform.Run, error) {
defer results.Release()
re := newRunExtractor()
for results.More() {
if err := results.Next().Tables().Do(re.Extract); err != nil {
return nil, err
}
}
if err := results.Err(); err != nil {
return nil, err
}
return re.Runs(), nil
}
// runExtractor is used to decode query results to runs.
type runExtractor struct {
runs map[platform.ID]platform.Run
}
func newRunExtractor() *runExtractor {
return &runExtractor{runs: make(map[platform.ID]platform.Run)}
}
// Runs returns the runExtractor's stored runs as a slice.
func (re *runExtractor) Runs() []*platform.Run {
runs := make([]*platform.Run, 0, len(re.runs))
for _, r := range re.runs {
r := r
runs = append(runs, &r)
}
sort.Slice(runs, func(i, j int) bool { return runs[i].ID < runs[j].ID })
return runs
}
// Extract extracts the run information from the given table.
func (re *runExtractor) Extract(tbl flux.Table) error {
key := tbl.Key()
if !key.HasCol("_measurement") {
return fmt.Errorf("table key missing _measurement: %s", key.String())
}
mv := key.LabelValue("_measurement")
if n := mv.Type().Nature(); n != semantic.String {
return fmt.Errorf("table key has invalid _measurement type: %s, type = %s", key.String(), n)
}
switch mv.Str() {
case "records":
return tbl.Do(re.extractRecord)
case "logs":
return tbl.Do(re.extractLog)
default:
return fmt.Errorf("unknown measurement: %q", mv.Str())
}
}
func (re *runExtractor) extractRecord(cr flux.ColReader) error {
for i := 0; i < cr.Len(); i++ {
var r platform.Run
for j, col := range cr.Cols() {
switch col.Label {
case requestedAtField:
r.RequestedAt = cr.Strings(j).ValueString(i)
case scheduledForField:
r.ScheduledFor = cr.Strings(j).ValueString(i)
case "runID":
id, err := platform.IDFromString(cr.Strings(j).ValueString(i))
if err != nil {
return err
}
r.ID = *id
case "taskID":
id, err := platform.IDFromString(cr.Strings(j).ValueString(i))
if err != nil {
return err
}
r.TaskID = *id
case RunStarted.String():
r.StartedAt = values.Time(cr.Times(j).Value(i)).Time().Format(time.RFC3339Nano)
if r.Status == "" {
// Only set status if it wasn't already set.
r.Status = col.Label
}
case RunSuccess.String(), RunFail.String(), RunCanceled.String():
if cr.Times(j).Value(i) != 0 {
r.FinishedAt = values.Time(cr.Times(j).Value(i)).Time().Format(time.RFC3339Nano)
// Finished can be set unconditionally;
// it's fine to overwrite if the status was already set to started.
r.Status = col.Label
}
}
}
if !r.ID.Valid() {
return errors.New("extractRecord: did not find valid run ID in table")
}
if ex, ok := re.runs[r.ID]; ok {
r.Log = ex.Log
}
re.runs[r.ID] = r
}
return nil
}
func (re *runExtractor) extractLog(cr flux.ColReader) error {
entries := make(map[platform.ID][]platform.Log)
for i := 0; i < cr.Len(); i++ {
var runID platform.ID
var when, line string
for j, col := range cr.Cols() {
switch col.Label {
case "runID":
id, err := platform.IDFromString(cr.Strings(j).ValueString(i))
if err != nil {
return err
}
runID = *id
case "_time":
when = values.Time(cr.Times(j).Value(i)).Time().Format(time.RFC3339Nano)
case "line":
line = cr.Strings(j).ValueString(i)
}
}
if !runID.Valid() {
return errors.New("extractLog: did not find valid run ID in table")
}
entries[runID] = append(entries[runID], platform.Log{Time: when, Message: line})
}
for id, logs := range entries {
run := re.runs[id]
run.Log = append(run.Log, logs...)
re.runs[id] = run
}
return nil
}

View File

@ -18,17 +18,6 @@ import (
"go.uber.org/zap"
)
var (
// ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally.
ErrRunCanceled = errors.New("run canceled")
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
ErrTaskNotClaimed = errors.New("task not claimed")
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
ErrTaskAlreadyClaimed = errors.New("task already claimed")
)
// Executor handles execution of a run.
type Executor interface {
// Execute attempts to begin execution of a run.
@ -43,6 +32,18 @@ type Executor interface {
Wait()
}
// RunCreation is returned by CreateNextRun.
type RunCreation struct {
Created QueuedRun
// Unix timestamp for when the next run is due.
NextDue int64
// Whether there are any manual runs queued for this task.
// If so, the scheduler should begin executing them after handling real-time tasks.
HasQueue bool
}
// QueuedRun is a task run that has been assigned an ID,
// but whose execution has not necessarily started.
type QueuedRun struct {

View File

@ -1,447 +0,0 @@
package backend
// The tooling needed to correctly run go generate is managed by the Makefile.
// Run `make` from the project root to ensure these generate commands execute correctly.
//go:generate protoc -I ../../internal -I . --plugin ../../scripts/protoc-gen-gogofaster --gogofaster_out=plugins=grpc:. ./meta.proto
import (
"context"
"errors"
"fmt"
"strings"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/options"
)
var (
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = errors.New("task not found")
// ErrOrgNotFound is an error for when we can't find an org
ErrOrgNotFound = errors.New("org not found")
// ErrManualQueueFull is returned when a manual run request cannot be completed.
ErrManualQueueFull = errors.New("manual queue at capacity")
// ErrRunNotFound is returned when searching for a single run that doesn't exist.
ErrRunNotFound = errors.New("run not found")
// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
ErrNoRunsFound = errors.New("no matching runs found")
// ErrRunNotFinished is returned when a retry is invalid due to the run not being finished yet.
ErrRunNotFinished = errors.New("run is still in progress")
)
type TaskStatus string
const (
TaskActive TaskStatus = "active"
TaskInactive TaskStatus = "inactive"
DefaultTaskStatus TaskStatus = TaskActive
)
// validate returns an error if s is not a known task status.
func (s TaskStatus) validate(allowEmpty bool) error {
if allowEmpty && s == "" {
return nil
}
if s == TaskActive || s == TaskInactive {
return nil
}
return fmt.Errorf("invalid task status: %q", s)
}
type RunStatus int
const (
RunStarted RunStatus = iota
RunSuccess
RunFail
RunCanceled
RunScheduled
)
func (r RunStatus) String() string {
switch r {
case RunStarted:
return "started"
case RunSuccess:
return "success"
case RunFail:
return "failed"
case RunCanceled:
return "canceled"
case RunScheduled:
return "scheduled"
}
panic(fmt.Sprintf("unknown RunStatus: %d", r))
}
// RunNotYetDueError is returned from CreateNextRun if a run is not yet due.
type RunNotYetDueError struct {
// DueAt is the unix timestamp of when the next run is due.
DueAt int64
}
func (e RunNotYetDueError) Error() string {
return "run not due until " + time.Unix(e.DueAt, 0).UTC().Format(time.RFC3339)
}
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished"
func (e RequestStillQueuedError) Error() string {
return fmt.Sprintf(fmtRequestStillQueued,
time.Unix(e.Start, 0).UTC().Format(time.RFC3339),
time.Unix(e.End, 0).UTC().Format(time.RFC3339),
)
}
// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg.
// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError {
var s, e string
n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e)
if err != nil || n != 2 {
return nil
}
start, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil
}
end, err := time.Parse(time.RFC3339, e)
if err != nil {
return nil
}
return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()}
}
// RunCreation is returned by CreateNextRun.
type RunCreation struct {
Created QueuedRun
// Unix timestamp for when the next run is due.
NextDue int64
// Whether there are any manual runs queued for this task.
// If so, the scheduler should begin executing them after handling real-time tasks.
HasQueue bool
}
// CreateTaskRequest encapsulates state of a new task to be created.
type CreateTaskRequest struct {
// Owner.
Org platform.ID
// Authorization ID to use when executing the task later.
// This is stored directly in the storage layer,
// so it is the caller's responsibility to ensure the user is permitted to access the authorization.
AuthorizationID platform.ID
// Script content of the task.
Script string
// Unix timestamp (seconds elapsed since January 1, 1970 UTC).
// The first run of the task will be run according to the earliest time after ScheduleAfter,
// matching the task's schedule via its cron or every option.
ScheduleAfter int64
// The initial task status.
// If empty, will be treated as DefaultTaskStatus.
Status TaskStatus
}
// UpdateTaskRequest encapsulates requested changes to a task.
type UpdateTaskRequest struct {
// ID of the task.
ID platform.ID
// New script content of the task.
// If empty, do not modify the existing script.
Script string
// The new desired task status.
// If empty, do not modify the existing status.
Status TaskStatus
// The new authorization ID.
// If zero, do not modify the existing authorization ID.
AuthorizationID platform.ID
// These options are for editing options via request. Zeroed options will be ignored.
options.Options
}
// UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it
// It zeros the options in the TaskUpdate.
func (t *UpdateTaskRequest) UpdateFlux(oldFlux string) error {
if t.Options.IsZero() {
return nil
}
tu := platform.TaskUpdate{
Options: t.Options,
Flux: &t.Script,
}
if err := tu.UpdateFlux(oldFlux); err != nil {
return err
}
t.Script = *tu.Flux
t.Options.Clear()
return nil
}
// UpdateTaskResult describes the result of modifying a single task.
// Having the content returned from ModifyTask makes it much simpler for callers
// to decide how to notify on status changes, etc.
type UpdateTaskResult struct {
OldScript string
OldStatus TaskStatus
NewTask StoreTask
NewMeta StoreTaskMeta
}
// Store is the interface around persisted tasks.
type Store interface {
// CreateTask creates a task with from the given CreateTaskRequest.
// If the task is created successfully, the ID of the new task is returned.
CreateTask(ctx context.Context, req CreateTaskRequest) (platform.ID, error)
// UpdateTask updates an existing task.
// It returns an error if there was no task matching the given ID.
// If the returned error is not nil, the returned result should not be inspected.
UpdateTask(ctx context.Context, req UpdateTaskRequest) (UpdateTaskResult, error)
// ListTasks lists the tasks in the store that match the search params.
ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTaskWithMeta, error)
// FindTaskByID returns the task with the given ID.
// If no task matches the ID, the returned task is nil and ErrTaskNotFound is returned.
FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error)
// FindTaskMetaByID returns the metadata about a task.
// If no task meta matches the ID, the returned meta is nil and ErrTaskNotFound is returned.
FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error)
// FindTaskByIDWithMeta combines finding the task and the meta into a single call.
FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error)
// DeleteTask returns whether an entry matching the given ID was deleted.
// If err is non-nil, deleted is false.
// If err is nil, deleted is false if no entry matched the ID,
// or deleted is true if there was a matching entry and it was deleted.
DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error)
// CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now.
// Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run.
CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
FinishRun(ctx context.Context, taskID, runID platform.ID) error
// ManuallyRunTimeRange enqueues a request to run the task with the given ID for all schedules no earlier than start and no later than end (Unix timestamps).
// requestedAt is the Unix timestamp when the request was initiated.
// ManuallyRunTimeRange must delegate to an underlying StoreTaskMeta's ManuallyRunTimeRange method.
ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) (*StoreTaskMetaManualRun, error)
// DeleteOrg deletes the org.
DeleteOrg(ctx context.Context, orgID platform.ID) error
// Close closes the store for usage and cleans up running processes.
Close() error
}
// RunLogBase is the base information for a logs about an individual run.
type RunLogBase struct {
// The parent task that owns the run.
Task *StoreTask
// The ID of the run.
RunID platform.ID
// The Unix timestamp indicating the run's scheduled time.
RunScheduledFor int64
// When the log is requested, should be ignored when it is zero.
RequestedAt int64
}
// LogWriter writes task logs and task state changes to a store.
type LogWriter interface {
// UpdateRunState sets the run state and the respective time.
UpdateRunState(ctx context.Context, base RunLogBase, when time.Time, state RunStatus) error
// AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, base RunLogBase, when time.Time, log string) error
}
// NopLogWriter is a LogWriter that doesn't do anything when its methods are called.
// This is useful for test, but not much else.
type NopLogWriter struct{}
var _ LogWriter = NopLogWriter{}
func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error {
return nil
}
func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) error {
return nil
}
// LogReader reads log information and log data from a store.
type LogReader interface {
// ListRuns returns a list of runs belonging to a task.
// orgID is necessary to look in the correct system bucket.
ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error)
// FindRunByID finds a run given a orgID and runID.
// orgID is necessary to look in the correct system bucket.
FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)
// ListLogs lists logs for a task or a specified run of a task.
// orgID is necessary to look in the correct system bucket.
ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error)
}
// NopLogReader is a LogReader that doesn't do anything when its methods are called.
// This is useful for test, but not much else.
type NopLogReader struct{}
var _ LogReader = NopLogReader{}
func (NopLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
return nil, nil
}
func (NopLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
return nil, nil
}
func (NopLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
return nil, nil
}
// TaskSearchParams is used when searching or listing tasks.
type TaskSearchParams struct {
// Return tasks belonging to this exact organization ID. May be nil.
Org platform.ID
// Return tasks starting after this ID.
After platform.ID
// Size of each page. Must be non-negative.
// If zero, the implementation picks an appropriate default page size.
// Valid page sizes are implementation-dependent.
PageSize int
}
// StoreTask is a stored representation of a Task.
type StoreTask struct {
ID platform.ID
// IDs for the owning organization and user.
Org platform.ID
// The user-supplied name of the Task.
Name string
// The script content of the task.
Script string
}
// StoreTaskWithMeta is a single struct with a StoreTask and a StoreTaskMeta.
type StoreTaskWithMeta struct {
Task StoreTask
Meta StoreTaskMeta
}
// StoreValidator is a package-level StoreValidation, so that you can write
// backend.StoreValidator.CreateArgs(...)
var StoreValidator StoreValidation
// StoreValidation is used for namespacing the store validation methods.
type StoreValidation struct{}
// CreateArgs returns the script's parsed options,
// and an error if any of the provided fields are invalid for creating a task.
func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error) {
var missing []string
var o options.Options
if req.Script == "" {
missing = append(missing, "script")
} else {
var err error
o, err = options.FromScript(req.Script)
if err != nil {
return o, err
}
}
if !req.Org.Valid() {
missing = append(missing, "organization ID")
}
if !req.AuthorizationID.Valid() {
missing = append(missing, "authorization ID")
}
if len(missing) > 0 {
return o, fmt.Errorf("missing required fields to create task: %s", strings.Join(missing, ", "))
}
if err := req.Status.validate(true); err != nil {
return o, err
}
return o, nil
}
// UpdateArgs validates the UpdateTaskRequest.
// If the update only includes a new status (i.e. req.Script is empty), the returned options are zero.
// If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned.
func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error) {
var missing []string
o := req.Options
if req.Script == "" && req.Status == "" && req.Options.IsZero() && !req.AuthorizationID.Valid() {
missing = append(missing, "script or status or options or authorizationID")
}
if req.Script != "" {
err := req.UpdateFlux(req.Script)
if err != nil {
return o, err
}
req.Clear()
o, err = options.FromScript(req.Script)
if err != nil {
return o, err
}
}
if err := req.Status.validate(true); err != nil {
return o, err
}
if !req.ID.Valid() {
missing = append(missing, "task ID")
}
if len(missing) > 0 {
return o, fmt.Errorf("missing required fields to modify task: %s", strings.Join(missing, ", "))
}
return o, nil
}

View File

@ -1,531 +0,0 @@
package storetest
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/task/backend"
platformtesting "github.com/influxdata/influxdb/testing"
)
// MakeNewAuthorizationFunc is a function that creates a new authorization associated with a valid org and user.
// The permissions on the authorization should be allowed to do everything (see influxdb.OperPermissions).
type MakeNewAuthorizationFunc func(context.Context, *testing.T) *platform.Authorization
// CreateRunStoreFunc returns a new LogWriter and LogReader.
// If the writer and reader are associated with a backend that validates authorizations,
// it must return a valid MakeNewAuthorizationFunc; otherwise the returned MakeNewAuthorizationFunc may be nil,
// in which case the tests will use authorizations associated with a random org and user ID.
type CreateRunStoreFunc func(*testing.T) (backend.LogWriter, backend.LogReader, MakeNewAuthorizationFunc)
type DestroyRunStoreFunc func(*testing.T, backend.LogWriter, backend.LogReader)
func NewRunStoreTest(name string, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) func(*testing.T) {
return func(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()
t.Run("UpdateRunState", func(t *testing.T) {
t.Parallel()
updateRunState(t, crf, drf)
})
t.Run("RunLog", func(t *testing.T) {
t.Parallel()
runLogTest(t, crf, drf)
})
t.Run("ListRuns", func(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
t.Parallel()
listRunsTest(t, crf, drf)
})
t.Run("FindRunByID", func(t *testing.T) {
t.Parallel()
findRunByIDTest(t, crf, drf)
})
t.Run("ListLogs", func(t *testing.T) {
t.Parallel()
listLogsTest(t, crf, drf)
})
})
}
}
func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader, makeAuthz := crf(t)
defer drf(t, writer, reader)
now := time.Now().UTC()
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
scheduledFor := now.Add(-3 * time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: scheduledFor.Unix(),
}
ctx := context.Background()
ctx = pcontext.SetAuthorizer(ctx, makeNewAuthorization(ctx, t, makeAuthz))
startAt := now.Add(-2 * time.Second)
if err := writer.UpdateRunState(ctx, rlb, startAt, backend.RunStarted); err != nil {
t.Fatal(err)
}
run.StartedAt = startAt.Format(time.RFC3339Nano)
run.Status = "started"
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
endAt := now.Add(-1 * time.Second)
if err := writer.UpdateRunState(ctx, rlb, endAt, backend.RunSuccess); err != nil {
t.Fatal(err)
}
run.FinishedAt = endAt.Format(time.RFC3339Nano)
run.Status = "success"
returnedRun, err = reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
now = time.Now().UTC()
// create a failed run
scheduledFor2 := now.Add(-2 * time.Second)
run2 := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747574"),
TaskID: task.ID,
Status: "started",
ScheduledFor: scheduledFor2.Format(time.RFC3339),
}
rlb2 := backend.RunLogBase{
Task: task,
RunID: run2.ID,
RunScheduledFor: scheduledFor2.Unix(),
}
startAt2 := now.Add(-1 * time.Second)
if err := writer.UpdateRunState(ctx, rlb2, startAt2, backend.RunStarted); err != nil {
t.Fatal(err)
}
endAt2 := now.Add(-1 * time.Millisecond)
if err := writer.UpdateRunState(ctx, rlb2, endAt2, backend.RunFail); err != nil {
t.Fatal(err)
}
run2.StartedAt = startAt2.Format(time.RFC3339Nano)
run2.FinishedAt = endAt2.Format(time.RFC3339Nano)
run2.Status = "failed"
runs, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{Task: task.ID})
if err != nil {
t.Fatal(err)
}
if len(runs) != 2 {
t.Fatalf("expected 2 runs, got: %d", len(runs))
}
if diff := cmp.Diff(runs, []*platform.Run{&run, &run2}); diff != "" {
for i, r := range runs {
t.Logf("returned run[%d]: %+#v", i, *r)
}
t.Fatalf("unexpected run2 found: -want/+got: %s", diff)
}
}
func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader, makeAuthz := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
sf := time.Now().UTC()
sa := sf.Add(-10 * time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339Nano),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
}
ctx := context.Background()
ctx = pcontext.SetAuthorizer(ctx, makeNewAuthorization(ctx, t, makeAuthz))
if err := writer.UpdateRunState(ctx, rlb, sa, backend.RunStarted); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(time.Second), "first"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(2*time.Second), "second"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(3*time.Second), "third"); err != nil {
t.Fatal(err)
}
run.Log = []platform.Log{
platform.Log{Time: sa.Add(time.Second).Format(time.RFC3339Nano), Message: "first"},
platform.Log{Time: sa.Add(2 * time.Second).Format(time.RFC3339Nano), Message: "second"},
platform.Log{Time: sa.Add(3 * time.Second).Format(time.RFC3339Nano), Message: "third"},
}
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
}
func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader, makeAuthz := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
ctx := context.Background()
ctx = pcontext.SetAuthorizer(ctx, makeNewAuthorization(ctx, t, makeAuthz))
{
r, err := reader.ListRuns(ctx, task.ID, platform.RunFilter{Task: task.ID})
if err != nil {
// TODO(lh): We may get an error here in the future when the system is more aggressive at checking orgID's
t.Fatalf("got an error with bad orgID when we should have returned a empty list: %v", err)
}
if len(r) != 0 {
t.Fatalf("expected 0 runs, got: %d", len(r))
}
}
{
r, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{Task: task.Org})
if err != nil {
t.Fatalf("got an error with bad taskID when we should have returned a empty list: %v", err)
}
if len(r) != 0 {
t.Fatalf("expected 0 runs, got: %d", len(r))
}
}
now := time.Now().UTC()
const nRuns = 150
runs := make([]platform.Run, nRuns)
for i := 0; i < len(runs); i++ {
// Scheduled for times ascending with IDs.
scheduledFor := now.Add(time.Duration(-2*(nRuns-i)) * time.Second)
id := platform.ID(i + 1)
runs[i] = platform.Run{
ID: id,
Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: scheduledFor.Unix(),
}
err := writer.UpdateRunState(ctx, rlb, scheduledFor.Add(time.Second), backend.RunStarted)
if err != nil {
t.Fatal(err)
}
}
if _, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{}); err == nil {
t.Fatal("failed to error with invalid task ID")
}
{
r, err := reader.ListRuns(ctx, 9999999, platform.RunFilter{Task: task.ID})
if err != nil {
t.Fatalf("got an error with a bad orgID when we should have returned a empty list: %v", err)
}
if len(r) != 0 {
t.Fatalf("expected 0 runs, got: %d", len(r))
}
}
listRuns, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs))
}
const afterIDIdx = 20
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
After: &runs[afterIDIdx].ID,
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs)-(afterIDIdx+1) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterIDIdx+1))
}
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
Limit: 30,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != 30 {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), 30)
}
const afterTimeIdx = 34
scheduledFor, _ := time.Parse(time.RFC3339, runs[afterTimeIdx].ScheduledFor)
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
AfterTime: scheduledFor.Format(time.RFC3339),
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs)-(afterTimeIdx+1) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterTimeIdx+1))
}
const beforeTimeIdx = 34
scheduledFor, _ = time.Parse(time.RFC3339, runs[beforeTimeIdx].ScheduledFor)
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
BeforeTime: scheduledFor.Add(time.Millisecond).Format(time.RFC3339),
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != beforeTimeIdx {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), beforeTimeIdx)
}
// add a run and now list again but this time with a requested at
scheduledFor = now.Add(time.Duration(-2*(nRuns-len(runs))) * time.Second)
run := platform.Run{
ID: platform.ID(len(runs) + 1),
Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339),
RequestedAt: scheduledFor.Format(time.RFC3339),
}
runs = append(runs, run)
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: scheduledFor.Unix(),
RequestedAt: scheduledFor.Unix(),
}
if err := writer.UpdateRunState(ctx, rlb, scheduledFor.Add(time.Second), backend.RunStarted); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
Task: task.ID,
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs))
}
}
func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader, makeAuthz := crf(t)
defer drf(t, writer, reader)
if _, err := reader.FindRunByID(context.Background(), platform.InvalidID(), platform.InvalidID()); err == nil {
t.Fatal("failed to error with bad id")
}
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
sf := time.Now().UTC().Add(-10 * time.Second)
sa := sf.Add(time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339Nano),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
}
ctx := context.Background()
ctx = pcontext.SetAuthorizer(ctx, makeNewAuthorization(ctx, t, makeAuthz))
if err := writer.UpdateRunState(ctx, rlb, sa, backend.RunStarted); err != nil {
t.Fatal(err)
}
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(run, *returnedRun) {
t.Fatalf("expected:\n%#v, got: \n%#v", run, *returnedRun)
}
returnedRun.Log = []platform.Log{platform.Log{Message: "cows"}}
rr2, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(returnedRun, rr2) {
t.Fatalf("updateing returned run modified RunStore data")
}
_, err = reader.FindRunByID(ctx, task.Org, 0xccc)
if err != backend.ErrRunNotFound {
t.Fatalf("expected finding run with invalid ID to return %v, got %v", backend.ErrRunNotFound, err)
}
}
func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader, makeAuthz := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
ctx := context.Background()
ctx = pcontext.SetAuthorizer(ctx, makeNewAuthorization(ctx, t, makeAuthz))
if _, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{}); err == nil {
t.Fatalf("expected error when task ID missing, but got nil")
}
r, err := reader.ListLogs(ctx, 9999999, platform.LogFilter{Task: task.ID})
if err != nil {
t.Fatalf("with bad org ID, expected no error: %v", err)
}
if len(r) != 0 {
t.Fatalf("with bad org id expected no runs, got: %d", len(r))
}
now := time.Now().UTC()
const nRuns = 20
runs := make([]platform.Run, nRuns)
for i := 0; i < len(runs); i++ {
sf := now.Add(time.Duration(i-nRuns) * time.Second)
id := platform.ID(i + 1)
runs[i] = platform.Run{
ID: id,
Status: "started",
ScheduledFor: sf.UTC().Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: sf.Unix(),
}
err := writer.UpdateRunState(ctx, rlb, sf.Add(time.Millisecond), backend.RunStarted)
if err != nil {
t.Fatal(err)
}
writer.AddRunLog(ctx, rlb, sf.Add(2*time.Millisecond), fmt.Sprintf("log%d", i))
}
const targetRun = 4
logs, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID, Run: &runs[targetRun].ID})
if err != nil {
t.Fatal(err)
}
if len(logs) != 1 {
t.Fatalf("expected 1 log, got %d", len(logs))
}
fmtTimelog := now.Add(time.Duration(targetRun-nRuns)*time.Second + 2*time.Millisecond).Format(time.RFC3339Nano)
if logs[0].Time != fmtTimelog {
t.Fatalf("expected: %q, got: %q", fmtTimelog, logs[0].Time)
}
if logs[0].Message != "log4" {
t.Fatalf("expected: %q, got: %q", "log4", logs[0].Message)
}
logs, err = reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID})
if err != nil {
t.Fatal(err)
}
if len(logs) != len(runs) {
t.Fatal("not all logs retrieved")
}
}
func makeNewAuthorization(ctx context.Context, t *testing.T, makeAuthz MakeNewAuthorizationFunc) *platform.Authorization {
if makeAuthz != nil {
return makeAuthz(ctx, t)
}
return &platform.Authorization{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
UserID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
OrgID: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
Permissions: platform.OperPermissions(),
}
}

File diff suppressed because it is too large Load Diff

View File

@ -2,10 +2,11 @@ package backend
import (
"context"
"errors"
"fmt"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/options"
)
// TaskControlService is a low-level controller interface, intended to be passed to
@ -34,212 +35,98 @@ type TaskControlService interface {
AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
}
// TaskControlAdaptor creates a TaskControlService for the older TaskStore system.
// TODO(lh): remove task control adaptor when we transition away from Store.
func TaskControlAdaptor(s Store, lw LogWriter, lr LogReader) TaskControlService {
return &taskControlAdaptor{s, lw, lr}
type TaskStatus string
const (
TaskActive TaskStatus = "active"
TaskInactive TaskStatus = "inactive"
DefaultTaskStatus TaskStatus = TaskActive
)
type RunStatus int
const (
RunStarted RunStatus = iota
RunSuccess
RunFail
RunCanceled
RunScheduled
)
func (r RunStatus) String() string {
switch r {
case RunStarted:
return "started"
case RunSuccess:
return "success"
case RunFail:
return "failed"
case RunCanceled:
return "canceled"
case RunScheduled:
return "scheduled"
}
panic(fmt.Sprintf("unknown RunStatus: %d", r))
}
// taskControlAdaptor adapts a Store and log readers and writers to implement the task control service.
type taskControlAdaptor struct {
s Store
lw LogWriter
lr LogReader
var (
// ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally.
ErrRunCanceled = errors.New("run canceled")
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
ErrTaskNotClaimed = errors.New("task not claimed")
// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
ErrNoRunsFound = errors.New("no matching runs found")
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
ErrTaskAlreadyClaimed = errors.New("task already claimed")
)
// RunNotYetDueError is returned from CreateNextRun if a run is not yet due.
type RunNotYetDueError struct {
// DueAt is the unix timestamp of when the next run is due.
DueAt int64
}
func (tcs *taskControlAdaptor) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error) {
return tcs.s.CreateNextRun(ctx, taskID, now)
func (e RunNotYetDueError) Error() string {
return "run not due until " + time.Unix(e.DueAt, 0).UTC().Format(time.RFC3339)
}
func (tcs *taskControlAdaptor) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
// Once we completely switch over to the new system we can look at the returned run in the tests.
task, err := tcs.s.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
tcs.lr.FindRunByID(ctx, task.Org, runID)
return nil, tcs.s.FinishRun(ctx, taskID, runID)
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
func (tcs *taskControlAdaptor) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished"
var rtn = make([]*influxdb.Run, len(m.CurrentlyRunning))
for i, cr := range m.CurrentlyRunning {
rtn[i] = &influxdb.Run{
ID: influxdb.ID(cr.RunID),
TaskID: t.ID,
ScheduledFor: time.Unix(cr.Now, 0).UTC().Format(time.RFC3339),
}
if cr.RequestedAt != 0 {
rtn[i].RequestedAt = time.Unix(cr.RequestedAt, 0).UTC().Format(time.RFC3339)
}
}
return rtn, nil
}
func (tcs *taskControlAdaptor) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
var rtn = make([]*influxdb.Run, len(m.ManualRuns))
for i, cr := range m.ManualRuns {
rtn[i] = &influxdb.Run{
ID: influxdb.ID(cr.RunID),
TaskID: t.ID,
ScheduledFor: time.Unix(cr.Start, 0).UTC().Format(time.RFC3339),
}
if cr.RequestedAt != 0 {
rtn[i].RequestedAt = time.Unix(cr.RequestedAt, 0).Format(time.RFC3339)
}
}
return rtn, nil
}
func (tcs *taskControlAdaptor) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
m, err := tcs.s.FindTaskMetaByID(ctx, taskID)
if err != nil {
return 0, err
}
return m.NextDueRun()
}
func (tcs *taskControlAdaptor) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state RunStatus) error {
st, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return err
}
var (
schedFor, reqAt time.Time
func (e RequestStillQueuedError) Error() string {
return fmt.Sprintf(fmtRequestStillQueued,
time.Unix(e.Start, 0).UTC().Format(time.RFC3339),
time.Unix(e.End, 0).UTC().Format(time.RFC3339),
)
// check the log store
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err == nil && r != nil {
schedFor, err = time.Parse(time.RFC3339, r.ScheduledFor)
if err != nil {
return err
}
if r.RequestedAt != "" {
reqAt, err = time.Parse(time.RFC3339, r.RequestedAt)
if err != nil {
return err
}
}
}
// in the old system the log store may not have the run until after the first
// state update, so we will need to pull the currently running.
if schedFor.IsZero() {
for _, cr := range m.CurrentlyRunning {
if influxdb.ID(cr.RunID) == runID {
schedFor = time.Unix(cr.Now, 0)
if cr.RequestedAt != 0 {
reqAt = time.Unix(cr.RequestedAt, 0)
}
}
}
}
rlb := RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schedFor.Unix(),
}
if !reqAt.IsZero() {
rlb.RequestedAt = reqAt.Unix()
}
if err := tcs.lw.UpdateRunState(ctx, rlb, when, state); err != nil {
return err
}
return nil
}
func (tcs *taskControlAdaptor) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
st, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg.
// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError {
var s, e string
n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e)
if err != nil || n != 2 {
return nil
}
start, err := time.Parse(time.RFC3339, s)
if err != nil {
return err
return nil
}
var (
schedFor, reqAt time.Time
)
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err == nil && r != nil {
schedFor, err = time.Parse(time.RFC3339, r.ScheduledFor)
if err != nil {
return err
}
if r.RequestedAt != "" {
reqAt, err = time.Parse(time.RFC3339, r.RequestedAt)
if err != nil {
return err
}
}
}
// in the old system the log store may not have the run until after the first
// state update, so we will need to pull the currently running.
if schedFor.IsZero() {
for _, cr := range m.CurrentlyRunning {
if influxdb.ID(cr.RunID) == runID {
schedFor = time.Unix(cr.Now, 0)
if cr.RequestedAt != 0 {
reqAt = time.Unix(cr.RequestedAt, 0)
}
}
}
}
rlb := RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schedFor.Unix(),
}
if !reqAt.IsZero() {
rlb.RequestedAt = reqAt.Unix()
}
return tcs.lw.AddRunLog(ctx, rlb, when, log)
}
// ToInfluxTask converts a backend tas and meta to a influxdb.Task
// TODO(lh): remove this when we no longer need the backend store.
func ToInfluxTask(t *StoreTask, m *StoreTaskMeta) (*influxdb.Task, error) {
opts, err := options.FromScript(t.Script)
end, err := time.Parse(time.RFC3339, e)
if err != nil {
return nil, err
return nil
}
pt := &influxdb.Task{
ID: t.ID,
OrganizationID: t.Org,
Name: t.Name,
Flux: t.Script,
Cron: opts.Cron,
AuthorizationID: influxdb.ID(m.AuthorizationID),
}
if !opts.Every.IsZero() {
pt.Every = opts.Every.String()
}
if opts.Offset != nil && !opts.Offset.IsZero() {
pt.Offset = opts.Offset.String()
}
if m != nil {
pt.Status = string(m.Status)
pt.LatestCompleted = time.Unix(m.LatestCompleted, 0).UTC().Format(time.RFC3339)
if m.CreatedAt != 0 {
pt.CreatedAt = time.Unix(m.CreatedAt, 0).UTC().Format(time.RFC3339)
}
if m.UpdatedAt != 0 {
pt.UpdatedAt = time.Unix(m.UpdatedAt, 0).UTC().Format(time.RFC3339)
}
pt.AuthorizationID = influxdb.ID(m.AuthorizationID)
}
return pt, nil
return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()}
}

View File

@ -1,465 +0,0 @@
package task
import (
"context"
"errors"
"fmt"
"time"
platform "github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
)
// RunController wraps the scheduler and exposes only canceling
// of runs.
type RunController interface {
CancelRun(ctx context.Context, taskID, runID platform.ID) error
//TODO: add retry run to this.
}
// PlatformAdapter wraps a task.Store into the platform.TaskService interface.
func PlatformAdapter(s backend.Store, r backend.LogReader, rc RunController, as platform.AuthorizationService, urm platform.UserResourceMappingService, orgSvc platform.OrganizationService) platform.TaskService {
return pAdapter{s: s, r: r, rc: rc, as: as, urm: urm, orgSvc: orgSvc}
}
type pAdapter struct {
s backend.Store
rc RunController
r backend.LogReader
// Needed to look up authorization ID from token during create.
as platform.AuthorizationService
urm platform.UserResourceMappingService
orgSvc platform.OrganizationService
}
var _ platform.TaskService = pAdapter{}
func (p pAdapter) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
t, m, err := p.s.FindTaskByIDWithMeta(ctx, id)
if err != nil {
return nil, err
}
// The store interface specifies that a returned task is nil if the operation succeeded without a match.
if t == nil {
return nil, nil
}
return p.toPlatformTask(ctx, *t, m)
}
func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]*platform.Task, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
params := backend.TaskSearchParams{PageSize: filter.Limit}
org := platform.Organization{
Name: filter.Organization,
}
if filter.OrganizationID != nil {
org.ID = *filter.OrganizationID
}
if filter.Organization != "" {
err := p.populateOrg(ctx, &org)
if err != nil {
return nil, 0, err
}
}
params.Org = org.ID
if filter.User != nil {
ownedTasks, _, err := p.urm.FindUserResourceMappings(
ctx,
platform.UserResourceMappingFilter{
UserID: *filter.User,
UserType: platform.Owner,
ResourceType: platform.TasksResourceType,
},
)
if err != nil {
return nil, 0, err
}
tasks := make([]*platform.Task, 0, len(ownedTasks))
for _, ownedTask := range ownedTasks {
storeTask, meta, err := p.s.FindTaskByIDWithMeta(ctx, ownedTask.ResourceID)
if err != nil {
// It's possible we had an entry in the list a moment ago and it's since been deleted.
if err == backend.ErrTaskNotFound {
// If so, just move on.
continue
}
return nil, 0, err
}
task, err := p.toPlatformTask(ctx, *storeTask, meta)
if err != nil {
return nil, 0, err
}
tasks = append(tasks, task)
}
return tasks, len(tasks), nil
}
if filter.After != nil {
params.After = *filter.After
}
ts, err := p.s.ListTasks(ctx, params)
if err != nil {
return nil, 0, err
}
pts := make([]*platform.Task, len(ts))
for i, t := range ts {
pts[i], err = p.toPlatformTask(ctx, t.Task, &t.Meta)
if err != nil {
return nil, 0, err
}
}
return pts, len(pts), nil
}
func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
auth, err := icontext.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
// validate options before creating the task by parsing and checking
// every/cron/etc.
if _, err := options.FromScript(t.Flux); err != nil {
return nil, err
}
// TODO(mr): decide whether we allow user to configure scheduleAfter. https://github.com/influxdata/influxdb/issues/10884
scheduleAfter := time.Now().Unix()
if t.Status == "" {
t.Status = string(backend.DefaultTaskStatus)
}
org := platform.Organization{ID: t.OrganizationID, Name: t.Organization}
if err := p.populateOrg(ctx, &org); err != nil {
return nil, err
}
req := backend.CreateTaskRequest{
Org: org.ID,
ScheduleAfter: scheduleAfter,
Status: backend.TaskStatus(t.Status),
Script: t.Flux,
}
req.AuthorizationID, err = p.authorizationIDFromToken(ctx, t.Token)
if err != nil {
return nil, err
}
id, err := p.s.CreateTask(ctx, req)
if err != nil {
return nil, err
}
mapping := &platform.UserResourceMapping{
UserID: auth.GetUserID(),
UserType: platform.Owner,
ResourceType: platform.TasksResourceType,
ResourceID: id,
}
if err := p.urm.CreateUserResourceMapping(ctx, mapping); err != nil {
// clean up the task if we fail to map the user and resource
// TODO(lh): Multi step creates could benefit from a service wide transactional request
if derr := p.DeleteTask(ctx, id); derr != nil {
err = fmt.Errorf("%s: failed to clean up task: %s", err.Error(), derr.Error())
}
return nil, err
}
backendTask, meta, err := p.s.FindTaskByIDWithMeta(ctx, id)
if err != nil {
return nil, err
}
return p.toPlatformTask(ctx, *backendTask, meta)
}
func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
err := upd.Validate()
if err != nil {
return nil, err
}
req := backend.UpdateTaskRequest{ID: id}
if upd.Flux != nil {
req.Script = *upd.Flux
}
if upd.Status != nil {
req.Status = backend.TaskStatus(*upd.Status)
}
req.Options = upd.Options
if upd.Token != "" {
req.AuthorizationID, err = p.authorizationIDFromToken(ctx, upd.Token)
if err != nil {
return nil, err
}
}
res, err := p.s.UpdateTask(ctx, req)
if err != nil {
return nil, err
}
if res.NewTask.Script == "" {
return nil, errors.New("script not defined in the store")
}
return p.FindTaskByID(ctx, id)
}
func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
_, err := p.s.DeleteTask(ctx, id)
if err != nil {
return err
}
// TODO(mr): Store.DeleteTask returns false, nil if ID didn't match; do we want to handle that case?
// clean up resource maps for deleted task
urms, _, err := p.urm.FindUserResourceMappings(ctx, platform.UserResourceMappingFilter{
ResourceID: id,
ResourceType: platform.TasksResourceType,
})
if err != nil {
return err
}
for _, m := range urms {
if err := p.urm.DeleteUserResourceMapping(ctx, m.ResourceID, m.UserID); err != nil {
return err
}
}
return nil
}
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
task, err := p.s.FindTaskByID(ctx, filter.Task)
if err != nil {
return nil, 0, err
}
logs, err := p.r.ListLogs(ctx, task.Org, filter)
logPointers := make([]*platform.Log, len(logs))
for i := range logs {
logPointers[i] = &logs[i]
}
return logPointers, len(logs), err
}
func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
task, err := p.s.FindTaskByID(ctx, filter.Task)
if err != nil {
return nil, 0, err
}
runs, err := p.r.ListRuns(ctx, task.Org, filter)
return runs, len(runs), err
}
func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
task, err := p.s.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
return p.r.FindRunByID(ctx, task.Org, id)
}
func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
task, err := p.s.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
run, err := p.r.FindRunByID(ctx, task.Org, id)
if err != nil {
return nil, err
}
if run.Status == backend.RunStarted.String() {
return nil, backend.ErrRunNotFinished
}
scheduledTime, err := time.Parse(time.RFC3339, run.ScheduledFor)
if err != nil {
return nil, err
}
t := scheduledTime.UTC().Unix()
requestedAt := time.Now().Unix()
m, err := p.s.ManuallyRunTimeRange(ctx, run.TaskID, t, t, requestedAt)
if err != nil {
return nil, err
}
return &platform.Run{
ID: platform.ID(m.RunID),
TaskID: run.TaskID,
RequestedAt: time.Unix(requestedAt, 0).Format(time.RFC3339),
Status: backend.RunScheduled.String(),
ScheduledFor: run.ScheduledFor,
}, nil
}
func (p pAdapter) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
requestedAt := time.Now()
m, err := p.s.ManuallyRunTimeRange(ctx, taskID, scheduledFor, scheduledFor, requestedAt.Unix())
if err != nil {
return nil, err
}
return &platform.Run{
ID: platform.ID(m.RunID),
TaskID: taskID,
RequestedAt: requestedAt.UTC().Format(time.RFC3339),
Status: backend.RunScheduled.String(),
ScheduledFor: time.Unix(scheduledFor, 0).UTC().Format(time.RFC3339),
}, nil
}
func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return p.rc.CancelRun(ctx, taskID, runID)
}
var errTokenUnreadable = errors.New("token invalid or unreadable by the current user")
// authorizationIDFromToken looks up the authorization ID from the given token,
// and returns that ID iff the authorizer on the context is allowed to view that authorization.
func (p pAdapter) authorizationIDFromToken(ctx context.Context, token string) (platform.ID, error) {
authorizer, err := icontext.GetAuthorizer(ctx)
if err != nil {
return 0, err
}
if token == "" {
// No explicit token. Use the authorization ID from the context's authorizer.
k := authorizer.Kind()
if k != platform.AuthorizationKind {
return 0, fmt.Errorf("unable to create task using authorization of kind %s", k)
}
return authorizer.Identifier(), nil
}
// Token was explicitly provided. Look it up.
a, err := p.as.FindAuthorizationByToken(ctx, token)
if err != nil {
// TODO(mr): log the actual error.
return 0, errTokenUnreadable
}
// It's a valid token. Is it our token?
if a.GetUserID() != authorizer.GetUserID() {
// The auth token isn't ours. Ensure we're allowed to read it.
p, err := platform.NewPermissionAtID(a.ID, platform.ReadAction, platform.AuthorizationsResourceType, a.OrgID)
if err != nil {
// TODO(mr): log the actual error.
return 0, errTokenUnreadable
}
if !authorizer.Allowed(*p) {
return 0, errTokenUnreadable
}
}
return a.ID, nil
}
func (p *pAdapter) toPlatformTask(ctx context.Context, t backend.StoreTask, m *backend.StoreTaskMeta) (*platform.Task, error) {
opts, err := options.FromScript(t.Script)
if err != nil {
return nil, err
}
org := platform.Organization{ID: t.Org}
if err := p.populateOrg(ctx, &org); err != nil {
return nil, err
}
pt := &platform.Task{
ID: t.ID,
OrganizationID: org.ID,
Organization: org.Name,
Name: t.Name,
Flux: t.Script,
Cron: opts.Cron,
}
if !opts.Every.IsZero() {
pt.Every = opts.Every.String()
}
if opts.Offset != nil && !(*opts.Offset).IsZero() {
pt.Offset = opts.Offset.String()
}
if m != nil {
pt.Status = string(m.Status)
pt.LatestCompleted = time.Unix(m.LatestCompleted, 0).Format(time.RFC3339)
if m.CreatedAt != 0 {
pt.CreatedAt = time.Unix(m.CreatedAt, 0).Format(time.RFC3339)
}
if m.UpdatedAt != 0 {
pt.UpdatedAt = time.Unix(m.UpdatedAt, 0).Format(time.RFC3339)
}
pt.AuthorizationID = platform.ID(m.AuthorizationID)
}
return pt, nil
}
func (p *pAdapter) populateOrg(ctx context.Context, org *platform.Organization) error {
if org.ID.Valid() && org.Name != "" {
return nil
}
if !org.ID.Valid() && org.Name == "" {
return errors.New("missing orgID and organization name")
}
if org.ID.Valid() {
o, err := p.orgSvc.FindOrganizationByID(ctx, org.ID)
if err != nil {
return err
}
org.Name = o.Name
} else {
o, err := p.orgSvc.FindOrganization(ctx, platform.OrganizationFilter{Name: &org.Name})
if err != nil {
return err
}
org.ID = o.ID
}
return nil
}

View File

@ -1,83 +0,0 @@
package task_test
import (
"context"
"io/ioutil"
"os"
"testing"
bolt "github.com/coreos/bbolt"
"github.com/influxdata/influxdb/inmem"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
boltstore "github.com/influxdata/influxdb/task/backend/bolt"
"github.com/influxdata/influxdb/task/mock"
"github.com/influxdata/influxdb/task/servicetest"
)
func inMemFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
st := backend.NewInMemStore()
lrw := backend.NewInMemRunReaderWriter()
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
st.Close()
}()
i := inmem.NewService()
return &servicetest.System{
TaskControlService: backend.TaskControlAdaptor(st, lrw, lrw),
Ctx: ctx,
I: i,
TaskService: servicetest.UsePlatformAdaptor(st, lrw, mock.NewScheduler(), i),
}, cancel
}
func boltFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
lrw := backend.NewInMemRunReaderWriter()
f, err := ioutil.TempFile("", "platform_adapter_test_bolt")
if err != nil {
t.Fatal(err)
}
db, err := bolt.Open(f.Name(), 0644, nil)
if err != nil {
t.Fatal(err)
}
st, err := boltstore.New(db, "testbucket")
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
if err := st.Close(); err != nil {
t.Logf("error closing bolt: %v", err)
}
if err := os.Remove(f.Name()); err != nil {
t.Logf("error removing bolt tempfile: %v", err)
}
}()
i := inmem.NewService()
return &servicetest.System{
TaskControlService: backend.TaskControlAdaptor(st, lrw, lrw),
TaskService: servicetest.UsePlatformAdaptor(st, lrw, mock.NewScheduler(), i),
Ctx: ctx,
I: i,
}, cancel
}
func TestTaskService(t *testing.T) {
t.Run("in-mem", func(t *testing.T) {
t.Parallel()
servicetest.TestTaskService(t, inMemFactory)
})
t.Run("bolt", func(t *testing.T) {
t.Parallel()
servicetest.TestTaskService(t, boltFactory)
})
}

View File

@ -18,8 +18,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/task"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
)
@ -37,11 +35,6 @@ func init() {
// If creating the System fails, the implementer should call t.Fatal.
type BackendComponentFactory func(t *testing.T) (*System, context.CancelFunc)
// UsePlatformAdaptor allows you to set the platform adaptor as your TaskService.
func UsePlatformAdaptor(s backend.Store, lr backend.LogReader, rc task.RunController, i *inmem.Service) influxdb.TaskService {
return task.PlatformAdapter(s, lr, rc, i, i, i)
}
// TestTaskService should be called by consumers of the servicetest package.
// This will call fn once to create a single influxdb.TaskService
// used across all subtests in TestTaskService.