feat(task): store authorization ID with task

Accept token when creating or updating a task, but only report back the
authorization ID.

This means the executor and the platform adapter are now both aware of
an Authorization Service.
pull/11899/head
Mark Rushakoff 2019-02-06 17:34:54 -08:00 committed by Mark Rushakoff
parent dcfab169c6
commit caf08b5078
18 changed files with 478 additions and 203 deletions

View File

@ -5,6 +5,9 @@ import (
"fmt"
)
// AuthorizationKind is returned by (*Authorization).Kind().
const AuthorizationKind = "authorization"
var (
// ErrUnableToCreateToken sanitized error message for all errors when a user cannot create a token
ErrUnableToCreateToken = &Error{
@ -64,7 +67,7 @@ func (a *Authorization) GetUserID() ID {
}
// Kind returns session and is used for auditing.
func (a *Authorization) Kind() string { return "authorization" }
func (a *Authorization) Kind() string { return AuthorizationKind }
// Identifier returns the authorizations ID and is used for auditing.
func (a *Authorization) Identifier() ID { return a.ID }

View File

@ -402,7 +402,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler, authSvc)
taskSvc = task.NewValidator(taskSvc, bucketSvc)
m.taskStore = boltStore
}

View File

@ -3734,7 +3734,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/Task"
$ref: "#/components/schemas/TaskUpdateRequest"
default:
description: unexpected error
content:
@ -5021,6 +5021,9 @@ components:
- inactive
labels:
$ref: "#/components/schemas/Labels"
authorizationID:
description: The ID of the authorization used when this task communicates with the query engine.
type: string
flux:
description: The Flux script to run for this task.
type: string
@ -6921,6 +6924,7 @@ components:
- u
- ns
TaskCreateRequest:
type: object
properties:
orgID:
description: The ID of the organization that owns this Task.
@ -6938,4 +6942,35 @@ components:
flux:
description: The Flux script to run for this task.
type: string
required: [flux]
token:
description: The token to use for authenticating this task when it executes queries. If omitted, uses the token associated with the request that creates the task.
type: string
required: [flux]
TaskUpdateRequest:
type: object
properties:
status:
description: Starting state of the task. 'inactive' tasks are not run until they are updated to 'active'
default: active
type: string
enum:
- active
- inactive
flux:
description: The Flux script to run for this task.
type: string
name:
description: Override the 'name' option in the flux script.
type: string
every:
description: Override the 'every' option in the flux script.
type: string
cron:
description: Override the 'cron' option in the flux script.
type: string
offset:
description: Override the 'offset' option in the flux script.
type: string
token:
description: Override the existing token associated with the task.
type: string

View File

@ -20,12 +20,13 @@ import (
platformtesting "github.com/influxdata/influxdb/testing"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
// NewMockTaskBackend returns a TaskBackend with mock services.
func NewMockTaskBackend() *TaskBackend {
func NewMockTaskBackend(t *testing.T) *TaskBackend {
return &TaskBackend{
Logger: zap.NewNop().With(zap.String("handler", "task")),
Logger: zaptest.NewLogger(t).With(zap.String("handler", "task")),
AuthorizationService: mock.NewAuthorizationService(),
TaskService: &mock.TaskService{},
@ -74,16 +75,18 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
FindTasksFn: func(ctx context.Context, f platform.TaskFilter) ([]*platform.Task, int, error) {
tasks := []*platform.Task{
{
ID: 1,
Name: "task1",
OrganizationID: 1,
Owner: platform.User{ID: 1, Name: "user1"},
ID: 1,
Name: "task1",
OrganizationID: 1,
Owner: platform.User{ID: 1, Name: "user1"},
AuthorizationID: 0x100,
},
{
ID: 2,
Name: "task2",
OrganizationID: 2,
Owner: platform.User{ID: 2, Name: "user2"},
ID: 2,
Name: "task2",
OrganizationID: 2,
Owner: platform.User{ID: 2, Name: "user2"},
AuthorizationID: 0x200,
},
}
return tasks, len(tasks), nil
@ -136,6 +139,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
"orgID": "0000000000000001",
"org": "test",
"status": "",
"authorizationID": "0000000000000100",
"flux": ""
},
{
@ -161,6 +165,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
"orgID": "0000000000000002",
"org": "test",
"status": "",
"authorizationID": "0000000000000200",
"flux": ""
}
]
@ -174,7 +179,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
r := httptest.NewRequest("GET", "http://any.url", nil)
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
taskBackend.TaskService = tt.fields.taskService
taskBackend.LabelService = tt.fields.labelService
h := NewTaskHandler(taskBackend)
@ -199,7 +204,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
func TestTaskHandler_handlePostTasks(t *testing.T) {
type args struct {
task platform.Task
taskCreate platform.TaskCreate
}
type fields struct {
taskService platform.TaskService
@ -219,24 +224,23 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
{
name: "create task",
args: args{
task: platform.Task{
Name: "task1",
taskCreate: platform.TaskCreate{
OrganizationID: 1,
Owner: platform.User{
ID: 1,
Name: "user1",
},
Token: "mytoken",
Flux: "abc",
},
},
fields: fields{
taskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
return &platform.Task{
ID: 1,
Name: "task1",
OrganizationID: 1,
Organization: "test",
Owner: platform.User{ID: 1, Name: "user1"},
ID: 1,
Name: "task1",
OrganizationID: 1,
Organization: "test",
Owner: platform.User{ID: 1, Name: "user1"},
AuthorizationID: 0x100,
Flux: "abc",
}, nil
},
},
@ -260,7 +264,8 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
"orgID": "0000000000000001",
"org": "test",
"status": "",
"flux": ""
"authorizationID": "0000000000000100",
"flux": "abc"
}
`,
},
@ -269,7 +274,7 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b, err := json.Marshal(tt.args.task)
b, err := json.Marshal(tt.args.taskCreate)
if err != nil {
t.Fatalf("failed to unmarshal task: %v", err)
}
@ -280,7 +285,7 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
taskBackend.TaskService = tt.fields.taskService
h := NewTaskHandler(taskBackend)
h.handlePostTask(w, r)
@ -385,7 +390,7 @@ func TestTaskHandler_handleGetRun(t *testing.T) {
},
}))
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
taskBackend.TaskService = tt.fields.taskService
h := NewTaskHandler(taskBackend)
h.handleGetRun(w, r)
@ -494,7 +499,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) {
},
}))
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
taskBackend.TaskService = tt.fields.taskService
h := NewTaskHandler(taskBackend)
h.handleGetRuns(w, r)
@ -520,7 +525,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
// Ensure that the HTTP handlers return 404s for missing resources, and OKs for matching.
im := inmem.NewService()
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
h := NewTaskHandler(taskBackend)
h.UserResourceMappingService = im
h.LabelService = im
@ -782,8 +787,9 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
func TestTaskUserResourceMap(t *testing.T) {
task := platform.Task{
Name: "task1",
OrganizationID: 1,
Name: "task1",
OrganizationID: 1,
AuthorizationID: 0x100,
}
b, err := json.Marshal(task)
@ -813,7 +819,7 @@ func TestTaskUserResourceMap(t *testing.T) {
},
}
taskBackend := NewMockTaskBackend()
taskBackend := NewMockTaskBackend(t)
taskBackend.UserResourceMappingService = urms
h := NewTaskHandler(taskBackend)
taskID := platform.ID(1)
@ -907,7 +913,7 @@ func TestService_handlePostTaskLabel(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
taskBE := NewMockTaskBackend()
taskBE := NewMockTaskBackend(t)
taskBE.LabelService = tt.fields.LabelService
h := NewTaskHandler(taskBE)

View File

@ -22,15 +22,18 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
ctx, cancel := context.WithCancel(context.Background())
backingTS := task.PlatformAdapter(store, rrw, sch)
i := inmem.NewService()
backingTS := task.PlatformAdapter(store, rrw, sch, i)
h := http.NewAuthenticationHandler()
h.AuthorizationService = i
th := http.NewTaskHandler(http.NewMockTaskBackend())
th := http.NewTaskHandler(http.NewMockTaskBackend(t))
th.TaskService = backingTS
th.AuthorizationService = i
th.OrganizationService = i
th.UserService = i
th.UserResourceMappingService = i
h.Handler = th
org := &platform.Organization{Name: t.Name() + "_org"}
@ -59,14 +62,20 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
}
}
cFunc := func() (o, u platform.ID, tok string, err error) {
return org.ID, user.ID, auth.Token, nil
cFunc := func() (servicetest.TestCreds, error) {
return servicetest.TestCreds{
OrgID: org.ID,
UserID: user.ID,
AuthorizationID: auth.ID,
Token: auth.Token,
}, nil
}
return &servicetest.System{
S: store,
LR: rrw,
LW: rrw,
I: i,
Ctx: ctx,
TaskServiceFunc: tsFunc,
CredsFunc: cFunc,

13
task.go
View File

@ -23,6 +23,7 @@ type Task struct {
ID ID `json:"id,omitempty"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"authorizationID"`
Name string `json:"name"`
Status string `json:"status"`
Owner User `json:"-"`
@ -95,6 +96,7 @@ type TaskCreate struct {
Status string `json:"status,omitempty"`
OrganizationID ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"`
Token string `json:"token,omitempty"`
}
// TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.
@ -103,6 +105,9 @@ type TaskUpdate struct {
Status *string `json:"status,omitempty"`
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
// Optional token override.
Token string `json:"token,omitempty"`
}
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
@ -126,6 +131,8 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
Concurrency int64 `json:"concurrency,omitempty"`
Retry int64 `json:"retry,omitempty"`
Token string `json:"token,omitempty"`
}{}
if err := json.Unmarshal(data, &jo); err != nil {
@ -139,6 +146,7 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
t.Options.Retry = jo.Retry
t.Flux = jo.Flux
t.Status = jo.Status
t.Token = jo.Token
return nil
}
@ -161,6 +169,8 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
Concurrency int64 `json:"concurrency,omitempty"`
Retry int64 `json:"retry,omitempty"`
Token string `json:"token,omitempty"`
}{}
jo.Name = t.Options.Name
jo.Cron = t.Options.Cron
@ -170,6 +180,7 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
jo.Retry = t.Options.Retry
jo.Flux = t.Flux
jo.Status = t.Status
jo.Token = t.Token
return json.Marshal(jo)
}
@ -177,7 +188,7 @@ func (t TaskUpdate) Validate() error {
switch {
case t.Options.Every != 0 && t.Options.Cron != "":
return errors.New("cannot specify both every and cron")
case t.Flux == nil && t.Status == nil && t.Options.IsZero():
case t.Flux == nil && t.Status == nil && t.Options.IsZero() && t.Token == "":
return errors.New("cannot update task without content")
}
return nil

View File

@ -255,9 +255,13 @@ func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (
}
stm.UpdatedAt = time.Now().Unix()
res.OldStatus = backend.TaskStatus(stm.Status)
if req.Status != "" {
stm.Status = string(req.Status)
}
if req.AuthorizationID.Valid() {
stm.AuthorizationID = uint64(req.AuthorizationID)
}
stmBytes, err = stm.Marshal()
if err != nil {
return err

View File

@ -37,7 +37,8 @@ func TestCoordinator(t *testing.T) {
orgID := platformtesting.MustIDBase16("69746f7175650d0a")
usrID := platformtesting.MustIDBase16("6c61757320657420")
id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, Script: script})
authzID := platform.ID(123456)
id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -69,7 +70,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal("task sent to scheduler doesnt match task created")
}
id, err = coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, Script: script})
id, err = coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -141,7 +142,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
coord := coordinator.New(zaptest.NewLogger(t), sched, st)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script})
if err != nil {
t.Fatal(err)
}
@ -170,7 +171,7 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) {
createdIDs := make([]platform.ID, numTasks)
for i := 0; i < numTasks; i++ {
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script})
if err != nil {
t.Fatal(err)
}

View File

@ -283,12 +283,13 @@ from(bucket: "one") |> http.to(url: "http://example.com")`
func testExecutorQuerySuccess(t *testing.T, fn createSysFn) {
var orgID = platformtesting.MustIDBase16("aaaaaaaaaaaaaaaa")
var userID = platformtesting.MustIDBase16("baaaaaaaaaaaaaab")
var authzID = platformtesting.MustIDBase16("caaaaaaaaaaaaaac")
sys := fn()
t.Run(sys.name+"/QuerySuccess", func(t *testing.T) {
t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -338,11 +339,12 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) {
func testExecutorQueryFailure(t *testing.T, fn createSysFn) {
var orgID = platformtesting.MustIDBase16("aaaaaaaaaaaaaaaa")
var userID = platformtesting.MustIDBase16("baaaaaaaaaaaaaab")
var authzID = platformtesting.MustIDBase16("caaaaaaaaaaaaaac")
sys := fn()
t.Run(sys.name+"/QueryFail", func(t *testing.T) {
t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -368,11 +370,12 @@ func testExecutorQueryFailure(t *testing.T, fn createSysFn) {
func testExecutorPromiseCancel(t *testing.T, fn createSysFn) {
var orgID = platformtesting.MustIDBase16("aaaaaaaaaaaaaaaa")
var userID = platformtesting.MustIDBase16("baaaaaaaaaaaaaab")
var authzID = platformtesting.MustIDBase16("caaaaaaaaaaaaaac")
sys := fn()
t.Run(sys.name+"/PromiseCancel", func(t *testing.T) {
t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -397,11 +400,12 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) {
func testExecutorServiceError(t *testing.T, fn createSysFn) {
var orgID = platformtesting.MustIDBase16("aaaaaaaaaaaaaaaa")
var userID = platformtesting.MustIDBase16("baaaaaaaaaaaaaab")
var authzID = platformtesting.MustIDBase16("caaaaaaaaaaaaaac")
sys := fn()
t.Run(sys.name+"/ServiceError", func(t *testing.T) {
t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -438,6 +442,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
var orgID = platformtesting.MustIDBase16("aaaaaaaaaaaaaaaa")
var userID = platformtesting.MustIDBase16("baaaaaaaaaaaaaab")
var authzID = platformtesting.MustIDBase16("caaaaaaaaaaaaaac")
// Other executor tests create a single sys and share it among subtests.
// For this set of tests, we are testing Wait, which does not allow calling Execute concurrently,
@ -470,7 +475,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
defer ctxCancel()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -509,7 +514,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
ctx := context.Background()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -549,7 +554,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
ctx := context.Background()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -589,7 +594,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
ctx := context.Background()
script := fmt.Sprintf(fmtTestScript, t.Name())
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, Script: script})
tid, err := sys.st.CreateTask(ctx, backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: script})
if err != nil {
t.Fatal(err)
}

View File

@ -115,9 +115,14 @@ func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTask
if req.Status != "" {
// Changing the status.
stm.Status = string(req.Status)
}
if req.AuthorizationID.Valid() {
stm.AuthorizationID = uint64(req.AuthorizationID)
}
s.meta[req.ID] = stm
res.NewMeta = stm
return res, nil
}

View File

@ -21,6 +21,7 @@ func NewStoreTaskMeta(req CreateTaskRequest, o options.Options) StoreTaskMeta {
CreatedAt: time.Now().Unix(),
EffectiveCron: o.EffectiveCronString(),
Offset: int32(o.Offset / time.Second),
AuthorizationID: uint64(req.AuthorizationID),
}
if stm.Status == "" {

View File

@ -35,17 +35,19 @@ type StoreTaskMeta struct {
// effective_cron is the effective cron string as reported by the task's options.
EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"`
// Task's configured delay, in seconds.
Offset int32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"`
CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns,proto3" json:"manual_runs,omitempty"`
Offset int32 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"`
CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
UpdatedAt int64 `protobuf:"varint,8,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"`
// The Authorization ID associated with the task.
AuthorizationID uint64 `protobuf:"varint,9,opt,name=authorization_id,json=authorizationId,proto3" json:"authorization_id,omitempty"`
ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns,proto3" json:"manual_runs,omitempty"`
}
func (m *StoreTaskMeta) Reset() { *m = StoreTaskMeta{} }
func (m *StoreTaskMeta) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMeta) ProtoMessage() {}
func (*StoreTaskMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_301ed152bff7072d, []int{0}
return fileDescriptor_meta_841ef32afee093f0, []int{0}
}
func (m *StoreTaskMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -130,6 +132,13 @@ func (m *StoreTaskMeta) GetUpdatedAt() int64 {
return 0
}
func (m *StoreTaskMeta) GetAuthorizationID() uint64 {
if m != nil {
return m.AuthorizationID
}
return 0
}
func (m *StoreTaskMeta) GetManualRuns() []*StoreTaskMetaManualRun {
if m != nil {
return m.ManualRuns
@ -155,7 +164,7 @@ func (m *StoreTaskMetaRun) Reset() { *m = StoreTaskMetaRun{} }
func (m *StoreTaskMetaRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaRun) ProtoMessage() {}
func (*StoreTaskMetaRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_301ed152bff7072d, []int{1}
return fileDescriptor_meta_841ef32afee093f0, []int{1}
}
func (m *StoreTaskMetaRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -245,7 +254,7 @@ func (m *StoreTaskMetaManualRun) Reset() { *m = StoreTaskMetaManualRun{}
func (m *StoreTaskMetaManualRun) String() string { return proto.CompactTextString(m) }
func (*StoreTaskMetaManualRun) ProtoMessage() {}
func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int) {
return fileDescriptor_meta_301ed152bff7072d, []int{2}
return fileDescriptor_meta_841ef32afee093f0, []int{2}
}
func (m *StoreTaskMetaManualRun) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -378,6 +387,11 @@ func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintMeta(dAtA, i, uint64(m.UpdatedAt))
}
if m.AuthorizationID != 0 {
dAtA[i] = 0x48
i++
i = encodeVarintMeta(dAtA, i, uint64(m.AuthorizationID))
}
if len(m.ManualRuns) > 0 {
for _, msg := range m.ManualRuns {
dAtA[i] = 0x82
@ -530,6 +544,9 @@ func (m *StoreTaskMeta) Size() (n int) {
if m.UpdatedAt != 0 {
n += 1 + sovMeta(uint64(m.UpdatedAt))
}
if m.AuthorizationID != 0 {
n += 1 + sovMeta(uint64(m.AuthorizationID))
}
if len(m.ManualRuns) > 0 {
for _, e := range m.ManualRuns {
l = e.Size()
@ -816,6 +833,25 @@ func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error {
break
}
}
case 9:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field AuthorizationID", wireType)
}
m.AuthorizationID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMeta
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.AuthorizationID |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 16:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ManualRuns", wireType)
@ -1282,40 +1318,42 @@ var (
ErrIntOverflowMeta = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_301ed152bff7072d) }
func init() { proto.RegisterFile("meta.proto", fileDescriptor_meta_841ef32afee093f0) }
var fileDescriptor_meta_301ed152bff7072d = []byte{
// 510 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0x86, 0x63, 0x1c, 0xa7, 0xcd, 0x84, 0xb4, 0x61, 0x55, 0x55, 0x16, 0x08, 0x37, 0x8d, 0x40,
0x84, 0x8b, 0x91, 0x40, 0xe2, 0xc4, 0x25, 0x0d, 0x1c, 0x7a, 0xe8, 0x65, 0xcb, 0x09, 0x09, 0x59,
0x5b, 0x7b, 0x1d, 0x45, 0xb1, 0x77, 0xc3, 0x7a, 0x16, 0x92, 0xb7, 0xe0, 0x51, 0xb8, 0x22, 0xf1,
0x00, 0x1c, 0x7b, 0xe4, 0x84, 0x50, 0xf2, 0x22, 0x68, 0x77, 0x9d, 0x00, 0x25, 0x07, 0xc4, 0x6d,
0xf6, 0x1b, 0xef, 0xec, 0xfc, 0xff, 0x8c, 0x01, 0x4a, 0x8e, 0x2c, 0x9e, 0x2b, 0x89, 0x92, 0x3c,
0x48, 0x65, 0x19, 0x4f, 0x45, 0x5e, 0xe8, 0x45, 0xc6, 0x0c, 0x2d, 0x18, 0xe6, 0x52, 0x95, 0x31,
0xb2, 0x6a, 0x16, 0x5f, 0xb1, 0x74, 0xc6, 0x45, 0x76, 0xf7, 0x68, 0x22, 0x27, 0xd2, 0x5e, 0x78,
0x62, 0x22, 0x77, 0x77, 0xf0, 0xc5, 0x87, 0xee, 0x25, 0x4a, 0xc5, 0x5f, 0xb3, 0x6a, 0x76, 0xc1,
0x91, 0x91, 0x47, 0x70, 0x58, 0xb2, 0x45, 0x92, 0x4a, 0x91, 0x6a, 0xa5, 0xb8, 0x48, 0x97, 0xa1,
0xd7, 0xf7, 0x86, 0x01, 0x3d, 0x28, 0xd9, 0x62, 0xfc, 0x8b, 0x92, 0xc7, 0xd0, 0x2b, 0x18, 0xf2,
0x0a, 0x93, 0x54, 0x96, 0xf3, 0x82, 0x23, 0xcf, 0xc2, 0x5b, 0x7d, 0x6f, 0xe8, 0xd3, 0x43, 0xc7,
0xc7, 0x1b, 0x4c, 0x8e, 0xa1, 0x55, 0x21, 0x43, 0x5d, 0x85, 0x7e, 0xdf, 0x1b, 0xb6, 0x69, 0x7d,
0x22, 0x29, 0xdc, 0x71, 0xe5, 0xb0, 0x58, 0x26, 0x4a, 0x0b, 0x31, 0x15, 0x93, 0xb0, 0xd9, 0xf7,
0x87, 0x9d, 0xa7, 0xcf, 0xe3, 0x7f, 0x51, 0x15, 0xff, 0xd1, 0x3b, 0xd5, 0x82, 0xf6, 0xb6, 0x05,
0xa9, 0xab, 0x47, 0x1e, 0xc2, 0x01, 0xcf, 0x73, 0x9e, 0xe2, 0xf4, 0x3d, 0x4f, 0x52, 0x25, 0x45,
0x18, 0xd8, 0x26, 0xba, 0x5b, 0x3a, 0x56, 0x52, 0x98, 0x1e, 0x65, 0x9e, 0x57, 0x1c, 0xc3, 0x96,
0x95, 0x5b, 0x9f, 0xc8, 0x7d, 0x80, 0x54, 0x71, 0x86, 0x3c, 0x4b, 0x18, 0x86, 0x7b, 0x56, 0x60,
0xbb, 0x26, 0x23, 0x9b, 0xd6, 0xf3, 0x6c, 0x93, 0xde, 0x77, 0xe9, 0x9a, 0x8c, 0x90, 0xbc, 0x85,
0x4e, 0xc9, 0x84, 0x66, 0x85, 0x91, 0x57, 0x85, 0x3d, 0xab, 0xed, 0xc5, 0x7f, 0x68, 0xbb, 0xb0,
0x55, 0x8c, 0x42, 0x28, 0x37, 0x61, 0x35, 0xf8, 0xec, 0x41, 0xef, 0xa6, 0x05, 0xa4, 0x07, 0xbe,
0x90, 0x1f, 0xec, 0xd4, 0x7c, 0x6a, 0x42, 0x43, 0x50, 0x2d, 0xed, 0x74, 0xba, 0xd4, 0x84, 0xa4,
0x0f, 0x2d, 0xa5, 0x45, 0x32, 0xcd, 0xec, 0x44, 0x9a, 0x67, 0xed, 0xd5, 0xf7, 0x93, 0x80, 0x6a,
0x71, 0xfe, 0x92, 0x06, 0x4a, 0x8b, 0xf3, 0x8c, 0x9c, 0x40, 0x47, 0x31, 0x31, 0xe1, 0x49, 0x85,
0x4c, 0x61, 0xd8, 0xb4, 0xd5, 0xc0, 0xa2, 0x4b, 0x43, 0xc8, 0x3d, 0x68, 0xbb, 0x0f, 0xb8, 0xc8,
0xac, 0xa5, 0x3e, 0xdd, 0xb7, 0xe0, 0x95, 0xc8, 0xc8, 0x29, 0xdc, 0x56, 0xfc, 0x9d, 0xe6, 0x55,
0x6d, 0x4c, 0xcb, 0xe6, 0x3b, 0x5b, 0x36, 0xc2, 0xc1, 0x27, 0x0f, 0x8e, 0x77, 0x4b, 0x24, 0x47,
0x10, 0xb8, 0x57, 0x9d, 0x06, 0x77, 0x30, 0x2a, 0xcc, 0x53, 0x6e, 0xc7, 0x4c, 0xb8, 0x73, 0x05,
0xfd, 0xdd, 0x2b, 0x78, 0xb3, 0xa1, 0xe6, 0x5f, 0x0d, 0xfd, 0xe6, 0x49, 0xb0, 0xdb, 0x93, 0xb3,
0xd3, 0xaf, 0xab, 0xc8, 0xbb, 0x5e, 0x45, 0xde, 0x8f, 0x55, 0xe4, 0x7d, 0x5c, 0x47, 0x8d, 0xeb,
0x75, 0xd4, 0xf8, 0xb6, 0x8e, 0x1a, 0x6f, 0xf6, 0xea, 0xa1, 0x5d, 0xb5, 0xec, 0x7f, 0xf5, 0xec,
0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb0, 0xe6, 0x7b, 0x45, 0xa1, 0x03, 0x00, 0x00,
var fileDescriptor_meta_841ef32afee093f0 = []byte{
// 543 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x41, 0x6f, 0xd3, 0x30,
0x14, 0xc7, 0x1b, 0xd2, 0x74, 0xab, 0x4b, 0xd7, 0x60, 0xa6, 0x29, 0x02, 0x91, 0x66, 0x15, 0x88,
0x72, 0x09, 0x12, 0x48, 0x9c, 0x10, 0x52, 0x57, 0x38, 0xec, 0xb0, 0x8b, 0xc7, 0x09, 0x09, 0x45,
0x5e, 0xe2, 0x94, 0xa8, 0x89, 0x5d, 0x9c, 0x67, 0x68, 0xf9, 0x14, 0x7c, 0x14, 0xae, 0x7c, 0x03,
0x8e, 0x3b, 0x72, 0x9a, 0x50, 0xfb, 0x35, 0x38, 0x20, 0x3b, 0x69, 0xd9, 0x46, 0x0f, 0x68, 0xb7,
0xe7, 0xdf, 0x8b, 0x9f, 0xdf, 0xff, 0xff, 0x5e, 0x10, 0x2a, 0x18, 0xd0, 0x70, 0x26, 0x05, 0x08,
0xfc, 0x30, 0x16, 0x45, 0x98, 0xf1, 0x34, 0x57, 0xf3, 0x84, 0x6a, 0x9a, 0x53, 0x48, 0x85, 0x2c,
0x42, 0xa0, 0xe5, 0x34, 0x3c, 0xa3, 0xf1, 0x94, 0xf1, 0xe4, 0xde, 0xfe, 0x44, 0x4c, 0x84, 0xb9,
0xf0, 0x54, 0x47, 0xd5, 0xdd, 0xc1, 0x6f, 0x1b, 0x75, 0x4f, 0x41, 0x48, 0xf6, 0x96, 0x96, 0xd3,
0x13, 0x06, 0x14, 0x3f, 0x46, 0xbd, 0x82, 0xce, 0xa3, 0x58, 0xf0, 0x58, 0x49, 0xc9, 0x78, 0xbc,
0xf0, 0xac, 0xc0, 0x1a, 0x3a, 0x64, 0xaf, 0xa0, 0xf3, 0xf1, 0x5f, 0x8a, 0x9f, 0x20, 0x37, 0xa7,
0xc0, 0x4a, 0x88, 0x62, 0x51, 0xcc, 0x72, 0x06, 0x2c, 0xf1, 0x6e, 0x05, 0xd6, 0xd0, 0x26, 0xbd,
0x8a, 0x8f, 0xd7, 0x18, 0x1f, 0xa0, 0x56, 0x09, 0x14, 0x54, 0xe9, 0xd9, 0x81, 0x35, 0x6c, 0x93,
0xfa, 0x84, 0x63, 0x74, 0xa7, 0x2a, 0x07, 0xf9, 0x22, 0x92, 0x8a, 0xf3, 0x8c, 0x4f, 0xbc, 0x66,
0x60, 0x0f, 0x3b, 0xcf, 0x5e, 0x84, 0xff, 0xa3, 0x2a, 0xbc, 0xd2, 0x3b, 0x51, 0x9c, 0xb8, 0x9b,
0x82, 0xa4, 0xaa, 0x87, 0x1f, 0xa1, 0x3d, 0x96, 0xa6, 0x2c, 0x86, 0xec, 0x13, 0x8b, 0x62, 0x29,
0xb8, 0xe7, 0x98, 0x26, 0xba, 0x1b, 0x3a, 0x96, 0x82, 0xeb, 0x1e, 0x45, 0x9a, 0x96, 0x0c, 0xbc,
0x96, 0x91, 0x5b, 0x9f, 0xf0, 0x03, 0x84, 0x62, 0xc9, 0x28, 0xb0, 0x24, 0xa2, 0xe0, 0xed, 0x18,
0x81, 0xed, 0x9a, 0x8c, 0x4c, 0x5a, 0xcd, 0x92, 0x75, 0x7a, 0xb7, 0x4a, 0xd7, 0x64, 0x04, 0xf8,
0x15, 0x72, 0xa9, 0x82, 0x0f, 0x42, 0x66, 0x5f, 0x28, 0x64, 0x82, 0x47, 0x59, 0xe2, 0xb5, 0x03,
0x6b, 0xd8, 0x3c, 0xba, 0xbb, 0xbc, 0xe8, 0xf7, 0x46, 0x97, 0x73, 0xc7, 0xaf, 0x49, 0xef, 0xca,
0xc7, 0xc7, 0x09, 0x7e, 0x8f, 0x3a, 0x05, 0xe5, 0x8a, 0xe6, 0xda, 0x9e, 0xd2, 0x73, 0x8d, 0x37,
0x2f, 0x6f, 0xe0, 0xcd, 0x89, 0xa9, 0xa2, 0x1d, 0x42, 0xc5, 0x3a, 0x2c, 0x07, 0xdf, 0x2d, 0xe4,
0x5e, 0xb7, 0x10, 0xbb, 0xc8, 0xe6, 0xe2, 0xb3, 0x99, 0xba, 0x4d, 0x74, 0xa8, 0x09, 0xc8, 0x85,
0x99, 0x6e, 0x97, 0xe8, 0x10, 0x07, 0xa8, 0x25, 0x95, 0x51, 0x63, 0x1b, 0x35, 0xed, 0xe5, 0x45,
0xdf, 0x21, 0x4a, 0x6b, 0x70, 0xa4, 0xd2, 0x9d, 0xf7, 0x51, 0x47, 0x52, 0x3e, 0x61, 0x51, 0x09,
0x54, 0x82, 0xd7, 0x34, 0xd5, 0x90, 0x41, 0xa7, 0x9a, 0xe0, 0xfb, 0xa8, 0x5d, 0x7d, 0xc0, 0x78,
0x62, 0x46, 0x62, 0x93, 0x5d, 0x03, 0xde, 0xf0, 0x04, 0x1f, 0xa2, 0xdb, 0x92, 0x7d, 0x54, 0xac,
0xac, 0x8d, 0x6d, 0x99, 0x7c, 0x67, 0xc3, 0x46, 0x30, 0xf8, 0x66, 0xa1, 0x83, 0xed, 0x12, 0xf1,
0x3e, 0x72, 0xaa, 0x57, 0x2b, 0x0d, 0xd5, 0x41, 0xab, 0xd0, 0x4f, 0x55, 0x3b, 0xaa, 0xc3, 0xad,
0x2b, 0x6c, 0x6f, 0x5f, 0xe1, 0xeb, 0x0d, 0x35, 0xff, 0x69, 0xe8, 0x92, 0x27, 0xce, 0x76, 0x4f,
0x8e, 0x0e, 0x7f, 0x2c, 0x7d, 0xeb, 0x7c, 0xe9, 0x5b, 0xbf, 0x96, 0xbe, 0xf5, 0x75, 0xe5, 0x37,
0xce, 0x57, 0x7e, 0xe3, 0xe7, 0xca, 0x6f, 0xbc, 0xdb, 0xa9, 0x87, 0x76, 0xd6, 0x32, 0xff, 0xe5,
0xf3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x74, 0xb0, 0xab, 0x79, 0xe1, 0x03, 0x00, 0x00,
}

View File

@ -30,6 +30,9 @@ message StoreTaskMeta {
int64 created_at = 7;
int64 updated_at = 8;
// The Authorization ID associated with the task.
uint64 authorization_id = 9 [(gogoproto.customname) = "AuthorizationID"];
// Fields below here are less likely to be present, so we're counting from 16 in order to
// use the 1-byte-encodable values where we can be more sure they're present.

View File

@ -150,12 +150,17 @@ type CreateTaskRequest struct {
// Owners.
Org, User platform.ID
// Authorization ID to use when executing the task later.
// This is stored directly in the storage layer,
// so it is the caller's responsibility to ensure the user is permitted to access the authorization.
AuthorizationID platform.ID
// Script content of the task.
Script string
// Unix timestamp (seconds elapsed since January 1, 1970 UTC).
// The first run of the task will be run according to the earliest time after ScheduleAfter,
// matching the task's schedul via its cron or every option.
// matching the task's schedule via its cron or every option.
ScheduleAfter int64
// The initial task status.
@ -176,6 +181,10 @@ type UpdateTaskRequest struct {
// If empty, do not modify the existing status.
Status TaskStatus
// The new authorization ID.
// If zero, do not modify the existing authorization ID.
AuthorizationID platform.ID
// These options are for editing options via request. Zeroed options will be ignored.
options.Options
}
@ -426,6 +435,9 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error
if !req.User.Valid() {
missing = append(missing, "user ID")
}
if !req.AuthorizationID.Valid() {
missing = append(missing, "authorization ID")
}
if len(missing) > 0 {
return o, fmt.Errorf("missing required fields to create task: %s", strings.Join(missing, ", "))
@ -444,8 +456,8 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error
func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error) {
var missing []string
o := req.Options
if req.Script == "" && req.Status == "" && req.Options.IsZero() {
missing = append(missing, "script or status or options")
if req.Script == "" && req.Status == "" && req.Options.IsZero() && !req.AuthorizationID.Valid() {
missing = append(missing, "script or status or options or authorizationID")
}
if req.Script != "" {
err := req.UpdateFlux(req.Script)

View File

@ -85,12 +85,14 @@ from(bucket:"test") |> range(start:-1h)`
org, user platform.ID
script string
status backend.TaskStatus
omitAuthz bool
noerr bool
}{
{caseName: "happy path", org: platform.ID(1), user: platform.ID(2), script: script, noerr: true},
{caseName: "missing org", org: platform.ID(0), user: platform.ID(2), script: script},
{caseName: "missing user", org: platform.ID(1), user: platform.ID(0), script: script},
{caseName: "missing name", org: platform.ID(1), user: platform.ID(2), script: scriptNoName},
{caseName: "missing authz", org: platform.ID(1), user: platform.ID(2), script: script, omitAuthz: true},
{caseName: "missing script", org: platform.ID(1), user: platform.ID(2), script: ""},
{caseName: "repeated name and org", org: platform.ID(1), user: platform.ID(3), script: script, noerr: true},
{caseName: "repeated name and user", org: platform.ID(3), user: platform.ID(2), script: script, noerr: true},
@ -106,6 +108,10 @@ from(bucket:"test") |> range(start:-1h)`
Script: args.script,
Status: args.status,
}
if !args.omitAuthz {
req.AuthorizationID = 54321
}
_, err := s.CreateTask(context.Background(), req)
if args.noerr && err != nil {
t.Fatalf("expected no err but got %v", err)
@ -140,7 +146,9 @@ from(bucket:"y") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
const origAuthzID = 3
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: origAuthzID, Script: script})
if err != nil {
t.Fatal(err)
}
@ -164,6 +172,9 @@ from(bucket:"y") |> range(start:-1h)`
// but in this case we need to be certain of the initial status so we can toggle it correctly.
t.Fatalf("expected task to be created active, got %q", meta.Status)
}
if meta.AuthorizationID != origAuthzID {
t.Fatalf("expected same authorization ID, got %v", meta.AuthorizationID)
}
// Modify just the status.
res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive})
@ -218,6 +229,17 @@ from(bucket:"y") |> range(start:-1h)`
if meta.Status != string(backend.TaskInactive) {
t.Fatalf("expected task status to be inactive, got %q", meta.Status)
}
// Modify only the authorization ID.
const newAuthzID = 987654
res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, AuthorizationID: newAuthzID})
if err != nil {
t.Fatal(err)
}
meta = res.NewMeta
if meta.AuthorizationID != newAuthzID {
t.Fatalf("expected authz ID to update to %v; got %v", newAuthzID, meta.AuthorizationID)
}
})
for _, args := range []struct {
@ -241,11 +263,11 @@ from(bucket:"y") |> range(start:-1h)`
t.Run("name repetition", func(t *testing.T) {
s := create(t)
defer destroy(t, s)
id1, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
id1, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script})
if err != nil {
t.Fatal(err)
}
_, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script2})
_, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script2})
if err != nil {
t.Fatal(err)
}
@ -268,8 +290,9 @@ from(bucket:"test") |> range(start:-1h)`
orgID := platform.ID(1)
userID := platform.ID(2)
authzID := platform.ID(3)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: fmt.Sprintf(scriptFmt, 0)})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: fmt.Sprintf(scriptFmt, 0)})
if err != nil {
t.Fatal(err)
}
@ -326,7 +349,7 @@ from(bucket:"test") |> range(start:-1h)`
t.Fatalf("expected no results for bad user ID, got %d result(s)", len(ts))
}
newID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: fmt.Sprintf(scriptFmt, 1), Status: backend.TaskInactive})
newID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: fmt.Sprintf(scriptFmt, 1), Status: backend.TaskInactive})
if err != nil {
t.Fatal(err)
}
@ -362,6 +385,7 @@ from(bucket:"test") |> range(start:-1h)`
orgID := platform.ID(1)
userID := platform.ID(2)
authzID := platform.ID(3)
type createdTask struct {
id platform.ID
@ -379,7 +403,7 @@ from(bucket:"test") |> range(start:-1h)`
tasks[i].name = fmt.Sprintf("my_bucket_%d", i)
tasks[i].script = fmt.Sprintf(script, i, i)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, Script: tasks[i].script})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: tasks[i].script})
if err != nil {
t.Fatalf("failed to create task %d: %v", i, err)
}
@ -455,8 +479,9 @@ from(bucket:"test") |> range(start:-1h)`
org := platform.ID(1)
user := platform.ID(2)
authz := platform.ID(3)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, AuthorizationID: authz, Script: script})
if err != nil {
t.Fatal(err)
}
@ -513,8 +538,9 @@ from(bucket:"test") |> range(start:-1h)`
org := platform.ID(1)
user := platform.ID(2)
authz := platform.ID(3)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script, ScheduleAfter: 6000})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, AuthorizationID: authz, Script: script, ScheduleAfter: 6000})
if err != nil {
t.Fatal(err)
}
@ -587,7 +613,7 @@ from(bucket:"test") |> range(start:-1h)`
defer destroy(t, s)
for _, st := range []backend.TaskStatus{backend.TaskActive, backend.TaskInactive} {
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script, Status: st})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script, Status: st})
if err != nil {
t.Fatal(err)
}
@ -629,7 +655,7 @@ from(bucket: "b") |> range(start:-1h) |> http.to(url: "http://example.com")`
{e: "2d", sa: (2 * secondsPerDay) + 1, lc: 2 * secondsPerDay},
} {
script := fmt.Sprintf(scriptFmt, tc.e)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script, ScheduleAfter: tc.sa})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script, ScheduleAfter: tc.sa})
if err != nil {
t.Fatal(err)
}
@ -660,8 +686,9 @@ from(bucket:"test") |> range(start:-1h)`
org := platform.ID(1)
user := platform.ID(2)
authz := platform.ID(3)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script, ScheduleAfter: 6000})
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, AuthorizationID: authz, Script: script, ScheduleAfter: 6000})
if err != nil {
t.Fatal(err)
}
@ -725,8 +752,8 @@ from(bucket:"test") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
org, user := idGen.ID(), idGen.ID()
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script})
org, user, authz := idGen.ID(), idGen.ID(), idGen.ID()
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, AuthorizationID: authz, Script: script})
if err != nil {
t.Fatal(err)
}
@ -757,7 +784,7 @@ from(bucket:"test") |> range(start:-1h)`
}
// It's safe to reuse the same name, for the same org with a new user, after deleting the original.
id, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: idGen.ID(), Script: script})
id, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: idGen.ID(), AuthorizationID: idGen.ID(), Script: script})
if err != nil {
t.Fatalf("Error when reusing task name that was previously deleted: %v", err)
}
@ -766,7 +793,7 @@ from(bucket:"test") |> range(start:-1h)`
}
// Reuse the same name, for the original user but a new org.
if _, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: idGen.ID(), User: user, Script: script}); err != nil {
if _, err = s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: idGen.ID(), User: user, AuthorizationID: idGen.ID(), Script: script}); err != nil {
t.Fatalf("Error when reusing task name that was previously deleted: %v", err)
}
})
@ -786,7 +813,7 @@ from(bucket:"test") |> range(start:-1h)`
defer destroy(t, s)
t.Run("no queue", func(t *testing.T) {
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script, ScheduleAfter: 30})
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script, ScheduleAfter: 30})
if err != nil {
t.Fatal(err)
}
@ -843,7 +870,7 @@ from(bucket:"test") |> range(start:-1h)`
}
from(bucket:"test") |> range(start:-1h)`
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 5, User: 6, Script: script, ScheduleAfter: 2999})
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 5, User: 6, AuthorizationID: 7, Script: script, ScheduleAfter: 2999})
if err != nil {
t.Fatal(err)
}
@ -933,7 +960,7 @@ from(bucket:"test") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
task, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
task, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script})
if err != nil {
t.Fatal(err)
}
@ -962,7 +989,7 @@ from(bucket:"test") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
taskID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, AuthorizationID: 3, Script: script})
if err != nil {
t.Fatal(err)
}

View File

@ -3,6 +3,7 @@ package task
import (
"context"
"errors"
"fmt"
"time"
platform "github.com/influxdata/influxdb"
@ -17,14 +18,17 @@ type RunController interface {
}
// PlatformAdapter wraps a task.Store into the platform.TaskService interface.
func PlatformAdapter(s backend.Store, r backend.LogReader, rc RunController) platform.TaskService {
return pAdapter{s: s, r: r, rc: rc}
func PlatformAdapter(s backend.Store, r backend.LogReader, rc RunController, as platform.AuthorizationService) platform.TaskService {
return pAdapter{s: s, r: r, rc: rc, as: as}
}
type pAdapter struct {
s backend.Store
rc RunController
r backend.LogReader
// Needed to look up authorization ID from token during create.
as platform.AuthorizationService
}
var _ platform.TaskService = pAdapter{}
@ -90,10 +94,14 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
req := backend.CreateTaskRequest{
Org: t.OrganizationID,
User: auth.GetUserID(),
Script: t.Flux,
ScheduleAfter: scheduleAfter,
Status: backend.TaskStatus(t.Status),
Script: t.Flux,
User: auth.GetUserID(),
}
req.AuthorizationID, err = p.authorizationIDFromToken(ctx, t.Token)
if err != nil {
return nil, err
}
id, err := p.s.CreateTask(ctx, req)
@ -101,13 +109,14 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
return nil, err
}
task := &platform.Task{
ID: id,
Flux: t.Flux,
Cron: opts.Cron,
Name: opts.Name,
OrganizationID: t.OrganizationID,
Owner: platform.User{ID: req.User},
Status: t.Status,
ID: id,
Flux: t.Flux,
Cron: opts.Cron,
Name: opts.Name,
OrganizationID: t.OrganizationID,
Owner: platform.User{ID: req.User},
Status: t.Status,
AuthorizationID: req.AuthorizationID,
}
if opts.Every != 0 {
@ -121,7 +130,8 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
}
func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
if err := upd.Validate(); err != nil {
err := upd.Validate()
if err != nil {
return nil, err
}
req := backend.UpdateTaskRequest{ID: id}
@ -132,6 +142,12 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
req.Status = backend.TaskStatus(*upd.Status)
}
req.Options = upd.Options
req.AuthorizationID, err = p.authorizationIDFromToken(ctx, upd.Token)
if err != nil {
return nil, err
}
res, err := p.s.UpdateTask(ctx, req)
if err != nil {
return nil, err
@ -223,6 +239,49 @@ func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) erro
return p.rc.CancelRun(ctx, taskID, runID)
}
var errTokenUnreadable = errors.New("token invalid or unreadable by the current user")
// authorizationIDFromToken looks up the authorization ID from the given token,
// and returns that ID iff the authorizer on the context is allowed to view that authorization.
func (p pAdapter) authorizationIDFromToken(ctx context.Context, token string) (platform.ID, error) {
authorizer, err := icontext.GetAuthorizer(ctx)
if err != nil {
return 0, err
}
if token == "" {
// No explicit token. Use the authorization ID from the context's authorizer.
k := authorizer.Kind()
if k != platform.AuthorizationKind {
return 0, fmt.Errorf("unable to create task using authorization of kind %s", k)
}
return authorizer.Identifier(), nil
}
// Token was explicitly provided. Look it up.
a, err := p.as.FindAuthorizationByToken(ctx, token)
if err != nil {
// TODO(mr): log the actual error.
return 0, errTokenUnreadable
}
// It's a valid token. Is it our token?
if a.GetUserID() != authorizer.GetUserID() {
// The auth token isn't ours. Ensure we're allowed to read it.
p, err := platform.NewPermissionAtID(a.ID, platform.ReadAction, platform.AuthorizationsResourceType, a.OrgID)
if err != nil {
// TODO(mr): log the actual error.
return 0, errTokenUnreadable
}
if !authorizer.Allowed(*p) {
return 0, errTokenUnreadable
}
}
return a.ID, nil
}
func toPlatformTask(t backend.StoreTask, m *backend.StoreTaskMeta) (*platform.Task, error) {
opts, err := options.FromScript(t.Script)
if err != nil {
@ -255,6 +314,7 @@ func toPlatformTask(t backend.StoreTask, m *backend.StoreTaskMeta) (*platform.Ta
if m.UpdatedAt != 0 {
pt.UpdatedAt = time.Unix(m.UpdatedAt, 0).Format(time.RFC3339)
}
pt.AuthorizationID = platform.ID(m.AuthorizationID)
}
return pt, nil
}

View File

@ -7,6 +7,7 @@ import (
"testing"
bolt "github.com/coreos/bbolt"
"github.com/influxdata/influxdb/inmem"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
boltstore "github.com/influxdata/influxdb/task/backend/bolt"
@ -23,7 +24,7 @@ func inMemFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
st.Close()
}()
return &servicetest.System{S: st, LR: lrw, LW: lrw, Ctx: ctx}, cancel
return &servicetest.System{S: st, LR: lrw, LW: lrw, Ctx: ctx, I: inmem.NewService()}, cancel
}
func boltFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
@ -53,7 +54,7 @@ func boltFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
}
}()
return &servicetest.System{S: st, LR: lrw, LW: lrw, Ctx: ctx}, cancel
return &servicetest.System{S: st, LR: lrw, LW: lrw, Ctx: ctx, I: inmem.NewService()}, cancel
}
func TestTaskService(t *testing.T) {

View File

@ -17,7 +17,7 @@ import (
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/task"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
@ -43,7 +43,7 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory) {
sys, cancel := fn(t)
defer cancel()
if sys.TaskServiceFunc == nil {
sys.ts = task.PlatformAdapter(sys.S, sys.LR, sys.Sch)
sys.ts = task.PlatformAdapter(sys.S, sys.LR, sys.Sch, sys.I)
} else {
sys.ts = sys.TaskServiceFunc()
}
@ -80,6 +80,21 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory) {
})
}
// TestCreds encapsulates credentials needed for a system to properly work with tasks.
type TestCreds struct {
OrgID, UserID, AuthorizationID platform.ID
Token string
}
func (tc TestCreds) Authorizer() platform.Authorizer {
return &platform.Authorization{
ID: tc.AuthorizationID,
OrgID: tc.OrgID,
UserID: tc.UserID,
Token: tc.Token,
}
}
// System, as in "system under test", encapsulates the required parts of a platform.TaskAdapter
// (the underlying Store, LogReader, and LogWriter) for low-level operations.
type System struct {
@ -88,6 +103,9 @@ type System struct {
LW backend.LogWriter
Sch backend.Scheduler
// Used in the Creds function to create valid organizations, users, tokens, etc.
I *inmem.Service
// Set this context, to be used in tests, so that any spawned goroutines watching Ctx.Done()
// will clean up after themselves.
Ctx context.Context
@ -98,10 +116,10 @@ type System struct {
// Override for accessing credentials for an individual test.
// Callers can leave this nil and the test will create its own random IDs for each test.
// However, if the system needs to verify a token, organization, or user,
// the caller should set this value and return valid IDs and a token.
// However, if the system needs to verify credentials,
// the caller should set this value and return valid IDs and a valid token.
// It is safe if this returns the same values every time it is called.
CredsFunc func() (orgID, userID platform.ID, token string, err error)
CredsFunc func() (TestCreds, error)
// Underlying task service, initialized inside TestTaskService,
// either by instantiating a PlatformAdapter directly or by calling TaskServiceFunc.
@ -109,19 +127,19 @@ type System struct {
}
func testTaskCRUD(t *testing.T, sys *System) {
orgID, userID, tok := creds(t, sys)
authorizer := &platform.Authorization{
ID: orgID + userID,
Token: tok,
OrgID: orgID,
UserID: userID,
}
cr := creds(t, sys)
authzID := cr.AuthorizationID
// Create a task.
ct := platform.TaskCreate{
OrganizationID: orgID,
tc := platform.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
task, err := sys.ts.CreateTask(authorizedCtx, tc)
if err != nil {
t.Fatal(err)
}
@ -152,14 +170,14 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
found["FindTaskByID"] = f
fs, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
fs, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &cr.OrgID})
if err != nil {
t.Fatal(err)
}
f = findTask(fs, task.ID)
found["FindTasks with Organization filter"] = f
fs, _, err = sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{User: &userID})
fs, _, err = sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{User: &cr.UserID})
if err != nil {
t.Fatal(err)
}
@ -168,8 +186,11 @@ func testTaskCRUD(t *testing.T, sys *System) {
found["FindTasks with User filter"] = f
for fn, f := range found {
if f.OrganizationID != orgID {
t.Fatalf("%s: wrong organization returned; want %s, got %s", fn, orgID.String(), f.OrganizationID.String())
if f.OrganizationID != cr.OrgID {
t.Fatalf("%s: wrong organization returned; want %s, got %s", fn, cr.OrgID.String(), f.OrganizationID.String())
}
if f.AuthorizationID != authzID {
t.Fatalf("%s: wrong authorization ID returned; want %s, got %s", fn, authzID.String(), f.AuthorizationID.String())
}
if f.Name != "task #0" {
t.Fatalf(`%s: wrong name returned; want "task #0", got %q`, fn, f.Name)
@ -191,7 +212,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: script only.
newFlux := fmt.Sprintf(scriptFmt, 99)
origID := f.ID
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux})
f, err = sys.ts.UpdateTask(authorizedCtx, origID, platform.TaskUpdate{Flux: &newFlux})
if err != nil {
t.Fatal(err)
}
@ -208,7 +229,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: status only.
newStatus := string(backend.TaskInactive)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Status: &newStatus})
f, err = sys.ts.UpdateTask(authorizedCtx, origID, platform.TaskUpdate{Status: &newStatus})
if err != nil {
t.Fatal(err)
}
@ -222,7 +243,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: reactivate status and update script.
newStatus = string(backend.TaskActive)
newFlux = fmt.Sprintf(scriptFmt, 98)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux, Status: &newStatus})
f, err = sys.ts.UpdateTask(authorizedCtx, origID, platform.TaskUpdate{Flux: &newFlux, Status: &newStatus})
if err != nil {
t.Fatal(err)
}
@ -236,7 +257,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
// Update task: just update an option.
newStatus = string(backend.TaskActive)
newFlux = fmt.Sprintf(scriptDifferentName, 98)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Options: options.Options{Name: "task-changed #98"}})
f, err = sys.ts.UpdateTask(authorizedCtx, origID, platform.TaskUpdate{Options: options.Options{Name: "task-changed #98"}})
if err != nil {
t.Fatal(err)
}
@ -248,6 +269,20 @@ func testTaskCRUD(t *testing.T, sys *System) {
t.Fatalf("expected task status to be active, got %q", f.Status)
}
// Update task: just update the token.
// First we need to make a new authorization in order to get a new token.
newAuthz := &platform.Authorization{OrgID: cr.OrgID, UserID: cr.UserID}
if err := sys.I.CreateAuthorization(sys.Ctx, newAuthz); err != nil {
t.Fatal(err)
}
f, err = sys.ts.UpdateTask(authorizedCtx, origID, platform.TaskUpdate{Token: newAuthz.Token})
if err != nil {
t.Fatal(err)
}
if f.AuthorizationID != newAuthz.ID {
t.Fatalf("expected authorization ID %v, got %v", newAuthz.ID, f.AuthorizationID)
}
// Delete task.
if err := sys.ts.DeleteTask(sys.Ctx, origID); err != nil {
t.Fatal(err)
@ -260,20 +295,16 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
func testMetaUpdate(t *testing.T, sys *System) {
orgID, userID, tok := creds(t, sys)
authorizer := &platform.Authorization{
ID: orgID + userID,
Token: tok,
OrgID: orgID,
UserID: userID,
}
cr := creds(t, sys)
now := time.Now()
ct := platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
task, err := sys.ts.CreateTask(authorizedCtx, ct)
if err != nil {
t.Fatal(err)
}
@ -327,7 +358,7 @@ func testMetaUpdate(t *testing.T, sys *System) {
now = time.Now()
flux := fmt.Sprintf(scriptFmt, 1)
task, err = sys.ts.UpdateTask(sys.Ctx, task.ID, platform.TaskUpdate{Flux: &flux})
task, err = sys.ts.UpdateTask(authorizedCtx, task.ID, platform.TaskUpdate{Flux: &flux})
if err != nil {
t.Fatal(err)
}
@ -361,13 +392,7 @@ func testMetaUpdate(t *testing.T, sys *System) {
}
func testTaskRuns(t *testing.T, sys *System) {
orgID, userID, tok := creds(t, sys)
authorizer := &platform.Authorization{
ID: orgID + userID,
Token: tok,
OrgID: orgID,
UserID: userID,
}
cr := creds(t, sys)
t.Run("FindRuns and FindRunByID", func(t *testing.T) {
t.Parallel()
@ -375,10 +400,11 @@ func testTaskRuns(t *testing.T, sys *System) {
// Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now",
// which makes timing of runs somewhat difficult.
ct := platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil {
t.Fatal(err)
}
@ -437,7 +463,7 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Limit 1 should only return the earlier run.
runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &orgID, Task: &task.ID, Limit: 1})
runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID, Limit: 1})
if err != nil {
t.Fatal(err)
}
@ -458,7 +484,7 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Unspecified limit returns both runs.
runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &orgID, Task: &task.ID})
runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -527,10 +553,11 @@ func testTaskRuns(t *testing.T, sys *System) {
// Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now",
// which makes timing of runs somewhat difficult.
ct := platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil {
t.Fatal(err)
}
@ -622,10 +649,11 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Parallel()
ct := platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil {
t.Fatal(err)
}
@ -667,10 +695,11 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Parallel()
ct := platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, authorizer), ct)
task, err := sys.ts.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil {
t.Fatal(err)
}
@ -718,7 +747,7 @@ func testTaskRuns(t *testing.T, sys *System) {
// Ensure it is returned when filtering logs by run ID.
logs, err := sys.LR.ListLogs(sys.Ctx, platform.LogFilter{
Org: &orgID,
Org: &cr.OrgID,
Task: &task.ID,
Run: &rc1.Created.RunID,
})
@ -740,7 +769,7 @@ func testTaskRuns(t *testing.T, sys *System) {
// Ensure both returned when filtering logs by task ID.
logs, err = sys.LR.ListLogs(sys.Ctx, platform.LogFilter{
Org: &orgID,
Org: &cr.OrgID,
Task: &task.ID,
})
if err != nil {
@ -756,13 +785,7 @@ func testTaskRuns(t *testing.T, sys *System) {
}
func testTaskConcurrency(t *testing.T, sys *System) {
orgID, userID, tok := creds(t, sys)
authorizer := &platform.Authorization{
ID: orgID + userID,
Token: tok,
OrgID: orgID,
UserID: userID,
}
cr := creds(t, sys)
const numTasks = 450 // Arbitrarily chosen to get a reasonable count of concurrent creates and deletes.
createTaskCh := make(chan platform.TaskCreate, numTasks)
@ -778,7 +801,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
createWg.Add(1)
go func() {
defer createWg.Done()
aCtx := icontext.SetAuthorizer(sys.Ctx, authorizer)
aCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
for ct := range createTaskCh {
task, err := sys.ts.CreateTask(aCtx, ct)
if err != nil {
@ -818,7 +841,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
}
// Get all the tasks.
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &cr.OrgID})
if err != nil {
t.Errorf("error finding tasks: %v", err)
return
@ -876,7 +899,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
}
// Get all the tasks.
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &orgID})
tasks, _, err := sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{Organization: &cr.OrgID})
if err != nil {
t.Errorf("error finding tasks: %v", err)
return
@ -915,8 +938,9 @@ func testTaskConcurrency(t *testing.T, sys *System) {
// Start adding tasks.
for i := 0; i < numTasks; i++ {
createTaskCh <- platform.TaskCreate{
OrganizationID: orgID,
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, i),
Token: cr.Token,
}
}
@ -926,18 +950,50 @@ func testTaskConcurrency(t *testing.T, sys *System) {
extraWg.Wait()
}
func creds(t *testing.T, s *System) (orgID, userID platform.ID, token string) {
func creds(t *testing.T, s *System) TestCreds {
t.Helper()
if s.CredsFunc == nil {
return idGen.ID(), idGen.ID(), idGen.ID().String()
u := &platform.User{Name: t.Name() + "-user"}
if err := s.I.CreateUser(s.Ctx, u); err != nil {
t.Fatal(err)
}
o := &platform.Organization{Name: t.Name() + "-org"}
if err := s.I.CreateOrganization(s.Ctx, o); err != nil {
t.Fatal(err)
}
if err := s.I.CreateUserResourceMapping(s.Ctx, &platform.UserResourceMapping{
ResourceType: platform.OrgsResourceType,
ResourceID: o.ID,
UserID: u.ID,
UserType: platform.Owner,
}); err != nil {
t.Fatal(err)
}
authz := platform.Authorization{
OrgID: o.ID,
UserID: u.ID,
Permissions: platform.OperPermissions(),
}
if err := s.I.CreateAuthorization(context.Background(), &authz); err != nil {
t.Fatal(err)
}
return TestCreds{
OrgID: o.ID,
UserID: u.ID,
AuthorizationID: authz.ID,
Token: authz.Token,
}
}
o, u, tok, err := s.CredsFunc()
c, err := s.CredsFunc()
if err != nil {
t.Fatal(err)
}
return o, u, tok
return c
}
const (
@ -965,5 +1021,3 @@ option task = {
from(bucket: "b")
|> http.to(url: "http://example.com")`
)
var idGen = snowflake.NewIDGenerator()