test: bump & add timeouts in executor tests to avoid flaky failures (#20413)
parent
7f3f562b67
commit
6b4bde68fa
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
|
||||
"github.com/influxdata/influxdb/v2/tenant"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/uber/jaeger-client-go"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
@ -380,92 +381,66 @@ func testMetrics(t *testing.T) {
|
|||
|
||||
mg := promtest.MustGather(t, reg)
|
||||
m := promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||
if got := *m.Gauge.Value; got != 0 {
|
||||
t.Fatalf("expected 0 total runs active, got %v", got)
|
||||
}
|
||||
assert.EqualValues(t, 0, *m.Gauge.Value, "unexpected number of active runs")
|
||||
|
||||
script := fmt.Sprintf(fmtTestScript, t.Name())
|
||||
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
|
||||
task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
promiseID := influxdb.ID(promise.ID())
|
||||
assert.NoError(t, err)
|
||||
promiseID := promise.ID()
|
||||
|
||||
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if run.ID != promiseID {
|
||||
t.Fatal("promise and run dont match")
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, promiseID, run.ID, "promise and run dont match")
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
|
||||
mg = promtest.MustGather(t, reg)
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||
if got := *m.Gauge.Value; got != 1 {
|
||||
t.Fatalf("expected 1 total runs active, got %v", got)
|
||||
}
|
||||
assert.EqualValues(t, 1, *m.Gauge.Value, "unexpected number of active runs")
|
||||
|
||||
tes.svc.SucceedQuery(script)
|
||||
<-promise.Done()
|
||||
|
||||
// N.B. You might think the _runs_complete and _runs_active metrics are updated atomically,
|
||||
// but that's not the case. As a task run completes and is being cleaned up, there's a small
|
||||
// window where it can be counted under both metrics.
|
||||
//
|
||||
// Our CI is very good at hitting this window, causing failures when we assert on the metric
|
||||
// values below. We sleep a small amount before gathering metrics to avoid flaky errors.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
mg = promtest.MustGather(t, reg)
|
||||
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_complete", map[string]string{"task_type": "", "status": "success"})
|
||||
if got := *m.Counter.Value; got != 1 {
|
||||
t.Fatalf("expected 1 active runs, got %v", got)
|
||||
}
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||
if got := *m.Gauge.Value; got != 0 {
|
||||
t.Fatalf("expected 0 total runs active, got %v", got)
|
||||
}
|
||||
assert.EqualValues(t, 1, *m.Counter.Value, "unexpected number of successful runs")
|
||||
|
||||
if got := promise.Error(); got != nil {
|
||||
t.Fatal(got)
|
||||
}
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||
assert.EqualValues(t, 0, *m.Gauge.Value, "unexpected number of active runs")
|
||||
|
||||
assert.NoError(t, promise.Error())
|
||||
|
||||
// manual runs metrics
|
||||
mt, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
scheduledFor := int64(123)
|
||||
|
||||
r, err := tes.i.ForceRun(ctx, mt.ID, scheduledFor)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = tes.ex.ManualRun(ctx, mt.ID, r.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
mg = promtest.MustGather(t, reg)
|
||||
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_manual_runs_counter", map[string]string{"taskID": string(mt.ID.String())})
|
||||
if got := *m.Counter.Value; got != 1 {
|
||||
t.Fatalf("expected 1 manual run, got %v", got)
|
||||
}
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_manual_runs_counter", map[string]string{"taskID": mt.ID.String()})
|
||||
assert.EqualValues(t, 1, *m.Counter.Value, "unexpected number of manual runs")
|
||||
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_run_latency_seconds", map[string]string{"task_type": ""})
|
||||
if got := *m.Histogram.SampleCount; got < 1 {
|
||||
t.Fatal("expected to find run latency metric")
|
||||
}
|
||||
|
||||
if got := *m.Histogram.SampleSum; got <= 100 {
|
||||
t.Fatalf("expected run latency metric to be very large, got %v", got)
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, *m.Histogram.SampleCount, uint64(1), "run latency metric not found")
|
||||
assert.Greater(t, *m.Histogram.SampleSum, float64(100), "run latency metric unexpectedly small")
|
||||
}
|
||||
|
||||
func testIteratorFailure(t *testing.T) {
|
||||
|
|
|
@ -131,7 +131,7 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
|
|||
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
|
||||
t.Helper()
|
||||
|
||||
const attempts = 10
|
||||
const attempts = 100
|
||||
ast := makeAST(script)
|
||||
astUTC := makeAST(script)
|
||||
astUTC.Now = ast.Now.UTC()
|
||||
|
@ -139,7 +139,7 @@ func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
|
|||
specUTC := makeASTString(astUTC)
|
||||
for i := 0; i < attempts; i++ {
|
||||
if i != 0 {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
|
|
Loading…
Reference in New Issue