feat: passing `type=basic` returns task metadata without query text (#22728)

pull/22735/head
Daniel Moran 2021-10-22 09:14:58 -04:00 committed by GitHub
parent de7f052e5a
commit 504f0e4413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 241 deletions

View File

@ -571,8 +571,19 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs influxdb.O
req.filter.Status = &status
}
// the task api can only create or lookup system tasks.
req.filter.Type = &taskmodel.TaskSystemType
switch typ := qp.Get("type"); typ {
case "basic":
req.filter.Type = &taskmodel.TaskBasicType
case "system":
fallthrough
case "":
req.filter.Type = &taskmodel.TaskSystemType
default:
return nil, &errors2.Error{
Code: errors2.EInvalid,
Msg: fmt.Sprintf("%q is not a valid task type", typ),
}
}
if name := qp.Get("name"); name != "" {
req.filter.Name = &name

View File

@ -36,65 +36,108 @@ var (
var _ taskmodel.TaskService = (*Service)(nil)
type kvTask struct {
ID platform.ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID platform.ID `json:"orgID"`
Organization string `json:"org"`
OwnerID platform.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
Offset influxdb.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LatestSuccess time.Time `json:"latestSuccess,omitempty"`
LatestFailure time.Time `json:"latestFailure,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
type matchableTask interface {
GetID() platform.ID
GetOrgID() platform.ID
GetOwnerID() platform.ID
GetType() string
GetName() string
GetStatus() string
ToInfluxDB() *taskmodel.Task
}
func kvToInfluxTask(k *kvTask) *taskmodel.Task {
type basicKvTask struct {
ID platform.ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID platform.ID `json:"orgID"`
OwnerID platform.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
Offset influxdb.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LatestSuccess time.Time `json:"latestSuccess,omitempty"`
LatestFailure time.Time `json:"latestFailure,omitempty"`
}
func (kv basicKvTask) GetID() platform.ID {
return kv.ID
}
func (kv basicKvTask) GetOrgID() platform.ID {
return kv.OrganizationID
}
func (kv basicKvTask) GetOwnerID() platform.ID {
return kv.OwnerID
}
func (kv basicKvTask) GetType() string {
return kv.Type
}
func (kv basicKvTask) GetName() string {
return kv.Name
}
func (kv basicKvTask) GetStatus() string {
return kv.Status
}
func (kv basicKvTask) ToInfluxDB() *taskmodel.Task {
return &taskmodel.Task{
ID: k.ID,
Type: k.Type,
OrganizationID: k.OrganizationID,
Organization: k.Organization,
OwnerID: k.OwnerID,
Name: k.Name,
Description: k.Description,
Status: k.Status,
Flux: k.Flux,
Every: k.Every,
Cron: k.Cron,
LastRunStatus: k.LastRunStatus,
LastRunError: k.LastRunError,
Offset: k.Offset.Duration,
LatestCompleted: k.LatestCompleted,
LatestScheduled: k.LatestScheduled,
LatestSuccess: k.LatestSuccess,
LatestFailure: k.LatestFailure,
CreatedAt: k.CreatedAt,
UpdatedAt: k.UpdatedAt,
Metadata: k.Metadata,
ID: kv.ID,
Type: kv.Type,
OrganizationID: kv.OrganizationID,
OwnerID: kv.OwnerID,
Name: kv.Name,
Description: kv.Description,
Status: kv.Status,
Every: kv.Every,
Cron: kv.Cron,
LastRunStatus: kv.LastRunStatus,
LastRunError: kv.LastRunError,
Offset: kv.Offset.Duration,
LatestCompleted: kv.LatestCompleted,
LatestScheduled: kv.LatestScheduled,
LatestSuccess: kv.LatestSuccess,
LatestFailure: kv.LatestFailure,
}
}
type kvTask struct {
basicKvTask
Organization string `json:"org"`
Flux string `json:"flux"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (kv kvTask) ToInfluxDB() *taskmodel.Task {
res := kv.basicKvTask.ToInfluxDB()
res.Organization = kv.Organization
res.Flux = kv.Flux
res.CreatedAt = kv.CreatedAt
res.UpdatedAt = kv.UpdatedAt
res.Metadata = kv.Metadata
return res
}
// FindTaskByID returns a single task
func (s *Service) FindTaskByID(ctx context.Context, id platform.ID) (*taskmodel.Task, error) {
var t *taskmodel.Task
err := s.kv.View(ctx, func(tx Tx) error {
task, err := s.findTaskByID(ctx, tx, id)
task, err := s.findTaskByID(ctx, tx, id, false)
if err != nil {
return err
}
t = task
t = task.ToInfluxDB()
return nil
})
if err != nil {
@ -106,7 +149,7 @@ func (s *Service) FindTaskByID(ctx context.Context, id platform.ID) (*taskmodel.
// findTaskByID is an internal method used to do any action with tasks internally
// that do not require authorization.
func (s *Service) findTaskByID(ctx context.Context, tx Tx, id platform.ID) (*taskmodel.Task, error) {
func (s *Service) findTaskByID(ctx context.Context, tx Tx, id platform.ID, basicOnly bool) (matchableTask, error) {
taskKey, err := taskKey(id)
if err != nil {
return nil, err
@ -124,15 +167,14 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id platform.ID) (*tas
if err != nil {
return nil, err
}
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return nil, taskmodel.ErrInternalTaskServiceError(err)
var t matchableTask
if basicOnly {
t = &basicKvTask{}
} else {
t = &kvTask{}
}
t := kvToInfluxTask(kvTask)
if t.LatestCompleted.IsZero() {
t.LatestCompleted = t.CreatedAt
if err := json.Unmarshal(v, t); err != nil {
return nil, taskmodel.ErrInternalTaskServiceError(err)
}
return t, nil
@ -230,17 +272,21 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter taskmodel.T
return nil, 0, taskmodel.ErrUnexpectedTaskBucketErr(err)
}
matchFn := newTaskMatchFn(filter, nil)
matchFn := newTaskMatchFn(filter)
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
var task matchableTask
if filter.Type != nil && *filter.Type == taskmodel.TaskBasicType {
task = &basicKvTask{}
} else {
task = &kvTask{}
}
if err := json.Unmarshal(v, task); err != nil {
return nil, 0, taskmodel.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if matchFn == nil || matchFn(t) {
ts = append(ts, t)
if matchFn == nil || matchFn(task) {
ts = append(ts, task.ToInfluxDB())
if len(ts) >= filter.Limit {
break
@ -298,7 +344,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, orgID platform.ID,
// free cursor resources
defer c.Close()
matchFn := newTaskMatchFn(filter, nil)
matchFn := newTaskMatchFn(filter)
for k, v := c.Next(); k != nil; k, v = c.Next() {
id, err := platform.IDFromString(string(v))
@ -306,7 +352,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, orgID platform.ID,
return nil, 0, taskmodel.ErrInvalidTaskID
}
t, err := s.findTaskByID(ctx, tx, *id)
t, err := s.findTaskByID(ctx, tx, *id, filter.Type != nil && *filter.Type == taskmodel.TaskBasicType)
if err != nil {
if err == taskmodel.ErrTaskNotFound {
// we might have some crufty index's
@ -317,12 +363,12 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, orgID platform.ID,
}
// If the new task doesn't belong to the org we have looped outside the org filter
if t.OrganizationID != orgID {
if t.GetOrgID() != orgID {
break
}
if matchFn == nil || matchFn(t) {
ts = append(ts, t)
ts = append(ts, t.ToInfluxDB())
// Check if we are over running the limit
if len(ts) >= filter.Limit {
break
@ -333,59 +379,44 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, orgID platform.ID,
return ts, len(ts), c.Err()
}
type taskMatchFn func(*taskmodel.Task) bool
type taskMatchFn func(matchableTask) bool
// newTaskMatchFn returns a function for validating
// a task matches the filter. Will return nil if
// the filter should match all tasks.
func newTaskMatchFn(f taskmodel.TaskFilter, org *influxdb.Organization) func(t *taskmodel.Task) bool {
var fn taskMatchFn
if org != nil {
expected := org.ID
prevFn := fn
fn = func(t *taskmodel.Task) bool {
res := prevFn == nil || prevFn(t)
return res && expected == t.OrganizationID
}
func newTaskMatchFn(f taskmodel.TaskFilter) taskMatchFn {
if f.Type == nil && f.Name == nil && f.Status == nil && f.User == nil {
return nil
}
if f.Type != nil {
expected := *f.Type
prevFn := fn
fn = func(t *taskmodel.Task) bool {
res := prevFn == nil || prevFn(t)
return res &&
((expected == taskmodel.TaskSystemType && (t.Type == taskmodel.TaskSystemType || t.Type == "")) || expected == t.Type)
return func(t matchableTask) bool {
if f.Type != nil {
expected := *f.Type
if expected == taskmodel.TaskBasicType {
// "basic" type == get "system" type tasks, but without full metadata
expected = taskmodel.TaskSystemType
}
typ := t.GetType()
// Default to "system" for old tasks without a persisted type
if typ == "" {
typ = taskmodel.TaskSystemType
}
if expected != typ {
return false
}
}
}
if f.Name != nil {
expected := *f.Name
prevFn := fn
fn = func(t *taskmodel.Task) bool {
res := prevFn == nil || prevFn(t)
return res && (expected == t.Name)
if f.Name != nil && t.GetName() != *f.Name {
return false
}
}
if f.Status != nil {
prevFn := fn
fn = func(t *taskmodel.Task) bool {
res := prevFn == nil || prevFn(t)
return res && (t.Status == *f.Status)
if f.Status != nil && t.GetStatus() != *f.Status {
return false
}
}
if f.User != nil {
prevFn := fn
fn = func(t *taskmodel.Task) bool {
res := prevFn == nil || prevFn(t)
return res && t.OwnerID == *f.User
if f.User != nil && t.GetOwnerID() != *f.User {
return false
}
}
return fn
return true
}
}
// findAllTasks is a subset of the find tasks function. Used for cleanliness.
@ -420,18 +451,21 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter taskmodel.Task
// free cursor resources
defer c.Close()
matchFn := newTaskMatchFn(filter, nil)
matchFn := newTaskMatchFn(filter)
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
var task matchableTask
if filter.Type != nil && *filter.Type == taskmodel.TaskBasicType {
task = &basicKvTask{}
} else {
task = &kvTask{}
}
if err := json.Unmarshal(v, task); err != nil {
return nil, 0, taskmodel.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if matchFn == nil || matchFn(t) {
ts = append(ts, t)
if matchFn == nil || matchFn(task) {
ts = append(ts, task.ToInfluxDB())
if len(ts) >= filter.Limit {
break
@ -595,10 +629,11 @@ func (s *Service) UpdateTask(ctx context.Context, id platform.ID, upd taskmodel.
func (s *Service) updateTask(ctx context.Context, tx Tx, id platform.ID, upd taskmodel.TaskUpdate) (*taskmodel.Task, error) {
// retrieve the task
task, err := s.findTaskByID(ctx, tx, id)
t, err := s.findTaskByID(ctx, tx, id, false)
if err != nil {
return nil, err
}
task := t.ToInfluxDB()
updatedAt := s.clock.Now().UTC()
@ -765,13 +800,13 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id platform.ID) error {
}
// retrieve the task
task, err := s.findTaskByID(ctx, tx, id)
task, err := s.findTaskByID(ctx, tx, id, true)
if err != nil {
return err
}
// remove the orgs index
orgKey, err := taskOrgKey(task.OrganizationID, task.ID)
orgKey, err := taskOrgKey(task.GetOrgID(), task.GetID())
if err != nil {
return err
}
@ -781,7 +816,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id platform.ID) error {
}
// remove latest completed
lastCompletedKey, err := taskLatestCompletedKey(task.ID)
lastCompletedKey, err := taskLatestCompletedKey(task.GetID())
if err != nil {
return err
}
@ -791,13 +826,13 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id platform.ID) error {
}
// remove the runs
runs, _, err := s.findRuns(ctx, tx, taskmodel.RunFilter{Task: task.ID})
runs, _, err := s.findRuns(ctx, tx, taskmodel.RunFilter{Task: task.GetID()})
if err != nil {
return err
}
for _, run := range runs {
key, err := taskRunKey(task.ID, run.ID)
key, err := taskRunKey(task.GetID(), run.ID)
if err != nil {
return err
}
@ -807,7 +842,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id platform.ID) error {
}
}
// remove the task
key, err := taskKey(task.ID)
key, err := taskKey(task.GetID())
if err != nil {
return err
}
@ -819,9 +854,9 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id platform.ID) error {
uid, _ := icontext.GetUserID(ctx)
return s.audit.Log(resource.Change{
Type: resource.Delete,
ResourceID: task.ID,
ResourceID: task.GetID(),
ResourceType: influxdb.TasksResourceType,
OrganizationID: task.OrganizationID,
OrganizationID: task.GetOrgID(),
UserID: uid,
Time: time.Now(),
})

View File

@ -4,14 +4,12 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/task/taskmodel"
)
func Test_newTaskMatchFN(t *testing.T) {
ct := func(typ string, name string) *taskmodel.Task {
return &taskmodel.Task{
ct := func(typ string, name string) *basicKvTask {
return &basicKvTask{
Type: typ,
OrganizationID: 1,
Name: name,
@ -19,20 +17,12 @@ func Test_newTaskMatchFN(t *testing.T) {
}
const (
NoOrg = platform.ID(0)
NoTyp = "-"
NoNam = "-"
)
newMatch := func(orgID platform.ID, typ string, name string) taskMatchFn {
var (
org *influxdb.Organization
fil taskmodel.TaskFilter
)
if orgID != NoOrg {
org = &influxdb.Organization{ID: orgID}
}
newMatch := func(typ string, name string) taskMatchFn {
var fil taskmodel.TaskFilter
if typ != NoTyp {
fil.Type = &typ
@ -42,12 +32,12 @@ func Test_newTaskMatchFN(t *testing.T) {
fil.Name = &name
}
return newTaskMatchFn(fil, org)
return newTaskMatchFn(fil)
}
type test struct {
name string
task *taskmodel.Task
task matchableTask
fn taskMatchFn
exp bool
}
@ -56,48 +46,37 @@ func Test_newTaskMatchFN(t *testing.T) {
name string
tests []test
}{
{
"match org",
[]test{
{
name: "equal",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(1, NoTyp, NoNam),
exp: true,
},
{
name: "not org",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(2, NoTyp, NoNam),
exp: false,
},
},
},
{
"match type",
[]test{
{
name: "empty with system type",
task: ct("", "Foo"),
fn: newMatch(NoOrg, taskmodel.TaskSystemType, NoNam),
fn: newMatch(taskmodel.TaskSystemType, NoNam),
exp: true,
},
{
name: "system with system type",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, taskmodel.TaskSystemType, NoNam),
fn: newMatch(taskmodel.TaskSystemType, NoNam),
exp: true,
},
{
name: "system with basic type",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(taskmodel.TaskBasicType, NoNam),
exp: true,
},
{
name: "equal",
task: ct("other type", "Foo"),
fn: newMatch(NoOrg, "other type", NoNam),
fn: newMatch("other type", NoNam),
exp: true,
},
{
name: "not type",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, "other type", NoNam),
fn: newMatch("other type", NoNam),
exp: false,
},
},
@ -108,88 +87,13 @@ func Test_newTaskMatchFN(t *testing.T) {
{
name: "equal",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, NoTyp, "Foo"),
fn: newMatch(NoTyp, "Foo"),
exp: true,
},
{
name: "not name",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(NoOrg, NoTyp, "Bar"),
exp: false,
},
},
},
{
"match org type",
[]test{
{
name: "equal",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(1, taskmodel.TaskSystemType, NoNam),
exp: true,
},
{
name: "not type",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(1, "wrong type", NoNam),
exp: false,
},
{
name: "not org",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(2, taskmodel.TaskSystemType, NoNam),
exp: false,
},
{
name: "not org and type",
task: ct("check", "Foo"),
fn: newMatch(2, taskmodel.TaskSystemType, NoNam),
exp: false,
},
},
},
{
"match org name",
[]test{
{
name: "equal",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(1, NoTyp, "Foo"),
exp: true,
},
{
name: "not org",
task: ct(taskmodel.TaskSystemType, "Foo"),
fn: newMatch(2, NoTyp, "Foo"),
exp: false,
},
},
},
{
"match org name type",
[]test{
{
name: "equal",
task: ct("check", "Foo"),
fn: newMatch(1, "check", "Foo"),
exp: true,
},
{
name: "not org",
task: ct("check", "Foo"),
fn: newMatch(2, "check", "Foo"),
exp: false,
},
{
name: "not name",
task: ct("check", "Foo"),
fn: newMatch(1, "check", "Bar"),
exp: false,
},
{
name: "not type",
task: ct("check", "Foo"),
fn: newMatch(1, "other", "Foo"),
fn: newMatch(NoTyp, "Bar"),
exp: false,
},
},
@ -208,7 +112,7 @@ func Test_newTaskMatchFN(t *testing.T) {
}
t.Run("match returns nil for no filter", func(t *testing.T) {
fn := newTaskMatchFn(taskmodel.TaskFilter{}, nil)
fn := newTaskMatchFn(taskmodel.TaskFilter{})
if fn != nil {
t.Error("expected nil")
}

View File

@ -10,7 +10,7 @@ declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
declare -r STATIC_DIR="$ROOT_DIR/static"
# Pins the swagger that will be downloaded to a specific commit
declare -r OPENAPI_SHA=d6f9073685dfb58e36f20c2ed351cf872ad31a86
declare -r OPENAPI_SHA=2f8be204870af493560bd0e5cc3201db9da2e432
# Don't do a shallow clone since the commit we want might be several commits
# back; but do only clone the main branch.

View File

@ -63,6 +63,10 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
testTaskCRUD(t, sys)
})
t.Run("FindTasks basic", func(t *testing.T) {
testTaskFindTasksBasic(t, sys)
})
t.Run("FindTasks paging", func(t *testing.T) {
testTaskFindTasksPaging(t, sys)
})
@ -496,6 +500,43 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
}
func testTaskFindTasksBasic(t *testing.T, sys *System) {
script := `option task = {name: "Task %03d", cron: "* * * * *", concurrency: 100, offset: 10s}
from(bucket: "b")
|> to(bucket: "two", orgID: "000000000000000")`
cr := creds(t, sys)
tc := taskmodel.TaskCreate{
OrganizationID: cr.OrgID,
OwnerID: cr.UserID,
Type: taskmodel.TaskSystemType,
}
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
created := make([]*taskmodel.Task, 50)
for i := 0; i < 50; i++ {
tc.Flux = fmt.Sprintf(script, i/10)
tc.Description = fmt.Sprintf("Task %d", i)
tsk, err := sys.TaskService.CreateTask(authorizedCtx, tc)
require.NoError(t, err)
require.True(t, tsk.ID.Valid(), "no task ID set")
created[i] = tsk
}
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, taskmodel.TaskFilter{Type: &taskmodel.TaskBasicType})
require.NoError(t, err)
require.Equal(t, 50, len(tasks))
// Basic results should exclude query text, but include other metdata like description.
for i, tsk := range tasks {
require.Empty(t, tsk.Flux)
require.Equal(t, fmt.Sprintf("Task %d", i), tsk.Description)
}
}
func testTaskFindTasksPaging(t *testing.T, sys *System) {
script := `option task = {name: "Task %03d", cron: "* * * * *", concurrency: 100, offset: 10s}

View File

@ -30,6 +30,8 @@ const (
var (
// TaskSystemType is the type set in tasks' for all crud requests
TaskSystemType = "system"
// TaskBasicType is short-hand used by the UI to request a minimal subset of system task metadata
TaskBasicType = "basic"
)
// Task is a task. 🎊