package kv import ( "context" "encoding/json" "strings" "time" "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/options" "go.uber.org/zap" cron "gopkg.in/robfig/cron.v2" ) // Task Storage Schema // taskBucket: // : task data storage // taskRunBucket: // /: run data storage // /manualRuns: list of runs to run manually // /latestCompleted: run data for the latest completed run of a task // taskIndexBucket // /: index for tasks by org // We may want to add a / index to allow us to look up tasks by task name. var ( taskBucket = []byte("tasksv1") taskRunBucket = []byte("taskRunsv1") taskIndexBucket = []byte("taskIndexsv1") ) var _ influxdb.TaskService = (*Service)(nil) var _ backend.TaskControlService = (*Service)(nil) type kvTask struct { ID influxdb.ID `json:"id"` Type string `json:"type,omitempty"` OrganizationID influxdb.ID `json:"orgID"` Organization string `json:"org"` OwnerID influxdb.ID `json:"ownerID"` Name string `json:"name"` Description string `json:"description,omitempty"` Status string `json:"status"` Flux string `json:"flux"` Every string `json:"every,omitempty"` Cron string `json:"cron,omitempty"` LastRunStatus string `json:"lastRunStatus,omitempty"` LastRunError string `json:"lastRunError,omitempty"` Offset influxdb.Duration `json:"offset,omitempty"` LatestCompleted time.Time `json:"latestCompleted,omitempty"` LatestScheduled time.Time `json:"latestScheduled,omitempty"` CreatedAt time.Time `json:"createdAt,omitempty"` UpdatedAt time.Time `json:"updatedAt,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` } func kvToInfluxTask(k *kvTask) *influxdb.Task { return &influxdb.Task{ ID: k.ID, Type: k.Type, OrganizationID: k.OrganizationID, Organization: k.Organization, OwnerID: k.OwnerID, Name: k.Name, Description: k.Description, Status: k.Status, Flux: k.Flux, Every: k.Every, Cron: k.Cron, LastRunStatus: k.LastRunStatus, LastRunError: k.LastRunError, Offset: k.Offset.Duration, LatestCompleted: k.LatestCompleted, LatestScheduled: k.LatestScheduled, CreatedAt: k.CreatedAt, UpdatedAt: k.UpdatedAt, Metadata: k.Metadata, } } func (s *Service) initializeTasks(ctx context.Context, tx Tx) error { if _, err := tx.Bucket(taskBucket); err != nil { return err } if _, err := tx.Bucket(taskRunBucket); err != nil { return err } if _, err := tx.Bucket(taskIndexBucket); err != nil { return err } return nil } // FindTaskByID returns a single task func (s *Service) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.View(ctx, func(tx Tx) error { if influxdb.FindTaskAuthRequired(ctx) { task, err := s.findTaskByIDWithAuth(ctx, tx, id) if err != nil { return err } t = task } else { task, err := s.findTaskByID(ctx, tx, id) if err != nil { return err } t = task } return nil }) if err != nil { return nil, err } return t, nil } // findTaskByIDWithAuth is a task lookup that populates the auth // This is to be used when we want to satisfy the FindTaskByID method // But is more taxing on the system then if we want to find the task alone. func (s *Service) findTaskByIDWithAuth(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) { t, err := s.findTaskByID(ctx, tx, id) if err != nil { return nil, err } t.Authorization = &influxdb.Authorization{ Status: influxdb.Active, ID: influxdb.ID(1), OrgID: t.OrganizationID, } if t.OwnerID.Valid() { // populate task Auth ps, err := s.maxPermissions(ctx, tx, t.OwnerID) if err != nil { return nil, err } t.Authorization.Permissions = ps } return t, nil } // findTaskByID is an internal method used to do any action with tasks internally // that do not require authorization. func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) { taskKey, err := taskKey(id) if err != nil { return nil, err } b, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } v, err := b.Get(taskKey) if IsNotFound(err) { return nil, influxdb.ErrTaskNotFound } if err != nil { return nil, err } kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if t.LatestCompleted.IsZero() { t.LatestCompleted = t.CreatedAt } // Attempt to fill in the owner ID based on the auth. if !t.OwnerID.Valid() { authType := struct { AuthorizationID influxdb.ID `json:"authorizationID"` }{} if err := json.Unmarshal(v, &authType); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } auth, err := s.findAuthorizationByID(ctx, tx, authType.AuthorizationID) if err == nil { t.OwnerID = auth.GetUserID() } } // Attempt to fill in the ownerID based on the organization owners. // If they have multiple owners just use the first one because any org owner // will have sufficient permissions to run a task. if !t.OwnerID.Valid() { owners, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: t.OrganizationID, ResourceType: influxdb.OrgsResourceType, UserType: influxdb.Owner, }) if err == nil && len(owners) > 0 { t.OwnerID = owners[0].UserID } } return t, nil } // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task err := s.kv.View(ctx, func(tx Tx) error { tasks, _, err := s.findTasks(ctx, tx, filter) if err != nil { return err } ts = tasks return nil }) if err != nil { return nil, 0, err } return ts, len(ts), nil } func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } // complain about limits if filter.Limit < 0 { return nil, 0, influxdb.ErrPageSizeTooSmall } if filter.Limit > influxdb.TaskMaxPageSize { return nil, 0, influxdb.ErrPageSizeTooLarge } if filter.Limit == 0 { filter.Limit = influxdb.TaskDefaultPageSize } // if no user or organization is passed, assume contexts auth is the user we are looking for. // it is possible for a internal system to call this with no auth so we shouldnt fail if no auth is found. if org == nil && filter.User == nil { userAuth, err := icontext.GetAuthorizer(ctx) if err == nil { userID := userAuth.GetUserID() filter.User = &userID } } // filter by user id. if filter.User != nil { return s.findTasksByUser(ctx, tx, filter) } else if org != nil { return s.findTasksByOrg(ctx, tx, filter) } return s.findAllTasks(ctx, tx, filter) } // findTasksByUser is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { if filter.User == nil { return nil, 0, influxdb.ErrTaskNotFound } var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } var ts []*influxdb.Task maps, err := s.findUserResourceMappings( ctx, tx, influxdb.UserResourceMappingFilter{ ResourceType: influxdb.TasksResourceType, UserID: *filter.User, UserType: influxdb.Owner, }, ) if err != nil { return nil, 0, err } for _, m := range maps { task, err := s.findTaskByIDWithAuth(ctx, tx, m.ResourceID) if err != nil && err != influxdb.ErrTaskNotFound { return nil, 0, err } if err == influxdb.ErrTaskNotFound { continue } if org != nil && task.OrganizationID != org.ID { continue } if taskFilterMatch(filter.Type, task.Type) { ts = append(ts, task) } if len(ts) >= filter.Limit { break } } return ts, len(ts), nil } // findTasksByOrg is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } if org == nil { return nil, 0, influxdb.ErrTaskNotFound } var ts []*influxdb.Task indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := indexBucket.Cursor() if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } // we can filter by orgID if filter.After != nil { key, err := taskOrgKey(org.ID, *filter.After) if err != nil { return nil, 0, err } // ignore the key:val returned in this seek because we are starting "after" // this key c.Seek(key) } else { // if we dont have an after we just move the cursor to the first instance of the // orgID key, err := org.ID.Encode() if err != nil { return nil, 0, influxdb.ErrInvalidTaskID } k, v := c.Seek(key) if k != nil { id, err := influxdb.IDFromString(string(v)) if err != nil { return nil, 0, influxdb.ErrInvalidTaskID } t, err := s.findTaskByIDWithAuth(ctx, tx, *id) if err != nil && err != influxdb.ErrTaskNotFound { // we might have some crufty index's return nil, 0, err } if t != nil { if taskFilterMatch(filter.Type, t.Type) { ts = append(ts, t) } } } } // if someone has a limit of 1 if len(ts) >= filter.Limit { return ts, len(ts), nil } for { k, v := c.Next() if k == nil { break } id, err := influxdb.IDFromString(string(v)) if err != nil { return nil, 0, influxdb.ErrInvalidTaskID } t, err := s.findTaskByIDWithAuth(ctx, tx, *id) if err != nil { if err == influxdb.ErrTaskNotFound { // we might have some crufty index's continue } return nil, 0, err } // If the new task doesn't belong to the org we have looped outside the org filter if org != nil && t.OrganizationID != org.ID { break } if !taskFilterMatch(filter.Type, t.Type) { continue } // insert the new task into the list ts = append(ts, t) // Check if we are over running the limit if len(ts) >= filter.Limit { break } } if filter.Name != nil { ts = filterByName(ts, *filter.Name) } return ts, len(ts), err } type taskMatchFn func(*influxdb.Task) bool // newTaskMatchFn returns a function for validating // a task matches the filter. Will return nil if // the filter should match all tasks. func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *influxdb.Task) bool { var fn taskMatchFn if org != nil { expected := org.ID prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && expected == t.OrganizationID } } if f.Type != nil { expected := *f.Type prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && ((expected == influxdb.TaskSystemType && (t.Type == influxdb.TaskSystemType || t.Type == "")) || expected == t.Type) } } if f.Name != nil { expected := *f.Name prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && (expected == t.Name) } } return fn } // findAllTasks is a subset of the find tasks function. Used for cleanliness. // This function should only be executed internally because it doesn't force organization or user filtering. // Enforcing filters should be done in a validation layer. func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := taskBucket.Cursor() if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } var k, v []byte // we can filter by orgID if filter.After != nil { key, err := taskKey(*filter.After) if err != nil { return nil, 0, err } // ignore the key:val returned in this seek because we are starting "after" // this key c.Seek(key) k, v = c.Next() } else { k, v = c.First() } matchFn := newTaskMatchFn(filter, nil) for k != nil { kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return nil, 0, influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if matchFn == nil || matchFn(t) { ts = append(ts, t) if len(ts) >= filter.Limit { break } } k, v = c.Next() } return ts, len(ts), err } func filterByName(ts []*influxdb.Task, taskName string) []*influxdb.Task { filtered := []*influxdb.Task{} for _, task := range ts { if task.Name == taskName { filtered = append(filtered, task) } } return filtered } // CreateTask creates a new task. // The owner of the task is inferred from the authorizer associated with ctx. func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.Update(ctx, func(tx Tx) error { task, err := s.createTask(ctx, tx, tc) if err != nil { return err } t = task return nil }) if err != nil { return nil, err } return t, nil } func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) { var err error var org *influxdb.Organization if tc.OrganizationID.Valid() { org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID) if err != nil { return nil, err } } else if tc.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, tc.Organization) if err != nil { return nil, err } } if org == nil { return nil, influxdb.ErrOrgNotFound } // TODO: Uncomment this once the checks/notifications no longer create tasks in kv // confirm the owner is a real user. // if _, err = s.findUserByID(ctx, tx, tc.OwnerID); err != nil { // return nil, influxdb.ErrInvalidOwnerID // } opt, err := options.FromScript(tc.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } if tc.Status == "" { tc.Status = string(backend.TaskActive) } createdAt := time.Now().Truncate(time.Second).UTC() task := &influxdb.Task{ ID: s.IDGenerator.ID(), Type: tc.Type, OrganizationID: org.ID, Organization: org.Name, OwnerID: tc.OwnerID, Metadata: tc.Metadata, Name: opt.Name, Description: tc.Description, Status: tc.Status, Flux: tc.Flux, Every: opt.Every.String(), Cron: opt.Cron, CreatedAt: createdAt, LatestCompleted: createdAt, } if opt.Offset != nil { off, err := time.ParseDuration(opt.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } task.Offset = off } taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } taskBytes, err := json.Marshal(task) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } taskKey, err := taskKey(task.ID) if err != nil { return nil, err } orgKey, err := taskOrgKey(task.OrganizationID, task.ID) if err != nil { return nil, err } // write the task err = taskBucket.Put(taskKey, taskBytes) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } // write the org index err = indexBucket.Put(orgKey, taskKey) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := s.createTaskURM(ctx, tx, task); err != nil { s.Logger.Info("error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } // populate permissions so the task can be used immediately // if we cant populate here we shouldn't error. ps, _ := s.maxPermissions(ctx, tx, task.OwnerID) task.Authorization = &influxdb.Authorization{ Status: influxdb.Active, ID: influxdb.ID(1), OrgID: task.OrganizationID, Permissions: ps, } return task, nil } func (s *Service) createTaskURM(ctx context.Context, tx Tx, t *influxdb.Task) error { userAuth, err := icontext.GetAuthorizer(ctx) if err != nil { return err } return s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{ ResourceType: influxdb.TasksResourceType, ResourceID: t.ID, UserID: userAuth.GetUserID(), UserType: influxdb.Owner, }) } // UpdateTask updates a single task with changeset. func (s *Service) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.Update(ctx, func(tx Tx) error { task, err := s.updateTask(ctx, tx, id, upd) if err != nil { return err } t = task return nil }) if err != nil { return nil, err } return t, nil } func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { // retrieve the task task, err := s.findTaskByID(ctx, tx, id) if err != nil { return nil, err } updatedAt := time.Now().UTC() // update the flux script if !upd.Options.IsZero() || upd.Flux != nil { if err = upd.UpdateFlux(task.Flux); err != nil { return nil, err } task.Flux = *upd.Flux options, err := options.FromScript(*upd.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } task.Name = options.Name task.Every = options.Every.String() task.Cron = options.Cron var off time.Duration if options.Offset != nil { off, err = time.ParseDuration(options.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } } task.Offset = off task.UpdatedAt = updatedAt } if upd.Description != nil { task.Description = *upd.Description task.UpdatedAt = updatedAt } if upd.Status != nil { task.Status = *upd.Status task.UpdatedAt = updatedAt } if upd.Metadata != nil { task.Metadata = upd.Metadata task.UpdatedAt = updatedAt } if upd.LatestCompleted != nil { // make sure we only update latest completed one way tlc := task.LatestCompleted ulc := *upd.LatestCompleted if !ulc.IsZero() && ulc.After(tlc) { task.LatestCompleted = *upd.LatestCompleted } } if upd.LatestScheduled != nil { // make sure we only update latest scheduled one way if upd.LatestScheduled.After(task.LatestScheduled) { task.LatestScheduled = *upd.LatestScheduled } } if upd.LastRunStatus != nil { task.LastRunStatus = *upd.LastRunStatus if *upd.LastRunStatus == "failed" && upd.LastRunError != nil { task.LastRunError = *upd.LastRunError } else { task.LastRunError = "" } } // save the updated task bucket, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskKey(id) if err != nil { return nil, err } taskBytes, err := json.Marshal(task) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } return task, bucket.Put(key, taskBytes) } // DeleteTask removes a task by ID and purges all associated data and scheduled runs. func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.deleteTask(ctx, tx, id) if err != nil { return err } return nil }) if err != nil { return err } return nil } func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { taskBucket, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBucket, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // retrieve the task task, err := s.findTaskByID(ctx, tx, id) if err != nil { return err } // remove the orgs index orgKey, err := taskOrgKey(task.OrganizationID, task.ID) if err != nil { return err } if err := indexBucket.Delete(orgKey); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove latest completed lastCompletedKey, err := taskLatestCompletedKey(task.ID) if err != nil { return err } if err := runBucket.Delete(lastCompletedKey); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove the runs runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: task.ID}) if err != nil { return err } for _, run := range runs { key, err := taskRunKey(task.ID, run.ID) if err != nil { return err } if err := runBucket.Delete(key); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } } // remove the task key, err := taskKey(task.ID) if err != nil { return err } if err := taskBucket.Delete(key); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } if err := s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: task.ID, }); err != nil { s.Logger.Info("error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } return nil } // FindLogs returns logs for a run. func (s *Service) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { var logs []*influxdb.Log err := s.kv.View(ctx, func(tx Tx) error { ls, _, err := s.findLogs(ctx, tx, filter) if err != nil { return err } logs = ls return nil }) if err != nil { return nil, 0, err } return logs, len(logs), nil } func (s *Service) findLogs(ctx context.Context, tx Tx, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { if filter.Run != nil { r, err := s.findRunByID(ctx, tx, filter.Task, *filter.Run) if err != nil { return nil, 0, err } rtn := make([]*influxdb.Log, len(r.Log)) for i := 0; i < len(r.Log); i++ { rtn[i] = &r.Log[i] } return rtn, len(rtn), nil } runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: filter.Task}) if err != nil { return nil, 0, err } var logs []*influxdb.Log for _, run := range runs { for i := 0; i < len(run.Log); i++ { logs = append(logs, &run.Log[i]) } } return logs, len(logs), nil } // FindRuns returns a list of runs that match a filter and the total count of returned runs. func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, _, err := s.findRuns(ctx, tx, filter) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, 0, err } return runs, len(runs), nil } func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { if filter.Limit == 0 { filter.Limit = influxdb.TaskDefaultPageSize } if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize { return nil, 0, influxdb.ErrOutOfBoundsLimit } var runs []*influxdb.Run // manual runs manualRuns, err := s.manualRuns(ctx, tx, filter.Task) if err != nil { return nil, 0, err } for _, run := range manualRuns { runs = append(runs, run) if len(runs) >= filter.Limit { return runs, len(runs), nil } } // append currently running currentlyRunning, err := s.currentlyRunning(ctx, tx, filter.Task) if err != nil { return nil, 0, err } for _, run := range currentlyRunning { runs = append(runs, run) if len(runs) >= filter.Limit { return runs, len(runs), nil } } return runs, len(runs), nil } // FindRunByID returns a single run. func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var run *influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } run = r return nil }) if err != nil { return nil, err } return run, nil } func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskRunKey(taskID, runID) if err != nil { return nil, err } runBytes, err := bucket.Get(key) if err != nil { if IsNotFound(err) { return nil, influxdb.ErrRunNotFound } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } run := &influxdb.Run{} err = json.Unmarshal(runBytes, run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } return run, nil } // CancelRun cancels a currently running run. func (s *Service) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.cancelRun(ctx, tx, taskID, runID) if err != nil { return err } return nil }) return err } func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) error { // get the run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // set status to canceled run.Status = "canceled" // save bucket, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, runID) if err != nil { return err } if err := bucket.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } // RetryRun creates and returns a new run (which is a retry of another run). func (s *Service) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.retryRun(ctx, tx, taskID, runID) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { // find the run r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return nil, err } r.ID = s.IDGenerator.ID() r.Status = backend.RunScheduled.String() r.StartedAt = time.Time{} r.FinishedAt = time.Time{} r.RequestedAt = time.Time{} // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } runs := []*influxdb.Run{} runsBytes, err := bucket.Get(key) if err != nil { if err != ErrKeyNotFound { return nil, influxdb.ErrRunNotFound } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if runsBytes != nil { if err := json.Unmarshal(runsBytes, &runs); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } } runs = append(runs, r) // save manual runs runsBytes, err = json.Marshal(runs) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } if err := bucket.Put(key, runsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. // The value of scheduledFor may or may not align with the task's schedule. func (s *Service) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.forceRun(ctx, tx, taskID, scheduledFor) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { // create a run t := time.Unix(scheduledFor, 0).UTC() r := &influxdb.Run{ ID: s.IDGenerator.ID(), TaskID: taskID, Status: backend.RunScheduled.String(), RequestedAt: time.Now().UTC(), ScheduledFor: t, Log: []influxdb.Log{}, } // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } runs, err := s.manualRuns(ctx, tx, taskID) if err != nil { return nil, err } // check to see if this run is already queued for _, run := range runs { if run.ScheduledFor == r.ScheduledFor { return nil, influxdb.ErrTaskRunAlreadyQueued } } runs = append(runs, r) // save manual runs runsBytes, err := json.Marshal(runs) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } if err := bucket.Put(key, runsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now. // Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run. func (s *Service) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) { var rc backend.RunCreation err := s.kv.Update(ctx, func(tx Tx) error { runCreate, err := s.createNextRun(ctx, tx, taskID, now) if err != nil { return err } rc = runCreate return nil }) return rc, err } func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, now int64) (backend.RunCreation, error) { // pull the scheduler for the task task, err := s.findTaskByID(ctx, tx, taskID) if err != nil { return backend.RunCreation{}, err } // check if we have any manual runs queued mRuns, err := s.manualRuns(ctx, tx, taskID) if err != nil { return backend.RunCreation{}, err } nextDue, scheduledFor, err := s.nextDueRun(ctx, tx, taskID) if err != nil { return backend.RunCreation{}, err } if len(mRuns) > 0 { mRun := mRuns[0] mRuns := mRuns[1:] // save manual runs b, err := tx.Bucket(taskRunBucket) if err != nil { return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } mRunsBytes, err := json.Marshal(mRuns) if err != nil { return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runsKey, err := taskManualRunKey(taskID) if err != nil { return backend.RunCreation{}, err } if err := b.Put(runsKey, mRunsBytes); err != nil { return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } // add mRun to the list of currently running mRunBytes, err := json.Marshal(mRun) if err != nil { return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, mRun.ID) if err != nil { return backend.RunCreation{}, err } if err := b.Put(runKey, mRunBytes); err != nil { return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } // return mRun schedFor := mRun.ScheduledFor reqAt := mRun.RequestedAt rc := backend.RunCreation{ Created: backend.QueuedRun{ TaskID: taskID, RunID: mRun.ID, DueAt: time.Now().UTC().Unix(), Now: schedFor.Unix(), }, NextDue: nextDue, HasQueue: len(mRuns) > 0, } if !reqAt.IsZero() { rc.Created.RequestedAt = reqAt.Unix() } return rc, nil } dueAt := time.Unix(nextDue, 0) // if its not due yet lets get outa here if dueAt.After(time.Unix(now, 0)) { return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt.Unix()) } id := s.IDGenerator.ID() run := influxdb.Run{ ID: id, TaskID: task.ID, ScheduledFor: time.Unix(scheduledFor, 0).UTC(), Status: backend.RunScheduled.String(), Log: []influxdb.Log{}, } b, err := tx.Bucket(taskRunBucket) if err != nil { return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return backend.RunCreation{}, err } if err := b.Put(runKey, runBytes); err != nil { return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } // We need to know when the next one is due based this run's due at time sch, err := cron.Parse(task.EffectiveCron()) if err != nil { return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } nextScheduled := sch.Next(time.Unix(scheduledFor, 0)).UTC() offset := &options.Duration{} if err := offset.Parse(task.Offset.String()); err != nil { return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } nextDueAt, err := offset.Add(nextScheduled) if err != nil { return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } // populate RunCreation return backend.RunCreation{ Created: backend.QueuedRun{ TaskID: taskID, RunID: id, DueAt: dueAt.Unix(), Now: scheduledFor, }, NextDue: nextDueAt.Unix(), HasQueue: false, }, nil } // CreateRun creates a run with a scheduledFor time as now. func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.createRun(ctx, tx, taskID, scheduledFor) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { id := s.IDGenerator.ID() run := influxdb.Run{ ID: id, TaskID: taskID, ScheduledFor: scheduledFor, Status: backend.RunScheduled.String(), Log: []influxdb.Log{}, } b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return nil, err } if err := b.Put(runKey, runBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return &run, nil } func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, err := s.currentlyRunning(ctx, tx, taskID) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, err } return runs, nil } func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := bucket.Cursor(WithCursorHintPrefix(taskID.String())) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } var runs []*influxdb.Run taskKey, err := taskKey(taskID) if err != nil { return nil, err } k, v := c.Seek(taskKey) for { if k == nil || !strings.HasPrefix(string(k), string(taskKey)) { break } if strings.HasSuffix(string(k), "manualRuns") || strings.HasSuffix(string(k), "latestCompleted") { k, v = c.Next() continue } r := &influxdb.Run{} if err := json.Unmarshal(v, r); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } // if the run no longer belongs to the task we are done if r.TaskID != taskID { break } runs = append(runs, r) k, v = c.Next() } return runs, nil } func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, err := s.manualRuns(ctx, tx, taskID) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, err } return runs, nil } func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } runs := []*influxdb.Run{} val, err := b.Get(key) if err != nil { if err == ErrKeyNotFound { return runs, nil } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := json.Unmarshal(val, &runs); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } return runs, nil } func (s *Service) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.startManualRun(ctx, tx, taskID, runID) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { mRuns, err := s.manualRuns(ctx, tx, taskID) if err != nil { return nil, influxdb.ErrRunNotFound } if len(mRuns) < 1 { return nil, influxdb.ErrRunNotFound } var run *influxdb.Run for i, r := range mRuns { if r.ID == runID { run = r mRuns = append(mRuns[:i], mRuns[i+1:]...) } } if run == nil { return nil, influxdb.ErrRunNotFound } // save manual runs mRunsBytes, err := json.Marshal(mRuns) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runsKey, err := taskManualRunKey(taskID) if err != nil { return nil, err } b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := b.Put(runsKey, mRunsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } // add mRun to the list of currently running mRunBytes, err := json.Marshal(run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return nil, err } if err := b.Put(runKey, mRunBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return run, nil } // FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it. func (s *Service) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var run *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { r, err := s.finishRun(ctx, tx, taskID, runID) if err != nil { return err } run = r return nil }) return run, err } func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { // get the run r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return nil, err } // tell task to update latest completed scheduled := r.ScheduledFor _, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{ LatestCompleted: &scheduled, LastRunStatus: &r.Status, LastRunError: func() *string { if r.Status == "failed" { // prefer the second to last log message as the error message // per https://github.com/influxdata/influxdb/issues/15153#issuecomment-547706005 if len(r.Log) > 1 { return &r.Log[len(r.Log)-2].Message } else if len(r.Log) > 0 { return &r.Log[len(r.Log)-1].Message } } return nil }(), }) if err != nil { return nil, err } // remove run bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskRunKey(taskID, runID) if err != nil { return nil, err } if err := bucket.Delete(key); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready. // The returned timestamp reflects the task's offset, so it does not necessarily exactly match the schedule time. func (s *Service) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) { var nextDue int64 err := s.kv.View(ctx, func(tx Tx) error { due, _, err := s.nextDueRun(ctx, tx, taskID) if err != nil { return err } nextDue = due return nil }) if err != nil { return 0, err } return nextDue, nil } // nextDueRun finds out when the next run is due as well as returning the expected Now time of the run func (s *Service) nextDueRun(ctx context.Context, tx Tx, taskID influxdb.ID) (int64, int64, error) { task, err := s.findTaskByID(ctx, tx, taskID) if err != nil { return 0, 0, err } latestCompleted, err := s.findLatestScheduledTime(ctx, tx, taskID) if err != nil { return 0, 0, err } // Align create to the hour/minute { if strings.HasPrefix(task.EffectiveCron(), "@every ") { everyString := strings.TrimPrefix(task.EffectiveCron(), "@every ") every := options.Duration{} err := every.Parse(everyString) if err != nil { // We cannot align a invalid time goto NoChange } // drop nanoseconds t := time.Unix(latestCompleted.Unix(), 0) everyDur, err := every.DurationFrom(t) if err != nil { goto NoChange } // truncate the duration from the time we are going to use t = t.Truncate(everyDur) latestCompleted = t.Truncate(time.Second) } NoChange: } // create a run if possible sch, err := cron.Parse(task.EffectiveCron()) if err != nil { return 0, 0, influxdb.ErrTaskTimeParse(err) } nextScheduled := sch.Next(latestCompleted).UTC() offset := &options.Duration{} if err := offset.Parse(task.Offset.String()); err != nil { return 0, 0, influxdb.ErrTaskTimeParse(err) } dueAt, err := offset.Add(nextScheduled) if err != nil { return 0, 0, influxdb.ErrTaskTimeParse(err) } return dueAt.Unix(), nextScheduled.Unix(), nil } // UpdateRunState sets the run state at the respective time. func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.updateRunState(ctx, tx, taskID, runID, when, state) if err != nil { return err } return nil }) return err } func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { // find run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // update state run.Status = state.String() switch state { case backend.RunStarted: run.StartedAt = when case backend.RunSuccess, backend.RunFail, backend.RunCanceled: run.FinishedAt = when } // save run b, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return err } if err := b.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } // AddRunLog adds a log line to the run. func (s *Service) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.addRunLog(ctx, tx, taskID, runID, when, log) if err != nil { return err } return nil }) return err } func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, log string) error { // find run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // update log l := influxdb.Log{RunID: runID, Time: when.Format(time.RFC3339Nano), Message: log} run.Log = append(run.Log, l) // save run b, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return err } if err := b.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, task *influxdb.Task) (time.Time, error) { // Get the latest completed time // This can come from whichever is latest between: // - CreatedAt time of the task // - LatestCompleted time of the task // - Latest scheduled currently running task // - or the latest completed run's ScheduleFor time var ( latestCompleted time.Time err error ) if task.LatestCompleted.IsZero() { latestCompleted = task.CreatedAt } else { latestCompleted = task.LatestCompleted } // find out if we have a currently running schedule that is after the latest completed currentRunning, err := s.currentlyRunning(ctx, tx, task.ID) if err != nil { return time.Time{}, err } for _, cr := range currentRunning { crTime := cr.ScheduledFor if crTime.After(latestCompleted) { latestCompleted = crTime } } return latestCompleted, nil } func (s *Service) findLatestScheduledTime(ctx context.Context, tx Tx, id influxdb.ID) (time.Time, error) { task, err := s.findTaskByID(ctx, tx, id) if err != nil { return time.Time{}, err } return s.findLatestScheduledTimeForTask(ctx, tx, task) } func taskKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return encodedID, nil } func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/latestCompleted"), nil } func taskManualRunKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/manualRuns"), nil } func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) { encodedOrgID, err := orgID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil } func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } encodedRunID, err := runID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/" + string(encodedRunID)), nil } func taskFilterMatch(filter *string, ttype string) bool { // if they want a system task the record may be system or an empty string if filter != nil { // if the task is either "system" or "" it qaulifies as a system task if *filter == influxdb.TaskSystemType && (ttype == influxdb.TaskSystemType || ttype == "") { return true } // otherwise check task type against the filter if *filter != ttype { return false } } return true }