influxdb/http/task_service.go

1820 lines
44 KiB
Go
Raw Normal View History

2018-06-05 23:47:32 +00:00
package http
import (
2018-10-23 17:51:13 +00:00
"bytes"
2018-06-05 23:47:32 +00:00
"context"
"encoding/json"
2018-10-23 17:51:13 +00:00
"errors"
"fmt"
2018-06-05 23:47:32 +00:00
"net/http"
2018-10-23 17:51:13 +00:00
"net/url"
"path"
"strconv"
"strings"
"time"
2018-06-05 23:47:32 +00:00
2019-08-22 02:08:51 +00:00
"github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
2019-02-20 17:04:46 +00:00
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/task/backend"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
2018-06-05 23:47:32 +00:00
)
// TaskBackend is all services and associated parameters required to construct
// the TaskHandler.
type TaskBackend struct {
2019-08-22 02:08:51 +00:00
influxdb.HTTPErrorHandler
Logger *zap.Logger
2019-08-22 02:08:51 +00:00
TaskService influxdb.TaskService
AuthorizationService influxdb.AuthorizationService
OrganizationService influxdb.OrganizationService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
BucketService influxdb.BucketService
}
// NewTaskBackend returns a new instance of TaskBackend.
func NewTaskBackend(b *APIBackend) *TaskBackend {
return &TaskBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "task")),
TaskService: b.TaskService,
AuthorizationService: b.AuthorizationService,
OrganizationService: b.OrganizationService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
BucketService: b.BucketService,
}
}
2018-06-05 23:47:32 +00:00
// TaskHandler represents an HTTP API handler for tasks.
type TaskHandler struct {
*httprouter.Router
2019-08-22 02:08:51 +00:00
influxdb.HTTPErrorHandler
logger *zap.Logger
2019-08-22 02:08:51 +00:00
TaskService influxdb.TaskService
AuthorizationService influxdb.AuthorizationService
OrganizationService influxdb.OrganizationService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
BucketService influxdb.BucketService
2018-06-05 23:47:32 +00:00
}
const (
tasksPath = "/api/v2/tasks"
tasksIDPath = "/api/v2/tasks/:id"
tasksIDLogsPath = "/api/v2/tasks/:id/logs"
tasksIDMembersPath = "/api/v2/tasks/:id/members"
tasksIDMembersIDPath = "/api/v2/tasks/:id/members/:userID"
tasksIDOwnersPath = "/api/v2/tasks/:id/owners"
tasksIDOwnersIDPath = "/api/v2/tasks/:id/owners/:userID"
tasksIDRunsPath = "/api/v2/tasks/:id/runs"
tasksIDRunsIDPath = "/api/v2/tasks/:id/runs/:rid"
tasksIDRunsIDLogsPath = "/api/v2/tasks/:id/runs/:rid/logs"
tasksIDRunsIDRetryPath = "/api/v2/tasks/:id/runs/:rid/retry"
tasksIDLabelsPath = "/api/v2/tasks/:id/labels"
tasksIDLabelsIDPath = "/api/v2/tasks/:id/labels/:lid"
)
2018-06-05 23:47:32 +00:00
// NewTaskHandler returns a new instance of TaskHandler.
func NewTaskHandler(b *TaskBackend) *TaskHandler {
2018-06-05 23:47:32 +00:00
h := &TaskHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
logger: b.Logger,
TaskService: b.TaskService,
AuthorizationService: b.AuthorizationService,
OrganizationService: b.OrganizationService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
BucketService: b.BucketService,
2018-06-05 23:47:32 +00:00
}
h.HandlerFunc("GET", tasksPath, h.handleGetTasks)
h.HandlerFunc("POST", tasksPath, h.handlePostTask)
h.HandlerFunc("GET", tasksIDPath, h.handleGetTask)
h.HandlerFunc("PATCH", tasksIDPath, h.handleUpdateTask)
h.HandlerFunc("DELETE", tasksIDPath, h.handleDeleteTask)
h.HandlerFunc("GET", tasksIDLogsPath, h.handleGetLogs)
h.HandlerFunc("GET", tasksIDRunsIDLogsPath, h.handleGetLogs)
memberBackend := MemberBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "member")),
2019-08-22 02:08:51 +00:00
ResourceType: influxdb.TasksResourceType,
UserType: influxdb.Member,
UserResourceMappingService: b.UserResourceMappingService,
UserService: b.UserService,
}
h.HandlerFunc("POST", tasksIDMembersPath, newPostMemberHandler(memberBackend))
h.HandlerFunc("GET", tasksIDMembersPath, newGetMembersHandler(memberBackend))
h.HandlerFunc("DELETE", tasksIDMembersIDPath, newDeleteMemberHandler(memberBackend))
ownerBackend := MemberBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "member")),
2019-08-22 02:08:51 +00:00
ResourceType: influxdb.TasksResourceType,
UserType: influxdb.Owner,
UserResourceMappingService: b.UserResourceMappingService,
UserService: b.UserService,
}
h.HandlerFunc("POST", tasksIDOwnersPath, newPostMemberHandler(ownerBackend))
h.HandlerFunc("GET", tasksIDOwnersPath, newGetMembersHandler(ownerBackend))
h.HandlerFunc("DELETE", tasksIDOwnersIDPath, newDeleteMemberHandler(ownerBackend))
h.HandlerFunc("GET", tasksIDRunsPath, h.handleGetRuns)
h.HandlerFunc("POST", tasksIDRunsPath, h.handleForceRun)
h.HandlerFunc("GET", tasksIDRunsIDPath, h.handleGetRun)
h.HandlerFunc("POST", tasksIDRunsIDRetryPath, h.handleRetryRun)
2018-10-04 17:12:43 +00:00
h.HandlerFunc("DELETE", tasksIDRunsIDPath, h.handleCancelRun)
labelBackend := &LabelBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "label")),
LabelService: b.LabelService,
2019-08-22 02:08:51 +00:00
ResourceType: influxdb.TasksResourceType,
}
h.HandlerFunc("GET", tasksIDLabelsPath, newGetLabelsHandler(labelBackend))
h.HandlerFunc("POST", tasksIDLabelsPath, newPostLabelHandler(labelBackend))
h.HandlerFunc("DELETE", tasksIDLabelsIDPath, newDeleteLabelHandler(labelBackend))
2018-06-05 23:47:32 +00:00
return h
}
type taskResponse struct {
Links map[string]string `json:"links"`
2019-08-22 02:08:51 +00:00
Labels []influxdb.Label `json:"labels"`
influxdb.Task
}
2019-08-22 02:08:51 +00:00
func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse {
response := taskResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/tasks/%s", t.ID),
"members": fmt.Sprintf("/api/v2/tasks/%s/members", t.ID),
"owners": fmt.Sprintf("/api/v2/tasks/%s/owners", t.ID),
"labels": fmt.Sprintf("/api/v2/tasks/%s/labels", t.ID),
"runs": fmt.Sprintf("/api/v2/tasks/%s/runs", t.ID),
"logs": fmt.Sprintf("/api/v2/tasks/%s/logs", t.ID),
},
Task: t,
2019-08-22 02:08:51 +00:00
Labels: []influxdb.Label{},
}
for _, l := range labels {
response.Labels = append(response.Labels, *l)
}
return response
}
2019-08-22 02:08:51 +00:00
func newTasksPagingLinks(basePath string, ts []*influxdb.Task, f influxdb.TaskFilter) *influxdb.PagingLinks {
2019-03-06 04:57:50 +00:00
var self, next string
u := url.URL{
Path: basePath,
}
values := url.Values{}
for k, vs := range f.QueryParams() {
for _, v := range vs {
if v != "" {
values.Add(k, v)
}
}
}
u.RawQuery = values.Encode()
self = u.String()
if len(ts) >= f.Limit {
values.Set("after", ts[f.Limit-1].ID.String())
u.RawQuery = values.Encode()
next = u.String()
}
2019-08-22 02:08:51 +00:00
links := &influxdb.PagingLinks{
2019-03-06 04:57:50 +00:00
Self: self,
Next: next,
}
return links
}
2019-03-06 04:57:50 +00:00
type tasksResponse struct {
2019-08-22 02:08:51 +00:00
Links *influxdb.PagingLinks `json:"links"`
2019-03-06 04:57:50 +00:00
Tasks []taskResponse `json:"tasks"`
}
2019-08-22 02:08:51 +00:00
func newTasksResponse(ctx context.Context, ts []*influxdb.Task, f influxdb.TaskFilter, labelService influxdb.LabelService) tasksResponse {
rs := tasksResponse{
2019-03-06 04:57:50 +00:00
Links: newTasksPagingLinks(tasksPath, ts, f),
Tasks: make([]taskResponse, len(ts)),
}
2018-11-28 22:52:11 +00:00
for i := range ts {
2019-08-22 02:08:51 +00:00
labels, _ := labelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: ts[i].ID})
rs.Tasks[i] = newTaskResponse(*ts[i], labels)
}
return rs
}
type runResponse struct {
2018-11-20 21:53:52 +00:00
Links map[string]string `json:"links,omitempty"`
httpRun
}
// httpRun is a version of the Run object used to communicate over the API
// it uses a pointer to a time.Time instead of a time.Time so that we can pass a nil
// value for empty time values
type httpRun struct {
ID influxdb.ID `json:"id,omitempty"`
TaskID influxdb.ID `json:"taskID"`
Status string `json:"status"`
ScheduledFor *time.Time `json:"scheduledFor"`
StartedAt *time.Time `json:"startedAt,omitempty"`
FinishedAt *time.Time `json:"finishedAt,omitempty"`
RequestedAt *time.Time `json:"requestedAt,omitempty"`
Log []influxdb.Log `json:"log,omitempty"`
}
2019-08-22 02:08:51 +00:00
func newRunResponse(r influxdb.Run) runResponse {
run := httpRun{
ID: r.ID,
TaskID: r.TaskID,
Status: r.Status,
Log: r.Log,
ScheduledFor: &r.ScheduledFor,
}
if !r.StartedAt.IsZero() {
run.StartedAt = &r.StartedAt
}
if !r.FinishedAt.IsZero() {
run.FinishedAt = &r.FinishedAt
}
if !r.RequestedAt.IsZero() {
run.RequestedAt = &r.RequestedAt
}
return runResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/tasks/%s/runs/%s", r.TaskID, r.ID),
"task": fmt.Sprintf("/api/v2/tasks/%s", r.TaskID),
"logs": fmt.Sprintf("/api/v2/tasks/%s/runs/%s/logs", r.TaskID, r.ID),
"retry": fmt.Sprintf("/api/v2/tasks/%s/runs/%s/retry", r.TaskID, r.ID),
},
httpRun: run,
}
}
func convertRun(r httpRun) *influxdb.Run {
run := &influxdb.Run{
ID: r.ID,
TaskID: r.TaskID,
Status: r.Status,
Log: r.Log,
}
if r.StartedAt != nil {
run.StartedAt = *r.StartedAt
}
if r.FinishedAt != nil {
run.FinishedAt = *r.FinishedAt
}
if r.RequestedAt != nil {
run.RequestedAt = *r.RequestedAt
}
if r.ScheduledFor != nil {
run.ScheduledFor = *r.ScheduledFor
}
return run
}
type runsResponse struct {
Links map[string]string `json:"links"`
Runs []*runResponse `json:"runs"`
}
2019-08-22 02:08:51 +00:00
func newRunsResponse(rs []*influxdb.Run, taskID influxdb.ID) runsResponse {
r := runsResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/tasks/%s/runs", taskID),
"task": fmt.Sprintf("/api/v2/tasks/%s", taskID),
},
Runs: make([]*runResponse, len(rs)),
}
for i := range rs {
rs := newRunResponse(*rs[i])
r.Runs[i] = &rs
}
return r
}
func (h *TaskHandler) handleGetTasks(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
2019-02-20 17:04:46 +00:00
req, err := decodeGetTasksRequest(ctx, r, h.OrganizationService)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
tasks, _, err := h.TaskService.FindTasks(ctx, req.filter)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
2019-07-09 15:16:26 +00:00
h.logger.Debug("tasks retrived", zap.String("tasks", fmt.Sprint(tasks)))
2019-03-06 04:57:50 +00:00
if err := encodeResponse(ctx, w, http.StatusOK, newTasksResponse(ctx, tasks, req.filter, h.LabelService)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type getTasksRequest struct {
2019-08-22 02:08:51 +00:00
filter influxdb.TaskFilter
}
2019-08-22 02:08:51 +00:00
func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs influxdb.OrganizationService) (*getTasksRequest, error) {
qp := r.URL.Query()
req := &getTasksRequest{}
if after := qp.Get("after"); after != "" {
2019-08-22 02:08:51 +00:00
id, err := influxdb.IDFromString(after)
if err != nil {
return nil, err
}
req.filter.After = id
}
2019-02-20 17:04:46 +00:00
if orgName := qp.Get("org"); orgName != "" {
2019-08-22 02:08:51 +00:00
o, err := orgs.FindOrganization(ctx, influxdb.OrganizationFilter{Name: &orgName})
2019-02-20 17:04:46 +00:00
if err != nil {
2019-08-22 02:08:51 +00:00
if pErr, ok := err.(*influxdb.Error); ok && pErr != nil {
if kv.IsNotFound(err) || pErr.Code == influxdb.EUnauthorized {
return nil, &influxdb.Error{
2019-02-20 17:04:46 +00:00
Err: errors.New("org not found or unauthorized"),
Msg: "org " + orgName + " not found or unauthorized",
}
}
}
return nil, err
}
req.filter.Organization = o.Name
req.filter.OrganizationID = &o.ID
}
if oid := qp.Get("orgID"); oid != "" {
2019-08-22 02:08:51 +00:00
orgID, err := influxdb.IDFromString(oid)
if err != nil {
return nil, err
}
req.filter.OrganizationID = orgID
}
if userID := qp.Get("user"); userID != "" {
2019-08-22 02:08:51 +00:00
id, err := influxdb.IDFromString(userID)
if err != nil {
return nil, err
}
req.filter.User = id
}
2018-11-29 16:28:17 +00:00
if limit := qp.Get("limit"); limit != "" {
lim, err := strconv.Atoi(limit)
if err != nil {
return nil, err
}
2019-08-22 02:08:51 +00:00
if lim < 1 || lim > influxdb.TaskMaxPageSize {
return nil, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: fmt.Sprintf("limit must be between 1 and %d", influxdb.TaskMaxPageSize),
}
2018-11-29 16:28:17 +00:00
}
req.filter.Limit = lim
} else {
2019-08-22 02:08:51 +00:00
req.filter.Limit = influxdb.TaskDefaultPageSize
2018-11-29 16:28:17 +00:00
}
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
// the task api can only create or lookup system tasks.
req.filter.Type = &influxdb.TaskSystemType
2019-08-06 16:27:52 +00:00
if name := qp.Get("name"); name != "" {
req.filter.Name = &name
}
return req, nil
}
2018-06-05 23:47:32 +00:00
func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodePostTaskRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
2018-06-05 23:47:32 +00:00
return
}
if err := h.populateTaskCreateOrg(ctx, &req.TaskCreate); err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
Msg: "could not identify organization",
}
h.HandleHTTPError(ctx, err, w)
return
}
if !req.TaskCreate.OrganizationID.Valid() {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Code: influxdb.EInvalid,
2018-11-20 12:13:12 +00:00
Msg: "invalid organization id",
}
h.HandleHTTPError(ctx, err, w)
2018-11-20 12:13:12 +00:00
return
}
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
task, err := h.TaskService.CreateTask(ctx, req.TaskCreate)
if err != nil {
if e, ok := err.(AuthzError); ok {
h.logger.Error("failed authentication", zap.Errors("error messages", []error{err, e.AuthzError()}))
}
2019-08-22 02:08:51 +00:00
// if the error is not already a influxdb.error then make it into one
if _, ok := err.(*influxdb.Error); !ok {
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInternal,
Msg: "failed to create task",
}
}
h.HandleHTTPError(ctx, err, w)
2018-06-05 23:47:32 +00:00
return
}
2019-08-22 02:08:51 +00:00
if err := encodeResponse(ctx, w, http.StatusCreated, newTaskResponse(*task, []*influxdb.Label{})); err != nil {
logEncodingError(h.logger, r, err)
2018-06-05 23:47:32 +00:00
return
}
}
type postTaskRequest struct {
2019-08-22 02:08:51 +00:00
TaskCreate influxdb.TaskCreate
2018-06-05 23:47:32 +00:00
}
func decodePostTaskRequest(ctx context.Context, r *http.Request) (*postTaskRequest, error) {
2019-08-22 02:08:51 +00:00
var tc influxdb.TaskCreate
if err := json.NewDecoder(r.Body).Decode(&tc); err != nil {
2018-06-05 23:47:32 +00:00
return nil, err
}
// pull auth from ctx, populate OwnerID
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
tc.OwnerID = auth.GetUserID()
feat(task): Allow tasks to run more isolated from other task systems (#15384) * feat(task): Allow tasks to run more isolated from other task systems To allow the task internal system to be used for user created tasks as well as checks, notification and other future additions we needed to take 2 actions: 1 - We need to use type as a first class citizen, meaning that task's have a type and each system that will be creating tasks will set the task type through the api. This is a change to the previous assumption that any user could set task types. This change will allow us to have other service's white label the task service for their own purposes and not have to worry about colissions between the types. 2 - We needed to allow other systems to add data specific to the problem they are trying to solve. For this purpose adding a `metadata` field to the internal task system which should allow other systems to use the task service. These changes will allow us in the future to allow for the current check's and notifications implementations to create a task with meta data instead of creating a check object and a task object in the database. By allowing this new behavior checks, notifications, and user task's can all follow the same pattern: Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call, translate the results to what the api expects for this system, and return results. * fix(task): undo additional check for ownerID because check is not ready
2019-10-11 14:53:38 +00:00
// when creating a task we set the type so we can filter later.
tc.Type = influxdb.TaskSystemType
if err := tc.Validate(); err != nil {
return nil, err
}
2018-06-05 23:47:32 +00:00
return &postTaskRequest{
TaskCreate: tc,
2018-06-05 23:47:32 +00:00
}, nil
}
func (h *TaskHandler) handleGetTask(w http.ResponseWriter, r *http.Request) {
2018-06-05 23:47:32 +00:00
ctx := r.Context()
req, err := decodeGetTaskRequest(ctx, r)
2018-06-05 23:47:32 +00:00
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
2018-06-05 23:47:32 +00:00
return
}
task, err := h.TaskService.FindTaskByID(ctx, req.TaskID)
2018-06-05 23:47:32 +00:00
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.ENotFound,
Msg: "failed to find task",
}
h.HandleHTTPError(ctx, err, w)
2018-06-05 23:47:32 +00:00
return
}
2019-08-22 02:08:51 +00:00
labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: task.ID})
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
Msg: "failed to find resource labels",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-07-09 15:16:26 +00:00
h.logger.Debug("task retrived", zap.String("tasks", fmt.Sprint(task)))
if err := encodeResponse(ctx, w, http.StatusOK, newTaskResponse(*task, labels)); err != nil {
logEncodingError(h.logger, r, err)
2018-06-05 23:47:32 +00:00
return
}
}
type getTaskRequest struct {
2019-08-22 02:08:51 +00:00
TaskID influxdb.ID
2018-06-05 23:47:32 +00:00
}
func decodeGetTaskRequest(ctx context.Context, r *http.Request) (*getTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
2018-06-05 23:47:32 +00:00
}
2019-08-22 02:08:51 +00:00
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
2018-06-05 23:47:32 +00:00
}
req := &getTaskRequest{
TaskID: i,
2018-06-05 23:47:32 +00:00
}
return req, nil
}
func (h *TaskHandler) handleUpdateTask(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeUpdateTaskRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
task, err := h.TaskService.UpdateTask(ctx, req.TaskID, req.Update)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to update task",
}
if err.Err == influxdb.ErrTaskNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-08-22 02:08:51 +00:00
labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: task.ID})
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
Msg: "failed to find resource labels",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-07-09 15:16:26 +00:00
h.logger.Debug("tasks updated", zap.String("task", fmt.Sprint(task)))
if err := encodeResponse(ctx, w, http.StatusOK, newTaskResponse(*task, labels)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type updateTaskRequest struct {
2019-08-22 02:08:51 +00:00
Update influxdb.TaskUpdate
TaskID influxdb.ID
}
func decodeUpdateTaskRequest(ctx context.Context, r *http.Request) (*updateTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
2019-08-22 02:08:51 +00:00
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
2019-08-22 02:08:51 +00:00
var upd influxdb.TaskUpdate
if err := json.NewDecoder(r.Body).Decode(&upd); err != nil {
return nil, err
}
if err := upd.Validate(); err != nil {
return nil, err
}
return &updateTaskRequest{
Update: upd,
TaskID: i,
}, nil
}
func (h *TaskHandler) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeDeleteTaskRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
if err := h.TaskService.DeleteTask(ctx, req.TaskID); err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to delete task",
}
if err.Err == influxdb.ErrTaskNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-07-09 15:16:26 +00:00
h.logger.Debug("tasks deleted", zap.String("taskID", fmt.Sprint(req.TaskID)))
2018-10-29 19:10:33 +00:00
w.WriteHeader(http.StatusNoContent)
}
type deleteTaskRequest struct {
2019-08-22 02:08:51 +00:00
TaskID influxdb.ID
}
func decodeDeleteTaskRequest(ctx context.Context, r *http.Request) (*deleteTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
2019-08-22 02:08:51 +00:00
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
return &deleteTaskRequest{
TaskID: i,
}, nil
}
func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetLogsRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EUnauthorized,
Msg: "failed to get authorizer",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-08-22 02:08:51 +00:00
if k := auth.Kind(); k != influxdb.AuthorizationKind {
// Get the authorization for the task, if allowed.
authz, err := h.getAuthorizationForTask(ctx, auth, req.filter.Task)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// We were able to access the authorizer for the task, so reassign that on the context for the rest of this call.
ctx = pcontext.SetAuthorizer(ctx, authz)
}
logs, _, err := h.TaskService.FindLogs(ctx, req.filter)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to find task logs",
}
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrNoRunsFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, &getLogsResponse{Events: logs}); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type getLogsRequest struct {
2019-08-22 02:08:51 +00:00
filter influxdb.LogFilter
}
type getLogsResponse struct {
2019-08-22 02:08:51 +00:00
Events []*influxdb.Log `json:"events"`
}
func decodeGetLogsRequest(ctx context.Context, r *http.Request) (*getLogsRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
req := &getLogsRequest{}
2019-08-22 02:08:51 +00:00
taskID, err := influxdb.IDFromString(id)
if err != nil {
return nil, err
}
req.filter.Task = *taskID
2018-11-07 20:15:53 +00:00
if runID := params.ByName("rid"); runID != "" {
2019-08-22 02:08:51 +00:00
id, err := influxdb.IDFromString(runID)
if err != nil {
return nil, err
}
req.filter.Run = id
}
return req, nil
}
func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetRunsRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EUnauthorized,
Msg: "failed to get authorizer",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-08-22 02:08:51 +00:00
if k := auth.Kind(); k != influxdb.AuthorizationKind {
// Get the authorization for the task, if allowed.
authz, err := h.getAuthorizationForTask(ctx, auth, req.filter.Task)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// We were able to access the authorizer for the task, so reassign that on the context for the rest of this call.
ctx = pcontext.SetAuthorizer(ctx, authz)
}
runs, _, err := h.TaskService.FindRuns(ctx, req.filter)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to find runs",
}
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrNoRunsFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, req.filter.Task)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type getRunsRequest struct {
2019-08-22 02:08:51 +00:00
filter influxdb.RunFilter
}
func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
req := &getRunsRequest{}
2019-08-22 02:08:51 +00:00
taskID, err := influxdb.IDFromString(id)
if err != nil {
return nil, err
}
req.filter.Task = *taskID
qp := r.URL.Query()
if id := qp.Get("after"); id != "" {
2019-08-22 02:08:51 +00:00
afterID, err := influxdb.IDFromString(id)
if err != nil {
return nil, err
}
req.filter.After = afterID
}
if limit := qp.Get("limit"); limit != "" {
i, err := strconv.Atoi(limit)
if err != nil {
return nil, err
}
if i < 1 || i > influxdb.TaskMaxPageSize {
return nil, influxdb.ErrOutOfBoundsLimit
}
req.filter.Limit = i
}
2018-11-28 12:45:25 +00:00
var at, bt string
var afterTime, beforeTime time.Time
2018-11-28 12:45:25 +00:00
if at = qp.Get("afterTime"); at != "" {
afterTime, err = time.Parse(time.RFC3339, at)
if err != nil {
return nil, err
}
req.filter.AfterTime = at
}
2018-11-28 12:45:25 +00:00
if bt = qp.Get("beforeTime"); bt != "" {
beforeTime, err = time.Parse(time.RFC3339, bt)
if err != nil {
return nil, err
}
req.filter.BeforeTime = bt
}
2018-11-29 14:30:37 +00:00
if at != "" && bt != "" && !beforeTime.After(afterTime) {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EUnprocessableEntity,
Msg: "beforeTime must be later than afterTime",
}
}
return req, nil
}
func (h *TaskHandler) handleForceRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeForceRunRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
run, err := h.TaskService.ForceRun(ctx, req.TaskID, req.Timestamp)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to force run",
}
if err.Err == influxdb.ErrTaskNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusCreated, newRunResponse(*run)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type forceRunRequest struct {
2019-08-22 02:08:51 +00:00
TaskID influxdb.ID
Timestamp int64
}
func decodeForceRunRequest(ctx context.Context, r *http.Request) (forceRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
tid := params.ByName("id")
if tid == "" {
2019-08-22 02:08:51 +00:00
return forceRunRequest{}, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
2019-08-22 02:08:51 +00:00
var ti influxdb.ID
if err := ti.DecodeFromString(tid); err != nil {
return forceRunRequest{}, err
}
var req struct {
ScheduledFor string `json:"scheduledFor"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return forceRunRequest{}, err
}
var t time.Time
if req.ScheduledFor == "" {
t = time.Now()
} else {
var err error
t, err = time.Parse(time.RFC3339, req.ScheduledFor)
if err != nil {
return forceRunRequest{}, err
}
}
return forceRunRequest{
TaskID: ti,
Timestamp: t.Unix(),
}, nil
}
func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetRunRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EUnauthorized,
Msg: "failed to get authorizer",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-08-22 02:08:51 +00:00
if k := auth.Kind(); k != influxdb.AuthorizationKind {
// Get the authorization for the task, if allowed.
authz, err := h.getAuthorizationForTask(ctx, auth, req.TaskID)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// We were able to access the authorizer for the task, so reassign that on the context for the rest of this call.
ctx = pcontext.SetAuthorizer(ctx, authz)
}
run, err := h.TaskService.FindRunByID(ctx, req.TaskID, req.RunID)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to find run",
}
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newRunResponse(*run)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type getRunRequest struct {
2019-08-22 02:08:51 +00:00
TaskID influxdb.ID
RunID influxdb.ID
}
func decodeGetRunRequest(ctx context.Context, r *http.Request) (*getRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
tid := params.ByName("id")
if tid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
rid := params.ByName("rid")
if rid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a run ID",
}
}
2019-08-22 02:08:51 +00:00
var ti, ri influxdb.ID
if err := ti.DecodeFromString(tid); err != nil {
return nil, err
}
if err := ri.DecodeFromString(rid); err != nil {
return nil, err
}
return &getRunRequest{
RunID: ri,
TaskID: ti,
}, nil
}
2018-10-04 17:12:43 +00:00
type cancelRunRequest struct {
2019-08-22 02:08:51 +00:00
RunID influxdb.ID
TaskID influxdb.ID
2018-10-04 17:12:43 +00:00
}
func decodeCancelRunRequest(ctx context.Context, r *http.Request) (*cancelRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
rid := params.ByName("rid")
if rid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a run ID",
}
2018-10-04 17:12:43 +00:00
}
tid := params.ByName("id")
2018-10-04 17:12:43 +00:00
if tid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
2018-10-04 17:12:43 +00:00
}
2019-08-22 02:08:51 +00:00
var i influxdb.ID
2018-10-04 17:12:43 +00:00
if err := i.DecodeFromString(rid); err != nil {
return nil, err
}
2019-08-22 02:08:51 +00:00
var t influxdb.ID
2018-10-04 17:12:43 +00:00
if err := t.DecodeFromString(tid); err != nil {
return nil, err
}
return &cancelRunRequest{
RunID: i,
TaskID: t,
}, nil
}
func (h *TaskHandler) handleCancelRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeCancelRunRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
2018-10-04 17:12:43 +00:00
return
}
err = h.TaskService.CancelRun(ctx, req.TaskID, req.RunID)
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to cancel run",
}
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
2018-10-04 17:12:43 +00:00
return
}
}
func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeRetryRunRequest(ctx, r)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EInvalid,
Msg: "failed to decode request",
}
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
2019-08-22 02:08:51 +00:00
err = &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EUnauthorized,
Msg: "failed to get authorizer",
}
h.HandleHTTPError(ctx, err, w)
return
}
2019-08-22 02:08:51 +00:00
if k := auth.Kind(); k != influxdb.AuthorizationKind {
// Get the authorization for the task, if allowed.
authz, err := h.getAuthorizationForTask(ctx, auth, req.TaskID)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
// We were able to access the authorizer for the task, so reassign that on the context for the rest of this call.
ctx = pcontext.SetAuthorizer(ctx, authz)
}
run, err := h.TaskService.RetryRun(ctx, req.TaskID, req.RunID)
2018-11-20 21:53:52 +00:00
if err != nil {
2019-08-22 02:08:51 +00:00
err := &influxdb.Error{
Err: err,
Msg: "failed to retry run",
}
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
2019-08-22 02:08:51 +00:00
err.Code = influxdb.ENotFound
}
h.HandleHTTPError(ctx, err, w)
2018-11-20 21:53:52 +00:00
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newRunResponse(*run)); err != nil {
logEncodingError(h.logger, r, err)
return
}
}
type retryRunRequest struct {
2019-08-22 02:08:51 +00:00
RunID, TaskID influxdb.ID
}
func decodeRetryRunRequest(ctx context.Context, r *http.Request) (*retryRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
tid := params.ByName("id")
if tid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a task ID",
}
}
rid := params.ByName("rid")
if rid == "" {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "you must provide a run ID",
}
}
2019-08-22 02:08:51 +00:00
var ti, ri influxdb.ID
if err := ti.DecodeFromString(tid); err != nil {
return nil, err
}
if err := ri.DecodeFromString(rid); err != nil {
return nil, err
}
return &retryRunRequest{
RunID: ri,
TaskID: ti,
}, nil
}
2018-10-23 17:51:13 +00:00
2019-08-22 02:08:51 +00:00
func (h *TaskHandler) populateTaskCreateOrg(ctx context.Context, tc *influxdb.TaskCreate) error {
if tc.OrganizationID.Valid() && tc.Organization != "" {
return nil
}
if !tc.OrganizationID.Valid() && tc.Organization == "" {
return errors.New("missing orgID and organization name")
}
if tc.OrganizationID.Valid() {
o, err := h.OrganizationService.FindOrganizationByID(ctx, tc.OrganizationID)
if err != nil {
return err
}
tc.Organization = o.Name
} else {
2019-08-22 02:08:51 +00:00
o, err := h.OrganizationService.FindOrganization(ctx, influxdb.OrganizationFilter{Name: &tc.Organization})
if err != nil {
return err
}
tc.OrganizationID = o.ID
}
return nil
}
// getAuthorizationForTask looks up the authorization associated with taskID,
// ensuring that the authorizer on ctx is allowed to view the task and the authorization.
//
2019-08-22 02:08:51 +00:00
// This method returns a *influxdb.Error, suitable for directly passing to h.HandleHTTPError.
func (h *TaskHandler) getAuthorizationForTask(ctx context.Context, auth influxdb.Authorizer, taskID influxdb.ID) (*influxdb.Authorization, *influxdb.Error) {
sess, ok := auth.(*influxdb.Session)
if !ok {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Code: influxdb.EUnauthorized,
Msg: "unable to authorize session",
}
}
// First look up the task, if we're allowed.
// This assumes h.TaskService validates access.
t, err := h.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
2019-08-22 02:08:51 +00:00
return nil, &influxdb.Error{
Err: err,
2019-08-22 02:08:51 +00:00
Code: influxdb.EUnauthorized,
Msg: "task ID unknown or unauthorized",
}
}
return sess.EphemeralAuth(t.OrganizationID), nil
}
2018-10-29 19:10:33 +00:00
// TaskService connects to Influx via HTTP using tokens to manage tasks.
2018-10-23 17:51:13 +00:00
type TaskService struct {
Addr string
Token string
InsecureSkipVerify bool
}
2018-10-29 19:10:33 +00:00
// FindTaskByID returns a single task
2019-08-22 02:08:51 +00:00
func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
2018-10-23 17:51:13 +00:00
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2019-08-22 02:08:51 +00:00
if influxdb.ErrorCode(err) == influxdb.ENotFound {
2018-10-23 17:51:13 +00:00
// ErrTaskNotFound is expected as part of the FindTaskByID contract,
// so return that actual error instead of a different error that looks like it.
2019-01-24 18:56:05 +00:00
// TODO cleanup backend task service error implementation
return nil, influxdb.ErrTaskNotFound
2018-10-23 17:51:13 +00:00
}
return nil, err
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
2018-10-23 17:51:13 +00:00
return nil, err
}
return &tr.Task, nil
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
2019-08-22 02:08:51 +00:00
func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, tasksPath)
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, 0, err
}
val := url.Values{}
if filter.After != nil {
val.Add("after", filter.After.String())
}
if filter.OrganizationID != nil {
val.Add("orgID", filter.OrganizationID.String())
}
if filter.Organization != "" {
val.Add("org", filter.Organization)
2018-10-23 17:51:13 +00:00
}
if filter.User != nil {
val.Add("user", filter.User.String())
}
2018-11-29 16:28:17 +00:00
if filter.Limit != 0 {
val.Add("limit", strconv.Itoa(filter.Limit))
}
2018-10-23 17:51:13 +00:00
2019-08-06 16:27:52 +00:00
if filter.Type != nil {
val.Add("type", *filter.Type)
}
2018-10-23 17:51:13 +00:00
u.RawQuery = val.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
2018-10-23 17:51:13 +00:00
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2018-10-23 17:51:13 +00:00
return nil, 0, err
}
var tr tasksResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
2018-10-23 17:51:13 +00:00
return nil, 0, err
}
2019-08-22 02:08:51 +00:00
tasks := make([]*influxdb.Task, len(tr.Tasks))
2018-11-28 22:52:11 +00:00
for i := range tr.Tasks {
2018-11-21 06:11:48 +00:00
tasks[i] = &tr.Tasks[i].Task
}
2018-10-23 17:51:13 +00:00
return tasks, len(tasks), nil
}
2018-10-29 19:10:33 +00:00
// CreateTask creates a new task.
2019-08-22 02:08:51 +00:00
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, tasksPath)
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
taskBytes, err := json.Marshal(tc)
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(taskBytes))
if err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
defer resp.Body.Close()
2018-10-23 17:51:13 +00:00
2019-01-24 01:02:37 +00:00
if err := CheckError(resp); err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
return nil, err
2018-10-23 17:51:13 +00:00
}
return &tr.Task, nil
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// UpdateTask updates a single task with changeset.
2019-08-22 02:08:51 +00:00
func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, err
}
taskBytes, err := json.Marshal(upd)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PATCH", u.String(), bytes.NewReader(taskBytes))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2018-10-23 17:51:13 +00:00
return nil, err
}
var tr taskResponse
if err := json.NewDecoder(resp.Body).Decode(&tr); err != nil {
2018-10-23 17:51:13 +00:00
return nil, err
}
return &tr.Task, nil
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
2019-08-22 02:08:51 +00:00
func (t TaskService) DeleteTask(ctx context.Context, id influxdb.ID) error {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, taskIDPath(id))
2018-10-23 17:51:13 +00:00
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
2018-10-29 19:10:33 +00:00
return CheckErrorStatus(http.StatusNoContent, resp)
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// FindLogs returns logs for a run.
2019-08-22 02:08:51 +00:00
func (t TaskService) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
if !filter.Task.Valid() {
return nil, 0, errors.New("task ID required")
}
var urlPath string
if filter.Run == nil {
urlPath = path.Join(taskIDPath(filter.Task), "logs")
} else {
urlPath = path.Join(taskIDRunIDPath(filter.Task, *filter.Run), "logs")
}
u, err := NewURL(t.Addr, urlPath)
if err != nil {
return nil, 0, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
return nil, 0, err
}
var logs getLogsResponse
if err := json.NewDecoder(resp.Body).Decode(&logs); err != nil {
return nil, 0, err
}
return logs.Events, len(logs.Events), nil
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
2019-08-22 02:08:51 +00:00
func (t TaskService) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
if !filter.Task.Valid() {
2018-10-23 17:51:13 +00:00
return nil, 0, errors.New("task ID required")
}
u, err := NewURL(t.Addr, taskIDRunsPath(filter.Task))
2018-10-23 17:51:13 +00:00
if err != nil {
return nil, 0, err
}
val := url.Values{}
if filter.After != nil {
val.Set("after", filter.After.String())
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
val.Set("limit", strconv.Itoa(filter.Limit))
2018-10-23 17:51:13 +00:00
u.RawQuery = val.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, 0, err
}
req.Header.Set("Content-Type", "application/json")
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-23 17:51:13 +00:00
resp, err := hc.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2018-10-23 17:51:13 +00:00
return nil, 0, err
}
var rs runsResponse
if err := json.NewDecoder(resp.Body).Decode(&rs); err != nil {
2018-10-23 17:51:13 +00:00
return nil, 0, err
}
2019-08-22 02:08:51 +00:00
runs := make([]*influxdb.Run, len(rs.Runs))
for i := range rs.Runs {
runs[i] = convertRun(rs.Runs[i].httpRun)
}
2018-10-23 17:51:13 +00:00
return runs, len(runs), nil
}
2018-10-29 19:10:33 +00:00
// FindRunByID returns a single run of a specific task.
2019-08-22 02:08:51 +00:00
func (t TaskService) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, taskIDRunIDPath(taskID, runID))
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2019-08-22 02:08:51 +00:00
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the FindRunByID contract,
// so return that actual error instead of a different error that looks like it.
2019-01-24 18:56:05 +00:00
// TODO cleanup backend error implementation
2019-08-22 02:08:51 +00:00
return nil, influxdb.ErrRunNotFound
}
return nil, err
}
2018-11-20 21:53:52 +00:00
var rs = &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
2018-10-23 17:51:13 +00:00
}
2018-10-29 19:10:33 +00:00
// RetryRun creates and returns a new run (which is a retry of another run).
2019-08-22 02:08:51 +00:00
func (t TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
p := path.Join(taskIDRunIDPath(taskID, runID), "retry")
u, err := NewURL(t.Addr, p)
if err != nil {
2018-11-20 21:53:52 +00:00
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
2018-11-20 21:53:52 +00:00
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
2018-11-20 21:53:52 +00:00
return nil, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2019-08-22 02:08:51 +00:00
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
2019-01-24 18:56:05 +00:00
// TODO cleanup backend task error implementation
2019-08-22 02:08:51 +00:00
return nil, influxdb.ErrRunNotFound
}
// RequestStillQueuedError is also part of the contract.
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
return nil, *e
}
return nil, err
}
rs := &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
}
// ForceRun starts a run manually right now.
2019-08-22 02:08:51 +00:00
func (t TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduledFor int64) (*influxdb.Run, error) {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, taskIDRunsPath(taskID))
if err != nil {
return nil, err
}
body := fmt.Sprintf(`{"scheduledFor": %q}`, time.Unix(scheduledFor, 0).UTC().Format(time.RFC3339))
req, err := http.NewRequest("POST", u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2019-08-22 02:08:51 +00:00
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
return nil, influxdb.ErrRunNotFound
}
// RequestStillQueuedError is also part of the contract.
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
2018-11-20 21:53:52 +00:00
return nil, *e
}
2018-11-20 21:53:52 +00:00
return nil, err
}
2018-11-20 21:53:52 +00:00
rs := &runResponse{}
if err := json.NewDecoder(resp.Body).Decode(rs); err != nil {
return nil, err
}
return convertRun(rs.httpRun), nil
2018-10-23 17:51:13 +00:00
}
2019-08-22 02:08:51 +00:00
func cancelPath(taskID, runID influxdb.ID) string {
2018-10-04 17:12:43 +00:00
return path.Join(taskID.String(), runID.String())
}
// CancelRun stops a longer running run.
2019-08-22 02:08:51 +00:00
func (t TaskService) CancelRun(ctx context.Context, taskID, runID influxdb.ID) error {
2019-03-06 00:18:04 +00:00
span, _ := tracing.StartSpanFromContext(ctx)
2019-03-05 00:38:10 +00:00
defer span.Finish()
u, err := NewURL(t.Addr, cancelPath(taskID, runID))
2018-10-04 17:12:43 +00:00
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}
SetToken(t.Token, req)
hc := NewClient(u.Scheme, t.InsecureSkipVerify)
2018-10-04 17:12:43 +00:00
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
2019-01-24 18:56:05 +00:00
if err := CheckError(resp); err != nil {
2018-10-04 17:12:43 +00:00
return err
}
return nil
}
2019-08-22 02:08:51 +00:00
func taskIDPath(id influxdb.ID) string {
2018-10-23 17:51:13 +00:00
return path.Join(tasksPath, id.String())
}
2019-08-22 02:08:51 +00:00
func taskIDRunsPath(id influxdb.ID) string {
2018-10-23 17:51:13 +00:00
return path.Join(tasksPath, id.String(), "runs")
}
2019-08-22 02:08:51 +00:00
func taskIDRunIDPath(taskID, runID influxdb.ID) string {
return path.Join(tasksPath, taskID.String(), "runs", runID.String())
}