feat: migrate task records to always have an owner id (#18513)

* feat: migrate task records to always have an owner id

Co-authored-by: Alirie Gray <alirie.gray@gmail.com>
Co-authored-by: George <me@georgemac.com>
pull/18571/head
Lyon Hill 2020-06-17 12:30:37 -06:00 committed by GitHub
parent 285502a8ae
commit c9177b8913
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 222 additions and 0 deletions

View File

@ -104,6 +104,14 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
),
// add index user resource mappings by user id
s.urmByUserIndex.Migration(),
NewAnonymousMigration(
"migrate task owner id",
s.TaskOwnerIDUpMigration,
func(context.Context, Store) error {
return nil
},
),
// and new migrations below here (and move this comment down):
)

View File

@ -1777,3 +1777,145 @@ func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
}
func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error {
var ownerlessTasks []*influxdb.Task
// loop through the tasks and collect a set of tasks that are missing the owner id.
err := store.View(ctx, func(tx Tx) error {
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
c, err := taskBucket.ForwardCursor([]byte{})
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if !t.OwnerID.Valid() {
ownerlessTasks = append(ownerlessTasks, t)
}
}
if err := c.Err(); err != nil {
return err
}
return c.Close()
})
if err != nil {
return err
}
// loop through tasks
for _, t := range ownerlessTasks {
// open transaction
err := store.Update(ctx, func(tx Tx) error {
taskKey, err := taskKey(t.ID)
if err != nil {
return err
}
b, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
if !t.OwnerID.Valid() {
v, err := b.Get(taskKey)
if IsNotFound(err) {
return influxdb.ErrTaskNotFound
}
authType := struct {
AuthorizationID influxdb.ID `json:"authorizationID"`
}{}
if err := json.Unmarshal(v, &authType); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
// try populating the owner from auth
encodedID, err := authType.AuthorizationID.Encode()
if err == nil {
authBucket, err := tx.Bucket([]byte("authorizationsv1"))
if err != nil {
return err
}
a, err := authBucket.Get(encodedID)
if err == nil {
auth := &influxdb.Authorization{}
if err := json.Unmarshal(a, auth); err != nil {
return err
}
t.OwnerID = auth.GetUserID()
}
}
}
// try populating owner from urm
if !t.OwnerID.Valid() {
b, err := tx.Bucket([]byte("userresourcemappingsv1"))
if err != nil {
return err
}
id, err := t.OrganizationID.Encode()
if err != nil {
return err
}
cur, err := b.ForwardCursor(id, WithCursorPrefix(id))
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return err
}
if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner {
t.OwnerID = m.UserID
break
}
}
if err := cur.Close(); err != nil {
return err
}
}
// if population fails return error
if !t.OwnerID.Valid() {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "could not populate owner ID for task",
}
}
// save task
taskBytes, err := json.Marshal(t)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
err = b.Put(taskKey, taskBytes)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
})
if err != nil {
return err
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"testing"
"time"
@ -315,3 +316,74 @@ func TestTaskRunCancellation(t *testing.T) {
t.Fatalf("expected task run to be cancelled")
}
}
func TestTaskMigrate(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ts := newService(t, ctx, nil)
defer ts.Close()
id := "05da585043e02000"
// create a task that has auth set and no ownerID
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"05d3ae3492c9c000","org":"whos","authorizationID":"%s","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Auth.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
idType, _ := influxdb.IDFromString(id)
task, err := ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
// create a task that has no auth or owner id but a urm exists
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"%s","org":"whos","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Org.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
task, err = ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
}