Merge pull request #14454 from influxdata/tasks/executor-metrics

feat(tasks): add new executor prometheus metrics
pull/14533/head
Alirie Gray 2019-07-31 14:40:29 -07:00 committed by GitHub
commit ea0888a7d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 46 deletions

View File

@ -60,10 +60,10 @@ type Run struct {
ID ID `json:"id,omitempty"`
TaskID ID `json:"taskID"`
Status string `json:"status"`
ScheduledFor string `json:"scheduledFor"`
StartedAt string `json:"startedAt,omitempty"`
FinishedAt string `json:"finishedAt,omitempty"`
RequestedAt string `json:"requestedAt,omitempty"`
ScheduledFor string `json:"scheduledFor"` // ScheduledFor is the time the task is scheduled to run at
StartedAt string `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task
FinishedAt string `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task
RequestedAt string `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task
Log []Log `json:"log,omitempty"`
}

View File

@ -10,11 +10,22 @@ import (
type ExecutorMetrics struct {
totalRunsComplete *prometheus.CounterVec
totalRunsActive prometheus.Gauge
activeRuns prometheus.Collector
queueDelta prometheus.Summary
runDuration prometheus.Summary
errorsCounter prometheus.Counter
manualRunsCounter *prometheus.CounterVec
resumeRunsCounter *prometheus.CounterVec
}
func NewExecutorMetrics() *ExecutorMetrics {
type runCollector struct {
totalRunsActive *prometheus.Desc
workersBusy *prometheus.Desc
promiseQueueUsage *prometheus.Desc
te *TaskExecutor
}
func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics {
const namespace = "task"
const subsystem = "executor"
@ -26,12 +37,7 @@ func NewExecutorMetrics() *ExecutorMetrics {
Help: "Total number of runs completed across all tasks, split out by success or failure.",
}, []string{"status"}),
totalRunsActive: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_runs_active",
Help: "Total number of runs across all tasks that have started but not yet completed.",
}),
activeRuns: NewRunCollector(te),
queueDelta: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
@ -40,6 +46,60 @@ func NewExecutorMetrics() *ExecutorMetrics {
Help: "The duration in seconds between a run being due to start and actually starting.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
runDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "run_duration",
Help: "The duration in seconds between a run starting and finishing.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
errorsCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "errors_counter",
Help: "The number of errors thrown by the executor.",
}),
manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "manual_runs_counter",
Help: "Total number of manual runs scheduled to run by task ID",
}, []string{"taskID"}),
resumeRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resume_runs_counter",
Help: "Total number of runs resumed by task ID",
}, []string{"taskID"}),
}
}
// NewRunCollector returns a collector which exports influxdb process metrics.
func NewRunCollector(te *TaskExecutor) prometheus.Collector {
return &runCollector{
workersBusy: prometheus.NewDesc(
"task_executor_workers_busy",
"Percent of total available workers that are currently busy",
nil,
prometheus.Labels{},
),
totalRunsActive: prometheus.NewDesc(
"task_executor_total_runs_active",
"Total number of workers currently running tasks",
nil,
prometheus.Labels{},
),
promiseQueueUsage: prometheus.NewDesc(
"task_executor_promise_queue_usage",
"Percent of the promise queue that is currently full",
nil,
prometheus.Labels{},
),
te: te,
}
}
@ -47,20 +107,44 @@ func NewExecutorMetrics() *ExecutorMetrics {
func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
em.totalRunsComplete,
em.totalRunsActive,
em.activeRuns,
em.queueDelta,
em.errorsCounter,
em.runDuration,
em.manualRunsCounter,
em.resumeRunsCounter,
}
}
// StartRun adjusts the metrics to indicate a run is in progress for the given task ID.
// We are also storing the delta time between when a run is due to start and actually starting.
// StartRun store the delta time between when a run is due to start and actually starting.
func (em *ExecutorMetrics) StartRun(taskID influxdb.ID, queueDelta time.Duration) {
em.totalRunsActive.Inc()
em.queueDelta.Observe(queueDelta.Seconds())
}
// FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.
func (em *ExecutorMetrics) FinishRun(taskID influxdb.ID, status backend.RunStatus) {
em.totalRunsActive.Dec()
func (em *ExecutorMetrics) FinishRun(taskID influxdb.ID, status backend.RunStatus, runDuration time.Duration) {
em.totalRunsComplete.WithLabelValues(status.String()).Inc()
em.runDuration.Observe(runDuration.Seconds())
}
// LogError increments the count of errors.
func (em *ExecutorMetrics) LogError() {
em.errorsCounter.Inc()
}
// Describe returns all descriptions associated with the run collector.
func (r *runCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- r.workersBusy
ch <- r.promiseQueueUsage
ch <- r.totalRunsActive
}
// Collect returns the current state of all metrics of the run collector.
func (r *runCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(r.workersBusy, prometheus.GaugeValue, r.te.WorkersBusy())
ch <- prometheus.MustNewConstMetric(r.promiseQueueUsage, prometheus.GaugeValue, r.te.PromiseQueueUsage())
ch <- prometheus.MustNewConstMetric(r.totalRunsActive, prometheus.GaugeValue, float64(r.te.RunsActive()))
}

View File

@ -32,13 +32,8 @@ func MultiLimit(limits ...LimitFunc) LimitFunc {
// LimitFunc is a function the executor will use to
type LimitFunc func(*influxdb.Run) error
type Metrics interface {
StartRun(influxdb.ID, time.Duration)
FinishRun(influxdb.ID, backend.RunStatus)
}
// NewExecutor creates a new task executor
func NewExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, metrics Metrics) *TaskExecutor {
func NewExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService) (*TaskExecutor, *ExecutorMetrics) {
te := &TaskExecutor{
logger: logger,
ts: ts,
@ -46,19 +41,20 @@ func NewExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.Authoriz
qs: qs,
as: as,
metrics: metrics,
currentPromises: sync.Map{},
promiseQueue: make(chan *Promise, 1000), //TODO(lh): make this configurable
workerLimit: make(chan struct{}, 100), //TODO(lh): make this configurable
limitFunc: func(*influxdb.Run) error { return nil }, // noop
}
te.metrics = NewExecutorMetrics(te)
wm := &workerMaker{
te: te,
}
te.workerPool = sync.Pool{New: wm.new}
return te
return te, te.metrics
}
// TaskExecutor it a task specific executor that works with the new scheduler system.
@ -70,7 +66,7 @@ type TaskExecutor struct {
qs query.QueryService
as influxdb.AuthorizationService
metrics Metrics
metrics *ExecutorMetrics
// currentPromises are all the promises we are made that have not been fulfilled
currentPromises sync.Map
@ -101,14 +97,17 @@ func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt
var err error
// look for manual run by scheduledAt
p, err = e.startManualRun(ctx, iid, scheduledAt)
if err == nil && p != nil {
e.metrics.manualRunsCounter.WithLabelValues(string(iid)).Inc()
goto PROMISEMADE
}
// look in currentlyrunning
p, err = e.resumeRun(ctx, iid, scheduledAt)
if err == nil && p != nil {
e.metrics.resumeRunsCounter.WithLabelValues(string(iid)).Inc()
goto PROMISEMADE
}
@ -155,7 +154,6 @@ func (e *TaskExecutor) startManualRun(ctx context.Context, id influxdb.ID, sched
if err == nil && sa.UTC() == scheduledAt.UTC() {
r, err := e.tcs.StartManualRun(ctx, id, run.ID)
if err != nil {
fmt.Println("err", err)
return nil, err
}
return e.createPromise(ctx, r)
@ -334,11 +332,14 @@ func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) {
w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now(), rs)
// add to metrics
w.te.metrics.FinishRun(p.task.ID, rs)
s, _ := p.run.ScheduledForTime()
rd := time.Since(s)
w.te.metrics.FinishRun(p.task.ID, rs, rd)
// log error
if err != nil {
w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String()))
w.te.metrics.LogError()
p.err = err
} else {
w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String()))
@ -407,6 +408,22 @@ func (w *worker) executeQuery(p *Promise) {
w.finish(p, backend.RunSuccess, runErr)
}
// RunsActive returns the current number of workers, which is equivalent to
// the number of runs actively running
func (e *TaskExecutor) RunsActive() int {
return len(e.workerLimit)
}
// WorkersBusy returns the percent of total workers that are busy
func (e *TaskExecutor) WorkersBusy() float64 {
return float64(len(e.workerLimit)) / float64(cap(e.workerLimit))
}
// PromiseQueueUsage returns the percent of the Promise Queue that is currently filled
func (e *TaskExecutor) PromiseQueueUsage() float64 {
return float64(len(e.promiseQueue)) / float64(cap(e.promiseQueue))
}
// Promise represents a promise the executor makes to finish a run's execution asynchronously.
type Promise struct {
run *influxdb.Run

View File

@ -17,23 +17,18 @@ import (
"github.com/influxdata/influxdb/kit/prom/promtest"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/scheduler"
"go.uber.org/zap/zaptest"
)
type tes struct {
svc *fakeQueryService
ex *TaskExecutor
i *kv.Service
tc testCreds
svc *fakeQueryService
ex *TaskExecutor
metrics *ExecutorMetrics
i *kv.Service
tc testCreds
}
type noopMetrics struct{}
func (*noopMetrics) StartRun(influxdb.ID, time.Duration) {}
func (*noopMetrics) FinishRun(influxdb.ID, backend.RunStatus) {}
func taskExecutorSystem(t *testing.T) tes {
aqs := newFakeQueryService()
qs := query.QueryServiceBridge{
@ -42,12 +37,13 @@ func taskExecutorSystem(t *testing.T) tes {
i := kv.NewService(inmem.NewKVStore())
ex := NewExecutor(zaptest.NewLogger(t), qs, i, i, i, &noopMetrics{})
ex, metrics := NewExecutor(zaptest.NewLogger(t), qs, i, i, i)
return tes{
svc: aqs,
ex: ex,
i: i,
tc: createCreds(t, i),
svc: aqs,
ex: ex,
metrics: metrics,
i: i,
tc: createCreds(t, i),
}
}
@ -302,8 +298,7 @@ func testLimitFunc(t *testing.T) {
func testMetrics(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
metrics := NewExecutorMetrics()
tes.ex.metrics = metrics
metrics := tes.metrics
reg := prom.NewRegistry()
reg.MustRegister(metrics.PrometheusCollectors()...)
@ -360,6 +355,31 @@ func testMetrics(t *testing.T) {
if got := promise.Error(); got != nil {
t.Fatal(got)
}
// manual runs metrics
mt, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
scheduledFor := int64(123)
_, err = tes.i.ForceRun(ctx, mt.ID, scheduledFor)
if err != nil {
t.Fatal(err)
}
scheduledForTime := time.Unix(scheduledFor, 0).UTC()
tes.ex.Execute(ctx, scheduler.ID(mt.ID), scheduledForTime)
mg = promtest.MustGather(t, reg)
m = promtest.MustFindMetric(t, mg, "task_executor_manual_runs_counter", map[string]string{"taskID": string(mt.ID)})
if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 manual run, got %v", got)
}
}
func testIteratorFailure(t *testing.T) {