influxdb/cmd/influx/task.go

605 lines
13 KiB
Go
Raw Normal View History

2018-10-23 17:51:13 +00:00
package main
import (
"context"
"fmt"
"os"
"time"
2018-10-23 17:51:13 +00:00
"github.com/influxdata/flux/repl"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influx/internal"
"github.com/influxdata/influxdb/http"
2018-10-23 17:51:13 +00:00
"github.com/spf13/cobra"
)
func cmdTask(f *globalFlags, opt genericCLIOpts) *cobra.Command {
runE := func(cmd *cobra.Command, args []string) error {
if flags.local {
return fmt.Errorf("local flag not supported for task command")
}
seeHelp(cmd, args)
return nil
}
cmd := opt.newCmd("task", runE, false)
cmd.Short = "Task management commands"
cmd.AddCommand(
taskLogCmd(opt),
taskRunCmd(opt),
taskCreateCmd(opt),
taskDeleteCmd(opt),
taskFindCmd(opt),
taskUpdateCmd(opt),
)
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
var taskCreateFlags struct {
org organization
2018-10-23 17:51:13 +00:00
}
func taskCreateCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("create [query literal or @/path/to/query.flux]", taskCreateF, true)
cmd.Args = cobra.ExactArgs(1)
cmd.Short = "Create task"
2018-10-23 17:51:13 +00:00
taskCreateFlags.org.register(cmd, false)
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskCreateF(cmd *cobra.Command, args []string) error {
if err := taskCreateFlags.org.validOrgFlags(&flags); err != nil {
return err
2018-10-23 17:51:13 +00:00
}
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
flux, err := repl.LoadQuery(args[0])
if err != nil {
return fmt.Errorf("error parsing flux script: %s", err)
2018-10-23 17:51:13 +00:00
}
tc := influxdb.TaskCreate{
Flux: flux,
Organization: taskCreateFlags.org.name,
2018-10-23 17:51:13 +00:00
}
if taskCreateFlags.org.id != "" || taskCreateFlags.org.name != "" {
svc, err := newOrganizationService()
if err != nil {
return nil
}
oid, err := taskCreateFlags.org.getID(svc)
2018-10-23 17:51:13 +00:00
if err != nil {
return fmt.Errorf("error parsing organization ID: %s", err)
2018-10-23 17:51:13 +00:00
}
tc.OrganizationID = oid
2018-10-23 17:51:13 +00:00
}
t, err := s.CreateTask(context.Background(), tc)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
w := internal.NewTabWriter(os.Stdout)
w.WriteHeaders(
"ID",
"Name",
"OrganizationID",
2018-10-23 17:51:13 +00:00
"Organization",
"AuthorizationID",
2018-10-23 17:51:13 +00:00
"Status",
"Every",
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
2018-10-23 17:51:13 +00:00
})
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
var taskFindFlags struct {
user string
id string
limit int
headers bool
org organization
2018-10-23 17:51:13 +00:00
}
func taskFindCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("list", taskFindF, true)
cmd.Short = "List tasks"
cmd.Aliases = []string{"find", "ls"}
2018-10-23 17:51:13 +00:00
taskFindFlags.org.register(cmd, false)
cmd.Flags().StringVarP(&taskFindFlags.id, "id", "i", "", "task ID")
cmd.Flags().StringVarP(&taskFindFlags.user, "user-id", "n", "", "task owner ID")
cmd.Flags().IntVarP(&taskFindFlags.limit, "limit", "", influxdb.TaskDefaultPageSize, "the number of tasks to find")
cmd.Flags().BoolVar(&taskFindFlags.headers, "headers", true, "To print the table headers; defaults true")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskFindF(cmd *cobra.Command, args []string) error {
if err := taskFindFlags.org.validOrgFlags(&flags); err != nil {
return err
}
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
filter := influxdb.TaskFilter{}
2018-10-23 17:51:13 +00:00
if taskFindFlags.user != "" {
id, err := influxdb.IDFromString(taskFindFlags.user)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
filter.User = id
}
if taskFindFlags.org.name != "" {
filter.Organization = taskFindFlags.org.name
}
if taskFindFlags.org.id != "" {
id, err := influxdb.IDFromString(taskFindFlags.org.id)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
filter.OrganizationID = id
2018-10-23 17:51:13 +00:00
}
if taskFindFlags.limit < 1 || taskFindFlags.limit > influxdb.TaskMaxPageSize {
return fmt.Errorf("limit must be between 1 and %d", influxdb.TaskMaxPageSize)
2018-11-29 16:28:17 +00:00
}
filter.Limit = taskFindFlags.limit
var tasks []http.Task
2018-10-23 17:51:13 +00:00
if taskFindFlags.id != "" {
id, err := influxdb.IDFromString(taskFindFlags.id)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
task, err := s.FindTaskByID(context.Background(), *id)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
tasks = append(tasks, *task)
2018-10-23 17:51:13 +00:00
} else {
tasks, _, err = s.FindTasks(context.Background(), filter)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
}
w := internal.NewTabWriter(os.Stdout)
w.HideHeaders(!taskFindFlags.headers)
2018-10-23 17:51:13 +00:00
w.WriteHeaders(
"ID",
"Name",
"OrganizationID",
2018-10-23 17:51:13 +00:00
"Organization",
"AuthorizationID",
2018-10-23 17:51:13 +00:00
"Status",
"Every",
"Cron",
)
for _, t := range tasks {
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
2018-10-23 17:51:13 +00:00
})
}
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
var taskUpdateFlags struct {
2018-10-23 17:51:13 +00:00
id string
status string
}
func taskUpdateCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("update", taskUpdateF, true)
cmd.Short = "Update task"
2018-10-23 17:51:13 +00:00
cmd.Flags().StringVarP(&taskUpdateFlags.id, "id", "i", "", "task ID (required)")
cmd.Flags().StringVarP(&taskUpdateFlags.status, "status", "", "", "update task status")
cmd.MarkFlagRequired("id")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskUpdateF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
var id influxdb.ID
2018-10-23 17:51:13 +00:00
if err := id.DecodeFromString(taskUpdateFlags.id); err != nil {
return err
2018-10-23 17:51:13 +00:00
}
update := influxdb.TaskUpdate{}
2018-10-23 17:51:13 +00:00
if taskUpdateFlags.status != "" {
update.Status = &taskUpdateFlags.status
}
if len(args) > 0 {
flux, err := repl.LoadQuery(args[0])
if err != nil {
return fmt.Errorf("error parsing flux script: %s", err)
2018-10-23 17:51:13 +00:00
}
update.Flux = &flux
}
t, err := s.UpdateTask(context.Background(), id, update)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
w := internal.NewTabWriter(os.Stdout)
w.WriteHeaders(
"ID",
"Name",
"OrganizationID",
2018-10-23 17:51:13 +00:00
"Organization",
"AuthorizationID",
2018-10-23 17:51:13 +00:00
"Status",
"Every",
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
2018-10-23 17:51:13 +00:00
})
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
var taskDeleteFlags struct {
2018-10-23 17:51:13 +00:00
id string
}
func taskDeleteCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("delete", taskDeleteF, true)
cmd.Short = "Delete task"
2018-10-23 17:51:13 +00:00
cmd.Flags().StringVarP(&taskDeleteFlags.id, "id", "i", "", "task id (required)")
cmd.MarkFlagRequired("id")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskDeleteF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
var id influxdb.ID
err = id.DecodeFromString(taskDeleteFlags.id)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
ctx := context.TODO()
t, err := s.FindTaskByID(ctx, id)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
if err = s.DeleteTask(ctx, id); err != nil {
return err
2018-10-23 17:51:13 +00:00
}
w := internal.NewTabWriter(os.Stdout)
w.WriteHeaders(
"ID",
"Name",
"OrganizationID",
2018-10-23 17:51:13 +00:00
"Organization",
"AuthorizationID",
2018-10-23 17:51:13 +00:00
"Status",
"Every",
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
2018-10-23 17:51:13 +00:00
})
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
func taskLogCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("log", nil, false)
cmd.Run = seeHelp
cmd.Short = "Log related commands"
cmd.AddCommand(
taskLogFindCmd(opt),
)
return cmd
}
var taskLogFindFlags struct {
2018-10-23 17:51:13 +00:00
taskID string
runID string
}
func taskLogFindCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("list", taskLogFindF, true)
cmd.Short = "List logs for task"
cmd.Aliases = []string{"find", "ls"}
2018-10-23 17:51:13 +00:00
cmd.Flags().StringVarP(&taskLogFindFlags.taskID, "task-id", "", "", "task id (required)")
cmd.Flags().StringVarP(&taskLogFindFlags.runID, "run-id", "", "", "run id")
cmd.MarkFlagRequired("task-id")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskLogFindF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
var filter influxdb.LogFilter
id, err := influxdb.IDFromString(taskLogFindFlags.taskID)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
filter.Task = *id
2018-10-23 17:51:13 +00:00
if taskLogFindFlags.runID != "" {
id, err := influxdb.IDFromString(taskLogFindFlags.runID)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
filter.Run = id
}
ctx := context.TODO()
logs, _, err := s.FindLogs(ctx, filter)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
w := internal.NewTabWriter(os.Stdout)
w.WriteHeaders(
2019-06-05 17:53:44 +00:00
"RunID",
"Time",
"Message",
2018-10-23 17:51:13 +00:00
)
for _, log := range logs {
w.Write(map[string]interface{}{
2019-06-05 17:53:44 +00:00
"RunID": log.RunID,
"Time": log.Time,
"Message": log.Message,
2018-10-23 17:51:13 +00:00
})
}
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
func taskRunCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("run", nil, false)
cmd.Run = seeHelp
cmd.Short = "List runs for a task"
cmd.AddCommand(
taskRunFindCmd(opt),
taskRunRetryCmd(opt),
)
return cmd
}
var taskRunFindFlags struct {
2018-10-23 17:51:13 +00:00
runID string
taskID string
afterTime string
beforeTime string
limit int
}
func taskRunFindCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("list", taskRunFindF, true)
cmd.Short = "List runs for a task"
cmd.Aliases = []string{"find", "ls"}
2018-10-23 17:51:13 +00:00
cmd.Flags().StringVarP(&taskRunFindFlags.taskID, "task-id", "", "", "task id (required)")
cmd.Flags().StringVarP(&taskRunFindFlags.runID, "run-id", "", "", "run id")
cmd.Flags().StringVarP(&taskRunFindFlags.afterTime, "after", "", "", "after time for filtering")
cmd.Flags().StringVarP(&taskRunFindFlags.beforeTime, "before", "", "", "before time for filtering")
cmd.Flags().IntVarP(&taskRunFindFlags.limit, "limit", "", 0, "limit the results")
2018-10-23 17:51:13 +00:00
cmd.MarkFlagRequired("task-id")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func taskRunFindF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
filter := influxdb.RunFilter{
2018-10-23 17:51:13 +00:00
Limit: taskRunFindFlags.limit,
AfterTime: taskRunFindFlags.afterTime,
BeforeTime: taskRunFindFlags.beforeTime,
}
taskID, err := influxdb.IDFromString(taskRunFindFlags.taskID)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
filter.Task = *taskID
2018-10-23 17:51:13 +00:00
var runs []*influxdb.Run
2018-10-23 17:51:13 +00:00
if taskRunFindFlags.runID != "" {
id, err := influxdb.IDFromString(taskRunFindFlags.runID)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
run, err := s.FindRunByID(context.Background(), filter.Task, *id)
2018-10-23 17:51:13 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
runs = append(runs, run)
} else {
runs, _, err = s.FindRuns(context.Background(), filter)
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
}
w := internal.NewTabWriter(os.Stdout)
w.WriteHeaders(
"ID",
"TaskID",
"Status",
"ScheduledFor",
"StartedAt",
"FinishedAt",
"RequestedAt",
)
2018-10-23 17:51:13 +00:00
for _, r := range runs {
scheduledFor := r.ScheduledFor.Format(time.RFC3339)
startedAt := r.StartedAt.Format(time.RFC3339Nano)
finishedAt := r.FinishedAt.Format(time.RFC3339Nano)
requestedAt := r.RequestedAt.Format(time.RFC3339Nano)
2018-10-23 17:51:13 +00:00
w.Write(map[string]interface{}{
"ID": r.ID,
"TaskID": r.TaskID,
"Status": r.Status,
"ScheduledFor": scheduledFor,
"StartedAt": startedAt,
"FinishedAt": finishedAt,
"RequestedAt": requestedAt,
2018-10-23 17:51:13 +00:00
})
}
w.Flush()
return nil
2018-10-23 17:51:13 +00:00
}
var runRetryFlags struct {
taskID, runID string
2018-10-23 17:51:13 +00:00
}
func taskRunRetryCmd(opt genericCLIOpts) *cobra.Command {
cmd := opt.newCmd("retry", runRetryF, true)
cmd.Short = "retry a run"
2018-10-23 17:51:13 +00:00
cmd.Flags().StringVarP(&runRetryFlags.taskID, "task-id", "i", "", "task id (required)")
cmd.Flags().StringVarP(&runRetryFlags.runID, "run-id", "r", "", "run id (required)")
cmd.MarkFlagRequired("task-id")
cmd.MarkFlagRequired("run-id")
2018-10-23 17:51:13 +00:00
return cmd
2018-10-23 17:51:13 +00:00
}
func runRetryF(cmd *cobra.Command, args []string) error {
client, err := newHTTPClient()
if err != nil {
return err
}
2018-10-23 17:51:13 +00:00
s := &http.TaskService{
Client: client,
InsecureSkipVerify: flags.skipVerify,
2018-10-23 17:51:13 +00:00
}
var taskID, runID influxdb.ID
if err := taskID.DecodeFromString(runRetryFlags.taskID); err != nil {
return err
}
if err := runID.DecodeFromString(runRetryFlags.runID); err != nil {
return err
2018-10-23 17:51:13 +00:00
}
ctx := context.TODO()
newRun, err := s.RetryRun(ctx, taskID, runID)
2018-11-20 21:53:52 +00:00
if err != nil {
return err
2018-10-23 17:51:13 +00:00
}
2018-11-20 21:53:52 +00:00
fmt.Printf("Retry for task %s's run %s queued as run %s.\n", taskID, runID, newRun.ID)
return nil
2018-10-23 17:51:13 +00:00
}