2019-04-09 22:52:54 +00:00
|
|
|
package kv_test
|
|
|
|
|
|
|
|
import (
|
2019-08-23 22:52:55 +00:00
|
|
|
"bytes"
|
2019-04-09 22:52:54 +00:00
|
|
|
"context"
|
2019-08-23 22:52:55 +00:00
|
|
|
"encoding/json"
|
2020-06-17 18:30:37 +00:00
|
|
|
"fmt"
|
2019-04-09 22:52:54 +00:00
|
|
|
"testing"
|
2019-08-15 21:28:35 +00:00
|
|
|
"time"
|
2019-04-09 22:52:54 +00:00
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
"github.com/benbjohnson/clock"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
2020-04-03 17:39:20 +00:00
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
|
|
icontext "github.com/influxdata/influxdb/v2/context"
|
|
|
|
"github.com/influxdata/influxdb/v2/kv"
|
|
|
|
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
2020-04-06 17:17:47 +00:00
|
|
|
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
2020-04-03 17:39:20 +00:00
|
|
|
"github.com/influxdata/influxdb/v2/task/servicetest"
|
2019-12-04 23:10:23 +00:00
|
|
|
"go.uber.org/zap/zaptest"
|
2019-04-09 22:52:54 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func TestBoltTaskService(t *testing.T) {
|
|
|
|
servicetest.TestTaskService(
|
|
|
|
t,
|
|
|
|
func(t *testing.T) (*servicetest.System, context.CancelFunc) {
|
2019-12-04 23:10:23 +00:00
|
|
|
store, close, err := NewTestBoltStore(t)
|
2019-04-09 22:52:54 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2020-03-05 20:36:58 +00:00
|
|
|
service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{
|
|
|
|
FluxLanguageService: fluxlang.DefaultService,
|
|
|
|
})
|
2019-04-09 22:52:54 +00:00
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
if err := service.Initialize(ctx); err != nil {
|
|
|
|
t.Fatalf("error initializing urm service: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
<-ctx.Done()
|
|
|
|
close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
return &servicetest.System{
|
|
|
|
TaskControlService: service,
|
|
|
|
TaskService: service,
|
|
|
|
I: service,
|
|
|
|
Ctx: ctx,
|
|
|
|
}, cancelFunc
|
|
|
|
},
|
|
|
|
"transactional",
|
|
|
|
)
|
|
|
|
}
|
2019-08-15 21:28:35 +00:00
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
type testService struct {
|
|
|
|
Store kv.Store
|
|
|
|
Service *kv.Service
|
|
|
|
Org influxdb.Organization
|
|
|
|
User influxdb.User
|
|
|
|
Auth influxdb.Authorization
|
|
|
|
Clock clock.Clock
|
|
|
|
|
|
|
|
storeCloseFn func()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *testService) Close() {
|
|
|
|
s.storeCloseFn()
|
|
|
|
}
|
|
|
|
|
|
|
|
func newService(t *testing.T, ctx context.Context, c clock.Clock) *testService {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
if c == nil {
|
|
|
|
c = clock.New()
|
|
|
|
}
|
|
|
|
|
|
|
|
ts := &testService{}
|
|
|
|
var err error
|
|
|
|
ts.Store, ts.storeCloseFn, err = NewTestInmemStore(t)
|
2019-08-23 22:52:55 +00:00
|
|
|
if err != nil {
|
2019-12-06 03:55:26 +00:00
|
|
|
t.Fatal("failed to create InmemStore", err)
|
2019-08-23 22:52:55 +00:00
|
|
|
}
|
|
|
|
|
2020-03-05 20:36:58 +00:00
|
|
|
ts.Service = kv.NewService(zaptest.NewLogger(t), ts.Store, kv.ServiceConfig{
|
|
|
|
Clock: c,
|
|
|
|
FluxLanguageService: fluxlang.DefaultService,
|
|
|
|
})
|
2019-12-06 03:55:26 +00:00
|
|
|
err = ts.Service.Initialize(ctx)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal("Service.Initialize", err)
|
2019-08-23 22:52:55 +00:00
|
|
|
}
|
2019-12-06 03:55:26 +00:00
|
|
|
|
|
|
|
ts.User = influxdb.User{Name: t.Name() + "-user"}
|
|
|
|
if err := ts.Service.CreateUser(ctx, &ts.User); err != nil {
|
2019-08-23 22:52:55 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2019-12-06 03:55:26 +00:00
|
|
|
ts.Org = influxdb.Organization{Name: t.Name() + "-org"}
|
|
|
|
if err := ts.Service.CreateOrganization(ctx, &ts.Org); err != nil {
|
2019-08-23 22:52:55 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
if err := ts.Service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
|
2019-08-23 22:52:55 +00:00
|
|
|
ResourceType: influxdb.OrgsResourceType,
|
2019-12-06 03:55:26 +00:00
|
|
|
ResourceID: ts.Org.ID,
|
|
|
|
UserID: ts.User.ID,
|
2019-08-23 22:52:55 +00:00
|
|
|
UserType: influxdb.Owner,
|
|
|
|
}); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
ts.Auth = influxdb.Authorization{
|
|
|
|
OrgID: ts.Org.ID,
|
|
|
|
UserID: ts.User.ID,
|
2019-08-23 22:52:55 +00:00
|
|
|
Permissions: influxdb.OperPermissions(),
|
|
|
|
}
|
2019-12-06 03:55:26 +00:00
|
|
|
if err := ts.Service.CreateAuthorization(context.Background(), &ts.Auth); err != nil {
|
2019-08-23 22:52:55 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
return ts
|
|
|
|
}
|
2019-08-23 22:52:55 +00:00
|
|
|
|
2019-12-06 03:55:26 +00:00
|
|
|
func TestRetrieveTaskWithBadAuth(t *testing.T) {
|
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
defer cancelFunc()
|
|
|
|
|
|
|
|
ts := newService(t, ctx, nil)
|
|
|
|
defer ts.Close()
|
|
|
|
|
|
|
|
ctx = icontext.SetAuthorizer(ctx, &ts.Auth)
|
|
|
|
|
|
|
|
task, err := ts.Service.CreateTask(ctx, influxdb.TaskCreate{
|
2019-08-23 22:52:55 +00:00
|
|
|
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
|
2019-12-06 03:55:26 +00:00
|
|
|
OrganizationID: ts.Org.ID,
|
|
|
|
OwnerID: ts.User.ID,
|
2020-03-06 22:19:32 +00:00
|
|
|
Status: string(influxdb.TaskActive),
|
2019-08-23 22:52:55 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// convert task to old one with a bad auth
|
2019-12-06 03:55:26 +00:00
|
|
|
err = ts.Store.Update(ctx, func(tx kv.Tx) error {
|
2019-08-23 22:52:55 +00:00
|
|
|
b, err := tx.Bucket([]byte("tasksv1"))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
bID, err := task.ID.Encode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
task.OwnerID = influxdb.ID(1)
|
|
|
|
task.AuthorizationID = influxdb.ID(132) // bad id or an id that doesnt match any auth
|
|
|
|
tbyte, err := json.Marshal(task)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// have to actually hack the bytes here because the system doesnt like us to encode bad id's.
|
|
|
|
tbyte = bytes.Replace(tbyte, []byte(`,"ownerID":"0000000000000001"`), []byte{}, 1)
|
|
|
|
if err := b.Put(bID, tbyte); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// lets see if we can list and find the task
|
2019-12-06 03:55:26 +00:00
|
|
|
newTask, err := ts.Service.FindTaskByID(ctx, task.ID)
|
2019-08-23 22:52:55 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if newTask.ID != task.ID {
|
|
|
|
t.Fatal("miss matching taskID's")
|
|
|
|
}
|
|
|
|
|
2020-06-17 14:20:05 +00:00
|
|
|
tasks, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{})
|
2019-08-23 22:52:55 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if len(tasks) != 1 {
|
|
|
|
t.Fatal("failed to return task")
|
|
|
|
}
|
2019-12-02 22:16:10 +00:00
|
|
|
|
|
|
|
// test status filter
|
2020-03-06 22:19:32 +00:00
|
|
|
active := string(influxdb.TaskActive)
|
2020-06-17 14:20:05 +00:00
|
|
|
tasksWithActiveFilter, _, err := ts.Service.FindTasks(context.Background(), influxdb.TaskFilter{Status: &active})
|
2019-12-02 22:16:10 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal("could not find tasks")
|
|
|
|
}
|
|
|
|
if len(tasksWithActiveFilter) != 1 {
|
|
|
|
t.Fatal("failed to find active task with filter")
|
|
|
|
}
|
2019-08-23 22:52:55 +00:00
|
|
|
}
|
2019-12-06 03:55:26 +00:00
|
|
|
|
|
|
|
func TestService_UpdateTask_InactiveToActive(t *testing.T) {
|
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
defer cancelFunc()
|
|
|
|
|
|
|
|
c := clock.NewMock()
|
|
|
|
c.Set(time.Unix(1000, 0))
|
|
|
|
|
|
|
|
ts := newService(t, ctx, c)
|
|
|
|
defer ts.Close()
|
|
|
|
|
|
|
|
ctx = icontext.SetAuthorizer(ctx, &ts.Auth)
|
|
|
|
|
|
|
|
originalTask, err := ts.Service.CreateTask(ctx, influxdb.TaskCreate{
|
|
|
|
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
|
|
|
|
OrganizationID: ts.Org.ID,
|
|
|
|
OwnerID: ts.User.ID,
|
2020-03-06 22:19:32 +00:00
|
|
|
Status: string(influxdb.TaskActive),
|
2019-12-06 03:55:26 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal("CreateTask", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
v := influxdb.TaskStatusInactive
|
|
|
|
c.Add(1 * time.Second)
|
|
|
|
exp := c.Now()
|
|
|
|
updatedTask, err := ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{Status: &v, LatestCompleted: &exp, LatestScheduled: &exp})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal("UpdateTask", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if got := updatedTask.LatestScheduled; !got.Equal(exp) {
|
|
|
|
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
|
|
|
|
}
|
|
|
|
if got := updatedTask.LatestCompleted; !got.Equal(exp) {
|
|
|
|
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
|
|
|
|
}
|
|
|
|
|
|
|
|
c.Add(10 * time.Second)
|
|
|
|
exp = c.Now()
|
|
|
|
v = influxdb.TaskStatusActive
|
|
|
|
updatedTask, err = ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{Status: &v})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal("UpdateTask", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if got := updatedTask.LatestScheduled; !got.Equal(exp) {
|
|
|
|
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
|
|
|
|
}
|
|
|
|
}
|
2019-12-17 04:08:24 +00:00
|
|
|
|
|
|
|
func TestTaskRunCancellation(t *testing.T) {
|
|
|
|
store, close, err := NewTestBoltStore(t)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
defer close()
|
|
|
|
|
2020-03-05 20:36:58 +00:00
|
|
|
service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{
|
|
|
|
FluxLanguageService: fluxlang.DefaultService,
|
|
|
|
})
|
2019-12-17 04:08:24 +00:00
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
if err := service.Initialize(ctx); err != nil {
|
|
|
|
t.Fatalf("error initializing urm service: %v", err)
|
|
|
|
}
|
|
|
|
defer cancelFunc()
|
|
|
|
u := &influxdb.User{Name: t.Name() + "-user"}
|
|
|
|
if err := service.CreateUser(ctx, u); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
o := &influxdb.Organization{Name: t.Name() + "-org"}
|
|
|
|
if err := service.CreateOrganization(ctx, o); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
|
|
|
|
ResourceType: influxdb.OrgsResourceType,
|
|
|
|
ResourceID: o.ID,
|
|
|
|
UserID: u.ID,
|
|
|
|
UserType: influxdb.Owner,
|
|
|
|
}); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
authz := influxdb.Authorization{
|
|
|
|
OrgID: o.ID,
|
|
|
|
UserID: u.ID,
|
|
|
|
Permissions: influxdb.OperPermissions(),
|
|
|
|
}
|
|
|
|
if err := service.CreateAuthorization(context.Background(), &authz); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx = icontext.SetAuthorizer(ctx, &authz)
|
|
|
|
|
|
|
|
task, err := service.CreateTask(ctx, influxdb.TaskCreate{
|
|
|
|
Flux: `option task = {name: "a task",cron: "0 * * * *", offset: 20s} from(bucket:"test") |> range(start:-1h)`,
|
|
|
|
OrganizationID: o.ID,
|
|
|
|
OwnerID: u.ID,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2020-01-02 19:41:21 +00:00
|
|
|
run, err := service.CreateRun(ctx, task.ID, time.Now().Add(time.Hour), time.Now().Add(time.Hour))
|
2019-12-17 04:08:24 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2020-01-02 19:41:21 +00:00
|
|
|
if err := service.CancelRun(ctx, run.TaskID, run.ID); err != nil {
|
2019-12-17 04:08:24 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2020-01-02 19:41:21 +00:00
|
|
|
canceled, err := service.FindRunByID(ctx, run.TaskID, run.ID)
|
2019-12-17 04:08:24 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2020-03-06 22:19:32 +00:00
|
|
|
if canceled.Status != influxdb.RunCanceled.String() {
|
2019-12-17 04:08:24 +00:00
|
|
|
t.Fatalf("expected task run to be cancelled")
|
|
|
|
}
|
|
|
|
}
|
2020-06-17 18:30:37 +00:00
|
|
|
|
|
|
|
func TestTaskMigrate(t *testing.T) {
|
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
defer cancelFunc()
|
|
|
|
|
|
|
|
ts := newService(t, ctx, nil)
|
|
|
|
defer ts.Close()
|
|
|
|
|
|
|
|
id := "05da585043e02000"
|
|
|
|
// create a task that has auth set and no ownerID
|
|
|
|
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
|
|
|
|
b, err := tx.Bucket([]byte("tasksv1"))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"05d3ae3492c9c000","org":"whos","authorizationID":"%s","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Auth.ID.String())
|
|
|
|
err = b.Put([]byte(id), []byte(taskBody))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
idType, _ := influxdb.IDFromString(id)
|
|
|
|
task, err := ts.Service.FindTaskByID(context.Background(), *idType)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if task.OwnerID != ts.User.ID {
|
|
|
|
t.Fatal("failed to fill in ownerID")
|
|
|
|
}
|
|
|
|
|
|
|
|
// create a task that has no auth or owner id but a urm exists
|
|
|
|
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
|
|
|
|
b, err := tx.Bucket([]byte("tasksv1"))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"%s","org":"whos","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Org.ID.String())
|
|
|
|
err = b.Put([]byte(id), []byte(taskBody))
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
task, err = ts.Service.FindTaskByID(context.Background(), *idType)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if task.OwnerID != ts.User.ID {
|
|
|
|
t.Fatal("failed to fill in ownerID")
|
|
|
|
}
|
|
|
|
}
|