fix(tasks): use influxdb errors in scheduler (#15145)
parent
5040dc7036
commit
aef199bcc1
|
@ -59,7 +59,7 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics {
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "errors_counter",
|
Name: "errors_counter",
|
||||||
Help: "The number of errors thrown by the executor with the type of error (ex. Flux compile, query, etc).",
|
Help: "The number of errors thrown by the executor with the type of error (ex. Invalid, Internal, etc.)",
|
||||||
}, []string{"errorType"}),
|
}, []string{"errorType"}),
|
||||||
|
|
||||||
manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
|
manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/influxdb"
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
|
@ -711,17 +712,21 @@ func (r *runner) clearRunning(id platform.ID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fail sets r's state to failed, and marks this runner as idle.
|
// fail sets r's state to failed, and marks this runner as idle.
|
||||||
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) {
|
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason influxdb.Error) {
|
||||||
if err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()); err != nil {
|
influxErr := &reason
|
||||||
|
err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
runLogger.Info("Failed to update run log", zap.Error(err))
|
runLogger.Info("Failed to update run log", zap.Error(err))
|
||||||
|
influxErr = influxdb.ErrCouldNotLogError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.updateRunState(qr, RunFail, runLogger)
|
r.updateRunState(qr, RunFail, runLogger, influxErr)
|
||||||
atomic.StoreUint32(r.state, runnerIdle)
|
atomic.StoreUint32(r.state, runnerIdle)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
|
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
|
||||||
r.updateRunState(qr, RunStarted, runLogger)
|
r.updateRunState(qr, RunStarted, runLogger, nil)
|
||||||
qr.startedAt = time.Now()
|
qr.startedAt = time.Now()
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
errMsg := "Failed to finish run"
|
errMsg := "Failed to finish run"
|
||||||
|
@ -743,7 +748,9 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
||||||
runLogger.Info("Failed to begin run execution", zap.Error(err))
|
runLogger.Info("Failed to begin run execution", zap.Error(err))
|
||||||
errMsg = "Beginning run execution failed, " + errMsg
|
errMsg = "Beginning run execution failed, " + errMsg
|
||||||
// TODO(mr): retry?
|
// TODO(mr): retry?
|
||||||
r.fail(qr, runLogger, "Run failed to begin execution", err)
|
|
||||||
|
influxErr := *influxdb.ErrRunExecutionError(err)
|
||||||
|
r.fail(qr, runLogger, "Run failed to begin execution", influxErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -769,7 +776,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
||||||
close(ready)
|
close(ready)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == platform.ErrRunCanceled {
|
if err == platform.ErrRunCanceled {
|
||||||
r.updateRunState(qr, RunCanceled, runLogger)
|
r.updateRunState(qr, RunCanceled, runLogger, err.(*influxdb.Error))
|
||||||
errMsg = "Waiting for execution result failed, " + errMsg
|
errMsg = "Waiting for execution result failed, " + errMsg
|
||||||
// Move on to the next execution, for a canceled run.
|
// Move on to the next execution, for a canceled run.
|
||||||
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
||||||
|
@ -779,7 +786,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
||||||
runLogger.Info("Failed to wait for execution result", zap.Error(err))
|
runLogger.Info("Failed to wait for execution result", zap.Error(err))
|
||||||
|
|
||||||
// TODO(mr): retry?
|
// TODO(mr): retry?
|
||||||
r.fail(qr, runLogger, "Waiting for execution result", err)
|
r.fail(qr, runLogger, "Waiting for execution result", *influxdb.ErrRunExecutionError(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := rr.Err(); err != nil {
|
if err := rr.Err(); err != nil {
|
||||||
|
@ -787,28 +794,31 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
||||||
errMsg = "Run failed to execute, " + errMsg
|
errMsg = "Run failed to execute, " + errMsg
|
||||||
|
|
||||||
// TODO(mr): retry?
|
// TODO(mr): retry?
|
||||||
r.fail(qr, runLogger, "Run failed to execute", err)
|
r.fail(qr, runLogger, "Run failed to execute", *influxdb.ErrRunExecutionError(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
stats := rr.Statistics()
|
stats := rr.Statistics()
|
||||||
|
|
||||||
|
var influxErr *influxdb.Error
|
||||||
b, err := json.Marshal(stats)
|
b, err := json.Marshal(stats)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
influxErr = influxdb.ErrJsonMarshalError(err)
|
||||||
|
} else {
|
||||||
// authctx can be updated mid process
|
// authctx can be updated mid process
|
||||||
r.ts.nextDueMu.RLock()
|
r.ts.nextDueMu.RLock()
|
||||||
authCtx := r.ts.authCtx
|
authCtx := r.ts.authCtx
|
||||||
r.ts.nextDueMu.RUnlock()
|
r.ts.nextDueMu.RUnlock()
|
||||||
r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b))
|
r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b))
|
||||||
}
|
}
|
||||||
r.updateRunState(qr, RunSuccess, runLogger)
|
r.updateRunState(qr, RunSuccess, runLogger, influxErr)
|
||||||
runLogger.Debug("Execution succeeded")
|
runLogger.Debug("Execution succeeded")
|
||||||
|
|
||||||
// Check again if there is a new run available, without returning to idle state.
|
// Check again if there is a new run available, without returning to idle state.
|
||||||
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) {
|
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err *influxdb.Error) {
|
||||||
switch s {
|
switch s {
|
||||||
case RunStarted:
|
case RunStarted:
|
||||||
dueAt := time.Unix(qr.DueAt, 0)
|
dueAt := time.Unix(qr.DueAt, 0)
|
||||||
|
@ -827,6 +837,9 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger
|
||||||
// There is not really a notion of being queued in this runner architecture.
|
// There is not really a notion of being queued in this runner architecture.
|
||||||
runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
|
runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
r.ts.metrics.LogError(err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := r.taskControlService.UpdateRunState(r.ctx, r.task.ID, qr.RunID, time.Now(), s); err != nil {
|
if err := r.taskControlService.UpdateRunState(r.ctx, r.task.ID, qr.RunID, time.Now(), s); err != nil {
|
||||||
runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
|
runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
|
||||||
|
|
|
@ -3,6 +3,7 @@ package backend
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,6 +17,8 @@ type schedulerMetrics struct {
|
||||||
runsComplete *prometheus.CounterVec
|
runsComplete *prometheus.CounterVec
|
||||||
runsActive *prometheus.GaugeVec
|
runsActive *prometheus.GaugeVec
|
||||||
|
|
||||||
|
errorsCounter *prometheus.CounterVec
|
||||||
|
|
||||||
claimsComplete *prometheus.CounterVec
|
claimsComplete *prometheus.CounterVec
|
||||||
claimsActive prometheus.Gauge
|
claimsActive prometheus.Gauge
|
||||||
|
|
||||||
|
@ -54,6 +57,13 @@ func newSchedulerMetrics() *schedulerMetrics {
|
||||||
Help: "Total number of runs that have started but not yet completed, split out by task ID.",
|
Help: "Total number of runs that have started but not yet completed, split out by task ID.",
|
||||||
}, []string{"task_id"}),
|
}, []string{"task_id"}),
|
||||||
|
|
||||||
|
errorsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "errors_counter",
|
||||||
|
Help: "The number of errors thrown by scheduler with the type of error (ex. Invalid, Internal, etc.",
|
||||||
|
}, []string{"error_type"}),
|
||||||
|
|
||||||
claimsComplete: prometheus.NewCounterVec(prometheus.CounterOpts{
|
claimsComplete: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
|
@ -94,6 +104,7 @@ func (sm *schedulerMetrics) PrometheusCollectors() []prometheus.Collector {
|
||||||
sm.claimsActive,
|
sm.claimsActive,
|
||||||
sm.queueDelta,
|
sm.queueDelta,
|
||||||
sm.executionDelta,
|
sm.executionDelta,
|
||||||
|
sm.errorsCounter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +131,11 @@ func (sm *schedulerMetrics) FinishRun(tid string, succeeded bool, executionDelta
|
||||||
sm.executionDelta.WithLabelValues(tid).Observe(executionDelta.Seconds())
|
sm.executionDelta.WithLabelValues(tid).Observe(executionDelta.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LogError increments the count of errors.
|
||||||
|
func (sm *schedulerMetrics) LogError(err *influxdb.Error) {
|
||||||
|
sm.errorsCounter.WithLabelValues(err.Code).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
// ClaimTask adjusts the metrics to indicate the result of an attempted claim.
|
// ClaimTask adjusts the metrics to indicate the result of an attempted claim.
|
||||||
func (sm *schedulerMetrics) ClaimTask(succeeded bool) {
|
func (sm *schedulerMetrics) ClaimTask(succeeded bool) {
|
||||||
status := statusString(succeeded)
|
status := statusString(succeeded)
|
||||||
|
|
|
@ -921,6 +921,11 @@ func TestScheduler_Metrics(t *testing.T) {
|
||||||
t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got)
|
t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m = promtest.MustFindMetric(t, mfs, "task_scheduler_errors_counter", map[string]string{"error_type": "internal error"})
|
||||||
|
if got := *m.Counter.Value; got != 1 {
|
||||||
|
t.Fatalf("expected error type in metric to be internal error, got %v", got)
|
||||||
|
}
|
||||||
|
|
||||||
// Runs label removed after task released.
|
// Runs label removed after task released.
|
||||||
if err := s.ReleaseTask(task.ID); err != nil {
|
if err := s.ReleaseTask(task.ID); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -53,6 +53,11 @@ var (
|
||||||
Msg: "run not found",
|
Msg: "run not found",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ErrRunKeyNotFound = &Error{
|
||||||
|
Code: ENotFound,
|
||||||
|
Msg: "run key not found",
|
||||||
|
}
|
||||||
|
|
||||||
ErrPageSizeTooSmall = &Error{
|
ErrPageSizeTooSmall = &Error{
|
||||||
Msg: "cannot have negative page limit",
|
Msg: "cannot have negative page limit",
|
||||||
Code: EInvalid,
|
Code: EInvalid,
|
||||||
|
@ -109,7 +114,7 @@ func ErrQueryError(err error) *Error {
|
||||||
// ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor
|
// ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor
|
||||||
func ErrResultIteratorError(err error) *Error {
|
func ErrResultIteratorError(err error) *Error {
|
||||||
return &Error{
|
return &Error{
|
||||||
Code: EInternal,
|
Code: EInvalid,
|
||||||
Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err),
|
Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err),
|
||||||
Op: "kv/taskExecutor",
|
Op: "kv/taskExecutor",
|
||||||
Err: err,
|
Err: err,
|
||||||
|
@ -161,3 +166,30 @@ func ErrRunNotDueYet(dueAt int64) *Error {
|
||||||
Msg: fmt.Sprintf("run not due until: %v", time.Unix(dueAt, 0).UTC().Format(time.RFC3339)),
|
Msg: fmt.Sprintf("run not due until: %v", time.Unix(dueAt, 0).UTC().Format(time.RFC3339)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ErrCouldNotLogError(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInternal,
|
||||||
|
Msg: fmt.Sprintf("unable to log error; Err: %v", err),
|
||||||
|
Op: "kv/taskScheduler",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ErrJsonMarshalError(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInvalid,
|
||||||
|
Msg: fmt.Sprintf("unable to marshal JSON; Err: %v", err),
|
||||||
|
Op: "kv/taskScheduler",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ErrRunExecutionError(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInternal,
|
||||||
|
Msg: fmt.Sprintf("could not execute task run; Err: %v", err),
|
||||||
|
Op: "kv/taskScheduler",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue