refactor(tasks): add Client to http TaskService (#16912)

pull/16994/head
Alirie Gray 2020-02-24 11:41:21 -08:00 committed by GitHub
parent c24445665e
commit 3120b4c47e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 310 additions and 496 deletions

View File

@ -7,7 +7,7 @@ import (
"time"
"github.com/influxdata/flux/repl"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influx/internal"
"github.com/influxdata/influxdb/http"
"github.com/spf13/cobra"
@ -57,9 +57,13 @@ func taskCreateF(cmd *cobra.Command, args []string) error {
return err
}
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
@ -68,7 +72,7 @@ func taskCreateF(cmd *cobra.Command, args []string) error {
return fmt.Errorf("error parsing flux script: %s", err)
}
tc := platform.TaskCreate{
tc := influxdb.TaskCreate{
Flux: flux,
Organization: taskCreateFlags.org.name,
}
@ -129,7 +133,7 @@ func taskFindCmd(opt genericCLIOpts) *cobra.Command {
taskFindFlags.org.register(cmd, false)
cmd.Flags().StringVarP(&taskFindFlags.id, "id", "i", "", "task ID")
cmd.Flags().StringVarP(&taskFindFlags.user, "user-id", "n", "", "task owner ID")
cmd.Flags().IntVarP(&taskFindFlags.limit, "limit", "", platform.TaskDefaultPageSize, "the number of tasks to find")
cmd.Flags().IntVarP(&taskFindFlags.limit, "limit", "", influxdb.TaskDefaultPageSize, "the number of tasks to find")
cmd.Flags().BoolVar(&taskFindFlags.headers, "headers", true, "To print the table headers; defaults true")
return cmd
@ -139,15 +143,20 @@ func taskFindF(cmd *cobra.Command, args []string) error {
if err := taskFindFlags.org.validOrgFlags(); err != nil {
return err
}
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
filter := platform.TaskFilter{}
filter := influxdb.TaskFilter{}
if taskFindFlags.user != "" {
id, err := platform.IDFromString(taskFindFlags.user)
id, err := influxdb.IDFromString(taskFindFlags.user)
if err != nil {
return err
}
@ -158,23 +167,22 @@ func taskFindF(cmd *cobra.Command, args []string) error {
filter.Organization = taskFindFlags.org.name
}
if taskFindFlags.org.id != "" {
id, err := platform.IDFromString(taskFindFlags.org.id)
id, err := influxdb.IDFromString(taskFindFlags.org.id)
if err != nil {
return err
}
filter.OrganizationID = id
}
if taskFindFlags.limit < 1 || taskFindFlags.limit > platform.TaskMaxPageSize {
return fmt.Errorf("limit must be between 1 and %d", platform.TaskMaxPageSize)
if taskFindFlags.limit < 1 || taskFindFlags.limit > influxdb.TaskMaxPageSize {
return fmt.Errorf("limit must be between 1 and %d", influxdb.TaskMaxPageSize)
}
filter.Limit = taskFindFlags.limit
var tasks []http.Task
var err error
if taskFindFlags.id != "" {
id, err := platform.IDFromString(taskFindFlags.id)
id, err := influxdb.IDFromString(taskFindFlags.id)
if err != nil {
return err
}
@ -237,18 +245,22 @@ func taskUpdateCmd(opt genericCLIOpts) *cobra.Command {
}
func taskUpdateF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
var id platform.ID
var id influxdb.ID
if err := id.DecodeFromString(taskUpdateFlags.id); err != nil {
return err
}
update := platform.TaskUpdate{}
update := influxdb.TaskUpdate{}
if taskUpdateFlags.status != "" {
update.Status = &taskUpdateFlags.status
}
@ -306,14 +318,18 @@ func taskDeleteCmd(opt genericCLIOpts) *cobra.Command {
}
func taskDeleteF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
var id platform.ID
err := id.DecodeFromString(taskDeleteFlags.id)
var id influxdb.ID
err = id.DecodeFromString(taskDeleteFlags.id)
if err != nil {
return err
}
@ -382,21 +398,25 @@ func taskLogFindCmd(opt genericCLIOpts) *cobra.Command {
}
func taskLogFindF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
var filter platform.LogFilter
id, err := platform.IDFromString(taskLogFindFlags.taskID)
var filter influxdb.LogFilter
id, err := influxdb.IDFromString(taskLogFindFlags.taskID)
if err != nil {
return err
}
filter.Task = *id
if taskLogFindFlags.runID != "" {
id, err := platform.IDFromString(taskLogFindFlags.runID)
id, err := influxdb.IDFromString(taskLogFindFlags.runID)
if err != nil {
return err
}
@ -463,26 +483,30 @@ func taskRunFindCmd(opt genericCLIOpts) *cobra.Command {
}
func taskRunFindF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
filter := platform.RunFilter{
filter := influxdb.RunFilter{
Limit: taskRunFindFlags.limit,
AfterTime: taskRunFindFlags.afterTime,
BeforeTime: taskRunFindFlags.beforeTime,
}
taskID, err := platform.IDFromString(taskRunFindFlags.taskID)
taskID, err := influxdb.IDFromString(taskRunFindFlags.taskID)
if err != nil {
return err
}
filter.Task = *taskID
var runs []*platform.Run
var runs []*influxdb.Run
if taskRunFindFlags.runID != "" {
id, err := platform.IDFromString(taskRunFindFlags.runID)
id, err := influxdb.IDFromString(taskRunFindFlags.runID)
if err != nil {
return err
}
@ -547,13 +571,17 @@ func taskRunRetryCmd(opt genericCLIOpts) *cobra.Command {
}
func runRetryF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
s := &http.TaskService{
Addr: flags.host,
Token: flags.token,
Client: client,
InsecureSkipVerify: flags.skipVerify,
}
var taskID, runID platform.ID
var taskID, runID influxdb.ID
if err := taskID.DecodeFromString(runRetryFlags.taskID); err != nil {
return err
}

View File

@ -42,6 +42,7 @@ type Service struct {
*AuthorizationService
*BackupService
*BucketService
*TaskService
*DashboardService
*OrganizationService
*UserService
@ -66,6 +67,7 @@ func NewService(addr, token string) (*Service, error) {
Token: token,
},
BucketService: &BucketService{Client: httpClient},
TaskService: &TaskService{Client: httpClient},
DashboardService: &DashboardService{Client: httpClient},
OrganizationService: &OrganizationService{Client: httpClient},
UserService: &UserService{Client: httpClient},

View File

@ -1,7 +1,6 @@
package http
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -10,7 +9,6 @@ import (
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/influxdata/httprouter"
@ -18,6 +16,7 @@ import (
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/task/backend"
"go.uber.org/zap"
)
@ -152,6 +151,8 @@ func NewTaskHandler(log *zap.Logger, b *TaskBackend) *TaskHandler {
return h
}
// Task is a package-specific Task format that preserves the expected format for the API,
// where time values are represented as strings
type Task struct {
ID influxdb.ID `json:"id"`
OrganizationID influxdb.ID `json:"orgID"`
@ -604,7 +605,6 @@ func decodePostTaskRequest(ctx context.Context, r *http.Request) (*postTaskReque
return nil, err
}
tc.OwnerID = auth.GetUserID()
// when creating a task we set the type so we can filter later.
tc.Type = influxdb.TaskSystemType
@ -1411,6 +1411,7 @@ func (h *TaskHandler) getAuthorizationForTask(ctx context.Context, auth influxdb
type TaskService struct {
Addr string
Token string
Client *httpc.Client
InsecureSkipVerify bool
}
@ -1419,36 +1420,9 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, e
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrTaskNotFound is expected as part of the FindTaskByID contract,
// so return that actual error instead of a different error that looks like it.
// TODO cleanup backend task service error implementation
return nil, influxdb.ErrTaskNotFound
}
return nil, err
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
err := t.Client.Get(taskIDPath(id)).DecodeJSON(&tr).Do(ctx)
if err != nil {
return nil, err
}
@ -1461,57 +1435,40 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter)
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, prefixTasks)
if err != nil {
return nil, 0, err
}
// slice of 2-capacity string slices for storing parameter key-value pairs
var params [][2]string
val := url.Values{}
if filter.After != nil {
val.Add("after", filter.After.String())
params = append(params, [2]string{"after", filter.After.String()})
}
if filter.OrganizationID != nil {
val.Add("orgID", filter.OrganizationID.String())
params = append(params, [2]string{"orgID", filter.OrganizationID.String()})
}
if filter.Organization != "" {
val.Add("org", filter.Organization)
params = append(params, [2]string{"org", filter.Organization})
}
if filter.User != nil {
val.Add("user", filter.User.String())
params = append(params, [2]string{"user", filter.User.String()})
}
if filter.Limit != 0 {
val.Add("limit", strconv.Itoa(filter.Limit))
params = append(params, [2]string{"limit", strconv.Itoa(filter.Limit)})
}
if filter.Status != nil {
val.Add("status", *filter.Status)
params = append(params, [2]string{"status", *filter.Status})
}
if filter.Type != nil {
val.Add("type", *filter.Type)
}
u.RawQuery = val.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, 0, err
params = append(params, [2]string{"type", *filter.Type})
}
var tr tasksResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
err := t.Client.
Get(prefixTasks).
QueryParams(params...).
DecodeJSON(&tr).
Do(ctx)
if err != nil {
return nil, 0, err
}
@ -1526,41 +1483,16 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter)
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, prefixTasks)
if err != nil {
return nil, err
}
taskBytes, err := json.Marshal(tc)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(taskBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, err
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
err := t.Client.
PostJSON(tc, prefixTasks).
DecodeJSON(&tr).
Do(ctx)
if err != nil {
return nil, err
}
return &tr.Task, nil
}
@ -1569,38 +1501,11 @@ func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxd
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
if err != nil {
return nil, err
}
taskBytes, err := json.Marshal(upd)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PATCH", u.String(), bytes.NewReader(taskBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, err
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
err := t.Client.
PatchJSON(&upd, taskIDPath(id)).
Do(ctx)
if err != nil {
return nil, err
}
@ -1612,28 +1517,9 @@ func (t TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return CheckErrorStatus(http.StatusNoContent, resp)
return t.Client.
Delete(taskIDPath(id)).
Do(ctx)
}
// FindLogs returns logs for a run.
@ -1652,31 +1538,13 @@ func (t TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([
urlPath = path.Join(taskIDRunIDPath(filter.Task, *filter.Run), "logs")
}
u, err := NewURL(t.Addr, urlPath)
if err != nil {
return nil, 0, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, 0, err
}
var logs getLogsResponse
if err := json.NewDecoder(resp.Body).Decode(&logs); err != nil {
err := t.Client.
Get(urlPath).
DecodeJSON(&logs).
Do(ctx)
if err != nil {
return nil, 0, err
}
@ -1688,48 +1556,29 @@ func (t TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var params [][2]string
if !filter.Task.Valid() {
return nil, 0, errors.New("task ID required")
}
u, err := NewURL(t.Addr, taskIDRunsPath(filter.Task))
if err != nil {
return nil, 0, err
}
val := url.Values{}
if filter.After != nil {
val.Set("after", filter.After.String())
params = append(params, [2]string{"after", filter.After.String()})
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
val.Set("limit", strconv.Itoa(filter.Limit))
u.RawQuery = val.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return nil, 0, err
}
params = append(params, [2]string{"limit", strconv.Itoa(filter.Limit)})
var rs runsResponse
if err := json.NewDecoder(resp.Body).Decode(&rs); err != nil {
err := t.Client.
Get(taskIDRunsPath(filter.Task)).
QueryParams(params...).
DecodeJSON(&rs).
Do(ctx)
if err != nil {
return nil, 0, err
}
@ -1746,27 +1595,13 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID)
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, taskIDRunIDPath(taskID, runID))
var rs = &runResponse{}
err := t.Client.
Get(taskIDRunIDPath(taskID, runID)).
DecodeJSON(rs).
Do(ctx)
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the FindRunByID contract,
// so return that actual error instead of a different error that looks like it.
@ -1776,10 +1611,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID)
return nil, err
}
var rs = &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
}
@ -1788,28 +1620,13 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
p := path.Join(taskIDRunIDPath(taskID, runID), "retry")
u, err := NewURL(t.Addr, p)
var rs runResponse
err := t.Client.
Post(nil, path.Join(taskIDRunIDPath(taskID, runID), "retry")).
DecodeJSON(&rs).
Do(ctx)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
@ -1824,10 +1641,6 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*
return nil, err
}
rs := &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
}
@ -1836,28 +1649,18 @@ func (t TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduled
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, taskIDRunsPath(taskID))
if err != nil {
return nil, err
type body struct {
scheduledFor string
}
b := body{scheduledFor: time.Unix(scheduledFor, 0).UTC().Format(time.RFC3339)}
rs := &runResponse{}
err := t.Client.
PostJSON(b, taskIDRunsPath(taskID)).
DecodeJSON(&rs).
Do(ctx)
body := fmt.Sprintf(`{"scheduledFor": %q}`, time.Unix(scheduledFor, 0).UTC().Format(time.RFC3339))
req, err := http.NewRequest("POST", u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
@ -1872,10 +1675,6 @@ func (t TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduled
return nil, err
}
rs := &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
}
@ -1888,30 +1687,14 @@ func (t TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) e
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
u, err := NewURL(t.Addr, cancelPath(taskID, runID))
err := t.Client.
Delete(cancelPath(taskID, runID)).
Do(ctx)
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return err
}
return nil
}

View File

@ -14,13 +14,14 @@ import (
"time"
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
kithttp "github.com/influxdata/influxdb/kit/transport/http"
"github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
platformtesting "github.com/influxdata/influxdb/testing"
influxdbtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@ -34,17 +35,17 @@ func NewMockTaskBackend(t *testing.T) *TaskBackend {
AuthorizationService: mock.NewAuthorizationService(),
TaskService: &mock.TaskService{},
OrganizationService: &mock.OrganizationService{
FindOrganizationByIDF: func(ctx context.Context, id platform.ID) (*platform.Organization, error) {
return &platform.Organization{ID: id, Name: "test"}, nil
FindOrganizationByIDF: func(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
return &influxdb.Organization{ID: id, Name: "test"}, nil
},
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
org := &platform.Organization{}
FindOrganizationF: func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
org := &influxdb.Organization{}
if filter.Name != nil {
if *filter.Name == "non-existent-org" {
return nil, &platform.Error{
return nil, &influxdb.Error{
Err: errors.New("org not found or unauthorized"),
Msg: "org " + *filter.Name + " not found or unauthorized",
Code: platform.ENotFound,
Code: influxdb.ENotFound,
}
}
org.Name = *filter.Name
@ -64,8 +65,8 @@ func NewMockTaskBackend(t *testing.T) *TaskBackend {
func TestTaskHandler_handleGetTasks(t *testing.T) {
type fields struct {
taskService platform.TaskService
labelService platform.LabelService
taskService influxdb.TaskService
labelService influxdb.LabelService
}
type wants struct {
statusCode int
@ -83,8 +84,8 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
name: "get tasks",
fields: fields{
taskService: &mock.TaskService{
FindTasksFn: func(ctx context.Context, f platform.TaskFilter) ([]*platform.Task, int, error) {
tasks := []*platform.Task{
FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
tasks := []*influxdb.Task{
{
ID: 1,
Name: "task1",
@ -107,10 +108,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
},
},
labelService: &mock.LabelService{
FindResourceLabelsFn: func(ctx context.Context, f platform.LabelMappingFilter) ([]*platform.Label, error) {
labels := []*platform.Label{
FindResourceLabelsFn: func(ctx context.Context, f influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
labels := []*influxdb.Label{
{
ID: platformtesting.MustIDBase16("fc3dc670a4be9b9a"),
ID: influxdbtesting.MustIDBase16("fc3dc670a4be9b9a"),
Name: "label",
Properties: map[string]string{
"color": "fff000",
@ -192,8 +193,8 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
getParams: "after=0000000000000001&limit=1",
fields: fields{
taskService: &mock.TaskService{
FindTasksFn: func(ctx context.Context, f platform.TaskFilter) ([]*platform.Task, int, error) {
tasks := []*platform.Task{
FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
tasks := []*influxdb.Task{
{
ID: 2,
Name: "task2",
@ -207,10 +208,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
},
},
labelService: &mock.LabelService{
FindResourceLabelsFn: func(ctx context.Context, f platform.LabelMappingFilter) ([]*platform.Label, error) {
labels := []*platform.Label{
FindResourceLabelsFn: func(ctx context.Context, f influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
labels := []*influxdb.Label{
{
ID: platformtesting.MustIDBase16("fc3dc670a4be9b9a"),
ID: influxdbtesting.MustIDBase16("fc3dc670a4be9b9a"),
Name: "label",
Properties: map[string]string{
"color": "fff000",
@ -266,8 +267,8 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
getParams: "org=test2",
fields: fields{
taskService: &mock.TaskService{
FindTasksFn: func(ctx context.Context, f platform.TaskFilter) ([]*platform.Task, int, error) {
tasks := []*platform.Task{
FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
tasks := []*influxdb.Task{
{
ID: 2,
Name: "task2",
@ -281,10 +282,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
},
},
labelService: &mock.LabelService{
FindResourceLabelsFn: func(ctx context.Context, f platform.LabelMappingFilter) ([]*platform.Label, error) {
labels := []*platform.Label{
FindResourceLabelsFn: func(ctx context.Context, f influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
labels := []*influxdb.Label{
{
ID: platformtesting.MustIDBase16("fc3dc670a4be9b9a"),
ID: influxdbtesting.MustIDBase16("fc3dc670a4be9b9a"),
Name: "label",
Properties: map[string]string{
"color": "fff000",
@ -339,8 +340,8 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
getParams: "org=non-existent-org",
fields: fields{
taskService: &mock.TaskService{
FindTasksFn: func(ctx context.Context, f platform.TaskFilter) ([]*platform.Task, int, error) {
tasks := []*platform.Task{
FindTasksFn: func(ctx context.Context, f influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
tasks := []*influxdb.Task{
{
ID: 1,
Name: "task1",
@ -362,10 +363,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
},
},
labelService: &mock.LabelService{
FindResourceLabelsFn: func(ctx context.Context, f platform.LabelMappingFilter) ([]*platform.Label, error) {
labels := []*platform.Label{
FindResourceLabelsFn: func(ctx context.Context, f influxdb.LabelMappingFilter) ([]*influxdb.Label, error) {
labels := []*influxdb.Label{
{
ID: platformtesting.MustIDBase16("fc3dc670a4be9b9a"),
ID: influxdbtesting.MustIDBase16("fc3dc670a4be9b9a"),
Name: "label",
Properties: map[string]string{
"color": "fff000",
@ -422,10 +423,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
func TestTaskHandler_handlePostTasks(t *testing.T) {
type args struct {
taskCreate platform.TaskCreate
taskCreate influxdb.TaskCreate
}
type fields struct {
taskService platform.TaskService
taskService influxdb.TaskService
}
type wants struct {
statusCode int
@ -442,15 +443,15 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
{
name: "create task",
args: args{
taskCreate: platform.TaskCreate{
taskCreate: influxdb.TaskCreate{
OrganizationID: 1,
Flux: "abc",
},
},
fields: fields{
taskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
return &platform.Task{
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return &influxdb.Task{
ID: 1,
Name: "task1",
Description: "Brand New Task",
@ -489,20 +490,20 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
},
},
{
name: "create task - platform error creating task",
name: "create task - influxdb error creating task",
args: args{
taskCreate: platform.TaskCreate{
taskCreate: influxdb.TaskCreate{
OrganizationID: 1,
Flux: "abc",
},
},
fields: fields{
taskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
return nil, platform.NewError(
platform.WithErrorErr(errors.New("something went wrong")),
platform.WithErrorMsg("something really went wrong"),
platform.WithErrorCode(platform.EInvalid),
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return nil, influxdb.NewError(
influxdb.WithErrorErr(errors.New("something went wrong")),
influxdb.WithErrorMsg("something really went wrong"),
influxdb.WithErrorCode(influxdb.EInvalid),
)
},
},
@ -521,14 +522,14 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
{
name: "create task - error creating task",
args: args{
taskCreate: platform.TaskCreate{
taskCreate: influxdb.TaskCreate{
OrganizationID: 1,
Flux: "abc",
},
},
fields: fields{
taskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
CreateTaskFn: func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
return nil, errors.New("something bad happened")
},
},
@ -554,7 +555,7 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
}
r := httptest.NewRequest("POST", "http://any.url", bytes.NewReader(b))
ctx := pcontext.SetAuthorizer(context.TODO(), new(platform.Authorization))
ctx := pcontext.SetAuthorizer(context.TODO(), new(influxdb.Authorization))
r = r.WithContext(ctx)
w := httptest.NewRecorder()
@ -588,11 +589,11 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
func TestTaskHandler_handleGetRun(t *testing.T) {
type fields struct {
taskService platform.TaskService
taskService influxdb.TaskService
}
type args struct {
taskID platform.ID
runID platform.ID
taskID influxdb.ID
runID influxdb.ID
}
type wants struct {
statusCode int
@ -610,12 +611,12 @@ func TestTaskHandler_handleGetRun(t *testing.T) {
name: "get a run by id",
fields: fields{
taskService: &mock.TaskService{
FindRunByIDFn: func(ctx context.Context, taskID platform.ID, runID platform.ID) (*platform.Run, error) {
FindRunByIDFn: func(ctx context.Context, taskID influxdb.ID, runID influxdb.ID) (*influxdb.Run, error) {
scheduledFor, _ := time.Parse(time.RFC3339, "2018-12-01T17:00:13Z")
startedAt, _ := time.Parse(time.RFC3339Nano, "2018-12-01T17:00:03.155645Z")
finishedAt, _ := time.Parse(time.RFC3339Nano, "2018-12-01T17:00:13.155645Z")
requestedAt, _ := time.Parse(time.RFC3339, "2018-12-01T17:00:13Z")
run := platform.Run{
run := influxdb.Run{
ID: runID,
TaskID: taskID,
Status: "success",
@ -671,7 +672,7 @@ func TestTaskHandler_handleGetRun(t *testing.T) {
Value: tt.args.runID.String(),
},
}))
r = r.WithContext(pcontext.SetAuthorizer(r.Context(), &platform.Authorization{Permissions: platform.OperPermissions()}))
r = r.WithContext(pcontext.SetAuthorizer(r.Context(), &influxdb.Authorization{Permissions: influxdb.OperPermissions()}))
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend(t)
taskBackend.HTTPErrorHandler = kithttp.ErrorHandler(0)
@ -702,10 +703,10 @@ func TestTaskHandler_handleGetRun(t *testing.T) {
func TestTaskHandler_handleGetRuns(t *testing.T) {
type fields struct {
taskService platform.TaskService
taskService influxdb.TaskService
}
type args struct {
taskID platform.ID
taskID influxdb.ID
}
type wants struct {
statusCode int
@ -723,14 +724,14 @@ func TestTaskHandler_handleGetRuns(t *testing.T) {
name: "get runs by task id",
fields: fields{
taskService: &mock.TaskService{
FindRunsFn: func(ctx context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
FindRunsFn: func(ctx context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) {
scheduledFor, _ := time.Parse(time.RFC3339, "2018-12-01T17:00:13Z")
startedAt, _ := time.Parse(time.RFC3339Nano, "2018-12-01T17:00:03.155645Z")
finishedAt, _ := time.Parse(time.RFC3339Nano, "2018-12-01T17:00:13.155645Z")
requestedAt, _ := time.Parse(time.RFC3339, "2018-12-01T17:00:13Z")
runs := []*platform.Run{
runs := []*influxdb.Run{
{
ID: platform.ID(2),
ID: influxdb.ID(2),
TaskID: f.Task,
Status: "success",
ScheduledFor: scheduledFor,
@ -789,7 +790,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) {
Value: tt.args.taskID.String(),
},
}))
r = r.WithContext(pcontext.SetAuthorizer(r.Context(), &platform.Authorization{Permissions: platform.OperPermissions()}))
r = r.WithContext(pcontext.SetAuthorizer(r.Context(), &influxdb.Authorization{Permissions: influxdb.OperPermissions()}))
w := httptest.NewRecorder()
taskBackend := NewMockTaskBackend(t)
taskBackend.HTTPErrorHandler = kithttp.ErrorHandler(0)
@ -830,16 +831,16 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
h.UserService = im
h.OrganizationService = im
o := platform.Organization{Name: "o"}
o := influxdb.Organization{Name: "o"}
ctx := context.Background()
if err := h.OrganizationService.CreateOrganization(ctx, &o); err != nil {
t.Fatal(err)
}
// Create a session to associate with the contexts, so authorization checks pass.
authz := &platform.Authorization{Permissions: platform.OperPermissions()}
authz := &influxdb.Authorization{Permissions: influxdb.OperPermissions()}
const taskID, runID = platform.ID(0xCCCCCC), platform.ID(0xAAAAAA)
const taskID, runID = influxdb.ID(0xCCCCCC), influxdb.ID(0xAAAAAA)
var (
okTask = []interface{}{taskID}
@ -867,12 +868,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get task",
svc: &mock.TaskService{
FindTaskByIDFn: func(_ context.Context, id platform.ID) (*platform.Task, error) {
FindTaskByIDFn: func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id == taskID {
return &platform.Task{ID: taskID, Organization: "o"}, nil
return &influxdb.Task{ID: taskID, Organization: "o"}, nil
}
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
},
},
method: http.MethodGet,
@ -883,12 +884,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "update task",
svc: &mock.TaskService{
UpdateTaskFn: func(_ context.Context, id platform.ID, _ platform.TaskUpdate) (*platform.Task, error) {
UpdateTaskFn: func(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) {
if id == taskID {
return &platform.Task{ID: taskID, Organization: "o"}, nil
return &influxdb.Task{ID: taskID, Organization: "o"}, nil
}
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
},
},
method: http.MethodPatch,
@ -900,12 +901,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "delete task",
svc: &mock.TaskService{
DeleteTaskFn: func(_ context.Context, id platform.ID) error {
DeleteTaskFn: func(_ context.Context, id influxdb.ID) error {
if id == taskID {
return nil
}
return platform.ErrTaskNotFound
return influxdb.ErrTaskNotFound
},
},
method: http.MethodDelete,
@ -916,12 +917,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get task logs",
svc: &mock.TaskService{
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
FindLogsFn: func(_ context.Context, f influxdb.LogFilter) ([]*influxdb.Log, int, error) {
if f.Task == taskID {
return nil, 0, nil
}
return nil, 0, platform.ErrTaskNotFound
return nil, 0, influxdb.ErrTaskNotFound
},
},
method: http.MethodGet,
@ -932,12 +933,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get run logs",
svc: &mock.TaskService{
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
FindLogsFn: func(_ context.Context, f influxdb.LogFilter) ([]*influxdb.Log, int, error) {
if f.Task != taskID {
return nil, 0, platform.ErrTaskNotFound
return nil, 0, influxdb.ErrTaskNotFound
}
if *f.Run != runID {
return nil, 0, platform.ErrNoRunsFound
return nil, 0, influxdb.ErrNoRunsFound
}
return nil, 0, nil
@ -951,9 +952,9 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get runs: task not found",
svc: &mock.TaskService{
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
FindRunsFn: func(_ context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) {
if f.Task != taskID {
return nil, 0, platform.ErrTaskNotFound
return nil, 0, influxdb.ErrTaskNotFound
}
return nil, 0, nil
@ -967,9 +968,9 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get runs: task found but no runs found",
svc: &mock.TaskService{
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
FindRunsFn: func(_ context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) {
if f.Task != taskID {
return nil, 0, platform.ErrNoRunsFound
return nil, 0, influxdb.ErrNoRunsFound
}
return nil, 0, nil
@ -983,12 +984,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "force run",
svc: &mock.TaskService{
ForceRunFn: func(_ context.Context, tid platform.ID, _ int64) (*platform.Run, error) {
ForceRunFn: func(_ context.Context, tid influxdb.ID, _ int64) (*influxdb.Run, error) {
if tid != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
},
},
method: http.MethodPost,
@ -1000,15 +1001,15 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "get run",
svc: &mock.TaskService{
FindRunByIDFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
FindRunByIDFn: func(_ context.Context, tid, rid influxdb.ID) (*influxdb.Run, error) {
if tid != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
if rid != runID {
return nil, platform.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
},
},
method: http.MethodGet,
@ -1019,15 +1020,15 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "retry run",
svc: &mock.TaskService{
RetryRunFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
RetryRunFn: func(_ context.Context, tid, rid influxdb.ID) (*influxdb.Run, error) {
if tid != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
if rid != runID {
return nil, platform.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
},
},
method: http.MethodPost,
@ -1038,12 +1039,12 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
{
name: "cancel run",
svc: &mock.TaskService{
CancelRunFn: func(_ context.Context, tid, rid platform.ID) error {
CancelRunFn: func(_ context.Context, tid, rid influxdb.ID) error {
if tid != taskID {
return platform.ErrTaskNotFound
return influxdb.ErrTaskNotFound
}
if rid != runID {
return platform.ErrRunNotFound
return influxdb.ErrRunNotFound
}
return nil
@ -1108,11 +1109,11 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
func TestService_handlePostTaskLabel(t *testing.T) {
type fields struct {
LabelService platform.LabelService
LabelService influxdb.LabelService
}
type args struct {
labelMapping *platform.LabelMapping
taskID platform.ID
labelMapping *influxdb.LabelMapping
taskID influxdb.ID
}
type wants struct {
statusCode int
@ -1130,8 +1131,8 @@ func TestService_handlePostTaskLabel(t *testing.T) {
name: "add label to task",
fields: fields{
LabelService: &mock.LabelService{
FindLabelByIDFn: func(ctx context.Context, id platform.ID) (*platform.Label, error) {
return &platform.Label{
FindLabelByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
return &influxdb.Label{
ID: 1,
Name: "label",
Properties: map[string]string{
@ -1139,11 +1140,11 @@ func TestService_handlePostTaskLabel(t *testing.T) {
},
}, nil
},
CreateLabelMappingFn: func(ctx context.Context, m *platform.LabelMapping) error { return nil },
CreateLabelMappingFn: func(ctx context.Context, m *influxdb.LabelMapping) error { return nil },
},
},
args: args{
labelMapping: &platform.LabelMapping{
labelMapping: &influxdb.LabelMapping{
ResourceID: 100,
LabelID: 1,
},
@ -1215,37 +1216,37 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) {
ctx := context.Background()
// Set up user and org.
u := &platform.User{Name: "u"}
u := &influxdb.User{Name: "u"}
if err := i.CreateUser(ctx, u); err != nil {
t.Fatal(err)
}
o := &platform.Organization{Name: "o"}
o := &influxdb.Organization{Name: "o"}
if err := i.CreateOrganization(ctx, o); err != nil {
t.Fatal(err)
}
// Source and destination buckets for use in task.
bSrc := platform.Bucket{OrgID: o.ID, Name: "b-src"}
bSrc := influxdb.Bucket{OrgID: o.ID, Name: "b-src"}
if err := i.CreateBucket(ctx, &bSrc); err != nil {
t.Fatal(err)
}
bDst := platform.Bucket{OrgID: o.ID, Name: "b-dst"}
bDst := influxdb.Bucket{OrgID: o.ID, Name: "b-dst"}
if err := i.CreateBucket(ctx, &bDst); err != nil {
t.Fatal(err)
}
authz := platform.Authorization{OrgID: o.ID, UserID: u.ID, Permissions: platform.OperPermissions()}
authz := influxdb.Authorization{OrgID: o.ID, UserID: u.ID, Permissions: influxdb.OperPermissions()}
if err := i.CreateAuthorization(ctx, &authz); err != nil {
t.Fatal(err)
}
ts := &mock.TaskService{
CreateTaskFn: func(_ context.Context, tc platform.TaskCreate) (*platform.Task, error) {
CreateTaskFn: func(_ context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
if tc.OrganizationID != o.ID {
t.Fatalf("expected task to be created with org ID %s, got %s", o.ID, tc.OrganizationID)
}
return &platform.Task{ID: 9, OrganizationID: o.ID, OwnerID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil
return &influxdb.Task{ID: 9, OrganizationID: o.ID, OwnerID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil
},
}
@ -1265,7 +1266,7 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) {
url := "http://localhost:9999/api/v2/tasks"
b, err := json.Marshal(platform.TaskCreate{
b, err := json.Marshal(influxdb.TaskCreate{
Flux: script,
Organization: o.Name,
})
@ -1292,7 +1293,7 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) {
}
// The task should have been created with a valid token.
var createdTask platform.Task
var createdTask influxdb.Task
if err := json.Unmarshal([]byte(body), &createdTask); err != nil {
t.Fatal(err)
}
@ -1309,38 +1310,38 @@ func TestTaskHandler_Sessions(t *testing.T) {
ctx := context.Background()
// Set up user and org.
u := &platform.User{Name: "u"}
u := &influxdb.User{Name: "u"}
if err := i.CreateUser(ctx, u); err != nil {
t.Fatal(err)
}
o := &platform.Organization{Name: "o"}
o := &influxdb.Organization{Name: "o"}
if err := i.CreateOrganization(ctx, o); err != nil {
t.Fatal(err)
}
// Map user to org.
if err := i.CreateUserResourceMapping(ctx, &platform.UserResourceMapping{
ResourceType: platform.OrgsResourceType,
if err := i.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
ResourceType: influxdb.OrgsResourceType,
ResourceID: o.ID,
UserID: u.ID,
UserType: platform.Owner,
UserType: influxdb.Owner,
}); err != nil {
t.Fatal(err)
}
// Source and destination buckets for use in task.
bSrc := platform.Bucket{OrgID: o.ID, Name: "b-src"}
bSrc := influxdb.Bucket{OrgID: o.ID, Name: "b-src"}
if err := i.CreateBucket(ctx, &bSrc); err != nil {
t.Fatal(err)
}
bDst := platform.Bucket{OrgID: o.ID, Name: "b-dst"}
bDst := influxdb.Bucket{OrgID: o.ID, Name: "b-dst"}
if err := i.CreateBucket(ctx, &bDst); err != nil {
t.Fatal(err)
}
sessionAllPermsCtx := pcontext.SetAuthorizer(context.Background(), &platform.Session{
sessionAllPermsCtx := pcontext.SetAuthorizer(context.Background(), &influxdb.Session{
UserID: u.ID,
Permissions: platform.OperPermissions(),
Permissions: influxdb.OperPermissions(),
ExpiresAt: time.Now().Add(24 * time.Hour),
})
@ -1361,33 +1362,33 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Run("get runs for a task", func(t *testing.T) {
// Unique authorization to associate with our fake task.
taskAuth := &platform.Authorization{OrgID: o.ID, UserID: u.ID}
taskAuth := &influxdb.Authorization{OrgID: o.ID, UserID: u.ID}
if err := i.CreateAuthorization(ctx, taskAuth); err != nil {
t.Fatal(err)
}
const taskID = platform.ID(12345)
const runID = platform.ID(9876)
const taskID = influxdb.ID(12345)
const runID = influxdb.ID(9876)
var findRunsCtx context.Context
ts := &mock.TaskService{
FindRunsFn: func(ctx context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
FindRunsFn: func(ctx context.Context, f influxdb.RunFilter) ([]*influxdb.Run, int, error) {
findRunsCtx = ctx
if f.Task != taskID {
t.Fatalf("expected task ID %v, got %v", taskID, f.Task)
}
return []*platform.Run{
return []*influxdb.Run{
{ID: runID, TaskID: taskID},
}, 1, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return &platform.Task{
return &influxdb.Task{
ID: taskID,
OrganizationID: o.ID,
AuthorizationID: taskAuth.ID,
@ -1416,23 +1417,23 @@ func TestTaskHandler_Sessions(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if authr.Kind() != platform.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", platform.AuthorizationKind, authr.Kind())
if authr.Kind() != influxdb.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", influxdb.AuthorizationKind, authr.Kind())
}
orgID := authr.(*platform.Authorization).OrgID
orgID := authr.(*influxdb.Authorization).OrgID
if orgID != o.ID {
t.Fatalf("expected context's authorizer org ID to be %v, got %v", o.ID, orgID)
}
// Other user without permissions on the task or authorization should be disallowed.
otherUser := &platform.User{Name: "other-" + t.Name()}
otherUser := &influxdb.User{Name: "other-" + t.Name()}
if err := i.CreateUser(ctx, otherUser); err != nil {
t.Fatal(err)
}
valCtx = pcontext.SetAuthorizer(valCtx, &platform.Session{
valCtx = pcontext.SetAuthorizer(valCtx, &influxdb.Session{
UserID: otherUser.ID,
ExpiresAt: time.Now().Add(24 * time.Hour),
})
@ -1454,17 +1455,17 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Run("get single run for a task", func(t *testing.T) {
// Unique authorization to associate with our fake task.
taskAuth := &platform.Authorization{OrgID: o.ID, UserID: u.ID}
taskAuth := &influxdb.Authorization{OrgID: o.ID, UserID: u.ID}
if err := i.CreateAuthorization(ctx, taskAuth); err != nil {
t.Fatal(err)
}
const taskID = platform.ID(12345)
const runID = platform.ID(9876)
const taskID = influxdb.ID(12345)
const runID = influxdb.ID(9876)
var findRunByIDCtx context.Context
ts := &mock.TaskService{
FindRunByIDFn: func(ctx context.Context, tid, rid platform.ID) (*platform.Run, error) {
FindRunByIDFn: func(ctx context.Context, tid, rid influxdb.ID) (*influxdb.Run, error) {
findRunByIDCtx = ctx
if tid != taskID {
t.Fatalf("expected task ID %v, got %v", taskID, tid)
@ -1473,15 +1474,15 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Fatalf("expected run ID %v, got %v", runID, rid)
}
return &platform.Run{ID: runID, TaskID: taskID}, nil
return &influxdb.Run{ID: runID, TaskID: taskID}, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return &platform.Task{
return &influxdb.Task{
ID: taskID,
OrganizationID: o.ID,
AuthorizationID: taskAuth.ID,
@ -1514,20 +1515,20 @@ func TestTaskHandler_Sessions(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if authr.Kind() != platform.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", platform.AuthorizationKind, authr.Kind())
if authr.Kind() != influxdb.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", influxdb.AuthorizationKind, authr.Kind())
}
if authr.Identifier() != taskAuth.ID {
t.Fatalf("expected context's authorizer ID to be %v, got %v", taskAuth.ID, authr.Identifier())
}
// Other user without permissions on the task or authorization should be disallowed.
otherUser := &platform.User{Name: "other-" + t.Name()}
otherUser := &influxdb.User{Name: "other-" + t.Name()}
if err := i.CreateUser(ctx, otherUser); err != nil {
t.Fatal(err)
}
valCtx = pcontext.SetAuthorizer(valCtx, &platform.Session{
valCtx = pcontext.SetAuthorizer(valCtx, &influxdb.Session{
UserID: otherUser.ID,
ExpiresAt: time.Now().Add(24 * time.Hour),
})
@ -1549,17 +1550,17 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Run("get logs for a run", func(t *testing.T) {
// Unique authorization to associate with our fake task.
taskAuth := &platform.Authorization{OrgID: o.ID, UserID: u.ID}
taskAuth := &influxdb.Authorization{OrgID: o.ID, UserID: u.ID}
if err := i.CreateAuthorization(ctx, taskAuth); err != nil {
t.Fatal(err)
}
const taskID = platform.ID(12345)
const runID = platform.ID(9876)
const taskID = influxdb.ID(12345)
const runID = influxdb.ID(9876)
var findLogsCtx context.Context
ts := &mock.TaskService{
FindLogsFn: func(ctx context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
FindLogsFn: func(ctx context.Context, f influxdb.LogFilter) ([]*influxdb.Log, int, error) {
findLogsCtx = ctx
if f.Task != taskID {
t.Fatalf("expected task ID %v, got %v", taskID, f.Task)
@ -1568,16 +1569,16 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Fatalf("expected run ID %v, got %v", runID, *f.Run)
}
line := platform.Log{Time: "time", Message: "a log line"}
return []*platform.Log{&line}, 1, nil
line := influxdb.Log{Time: "time", Message: "a log line"}
return []*influxdb.Log{&line}, 1, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return &platform.Task{
return &influxdb.Task{
ID: taskID,
OrganizationID: o.ID,
AuthorizationID: taskAuth.ID,
@ -1610,20 +1611,20 @@ func TestTaskHandler_Sessions(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if authr.Kind() != platform.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", platform.AuthorizationKind, authr.Kind())
if authr.Kind() != influxdb.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", influxdb.AuthorizationKind, authr.Kind())
}
if authr.Identifier() != taskAuth.ID {
t.Fatalf("expected context's authorizer ID to be %v, got %v", taskAuth.ID, authr.Identifier())
}
// Other user without permissions on the task or authorization should be disallowed.
otherUser := &platform.User{Name: "other-" + t.Name()}
otherUser := &influxdb.User{Name: "other-" + t.Name()}
if err := i.CreateUser(ctx, otherUser); err != nil {
t.Fatal(err)
}
valCtx = pcontext.SetAuthorizer(valCtx, &platform.Session{
valCtx = pcontext.SetAuthorizer(valCtx, &influxdb.Session{
UserID: otherUser.ID,
ExpiresAt: time.Now().Add(24 * time.Hour),
})
@ -1645,17 +1646,17 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Run("retry a run", func(t *testing.T) {
// Unique authorization to associate with our fake task.
taskAuth := &platform.Authorization{OrgID: o.ID, UserID: u.ID}
taskAuth := &influxdb.Authorization{OrgID: o.ID, UserID: u.ID}
if err := i.CreateAuthorization(ctx, taskAuth); err != nil {
t.Fatal(err)
}
const taskID = platform.ID(12345)
const runID = platform.ID(9876)
const taskID = influxdb.ID(12345)
const runID = influxdb.ID(9876)
var retryRunCtx context.Context
ts := &mock.TaskService{
RetryRunFn: func(ctx context.Context, tid, rid platform.ID) (*platform.Run, error) {
RetryRunFn: func(ctx context.Context, tid, rid influxdb.ID) (*influxdb.Run, error) {
retryRunCtx = ctx
if tid != taskID {
t.Fatalf("expected task ID %v, got %v", taskID, tid)
@ -1664,15 +1665,15 @@ func TestTaskHandler_Sessions(t *testing.T) {
t.Fatalf("expected run ID %v, got %v", runID, rid)
}
return &platform.Run{ID: 10 * runID, TaskID: taskID}, nil
return &influxdb.Run{ID: 10 * runID, TaskID: taskID}, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
FindTaskByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id != taskID {
return nil, platform.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return &platform.Task{
return &influxdb.Task{
ID: taskID,
OrganizationID: o.ID,
AuthorizationID: taskAuth.ID,
@ -1705,20 +1706,20 @@ func TestTaskHandler_Sessions(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if authr.Kind() != platform.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", platform.AuthorizationKind, authr.Kind())
if authr.Kind() != influxdb.AuthorizationKind {
t.Fatalf("expected context's authorizer to be of kind %q, got %q", influxdb.AuthorizationKind, authr.Kind())
}
if authr.Identifier() != taskAuth.ID {
t.Fatalf("expected context's authorizer ID to be %v, got %v", taskAuth.ID, authr.Identifier())
}
// Other user without permissions on the task or authorization should be disallowed.
otherUser := &platform.User{Name: "other-" + t.Name()}
otherUser := &influxdb.User{Name: "other-" + t.Name()}
if err := i.CreateUser(ctx, otherUser); err != nil {
t.Fatal(err)
}
valCtx = pcontext.SetAuthorizer(valCtx, &platform.Session{
valCtx = pcontext.SetAuthorizer(valCtx, &influxdb.Session{
UserID: otherUser.ID,
ExpiresAt: time.Now().Add(24 * time.Hour),
})