influxdb/kv/task.go

1661 lines
41 KiB
Go
Raw Normal View History

package kv
import (
"context"
"encoding/json"
"strings"
"time"
2020-01-13 14:22:52 +00:00
"github.com/influxdata/influxdb/resource"
"github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
"go.uber.org/zap"
)
// Task Storage Schema
// taskBucket:
// <taskID>: task data storage
// taskRunBucket:
// <taskID>/<runID>: run data storage
// <taskID>/manualRuns: list of runs to run manually
// <taskID>/latestCompleted: run data for the latest completed run of a task
// taskIndexBucket
// <orgID>/<taskID>: index for tasks by org
// We may want to add a <taskName>/<taskID> index to allow us to look up tasks by task name.
var (
taskBucket = []byte("tasksv1")
taskRunBucket = []byte("taskRunsv1")
taskIndexBucket = []byte("taskIndexsv1")
)
var _ influxdb.TaskService = (*Service)(nil)
var _ backend.TaskControlService = (*Service)(nil)
type kvTask struct {
ID influxdb.ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID influxdb.ID `json:"orgID"`
Organization string `json:"org"`
OwnerID influxdb.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
Offset influxdb.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func kvToInfluxTask(k *kvTask) *influxdb.Task {
return &influxdb.Task{
ID: k.ID,
Type: k.Type,
OrganizationID: k.OrganizationID,
Organization: k.Organization,
OwnerID: k.OwnerID,
Name: k.Name,
Description: k.Description,
Status: k.Status,
Flux: k.Flux,
Every: k.Every,
Cron: k.Cron,
LastRunStatus: k.LastRunStatus,
LastRunError: k.LastRunError,
Offset: k.Offset.Duration,
LatestCompleted: k.LatestCompleted,
LatestScheduled: k.LatestScheduled,
CreatedAt: k.CreatedAt,
UpdatedAt: k.UpdatedAt,
Metadata: k.Metadata,
}
}
func (s *Service) initializeTasks(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(taskBucket); err != nil {
return err
}
if _, err := tx.Bucket(taskRunBucket); err != nil {
return err
}
if _, err := tx.Bucket(taskIndexBucket); err != nil {
return err
}
return nil
}
// FindTaskByID returns a single task
func (s *Service) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
var t *influxdb.Task
err := s.kv.View(ctx, func(tx Tx) error {
if influxdb.FindTaskAuthRequired(ctx) {
task, err := s.findTaskByIDWithAuth(ctx, tx, id)
if err != nil {
return err
}
t = task
} else {
task, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return err
}
t = task
}
return nil
})
if err != nil {
return nil, err
}
return t, nil
}
// findTaskByIDWithAuth is a task lookup that populates the auth
// This is to be used when we want to satisfy the FindTaskByID method
// But is more taxing on the system then if we want to find the task alone.
func (s *Service) findTaskByIDWithAuth(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) {
t, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return nil, err
}
t.Authorization = &influxdb.Authorization{
Status: influxdb.Active,
ID: influxdb.ID(1),
OrgID: t.OrganizationID,
}
if t.OwnerID.Valid() {
// populate task Auth
ps, err := s.maxPermissions(ctx, tx, t.OwnerID)
if err != nil {
return nil, err
}
t.Authorization.Permissions = ps
}
return t, nil
}
// findTaskByID is an internal method used to do any action with tasks internally
// that do not require authorization.
func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) {
taskKey, err := taskKey(id)
if err != nil {
return nil, err
}
b, err := tx.Bucket(taskBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
v, err := b.Get(taskKey)
if IsNotFound(err) {
return nil, influxdb.ErrTaskNotFound
}
if err != nil {
return nil, err
}
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if t.LatestCompleted.IsZero() {
t.LatestCompleted = t.CreatedAt
}
// Attempt to fill in the owner ID based on the auth.
if !t.OwnerID.Valid() {
authType := struct {
AuthorizationID influxdb.ID `json:"authorizationID"`
}{}
if err := json.Unmarshal(v, &authType); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
auth, err := s.findAuthorizationByID(ctx, tx, authType.AuthorizationID)
if err == nil {
t.OwnerID = auth.GetUserID()
}
}
// Attempt to fill in the ownerID based on the organization owners.
// If they have multiple owners just use the first one because any org owner
// will have sufficient permissions to run a task.
if !t.OwnerID.Valid() {
owners, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: t.OrganizationID,
ResourceType: influxdb.OrgsResourceType,
UserType: influxdb.Owner,
})
if err == nil && len(owners) > 0 {
t.OwnerID = owners[0].UserID
}
}
return t, nil
}
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var ts []*influxdb.Task
err := s.kv.View(ctx, func(tx Tx) error {
tasks, _, err := s.findTasks(ctx, tx, filter)
if err != nil {
return err
}
ts = tasks
return nil
})
if err != nil {
return nil, 0, err
}
return ts, len(ts), nil
}
func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var org *influxdb.Organization
var err error
if filter.OrganizationID != nil {
org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID)
if err != nil {
return nil, 0, err
}
} else if filter.Organization != "" {
org, err = s.findOrganizationByName(ctx, tx, filter.Organization)
if err != nil {
return nil, 0, err
}
}
// complain about limits
if filter.Limit < 0 {
return nil, 0, influxdb.ErrPageSizeTooSmall
}
if filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrPageSizeTooLarge
}
if filter.Limit == 0 {
filter.Limit = influxdb.TaskDefaultPageSize
}
// if no user or organization is passed, assume contexts auth is the user we are looking for.
// it is possible for a internal system to call this with no auth so we shouldnt fail if no auth is found.
if org == nil && filter.User == nil {
userAuth, err := icontext.GetAuthorizer(ctx)
if err == nil {
userID := userAuth.GetUserID()
filter.User = &userID
}
}
// filter by user id.
if filter.User != nil {
return s.findTasksByUser(ctx, tx, filter)
} else if org != nil {
return s.findTasksByOrg(ctx, tx, filter)
}
return s.findAllTasks(ctx, tx, filter)
}
// findTasksByUser is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
if filter.User == nil {
return nil, 0, influxdb.ErrTaskNotFound
}
var org *influxdb.Organization
var err error
if filter.OrganizationID != nil {
org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID)
if err != nil {
return nil, 0, err
}
} else if filter.Organization != "" {
org, err = s.findOrganizationByName(ctx, tx, filter.Organization)
if err != nil {
return nil, 0, err
}
}
var ts []*influxdb.Task
maps, err := s.findUserResourceMappings(
ctx,
tx,
influxdb.UserResourceMappingFilter{
ResourceType: influxdb.TasksResourceType,
UserID: *filter.User,
UserType: influxdb.Owner,
},
)
if err != nil {
return nil, 0, err
}
matchFn := newTaskMatchFn(filter, org)
for _, m := range maps {
task, err := s.findTaskByIDWithAuth(ctx, tx, m.ResourceID)
if err != nil && err != influxdb.ErrTaskNotFound {
return nil, 0, err
}
if err == influxdb.ErrTaskNotFound {
continue
}
if matchFn == nil || matchFn(task) {
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
ts = append(ts, task)
2019-08-06 16:27:52 +00:00
if len(ts) >= filter.Limit {
break
}
}
}
return ts, len(ts), nil
}
// findTasksByOrg is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var org *influxdb.Organization
var err error
if filter.OrganizationID != nil {
org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID)
if err != nil {
return nil, 0, err
}
} else if filter.Organization != "" {
org, err = s.findOrganizationByName(ctx, tx, filter.Organization)
if err != nil {
return nil, 0, err
}
}
if org == nil {
return nil, 0, influxdb.ErrTaskNotFound
}
var ts []*influxdb.Task
indexBucket, err := tx.Bucket(taskIndexBucket)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}
prefix, err := org.ID.Encode()
if err != nil {
return nil, 0, influxdb.ErrInvalidTaskID
}
key := prefix
// we can filter by orgID
if filter.After != nil {
key, err = taskOrgKey(org.ID, *filter.After)
if err != nil {
return nil, 0, err
}
// append a extra character because we want to skip "after" record
key = []byte(string(key) + "\x00")
}
c, err := indexBucket.ForwardCursor(key, WithCursorPrefix(prefix))
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}
matchFn := newTaskMatchFn(filter, nil)
for k, v := c.Next(); k != nil; k, v = c.Next() {
id, err := influxdb.IDFromString(string(v))
if err != nil {
return nil, 0, influxdb.ErrInvalidTaskID
}
t, err := s.findTaskByIDWithAuth(ctx, tx, *id)
if err != nil {
if err == influxdb.ErrTaskNotFound {
// we might have some crufty index's
continue
}
return nil, 0, err
}
// If the new task doesn't belong to the org we have looped outside the org filter
if org != nil && t.OrganizationID != org.ID {
break
}
if matchFn == nil || matchFn(t) {
ts = append(ts, t)
// Check if we are over running the limit
if len(ts) >= filter.Limit {
break
}
}
}
return ts, len(ts), err
}
type taskMatchFn func(*influxdb.Task) bool
// newTaskMatchFn returns a function for validating
// a task matches the filter. Will return nil if
// the filter should match all tasks.
func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *influxdb.Task) bool {
var fn taskMatchFn
if org != nil {
expected := org.ID
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && expected == t.OrganizationID
}
}
if f.Type != nil {
expected := *f.Type
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res &&
((expected == influxdb.TaskSystemType && (t.Type == influxdb.TaskSystemType || t.Type == "")) || expected == t.Type)
}
}
if f.Name != nil {
expected := *f.Name
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && (expected == t.Name)
}
}
if f.Status != nil {
prevFn := fn
fn = func(t *influxdb.Task) bool {
res := prevFn == nil || prevFn(t)
return res && (t.Status == *f.Status)
}
}
return fn
}
// findAllTasks is a subset of the find tasks function. Used for cleanliness.
// This function should only be executed internally because it doesn't force organization or user filtering.
// Enforcing filters should be done in a validation layer.
func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
var ts []*influxdb.Task
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}
var seek []byte
if filter.After != nil {
seek, err = taskKey(*filter.After)
if err != nil {
return nil, 0, err
}
// append a extra character because we want to skip "after" record
seek = []byte(string(seek) + "\x00")
}
c, err := taskBucket.ForwardCursor(seek)
if err != nil {
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
}
matchFn := newTaskMatchFn(filter, nil)
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
}
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
t := kvToInfluxTask(kvTask)
if matchFn == nil || matchFn(t) {
ts = append(ts, t)
if len(ts) >= filter.Limit {
break
}
}
}
return ts, len(ts), err
}
// CreateTask creates a new task.
// The owner of the task is inferred from the authorizer associated with ctx.
func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
var t *influxdb.Task
err := s.kv.Update(ctx, func(tx Tx) error {
task, err := s.createTask(ctx, tx, tc)
if err != nil {
return err
}
t = task
return nil
})
if err != nil {
return nil, err
}
return t, nil
}
func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) {
var err error
var org *influxdb.Organization
if tc.OrganizationID.Valid() {
org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID)
if err != nil {
return nil, err
}
} else if tc.Organization != "" {
org, err = s.findOrganizationByName(ctx, tx, tc.Organization)
if err != nil {
return nil, err
}
}
if org == nil {
return nil, influxdb.ErrOrgNotFound
}
// TODO: Uncomment this once the checks/notifications no longer create tasks in kv
// confirm the owner is a real user.
// if _, err = s.findUserByID(ctx, tx, tc.OwnerID); err != nil {
// return nil, influxdb.ErrInvalidOwnerID
// }
opt, err := options.FromScript(tc.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
if tc.Status == "" {
tc.Status = string(backend.TaskActive)
}
createdAt := s.clock.Now().Truncate(time.Second).UTC()
task := &influxdb.Task{
ID: s.IDGenerator.ID(),
2019-08-06 16:27:52 +00:00
Type: tc.Type,
OrganizationID: org.ID,
Organization: org.Name,
OwnerID: tc.OwnerID,
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
Metadata: tc.Metadata,
Name: opt.Name,
Description: tc.Description,
Status: tc.Status,
Flux: tc.Flux,
Every: opt.Every.String(),
Cron: opt.Cron,
CreatedAt: createdAt,
LatestCompleted: createdAt,
LatestScheduled: createdAt,
}
if opt.Offset != nil {
off, err := time.ParseDuration(opt.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
task.Offset = off
}
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
indexBucket, err := tx.Bucket(taskIndexBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
taskKey, err := taskKey(task.ID)
if err != nil {
return nil, err
}
orgKey, err := taskOrgKey(task.OrganizationID, task.ID)
if err != nil {
return nil, err
}
// write the task
err = taskBucket.Put(taskKey, taskBytes)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
// write the org index
err = indexBucket.Put(orgKey, taskKey)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
if err := s.createTaskURM(ctx, tx, task); err != nil {
s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
}
// populate permissions so the task can be used immediately
// if we cant populate here we shouldn't error.
ps, _ := s.maxPermissions(ctx, tx, task.OwnerID)
task.Authorization = &influxdb.Authorization{
Status: influxdb.Active,
ID: influxdb.ID(1),
OrgID: task.OrganizationID,
Permissions: ps,
}
2020-01-13 14:22:52 +00:00
uid, _ := icontext.GetUserID(ctx)
if err := s.audit.Log(resource.Change{
Type: resource.Create,
ResourceID: task.ID,
ResourceType: influxdb.TasksResourceType,
OrganizationID: task.OrganizationID,
UserID: uid,
ResourceBody: taskBytes,
Time: time.Now(),
}); err != nil {
return nil, err
}
return task, nil
}
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
func (s *Service) createTaskURM(ctx context.Context, tx Tx, t *influxdb.Task) error {
// TODO(jsteenb2): should not be getting authorizer inside the store, should terminate at the
// transport layer then pass user id everywhere else.
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
userAuth, err := icontext.GetAuthorizer(ctx)
if err != nil {
return err
}
return s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{
ResourceType: influxdb.TasksResourceType,
ResourceID: t.ID,
UserID: userAuth.GetUserID(),
UserType: influxdb.Owner,
})
}
// UpdateTask updates a single task with changeset.
func (s *Service) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
var t *influxdb.Task
err := s.kv.Update(ctx, func(tx Tx) error {
task, err := s.updateTask(ctx, tx, id, upd)
if err != nil {
return err
}
t = task
return nil
})
if err != nil {
return nil, err
}
return t, nil
}
func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
// retrieve the task
task, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return nil, err
}
updatedAt := s.clock.Now().UTC()
// update the flux script
if !upd.Options.IsZero() || upd.Flux != nil {
if err = upd.UpdateFlux(task.Flux); err != nil {
return nil, err
}
task.Flux = *upd.Flux
options, err := options.FromScript(*upd.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
task.Name = options.Name
task.Every = options.Every.String()
task.Cron = options.Cron
var off time.Duration
if options.Offset != nil {
off, err = time.ParseDuration(options.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
}
task.Offset = off
task.UpdatedAt = updatedAt
}
if upd.Description != nil {
task.Description = *upd.Description
task.UpdatedAt = updatedAt
}
if upd.Status != nil && task.Status != *upd.Status {
task.Status = *upd.Status
task.UpdatedAt = updatedAt
// task is transitioning from inactive to active, ensure scheduled and completed are updated
if task.Status == influxdb.TaskStatusActive {
updatedAtTrunc := updatedAt.Truncate(time.Second).UTC()
task.LatestCompleted = updatedAtTrunc
task.LatestScheduled = updatedAtTrunc
}
}
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
if upd.Metadata != nil {
task.Metadata = upd.Metadata
task.UpdatedAt = updatedAt
}
if upd.LatestCompleted != nil {
// make sure we only update latest completed one way
tlc := task.LatestCompleted
ulc := *upd.LatestCompleted
if !ulc.IsZero() && ulc.After(tlc) {
task.LatestCompleted = *upd.LatestCompleted
}
}
if upd.LatestScheduled != nil {
// make sure we only update latest scheduled one way
if upd.LatestScheduled.After(task.LatestScheduled) {
task.LatestScheduled = *upd.LatestScheduled
}
}
if upd.LastRunStatus != nil {
task.LastRunStatus = *upd.LastRunStatus
if *upd.LastRunStatus == "failed" && upd.LastRunError != nil {
task.LastRunError = *upd.LastRunError
} else {
task.LastRunError = ""
}
}
// save the updated task
bucket, err := tx.Bucket(taskBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskKey(id)
if err != nil {
return nil, err
}
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
2020-01-13 14:22:52 +00:00
err = bucket.Put(key, taskBytes)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
uid, _ := icontext.GetUserID(ctx)
if err := s.audit.Log(resource.Change{
Type: resource.Update,
ResourceID: task.ID,
ResourceType: influxdb.TasksResourceType,
OrganizationID: task.OrganizationID,
UserID: uid,
ResourceBody: taskBytes,
Time: time.Now(),
}); err != nil {
return nil, err
}
return task, nil
}
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error {
err := s.kv.Update(ctx, func(tx Tx) error {
err := s.deleteTask(ctx, tx, id)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
indexBucket, err := tx.Bucket(taskIndexBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
// retrieve the task
task, err := s.findTaskByID(ctx, tx, id)
if err != nil {
return err
}
// remove the orgs index
orgKey, err := taskOrgKey(task.OrganizationID, task.ID)
if err != nil {
return err
}
if err := indexBucket.Delete(orgKey); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
// remove latest completed
lastCompletedKey, err := taskLatestCompletedKey(task.ID)
if err != nil {
return err
}
if err := runBucket.Delete(lastCompletedKey); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
// remove the runs
runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: task.ID})
if err != nil {
return err
}
for _, run := range runs {
key, err := taskRunKey(task.ID, run.ID)
if err != nil {
return err
}
if err := runBucket.Delete(key); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
}
// remove the task
key, err := taskKey(task.ID)
if err != nil {
return err
}
if err := taskBucket.Delete(key); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
if err := s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: task.ID,
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
}); err != nil {
s.log.Info("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
feat(checks): add first pass at creating tasks from checks First pass at flux AST generation from check Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): format call expression Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): cleanup CheckDefinition Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up threshold functions Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): clean up message function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): misc fixes Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): remove dead code Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move threshold flux generation to check pkg Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): move base ast generation to its own package Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> fix(notification/check): add comment for GenerateFluxAST Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> docs(notification/flux): add comments to each exported function Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add tests for GenerateFlux Co-authored-by: Michael Desa <mjdesa@gmail.com> Co-authored-by: Deniz Kusefoglu <deniz@influxdata.com> feat(notification/check): add task options to generated flux fix(notification/check): use flux compatible duration type test(notification/check): add task option to task definition test(http): use check Duration in checks http handlers feat(check): add TaskID to checks base fix(notification/check): hack around issue with formatting ast package wtih multiple files test(check): create task when check is created A lot of little changes had to happen as a result of this. This change was rather painful. feat(checks): add update and delete of task for check fix(notifications/check): hack around the alerts package not being available test(kv): temporarily skip check tests while we merge the pr above
2019-08-07 22:34:07 +00:00
}
2020-01-13 14:22:52 +00:00
uid, _ := icontext.GetUserID(ctx)
return s.audit.Log(resource.Change{
Type: resource.Delete,
ResourceID: task.ID,
ResourceType: influxdb.TasksResourceType,
OrganizationID: task.OrganizationID,
UserID: uid,
Time: time.Now(),
})
}
// FindLogs returns logs for a run.
func (s *Service) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
var logs []*influxdb.Log
err := s.kv.View(ctx, func(tx Tx) error {
ls, _, err := s.findLogs(ctx, tx, filter)
if err != nil {
return err
}
logs = ls
return nil
})
if err != nil {
return nil, 0, err
}
return logs, len(logs), nil
}
func (s *Service) findLogs(ctx context.Context, tx Tx, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
if filter.Run != nil {
r, err := s.findRunByID(ctx, tx, filter.Task, *filter.Run)
if err != nil {
return nil, 0, err
}
rtn := make([]*influxdb.Log, len(r.Log))
for i := 0; i < len(r.Log); i++ {
rtn[i] = &r.Log[i]
}
return rtn, len(rtn), nil
}
runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: filter.Task})
if err != nil {
return nil, 0, err
}
var logs []*influxdb.Log
for _, run := range runs {
for i := 0; i < len(run.Log); i++ {
logs = append(logs, &run.Log[i])
}
}
return logs, len(logs), nil
}
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
var runs []*influxdb.Run
err := s.kv.View(ctx, func(tx Tx) error {
rs, _, err := s.findRuns(ctx, tx, filter)
if err != nil {
return err
}
runs = rs
return nil
})
if err != nil {
return nil, 0, err
}
return runs, len(runs), nil
}
func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
if filter.Limit == 0 {
filter.Limit = influxdb.TaskDefaultPageSize
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
var runs []*influxdb.Run
// manual runs
manualRuns, err := s.manualRuns(ctx, tx, filter.Task)
if err != nil {
return nil, 0, err
}
for _, run := range manualRuns {
runs = append(runs, run)
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
}
// append currently running
currentlyRunning, err := s.currentlyRunning(ctx, tx, filter.Task)
if err != nil {
return nil, 0, err
}
for _, run := range currentlyRunning {
runs = append(runs, run)
if len(runs) >= filter.Limit {
return runs, len(runs), nil
}
}
return runs, len(runs), nil
}
// FindRunByID returns a single run.
func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var run *influxdb.Run
err := s.kv.View(ctx, func(tx Tx) error {
r, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return err
}
run = r
return nil
})
if err != nil {
return nil, err
}
return run, nil
}
func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskRunKey(taskID, runID)
if err != nil {
return nil, err
}
runBytes, err := bucket.Get(key)
if err != nil {
if IsNotFound(err) {
return nil, influxdb.ErrRunNotFound
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
run := &influxdb.Run{}
err = json.Unmarshal(runBytes, run)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
return run, nil
}
// CancelRun cancels a currently running run.
func (s *Service) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error {
err := s.kv.Update(ctx, func(tx Tx) error {
err := s.cancelRun(ctx, tx, taskID, runID)
if err != nil {
return err
}
return nil
})
return err
}
func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) error {
// get the run
run, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return err
}
// set status to canceled
run.Status = "canceled"
// save
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBytes, err := json.Marshal(run)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, runID)
if err != nil {
return err
}
if err := bucket.Put(runKey, runBytes); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
}
// RetryRun creates and returns a new run (which is a retry of another run).
func (s *Service) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.retryRun(ctx, tx, taskID, runID)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
// find the run
r, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return nil, err
}
r.ID = s.IDGenerator.ID()
r.Status = backend.RunScheduled.String()
r.StartedAt = time.Time{}
r.FinishedAt = time.Time{}
r.RequestedAt = time.Time{}
// add a clean copy of the run to the manual runs
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskManualRunKey(taskID)
if err != nil {
return nil, err
}
runs := []*influxdb.Run{}
runsBytes, err := bucket.Get(key)
if err != nil {
if err != ErrKeyNotFound {
return nil, influxdb.ErrRunNotFound
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
if runsBytes != nil {
if err := json.Unmarshal(runsBytes, &runs); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
}
runs = append(runs, r)
// save manual runs
runsBytes, err = json.Marshal(runs)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
if err := bucket.Put(key, runsBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return r, nil
}
// ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible.
// The value of scheduledFor may or may not align with the task's schedule.
func (s *Service) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.forceRun(ctx, tx, taskID, scheduledFor)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
// create a run
t := time.Unix(scheduledFor, 0).UTC()
r := &influxdb.Run{
ID: s.IDGenerator.ID(),
TaskID: taskID,
Status: backend.RunScheduled.String(),
RequestedAt: time.Now().UTC(),
ScheduledFor: t,
Log: []influxdb.Log{},
}
// add a clean copy of the run to the manual runs
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
runs, err := s.manualRuns(ctx, tx, taskID)
if err != nil {
return nil, err
}
// check to see if this run is already queued
for _, run := range runs {
if run.ScheduledFor == r.ScheduledFor {
return nil, influxdb.ErrTaskRunAlreadyQueued
}
}
runs = append(runs, r)
// save manual runs
runsBytes, err := json.Marshal(runs)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
key, err := taskManualRunKey(taskID)
if err != nil {
return nil, err
}
if err := bucket.Put(key, runsBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return r, nil
}
// CreateRun creates a run with a scheduledFor time as now.
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
id := s.IDGenerator.ID()
t := time.Unix(scheduledFor.Unix(), 0).UTC()
run := influxdb.Run{
ID: id,
TaskID: taskID,
ScheduledFor: t,
RunAt: runAt,
Status: backend.RunScheduled.String(),
Log: []influxdb.Log{},
}
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBytes, err := json.Marshal(run)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return nil, err
}
if err := b.Put(runKey, runBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return &run, nil
}
func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
var runs []*influxdb.Run
err := s.kv.View(ctx, func(tx Tx) error {
rs, err := s.currentlyRunning(ctx, tx, taskID)
if err != nil {
return err
}
runs = rs
return nil
})
if err != nil {
return nil, err
}
return runs, nil
}
func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
c, err := bucket.Cursor(WithCursorHintPrefix(taskID.String()))
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
var runs []*influxdb.Run
taskKey, err := taskKey(taskID)
if err != nil {
return nil, err
}
k, v := c.Seek(taskKey)
for {
if k == nil || !strings.HasPrefix(string(k), string(taskKey)) {
break
}
if strings.HasSuffix(string(k), "manualRuns") || strings.HasSuffix(string(k), "latestCompleted") {
k, v = c.Next()
continue
}
r := &influxdb.Run{}
if err := json.Unmarshal(v, r); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
// if the run no longer belongs to the task we are done
if r.TaskID != taskID {
break
}
runs = append(runs, r)
k, v = c.Next()
}
return runs, nil
}
func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
var runs []*influxdb.Run
err := s.kv.View(ctx, func(tx Tx) error {
rs, err := s.manualRuns(ctx, tx, taskID)
if err != nil {
return err
}
runs = rs
return nil
})
if err != nil {
return nil, err
}
return runs, nil
}
func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskManualRunKey(taskID)
if err != nil {
return nil, err
}
runs := []*influxdb.Run{}
val, err := b.Get(key)
if err != nil {
if err == ErrKeyNotFound {
return runs, nil
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
if err := json.Unmarshal(val, &runs); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
return runs, nil
}
func (s *Service) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.startManualRun(ctx, tx, taskID, runID)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
mRuns, err := s.manualRuns(ctx, tx, taskID)
if err != nil {
return nil, influxdb.ErrRunNotFound
}
if len(mRuns) < 1 {
return nil, influxdb.ErrRunNotFound
}
var run *influxdb.Run
for i, r := range mRuns {
if r.ID == runID {
run = r
mRuns = append(mRuns[:i], mRuns[i+1:]...)
}
}
if run == nil {
return nil, influxdb.ErrRunNotFound
}
// save manual runs
mRunsBytes, err := json.Marshal(mRuns)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runsKey, err := taskManualRunKey(taskID)
if err != nil {
return nil, err
}
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
if err := b.Put(runsKey, mRunsBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
// add mRun to the list of currently running
mRunBytes, err := json.Marshal(run)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return nil, err
}
if err := b.Put(runKey, mRunBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return run, nil
}
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (s *Service) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var run *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
r, err := s.finishRun(ctx, tx, taskID, runID)
if err != nil {
return err
}
run = r
return nil
})
return run, err
}
func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
// get the run
r, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return nil, err
}
// tell task to update latest completed
scheduled := r.ScheduledFor
_, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{
LatestCompleted: &scheduled,
LastRunStatus: &r.Status,
LastRunError: func() *string {
if r.Status == "failed" {
// prefer the second to last log message as the error message
// per https://github.com/influxdata/influxdb/issues/15153#issuecomment-547706005
if len(r.Log) > 1 {
return &r.Log[len(r.Log)-2].Message
} else if len(r.Log) > 0 {
return &r.Log[len(r.Log)-1].Message
}
}
return nil
}(),
})
if err != nil {
return nil, err
}
// remove run
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskRunKey(taskID, runID)
if err != nil {
return nil, err
}
if err := bucket.Delete(key); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return r, nil
}
// UpdateRunState sets the run state at the respective time.
func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
err := s.kv.Update(ctx, func(tx Tx) error {
err := s.updateRunState(ctx, tx, taskID, runID, when, state)
if err != nil {
return err
}
return nil
})
return err
}
func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
// find run
run, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return err
}
// update state
run.Status = state.String()
switch state {
case backend.RunStarted:
run.StartedAt = when
case backend.RunSuccess, backend.RunFail, backend.RunCanceled:
run.FinishedAt = when
}
// save run
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBytes, err := json.Marshal(run)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return err
}
if err := b.Put(runKey, runBytes); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
}
// AddRunLog adds a log line to the run.
func (s *Service) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
err := s.kv.Update(ctx, func(tx Tx) error {
err := s.addRunLog(ctx, tx, taskID, runID, when, log)
if err != nil {
return err
}
return nil
})
return err
}
func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, log string) error {
// find run
run, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
return err
}
// update log
2019-06-05 17:53:44 +00:00
l := influxdb.Log{RunID: runID, Time: when.Format(time.RFC3339Nano), Message: log}
run.Log = append(run.Log, l)
// save run
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBytes, err := json.Marshal(run)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return err
}
if err := b.Put(runKey, runBytes); err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
}
func taskKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return encodedID, nil
}
func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/latestCompleted"), nil
}
func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/manualRuns"), nil
}
func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
encodedOrgID, err := orgID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil
}
func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
encodedRunID, err := runID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
}