package kv import ( "context" "encoding/json" "regexp" "strings" "time" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" "github.com/influxdata/influxdb/v2/kit/feature" "github.com/influxdata/influxdb/v2/resource" "github.com/influxdata/influxdb/v2/task/options" "go.uber.org/zap" ) // Task Storage Schema // taskBucket: // : task data storage // taskRunBucket: // /: run data storage // /manualRuns: list of runs to run manually // /latestCompleted: run data for the latest completed run of a task // taskIndexBucket // /: index for tasks by org // We may want to add a / index to allow us to look up tasks by task name. var ( taskBucket = []byte("tasksv1") taskRunBucket = []byte("taskRunsv1") taskIndexBucket = []byte("taskIndexsv1") ) var _ influxdb.TaskService = (*Service)(nil) type kvTask struct { ID influxdb.ID `json:"id"` Type string `json:"type,omitempty"` OrganizationID influxdb.ID `json:"orgID"` Organization string `json:"org"` OwnerID influxdb.ID `json:"ownerID"` Name string `json:"name"` Description string `json:"description,omitempty"` Status string `json:"status"` Flux string `json:"flux"` Every string `json:"every,omitempty"` Cron string `json:"cron,omitempty"` LastRunStatus string `json:"lastRunStatus,omitempty"` LastRunError string `json:"lastRunError,omitempty"` Offset influxdb.Duration `json:"offset,omitempty"` LatestCompleted time.Time `json:"latestCompleted,omitempty"` LatestScheduled time.Time `json:"latestScheduled,omitempty"` CreatedAt time.Time `json:"createdAt,omitempty"` UpdatedAt time.Time `json:"updatedAt,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` } func kvToInfluxTask(k *kvTask) *influxdb.Task { return &influxdb.Task{ ID: k.ID, Type: k.Type, OrganizationID: k.OrganizationID, Organization: k.Organization, OwnerID: k.OwnerID, Name: k.Name, Description: k.Description, Status: k.Status, Flux: k.Flux, Every: k.Every, Cron: k.Cron, LastRunStatus: k.LastRunStatus, LastRunError: k.LastRunError, Offset: k.Offset.Duration, LatestCompleted: k.LatestCompleted, LatestScheduled: k.LatestScheduled, CreatedAt: k.CreatedAt, UpdatedAt: k.UpdatedAt, Metadata: k.Metadata, } } func (s *Service) initializeTasks(ctx context.Context, tx Tx) error { if _, err := tx.Bucket(taskBucket); err != nil { return err } if _, err := tx.Bucket(taskRunBucket); err != nil { return err } if _, err := tx.Bucket(taskIndexBucket); err != nil { return err } return nil } // FindTaskByID returns a single task func (s *Service) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.View(ctx, func(tx Tx) error { if influxdb.FindTaskAuthRequired(ctx) { task, err := s.findTaskByIDWithAuth(ctx, tx, id) if err != nil { return err } t = task } else { task, err := s.findTaskByID(ctx, tx, id) if err != nil { return err } t = task } return nil }) if err != nil { return nil, err } return t, nil } // findTaskByIDWithAuth is a task lookup that populates the auth // This is to be used when we want to satisfy the FindTaskByID method // But is more taxing on the system then if we want to find the task alone. func (s *Service) findTaskByIDWithAuth(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) { t, err := s.findTaskByID(ctx, tx, id) if err != nil { return nil, err } t.Authorization = &influxdb.Authorization{ Status: influxdb.Active, ID: influxdb.ID(1), OrgID: t.OrganizationID, UserID: t.OwnerID, } if t.OwnerID.Valid() { ctx = icontext.SetAuthorizer(ctx, t.Authorization) // populate task Auth ps, err := s.maxPermissions(ctx, tx, t.OwnerID) if err != nil { return nil, err } t.Authorization.Permissions = ps } return t, nil } // findTaskByID is an internal method used to do any action with tasks internally // that do not require authorization. func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Task, error) { taskKey, err := taskKey(id) if err != nil { return nil, err } b, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } v, err := b.Get(taskKey) if IsNotFound(err) { return nil, influxdb.ErrTaskNotFound } if err != nil { return nil, err } kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if t.LatestCompleted.IsZero() { t.LatestCompleted = t.CreatedAt } // Attempt to fill in the owner ID based on the auth. if !t.OwnerID.Valid() { authType := struct { AuthorizationID influxdb.ID `json:"authorizationID"` }{} if err := json.Unmarshal(v, &authType); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } auth, err := s.findAuthorizationByID(ctx, tx, authType.AuthorizationID) if err == nil { t.OwnerID = auth.GetUserID() } } // Attempt to fill in the ownerID based on the organization owners. // If they have multiple owners just use the first one because any org owner // will have sufficient permissions to run a task. if !t.OwnerID.Valid() { owners, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: t.OrganizationID, ResourceType: influxdb.OrgsResourceType, UserType: influxdb.Owner, }) if err == nil && len(owners) > 0 { t.OwnerID = owners[0].UserID } } return t, nil } // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. func (s *Service) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task err := s.kv.View(ctx, func(tx Tx) error { tasks, _, err := s.findTasks(ctx, tx, filter) if err != nil { return err } ts = tasks return nil }) if err != nil { return nil, 0, err } return ts, len(ts), nil } func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } // complain about limits if filter.Limit < 0 { return nil, 0, influxdb.ErrPageSizeTooSmall } if filter.Limit > influxdb.TaskMaxPageSize { return nil, 0, influxdb.ErrPageSizeTooLarge } if filter.Limit == 0 { filter.Limit = influxdb.TaskDefaultPageSize } // if no user or organization is passed, assume contexts auth is the user we are looking for. // it is possible for a internal system to call this with no auth so we shouldnt fail if no auth is found. if org == nil && filter.User == nil { userAuth, err := icontext.GetAuthorizer(ctx) if err == nil { userID := userAuth.GetUserID() if userID.Valid() { filter.User = &userID } } } // filter by user id. if filter.User != nil { return s.findTasksByUser(ctx, tx, filter) } else if org != nil { return s.findTasksByOrg(ctx, tx, filter) } return s.findAllTasks(ctx, tx, filter) } // findTasksByUser is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { if feature.UrmFreeTasks().Enabled(ctx) { return s.findTasksByUserUrmFree(ctx, tx, filter) } return s.findTasksByUserWithURM(ctx, tx, filter) } // findTasksByUser is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByUserWithURM(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { if filter.User == nil { return nil, 0, influxdb.ErrTaskNotFound } var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } var ts []*influxdb.Task maps, err := s.findUserResourceMappings( ctx, tx, influxdb.UserResourceMappingFilter{ ResourceType: influxdb.TasksResourceType, UserID: *filter.User, UserType: influxdb.Owner, }, ) if err != nil { return nil, 0, err } var ( afterSeen bool after = func(task *influxdb.Task) bool { if filter.After == nil || afterSeen { return true } if task.ID == *filter.After { afterSeen = true } return false } matchFn = newTaskMatchFn(filter, org) ) for _, m := range maps { task, err := s.findTaskByIDWithAuth(ctx, tx, m.ResourceID) if err != nil && err != influxdb.ErrTaskNotFound { return nil, 0, err } if err == influxdb.ErrTaskNotFound { continue } if matchFn == nil || matchFn(task) { if !after(task) { continue } ts = append(ts, task) if len(ts) >= filter.Limit { break } } } return ts, len(ts), nil } func (s *Service) findTasksByUserUrmFree(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } var ( seek []byte opts []CursorOption ) if filter.After != nil { seek, err = taskKey(*filter.After) if err != nil { return nil, 0, err } opts = append(opts, WithCursorSkipFirstItem()) } c, err := taskBucket.ForwardCursor(seek, opts...) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } ps, err := s.maxPermissions(ctx, tx, *filter.User) if err != nil { return nil, 0, err } matchFn := newTaskMatchFn(filter, nil) for k, v := c.Next(); k != nil; k, v = c.Next() { kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return nil, 0, influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if matchFn == nil || matchFn(t) { t.Authorization = &influxdb.Authorization{ Status: influxdb.Active, UserID: t.OwnerID, ID: influxdb.ID(1), OrgID: t.OrganizationID, Permissions: ps, } ts = append(ts, t) if len(ts) >= filter.Limit { break } } } if err := c.Err(); err != nil { return nil, 0, err } return ts, len(ts), c.Close() } // findTasksByOrg is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var org *influxdb.Organization var err error if filter.OrganizationID != nil { org, err = s.findOrganizationByID(ctx, tx, *filter.OrganizationID) if err != nil { return nil, 0, err } } else if filter.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, filter.Organization) if err != nil { return nil, 0, err } } if org == nil { return nil, 0, influxdb.ErrTaskNotFound } var ts []*influxdb.Task indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } prefix, err := org.ID.Encode() if err != nil { return nil, 0, influxdb.ErrInvalidTaskID } var ( key = prefix opts []CursorOption ) // we can filter by orgID if filter.After != nil { key, err = taskOrgKey(org.ID, *filter.After) if err != nil { return nil, 0, err } opts = append(opts, WithCursorSkipFirstItem()) } c, err := indexBucket.ForwardCursor( key, append(opts, WithCursorPrefix(prefix))..., ) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } // free cursor resources defer c.Close() matchFn := newTaskMatchFn(filter, nil) for k, v := c.Next(); k != nil; k, v = c.Next() { id, err := influxdb.IDFromString(string(v)) if err != nil { return nil, 0, influxdb.ErrInvalidTaskID } t, err := s.findTaskByIDWithAuth(ctx, tx, *id) if err != nil { if err == influxdb.ErrTaskNotFound { // we might have some crufty index's err = nil continue } return nil, 0, err } // If the new task doesn't belong to the org we have looped outside the org filter if org != nil && t.OrganizationID != org.ID { break } if matchFn == nil || matchFn(t) { ts = append(ts, t) // Check if we are over running the limit if len(ts) >= filter.Limit { break } } } return ts, len(ts), c.Err() } type taskMatchFn func(*influxdb.Task) bool // newTaskMatchFn returns a function for validating // a task matches the filter. Will return nil if // the filter should match all tasks. func newTaskMatchFn(f influxdb.TaskFilter, org *influxdb.Organization) func(t *influxdb.Task) bool { var fn taskMatchFn if org != nil { expected := org.ID prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && expected == t.OrganizationID } } if f.Type != nil { expected := *f.Type prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && ((expected == influxdb.TaskSystemType && (t.Type == influxdb.TaskSystemType || t.Type == "")) || expected == t.Type) } } if f.Name != nil { expected := *f.Name prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && (expected == t.Name) } } if f.Status != nil { prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && (t.Status == *f.Status) } } if f.User != nil { prevFn := fn fn = func(t *influxdb.Task) bool { res := prevFn == nil || prevFn(t) return res && t.OwnerID == *f.User } } return fn } // findAllTasks is a subset of the find tasks function. Used for cleanliness. // This function should only be executed internally because it doesn't force organization or user filtering. // Enforcing filters should be done in a validation layer. func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { var ts []*influxdb.Task taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } var ( seek []byte opts []CursorOption ) if filter.After != nil { seek, err = taskKey(*filter.After) if err != nil { return nil, 0, err } opts = append(opts, WithCursorSkipFirstItem()) } c, err := taskBucket.ForwardCursor(seek, opts...) if err != nil { return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } // free cursor resources defer c.Close() matchFn := newTaskMatchFn(filter, nil) for k, v := c.Next(); k != nil; k, v = c.Next() { kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return nil, 0, influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if matchFn == nil || matchFn(t) { ts = append(ts, t) if len(ts) >= filter.Limit { break } } } if err := c.Err(); err != nil { return nil, 0, err } return ts, len(ts), err } // CreateTask creates a new task. // The owner of the task is inferred from the authorizer associated with ctx. func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.Update(ctx, func(tx Tx) error { task, err := s.createTask(ctx, tx, tc) if err != nil { return err } t = task return nil }) if err != nil { return nil, err } return t, nil } func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) { var err error var org *influxdb.Organization if tc.OrganizationID.Valid() { org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID) if err != nil { return nil, err } } else if tc.Organization != "" { org, err = s.findOrganizationByName(ctx, tx, tc.Organization) if err != nil { return nil, err } } if org == nil { return nil, influxdb.ErrOrgNotFound } // TODO: Uncomment this once the checks/notifications no longer create tasks in kv // confirm the owner is a real user. // if _, err = s.findUserByID(ctx, tx, tc.OwnerID); err != nil { // return nil, influxdb.ErrInvalidOwnerID // } opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, tc.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } if tc.Status == "" { tc.Status = string(influxdb.TaskActive) } createdAt := s.clock.Now().Truncate(time.Second).UTC() task := &influxdb.Task{ ID: s.IDGenerator.ID(), Type: tc.Type, OrganizationID: org.ID, Organization: org.Name, OwnerID: tc.OwnerID, Metadata: tc.Metadata, Name: opts.Name, Description: tc.Description, Status: tc.Status, Flux: tc.Flux, Every: opts.Every.String(), Cron: opts.Cron, CreatedAt: createdAt, LatestCompleted: createdAt, LatestScheduled: createdAt, } if opts.Offset != nil { off, err := time.ParseDuration(opts.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } task.Offset = off } taskBucket, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } taskBytes, err := json.Marshal(task) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } taskKey, err := taskKey(task.ID) if err != nil { return nil, err } orgKey, err := taskOrgKey(task.OrganizationID, task.ID) if err != nil { return nil, err } // write the task err = taskBucket.Put(taskKey, taskBytes) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } // write the org index err = indexBucket.Put(orgKey, taskKey) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if !feature.UrmFreeTasks().Enabled(ctx) { if err := s.createTaskURM(ctx, tx, task); err != nil { s.log.Info("Error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } } // populate permissions so the task can be used immediately // if we cant populate here we shouldn't error. ps, _ := s.maxPermissions(ctx, tx, task.OwnerID) task.Authorization = &influxdb.Authorization{ Status: influxdb.Active, ID: influxdb.ID(1), OrgID: task.OrganizationID, Permissions: ps, } uid, _ := icontext.GetUserID(ctx) if err := s.audit.Log(resource.Change{ Type: resource.Create, ResourceID: task.ID, ResourceType: influxdb.TasksResourceType, OrganizationID: task.OrganizationID, UserID: uid, ResourceBody: taskBytes, Time: time.Now(), }); err != nil { return nil, err } return task, nil } func (s *Service) createTaskURM(ctx context.Context, tx Tx, t *influxdb.Task) error { // TODO(jsteenb2): should not be getting authorizer inside the store, should terminate at the // transport layer then pass user id everywhere else. userAuth, err := icontext.GetAuthorizer(ctx) if err != nil { return err } return s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{ ResourceType: influxdb.TasksResourceType, ResourceID: t.ID, UserID: userAuth.GetUserID(), UserType: influxdb.Owner, }) } // UpdateTask updates a single task with changeset. func (s *Service) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { var t *influxdb.Task err := s.kv.Update(ctx, func(tx Tx) error { task, err := s.updateTask(ctx, tx, id, upd) if err != nil { return err } t = task return nil }) if err != nil { return nil, err } return t, nil } func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { // retrieve the task task, err := s.findTaskByID(ctx, tx, id) if err != nil { return nil, err } updatedAt := s.clock.Now().UTC() // update the flux script if !upd.Options.IsZero() || upd.Flux != nil { if err = upd.UpdateFlux(s.FluxLanguageService, task.Flux); err != nil { return nil, err } task.Flux = *upd.Flux opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, *upd.Flux) if err != nil { return nil, influxdb.ErrTaskOptionParse(err) } task.Name = opts.Name task.Every = opts.Every.String() task.Cron = opts.Cron var off time.Duration if opts.Offset != nil { off, err = time.ParseDuration(opts.Offset.String()) if err != nil { return nil, influxdb.ErrTaskTimeParse(err) } } task.Offset = off task.UpdatedAt = updatedAt } if upd.Description != nil { task.Description = *upd.Description task.UpdatedAt = updatedAt } if upd.Status != nil && task.Status != *upd.Status { task.Status = *upd.Status task.UpdatedAt = updatedAt // task is transitioning from inactive to active, ensure scheduled and completed are updated if task.Status == influxdb.TaskStatusActive { updatedAtTrunc := updatedAt.Truncate(time.Second).UTC() task.LatestCompleted = updatedAtTrunc task.LatestScheduled = updatedAtTrunc } } if upd.Metadata != nil { task.Metadata = upd.Metadata task.UpdatedAt = updatedAt } if upd.LatestCompleted != nil { // make sure we only update latest completed one way tlc := task.LatestCompleted ulc := *upd.LatestCompleted if !ulc.IsZero() && ulc.After(tlc) { task.LatestCompleted = *upd.LatestCompleted } } if upd.LatestScheduled != nil { // make sure we only update latest scheduled one way if upd.LatestScheduled.After(task.LatestScheduled) { task.LatestScheduled = *upd.LatestScheduled } } if upd.LastRunStatus != nil { task.LastRunStatus = *upd.LastRunStatus if *upd.LastRunStatus == "failed" && upd.LastRunError != nil { task.LastRunError = *upd.LastRunError } else { task.LastRunError = "" } } // save the updated task bucket, err := tx.Bucket(taskBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskKey(id) if err != nil { return nil, err } taskBytes, err := json.Marshal(task) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } err = bucket.Put(key, taskBytes) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } uid, _ := icontext.GetUserID(ctx) if err := s.audit.Log(resource.Change{ Type: resource.Update, ResourceID: task.ID, ResourceType: influxdb.TasksResourceType, OrganizationID: task.OrganizationID, UserID: uid, ResourceBody: taskBytes, Time: time.Now(), }); err != nil { return nil, err } return task, nil } // DeleteTask removes a task by ID and purges all associated data and scheduled runs. func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.deleteTask(ctx, tx, id) if err != nil { return err } return nil }) if err != nil { return err } return nil } func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { taskBucket, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBucket, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // retrieve the task task, err := s.findTaskByID(ctx, tx, id) if err != nil { return err } // remove the orgs index orgKey, err := taskOrgKey(task.OrganizationID, task.ID) if err != nil { return err } if err := indexBucket.Delete(orgKey); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove latest completed lastCompletedKey, err := taskLatestCompletedKey(task.ID) if err != nil { return err } if err := runBucket.Delete(lastCompletedKey); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove the runs runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: task.ID}) if err != nil { return err } for _, run := range runs { key, err := taskRunKey(task.ID, run.ID) if err != nil { return err } if err := runBucket.Delete(key); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } } // remove the task key, err := taskKey(task.ID) if err != nil { return err } if err := taskBucket.Delete(key); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } if err := s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{ ResourceID: task.ID, }); err != nil { s.log.Debug("Error deleting user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } uid, _ := icontext.GetUserID(ctx) return s.audit.Log(resource.Change{ Type: resource.Delete, ResourceID: task.ID, ResourceType: influxdb.TasksResourceType, OrganizationID: task.OrganizationID, UserID: uid, Time: time.Now(), }) } // FindLogs returns logs for a run. func (s *Service) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { var logs []*influxdb.Log err := s.kv.View(ctx, func(tx Tx) error { ls, _, err := s.findLogs(ctx, tx, filter) if err != nil { return err } logs = ls return nil }) if err != nil { return nil, 0, err } return logs, len(logs), nil } func (s *Service) findLogs(ctx context.Context, tx Tx, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) { if filter.Run != nil { r, err := s.findRunByID(ctx, tx, filter.Task, *filter.Run) if err != nil { return nil, 0, err } rtn := make([]*influxdb.Log, len(r.Log)) for i := 0; i < len(r.Log); i++ { rtn[i] = &r.Log[i] } return rtn, len(rtn), nil } runs, _, err := s.findRuns(ctx, tx, influxdb.RunFilter{Task: filter.Task}) if err != nil { return nil, 0, err } var logs []*influxdb.Log for _, run := range runs { for i := 0; i < len(run.Log); i++ { logs = append(logs, &run.Log[i]) } } return logs, len(logs), nil } // FindRuns returns a list of runs that match a filter and the total count of returned runs. func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, _, err := s.findRuns(ctx, tx, filter) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, 0, err } return runs, len(runs), nil } func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) { if filter.Limit == 0 { filter.Limit = influxdb.TaskDefaultPageSize } if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize { return nil, 0, influxdb.ErrOutOfBoundsLimit } var runs []*influxdb.Run // manual runs manualRuns, err := s.manualRuns(ctx, tx, filter.Task) if err != nil { return nil, 0, err } for _, run := range manualRuns { runs = append(runs, run) if len(runs) >= filter.Limit { return runs, len(runs), nil } } // append currently running currentlyRunning, err := s.currentlyRunning(ctx, tx, filter.Task) if err != nil { return nil, 0, err } for _, run := range currentlyRunning { runs = append(runs, run) if len(runs) >= filter.Limit { return runs, len(runs), nil } } return runs, len(runs), nil } // FindRunByID returns a single run. func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var run *influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } run = r return nil }) if err != nil { return nil, err } return run, nil } func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskRunKey(taskID, runID) if err != nil { return nil, err } runBytes, err := bucket.Get(key) if err != nil { if IsNotFound(err) { return nil, influxdb.ErrRunNotFound } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } run := &influxdb.Run{} err = json.Unmarshal(runBytes, run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } return run, nil } // CancelRun cancels a currently running run. func (s *Service) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.cancelRun(ctx, tx, taskID, runID) if err != nil { return err } return nil }) return err } func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) error { // get the run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // set status to canceled run.Status = "canceled" // save bucket, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, runID) if err != nil { return err } if err := bucket.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } // RetryRun creates and returns a new run (which is a retry of another run). func (s *Service) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.retryRun(ctx, tx, taskID, runID) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { // find the run r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return nil, err } r.ID = s.IDGenerator.ID() r.Status = influxdb.RunScheduled.String() r.StartedAt = time.Time{} r.FinishedAt = time.Time{} r.RequestedAt = time.Time{} // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } runs := []*influxdb.Run{} runsBytes, err := bucket.Get(key) if err != nil { if err != ErrKeyNotFound { return nil, influxdb.ErrRunNotFound } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if runsBytes != nil { if err := json.Unmarshal(runsBytes, &runs); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } } runs = append(runs, r) // save manual runs runsBytes, err = json.Marshal(runs) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } if err := bucket.Put(key, runsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. // The value of scheduledFor may or may not align with the task's schedule. func (s *Service) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.forceRun(ctx, tx, taskID, scheduledFor) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) { // create a run t := time.Unix(scheduledFor, 0).UTC() r := &influxdb.Run{ ID: s.IDGenerator.ID(), TaskID: taskID, Status: influxdb.RunScheduled.String(), RequestedAt: time.Now().UTC(), ScheduledFor: t, Log: []influxdb.Log{}, } // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } runs, err := s.manualRuns(ctx, tx, taskID) if err != nil { return nil, err } // check to see if this run is already queued for _, run := range runs { if run.ScheduledFor == r.ScheduledFor { return nil, influxdb.ErrTaskRunAlreadyQueued } } runs = append(runs, r) // save manual runs runsBytes, err := json.Marshal(runs) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } if err := bucket.Put(key, runsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // CreateRun creates a run with a scheduledFor time as now. func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { id := s.IDGenerator.ID() t := time.Unix(scheduledFor.Unix(), 0).UTC() run := influxdb.Run{ ID: id, TaskID: taskID, ScheduledFor: t, RunAt: runAt, Status: influxdb.RunScheduled.String(), Log: []influxdb.Log{}, } b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return nil, err } if err := b.Put(runKey, runBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return &run, nil } func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, err := s.currentlyRunning(ctx, tx, taskID) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, err } return runs, nil } func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := bucket.Cursor(WithCursorHintPrefix(taskID.String())) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } var runs []*influxdb.Run taskKey, err := taskKey(taskID) if err != nil { return nil, err } k, v := c.Seek(taskKey) for { if k == nil || !strings.HasPrefix(string(k), string(taskKey)) { break } if strings.HasSuffix(string(k), "manualRuns") || strings.HasSuffix(string(k), "latestCompleted") { k, v = c.Next() continue } r := &influxdb.Run{} if err := json.Unmarshal(v, r); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } // if the run no longer belongs to the task we are done if r.TaskID != taskID { break } runs = append(runs, r) k, v = c.Next() } return runs, nil } func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { var runs []*influxdb.Run err := s.kv.View(ctx, func(tx Tx) error { rs, err := s.manualRuns(ctx, tx, taskID) if err != nil { return err } runs = rs return nil }) if err != nil { return nil, err } return runs, nil } func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) if err != nil { return nil, err } runs := []*influxdb.Run{} val, err := b.Get(key) if err != nil { if err == ErrKeyNotFound { return runs, nil } return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := json.Unmarshal(val, &runs); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } return runs, nil } func (s *Service) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { run, err := s.startManualRun(ctx, tx, taskID, runID) if err != nil { return err } r = run return nil }) return r, err } func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { mRuns, err := s.manualRuns(ctx, tx, taskID) if err != nil { return nil, influxdb.ErrRunNotFound } if len(mRuns) < 1 { return nil, influxdb.ErrRunNotFound } var run *influxdb.Run for i, r := range mRuns { if r.ID == runID { run = r mRuns = append(mRuns[:i], mRuns[i+1:]...) } } if run == nil { return nil, influxdb.ErrRunNotFound } // save manual runs mRunsBytes, err := json.Marshal(mRuns) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runsKey, err := taskManualRunKey(taskID) if err != nil { return nil, err } b, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := b.Put(runsKey, mRunsBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } // add mRun to the list of currently running mRunBytes, err := json.Marshal(run) if err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return nil, err } if err := b.Put(runKey, mRunBytes); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return run, nil } // FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it. func (s *Service) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { var run *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { r, err := s.finishRun(ctx, tx, taskID, runID) if err != nil { return err } run = r return nil }) return run, err } func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { // get the run r, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return nil, err } // tell task to update latest completed scheduled := r.ScheduledFor _, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{ LatestCompleted: &scheduled, LastRunStatus: &r.Status, LastRunError: func() *string { if r.Status == "failed" { // prefer the second to last log message as the error message // per https://github.com/influxdata/influxdb/issues/15153#issuecomment-547706005 if len(r.Log) > 1 { return &r.Log[len(r.Log)-2].Message } else if len(r.Log) > 0 { return &r.Log[len(r.Log)-1].Message } } return nil }(), }) if err != nil { return nil, err } // remove run bucket, err := tx.Bucket(taskRunBucket) if err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskRunKey(taskID, runID) if err != nil { return nil, err } if err := bucket.Delete(key); err != nil { return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil } // UpdateRunState sets the run state at the respective time. func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.updateRunState(ctx, tx, taskID, runID, when, state) if err != nil { return err } return nil }) return err } func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { // find run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // update state run.Status = state.String() switch state { case influxdb.RunStarted: run.StartedAt = when case influxdb.RunSuccess, influxdb.RunFail, influxdb.RunCanceled: run.FinishedAt = when } // save run b, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return err } if err := b.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } // AddRunLog adds a log line to the run. func (s *Service) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.addRunLog(ctx, tx, taskID, runID, when, log) if err != nil { return err } return nil }) return err } func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, log string) error { // find run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { return err } // update log l := influxdb.Log{RunID: runID, Time: when.Format(time.RFC3339Nano), Message: log} run.Log = append(run.Log, l) // save run b, err := tx.Bucket(taskRunBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) if err != nil { return err } if err := b.Put(runKey, runBytes); err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil } func taskKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return encodedID, nil } func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/latestCompleted"), nil } func taskManualRunKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/manualRuns"), nil } func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) { encodedOrgID, err := orgID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil } func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } encodedRunID, err := runID.Encode() if err != nil { return nil, influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/" + string(encodedRunID)), nil } func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error { var ownerlessTasks []*influxdb.Task // loop through the tasks and collect a set of tasks that are missing the owner id. err := store.View(ctx, func(tx Tx) error { taskBucket, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := taskBucket.ForwardCursor([]byte{}) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } for k, v := c.Next(); k != nil; k, v = c.Next() { kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if !t.OwnerID.Valid() { ownerlessTasks = append(ownerlessTasks, t) } } if err := c.Err(); err != nil { return err } return c.Close() }) if err != nil { return err } // loop through tasks for _, t := range ownerlessTasks { // open transaction err := store.Update(ctx, func(tx Tx) error { taskKey, err := taskKey(t.ID) if err != nil { return err } b, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } if !t.OwnerID.Valid() { v, err := b.Get(taskKey) if IsNotFound(err) { return influxdb.ErrTaskNotFound } authType := struct { AuthorizationID influxdb.ID `json:"authorizationID"` }{} if err := json.Unmarshal(v, &authType); err != nil { return influxdb.ErrInternalTaskServiceError(err) } // try populating the owner from auth encodedID, err := authType.AuthorizationID.Encode() if err == nil { authBucket, err := tx.Bucket([]byte("authorizationsv1")) if err != nil { return err } a, err := authBucket.Get(encodedID) if err == nil { auth := &influxdb.Authorization{} if err := json.Unmarshal(a, auth); err != nil { return err } t.OwnerID = auth.GetUserID() } } } // try populating owner from urm if !t.OwnerID.Valid() { b, err := tx.Bucket([]byte("userresourcemappingsv1")) if err != nil { return err } id, err := t.OrganizationID.Encode() if err != nil { return err } cur, err := b.ForwardCursor(id, WithCursorPrefix(id)) if err != nil { return err } for k, v := cur.Next(); k != nil; k, v = cur.Next() { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { return err } if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner { t.OwnerID = m.UserID break } } if err := cur.Close(); err != nil { return err } } // if population fails return error if !t.OwnerID.Valid() { return &influxdb.Error{ Code: influxdb.EInternal, Msg: "could not populate owner ID for task", } } // save task taskBytes, err := json.Marshal(t) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } err = b.Put(taskKey, taskBytes) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil }) if err != nil { return err } } return nil } var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`) // ExtractTaskOptions is a feature-flag driven switch between normal options // parsing and a more simplified variant. // // The simplified variant extracts the options assignment and passes only that // content through the parser. This allows us to allow scenarios like [1] to // pass through options validation. One clear drawback of this is that it // requires constant values for the parameter assignments. However, most people // are doing that anyway. // // [1]: https://github.com/influxdata/influxdb/issues/17666 func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) { if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) { return options.FromScript(lang, flux) } matches := taskOptionsPattern.FindAllString(flux, -1) if len(matches) == 0 { return options.Options{}, &influxdb.Error{ Code: influxdb.EInvalid, Msg: "no task options defined", } } if len(matches) > 1 { return options.Options{}, &influxdb.Error{ Code: influxdb.EInvalid, Msg: "multiple task options defined", } } return options.FromScript(lang, matches[0]) }