From a2de2cc5c8b275faf00e490a1cad1853bb0169f2 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Mon, 16 Dec 2019 23:08:24 -0500 Subject: [PATCH] fix(kv): Store canceled task runs in the correct bucket. Task runs are stored and retrieved from the `taskRunsv1` bucket, but when they are canceled they are incorrectly placed in the `tasksv1` bucket. Once this has been done, further look ups of the task run fail, because it is located in the wrong bucket. This addresses the problem by placing them back into the `taskRunsv1` bucket. An additional test has been added to ensure we are able to successfully read a canceled run. --- kv/task.go | 2 +- kv/task_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/kv/task.go b/kv/task.go index fe863a94b8..2aa2083bb4 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1051,7 +1051,7 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I run.Status = "canceled" // save - bucket, err := tx.Bucket(taskBucket) + bucket, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } diff --git a/kv/task_test.go b/kv/task_test.go index cadb462a06..c6713a58b8 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -370,3 +370,73 @@ func TestService_UpdateTask_InactiveToActive(t *testing.T) { t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String())) } } + +func TestTaskRunCancellation(t *testing.T) { + store, close, err := NewTestBoltStore(t) + if err != nil { + t.Fatal(err) + } + defer close() + + service := kv.NewService(zaptest.NewLogger(t), store) + ctx, cancelFunc := context.WithCancel(context.Background()) + if err := service.Initialize(ctx); err != nil { + t.Fatalf("error initializing urm service: %v", err) + } + defer cancelFunc() + u := &influxdb.User{Name: t.Name() + "-user"} + if err := service.CreateUser(ctx, u); err != nil { + t.Fatal(err) + } + o := &influxdb.Organization{Name: t.Name() + "-org"} + if err := service.CreateOrganization(ctx, o); err != nil { + t.Fatal(err) + } + + if err := service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ + ResourceType: influxdb.OrgsResourceType, + ResourceID: o.ID, + UserID: u.ID, + UserType: influxdb.Owner, + }); err != nil { + t.Fatal(err) + } + + authz := influxdb.Authorization{ + OrgID: o.ID, + UserID: u.ID, + Permissions: influxdb.OperPermissions(), + } + if err := service.CreateAuthorization(context.Background(), &authz); err != nil { + t.Fatal(err) + } + + ctx = icontext.SetAuthorizer(ctx, &authz) + + task, err := service.CreateTask(ctx, influxdb.TaskCreate{ + Flux: `option task = {name: "a task",cron: "0 * * * *", offset: 20s} from(bucket:"test") |> range(start:-1h)`, + OrganizationID: o.ID, + OwnerID: u.ID, + }) + if err != nil { + t.Fatal(err) + } + + run, err := service.CreateNextRun(ctx, task.ID, time.Now().Add(time.Hour).Unix()) + if err != nil { + t.Fatal(err) + } + + if err := service.CancelRun(ctx, run.Created.TaskID, run.Created.RunID); err != nil { + t.Fatal(err) + } + + canceled, err := service.FindRunByID(ctx, run.Created.TaskID, run.Created.RunID) + if err != nil { + t.Fatal(err) + } + + if canceled.Status != backend.RunCanceled.String() { + t.Fatalf("expected task run to be cancelled") + } +}