feat(http): add endpoints for runs and logs

pull/10616/head
Jade McGough 2018-06-08 16:41:26 -07:00
parent e68b635402
commit d756083890
2 changed files with 253 additions and 9 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"
"github.com/influxdata/platform"
kerrors "github.com/influxdata/platform/kit/errors"
@ -24,9 +25,16 @@ func NewTaskHandler() *TaskHandler {
h.HandlerFunc("GET", "/v1/tasks", h.handleGetTasks)
h.HandlerFunc("POST", "/v1/tasks", h.handlePostTask)
h.HandlerFunc("GET", "/v1/tasks/:id", h.handleGetTask)
h.HandlerFunc("PATCH", "/v1/tasks/:id", h.handleUpdateTask)
h.HandlerFunc("DELETE", "/v1/tasks/:id", h.handleDeleteTask)
h.HandlerFunc("GET", "/v1/tasks/:tid", h.handleGetTask)
h.HandlerFunc("PATCH", "/v1/tasks/:tid", h.handleUpdateTask)
h.HandlerFunc("DELETE", "/v1/tasks/:tid", h.handleDeleteTask)
h.HandlerFunc("GET", "/v1/tasks/:tid/logs", h.handleGetLogs)
h.HandlerFunc("GET", "/v1/tasks/:tid/runs", h.handleGetRuns)
h.HandlerFunc("GET", "/v1/tasks/:tid/runs/:rid", h.handleGetRun)
h.HandlerFunc("POST", "/v1/tasks/:tid/runs/:rid/retry", h.handleRetryRun)
return h
}
@ -144,7 +152,7 @@ type getTaskRequest struct {
func decodeGetTaskRequest(ctx context.Context, r *http.Request) (*getTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
id := params.ByName("tid")
if id == "" {
return nil, kerrors.InvalidDataf("url missing id")
}
@ -189,7 +197,7 @@ type updateTaskRequest struct {
func decodeUpdateTaskRequest(ctx context.Context, r *http.Request) (*updateTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
id := params.ByName("tid")
if id == "" {
return nil, kerrors.InvalidDataf("url missing id")
}
@ -233,7 +241,7 @@ type deleteTaskRequest struct {
func decodeDeleteTaskRequest(ctx context.Context, r *http.Request) (*deleteTaskRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
id := params.ByName("tid")
if id == "" {
return nil, kerrors.InvalidDataf("url missing id")
}
@ -247,3 +255,202 @@ func decodeDeleteTaskRequest(ctx context.Context, r *http.Request) (*deleteTaskR
TaskID: i,
}, nil
}
func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetLogsRequest(ctx, r)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
logs, _, err := h.TaskService.FindLogs(ctx, req.filter)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, logs); err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
}
type getLogsRequest struct {
filter platform.LogFilter
}
func decodeGetLogsRequest(ctx context.Context, r *http.Request) (*getLogsRequest, error) {
qp := r.URL.Query()
req := &getLogsRequest{}
if id := qp.Get("task"); id != "" {
req.filter.Task = &platform.ID{}
if err := req.filter.Task.DecodeFromString(id); err != nil {
return nil, err
}
}
if id := qp.Get("run"); id != "" {
req.filter.Run = &platform.ID{}
if err := req.filter.Run.DecodeFromString(id); err != nil {
return nil, err
}
}
return req, nil
}
func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetRunsRequest(ctx, r)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
runs, _, err := h.TaskService.FindRuns(ctx, req.filter)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, runs); err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
}
type getRunsRequest struct {
filter platform.RunFilter
}
func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest, error) {
qp := r.URL.Query()
req := &getRunsRequest{}
if id := qp.Get("task"); id != "" {
req.filter.Task = &platform.ID{}
if err := req.filter.Task.DecodeFromString(id); err != nil {
return nil, err
}
}
if id := qp.Get("after"); id != "" {
req.filter.After = &platform.ID{}
if err := req.filter.After.DecodeFromString(id); err != nil {
return nil, err
}
}
if limit := qp.Get("limit"); limit != "" {
i, err := strconv.Atoi(limit)
if err != nil {
return nil, err
}
if i < 1 || i > 100 {
return nil, kerrors.InvalidDataf("limit must be between 1 and 100")
}
req.filter.Limit = i
}
if time := qp.Get("afterTime"); time != "" {
// TODO (jm): verify format is correct, once we've decided on a format
req.filter.AfterTime = time
}
if time := qp.Get("beforeTime"); time != "" {
// TODO (jm): verify format is correct, once we've decided on a format
req.filter.BeforeTime = time
}
return req, nil
}
func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetRunRequest(ctx, r)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
run, err := h.TaskService.FindRunByID(ctx, req.RunID)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, run); err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
}
type getRunRequest struct {
RunID platform.ID
}
func decodeGetRunRequest(ctx context.Context, r *http.Request) (*getRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("rid")
if id == "" {
return nil, kerrors.InvalidDataf("you must provide a run ID")
}
var i platform.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
return &getRunRequest{
RunID: i,
}, nil
}
func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeRetryRunRequest(ctx, r)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
run, err := h.TaskService.RetryRun(ctx, req.RunID)
if err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, run); err != nil {
kerrors.EncodeHTTP(ctx, err, w)
return
}
}
type retryRunRequest struct {
RunID platform.ID
}
func decodeRetryRunRequest(ctx context.Context, r *http.Request) (*retryRunRequest, error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("rid")
if id == "" {
return nil, kerrors.InvalidDataf("you must provide a run ID")
}
var i platform.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
return &retryRunRequest{
RunID: i,
}, nil
}

43
task.go
View File

@ -11,15 +11,26 @@ type Task struct {
IFQL string `json:"ifql"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Last Run `json:"last,omitempty"`
}
// Run is a record created when a run of a task is queued.
type Run struct {
ID ID `json:"id,omitempty"`
Status string `json:"status"`
QueuedAt string `json:"queuedAt"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Log Log `json:"log"`
}
// Log represents a link to a log resource
type Log string
// TaskService represents a service for managing one-off and recurring tasks.
type TaskService interface {
FindTaskByID(ctx context.Context, id ID) (*Task, error)
// Returns a task that matches filter.
FindTask(ctx context.Context, filter TaskFilter) (*Task, error)
// Returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
FindTasks(ctx context.Context, filter TaskFilter) ([]*Task, int, error)
@ -32,6 +43,17 @@ type TaskService interface {
// Removes a task by ID and purges all associated data and scheduled runs
DeleteTask(ctx context.Context, id ID) error
// Returns logs for a run.
FindLogs(ctx context.Context, filter LogFilter) ([]*Log, int, error)
// Returns a list of runs that match a filter and the total count of returned runs.
FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error)
FindRunByID(ctx context.Context, id ID) (*Run, error)
// Creates and returns a new run (which is a retry of another run)
RetryRun(ctx context.Context, id ID) (*Run, error)
}
// TaskUpdate represents updates to a task
@ -45,3 +67,18 @@ type TaskFilter struct {
Organization *ID
User *ID
}
// RunFilter represents a set of filters that restrict the returned results
type RunFilter struct {
Task *ID
After *ID
Limit int
AfterTime string
BeforeTime string
}
// LogFilter represents a set of filters that restrict the returned results
type LogFilter struct {
Task *ID
Run *ID
}