Update the scheduler to add a few task logs (#1029)
parent
5f9b0e951e
commit
610faf18e7
|
@ -294,7 +294,20 @@ func decodeDeleteTaskRequest(ctx context.Context, r *http.Request) (*deleteTaskR
|
||||||
func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
|
func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
req, err := decodeGetLogsRequest(ctx, r)
|
tok, err := GetToken(r)
|
||||||
|
if err != nil {
|
||||||
|
EncodeError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
auth, err := h.AuthorizationService.FindAuthorizationByToken(ctx, tok)
|
||||||
|
if err != nil {
|
||||||
|
EncodeError(ctx, kerrors.Wrap(err, "invalid token", kerrors.InvalidData), w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx = pcontext.SetAuthorization(ctx, auth)
|
||||||
|
|
||||||
|
req, err := decodeGetLogsRequest(ctx, r, h.OrganizationService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
return
|
return
|
||||||
|
@ -316,7 +329,7 @@ type getLogsRequest struct {
|
||||||
filter platform.LogFilter
|
filter platform.LogFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeGetLogsRequest(ctx context.Context, r *http.Request) (*getLogsRequest, error) {
|
func decodeGetLogsRequest(ctx context.Context, r *http.Request, orgs platform.OrganizationService) (*getLogsRequest, error) {
|
||||||
params := httprouter.ParamsFromContext(ctx)
|
params := httprouter.ParamsFromContext(ctx)
|
||||||
id := params.ByName("tid")
|
id := params.ByName("tid")
|
||||||
if id == "" {
|
if id == "" {
|
||||||
|
@ -329,6 +342,17 @@ func decodeGetLogsRequest(ctx context.Context, r *http.Request) (*getLogsRequest
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qp := r.URL.Query()
|
||||||
|
|
||||||
|
if orgName := qp.Get("org"); orgName != "" {
|
||||||
|
o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.filter.Org = &o.ID
|
||||||
|
}
|
||||||
|
|
||||||
if id := params.ByName("rid"); id != "" {
|
if id := params.ByName("rid"); id != "" {
|
||||||
req.filter.Run = &platform.ID{}
|
req.filter.Run = &platform.ID{}
|
||||||
if err := req.filter.Run.DecodeFromString(id); err != nil {
|
if err := req.filter.Run.DecodeFromString(id); err != nil {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package backend
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -580,19 +581,6 @@ func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) {
|
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) {
|
||||||
switch s {
|
|
||||||
case RunStarted:
|
|
||||||
r.ts.metrics.StartRun(r.task.ID.String())
|
|
||||||
case RunSuccess:
|
|
||||||
r.ts.metrics.FinishRun(r.task.ID.String(), true)
|
|
||||||
case RunFail, RunCanceled:
|
|
||||||
r.ts.metrics.FinishRun(r.task.ID.String(), false)
|
|
||||||
default:
|
|
||||||
// We are deliberately not handling RunQueued yet.
|
|
||||||
// There is not really a notion of being queued in this runner architecture.
|
|
||||||
runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
|
|
||||||
}
|
|
||||||
|
|
||||||
rlb := RunLogBase{
|
rlb := RunLogBase{
|
||||||
Task: r.task,
|
Task: r.task,
|
||||||
RunID: qr.RunID,
|
RunID: qr.RunID,
|
||||||
|
@ -600,6 +588,24 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger
|
||||||
RequestedAt: qr.RequestedAt,
|
RequestedAt: qr.RequestedAt,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch s {
|
||||||
|
case RunStarted:
|
||||||
|
r.ts.metrics.StartRun(r.task.ID.String())
|
||||||
|
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), fmt.Sprintf("Started task from script: %q", r.task.Script))
|
||||||
|
case RunSuccess:
|
||||||
|
r.ts.metrics.FinishRun(r.task.ID.String(), true)
|
||||||
|
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Completed successfully")
|
||||||
|
case RunFail:
|
||||||
|
r.ts.metrics.FinishRun(r.task.ID.String(), false)
|
||||||
|
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Failed")
|
||||||
|
case RunCanceled:
|
||||||
|
r.ts.metrics.FinishRun(r.task.ID.String(), false)
|
||||||
|
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Canceled")
|
||||||
|
default: // We are deliberately not handling RunQueued yet.
|
||||||
|
// There is not really a notion of being queued in this runner architecture.
|
||||||
|
runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
|
||||||
|
}
|
||||||
|
|
||||||
// Arbitrarily chosen short time limit for how fast the log write must complete.
|
// Arbitrarily chosen short time limit for how fast the log write must complete.
|
||||||
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
|
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
|
||||||
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
|
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
|
||||||
|
|
Loading…
Reference in New Issue