feat(task): Inject Task's LatestSuccess Timestamp In Flux Extern (#19402)
* feat(task): Inject latest success/failure into extern. * chore(task/backend): Don't specify an extern if there are no statements. * chore(task/executor): Don't apply the latest failure for now. * chore(changelog): Add 19402 to changelog. * chore(kit): Introduce feature flag for time injection. * chore(task/executor): Guard injection into extern by feature flag. * chore(task/executor): No need for this subtest pattern. * chore(task/executor): Add tests for extern injection.pull/19435/head
parent
342a29897e
commit
1ae2541bf3
|
@ -6,6 +6,7 @@
|
|||
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
|
||||
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
|
||||
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
|
||||
1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
|
|
@ -138,3 +138,9 @@
|
|||
key: enforceOrgDashboardLimits
|
||||
default: false
|
||||
contact: Compute Team
|
||||
|
||||
- name: Inject Latest Success Time
|
||||
description: Inject the latest successful task run timestamp into a Task query extern when executing.
|
||||
key: injectLatestSuccessTime
|
||||
default: false
|
||||
contact: Compute Team
|
||||
|
|
|
@ -254,6 +254,20 @@ func EnforceOrganizationDashboardLimits() BoolFlag {
|
|||
return enforceOrgDashboardLimits
|
||||
}
|
||||
|
||||
var injectLatestSuccessTime = MakeBoolFlag(
|
||||
"Inject Latest Success Time",
|
||||
"injectLatestSuccessTime",
|
||||
"Compute Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// InjectLatestSuccessTime - Inject the latest successful task run timestamp into a Task query extern when executing.
|
||||
func InjectLatestSuccessTime() BoolFlag {
|
||||
return injectLatestSuccessTime
|
||||
}
|
||||
|
||||
var all = []Flag{
|
||||
appMetrics,
|
||||
backendExample,
|
||||
|
@ -273,6 +287,7 @@ var all = []Flag{
|
|||
pushDownGroupAggregateMinMax,
|
||||
orgOnlyMemberList,
|
||||
enforceOrgDashboardLimits,
|
||||
injectLatestSuccessTime,
|
||||
}
|
||||
|
||||
var byKey = map[string]Flag{
|
||||
|
@ -294,4 +309,5 @@ var byKey = map[string]Flag{
|
|||
"pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax,
|
||||
"orgOnlyMemberList": orgOnlyMemberList,
|
||||
"enforceOrgDashboardLimits": enforceOrgDashboardLimits,
|
||||
"injectLatestSuccessTime": injectLatestSuccessTime,
|
||||
}
|
||||
|
|
|
@ -2,11 +2,13 @@ package executor
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/runtime"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
|
@ -22,6 +24,9 @@ import (
|
|||
const (
|
||||
maxPromises = 1000
|
||||
defaultMaxWorkers = 100
|
||||
|
||||
latestSuccessOption = "tasks.latestSuccessTime"
|
||||
latestFailureOption = "tasks.latestFailureTime"
|
||||
)
|
||||
|
||||
var _ scheduler.Executor = (*Executor)(nil)
|
||||
|
@ -70,7 +75,31 @@ func WithMaxWorkers(n int) executorOption {
|
|||
|
||||
// CompilerBuilderFunc is a function that yields a new flux.Compiler. The
|
||||
// context.Context provided can be assumed to be an authorized context.
|
||||
type CompilerBuilderFunc func(ctx context.Context, query string, now time.Time) (flux.Compiler, error)
|
||||
type CompilerBuilderFunc func(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error)
|
||||
|
||||
// CompilerBuilderTimestamps contains timestamps which should be provided along
|
||||
// with a Task query.
|
||||
type CompilerBuilderTimestamps struct {
|
||||
Now time.Time
|
||||
LatestSuccess time.Time
|
||||
}
|
||||
|
||||
func (ts CompilerBuilderTimestamps) Extern() *ast.File {
|
||||
var body []ast.Statement
|
||||
|
||||
if !ts.LatestSuccess.IsZero() {
|
||||
body = append(body, &ast.OptionStatement{
|
||||
Assignment: &ast.VariableAssignment{
|
||||
ID: &ast.Identifier{Name: latestSuccessOption},
|
||||
Init: &ast.DateTimeLiteral{
|
||||
Value: ts.LatestSuccess,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return &ast.File{Body: body}
|
||||
}
|
||||
|
||||
// WithSystemCompilerBuilder is an Executor option that configures a
|
||||
// CompilerBuilderFunc to be used when compiling queries for System Tasks.
|
||||
|
@ -416,8 +445,6 @@ func (w *worker) start(p *promise) {
|
|||
}
|
||||
|
||||
func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) {
|
||||
|
||||
// trace
|
||||
span, ctx := tracing.StartSpanFromContext(p.ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -471,7 +498,10 @@ func (w *worker) executeQuery(p *promise) {
|
|||
if p.task.Type != influxdb.TaskSystemType {
|
||||
buildCompiler = w.nonSystemBuildCompiler
|
||||
}
|
||||
compiler, err := buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor)
|
||||
compiler, err := buildCompiler(ctx, p.task.Flux, CompilerBuilderTimestamps{
|
||||
Now: p.run.ScheduledFor,
|
||||
LatestSuccess: p.task.LatestSuccess,
|
||||
})
|
||||
if err != nil {
|
||||
w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err))
|
||||
return
|
||||
|
@ -592,21 +622,45 @@ func exhaustResultIterators(res flux.Result) error {
|
|||
}
|
||||
|
||||
// NewASTCompiler parses a Flux query string into an AST representatation.
|
||||
func NewASTCompiler(_ context.Context, query string, now time.Time) (flux.Compiler, error) {
|
||||
func NewASTCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
|
||||
pkg, err := runtime.ParseToJSON(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var externBytes []byte
|
||||
if feature.InjectLatestSuccessTime().Enabled(ctx) {
|
||||
extern := ts.Extern()
|
||||
if len(extern.Body) > 0 {
|
||||
var err error
|
||||
externBytes, err = json.Marshal(extern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return lang.ASTCompiler{
|
||||
AST: pkg,
|
||||
Now: now,
|
||||
AST: pkg,
|
||||
Now: ts.Now,
|
||||
Extern: externBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewFluxCompiler wraps a Flux query string in a raw-query representation.
|
||||
func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compiler, error) {
|
||||
func NewFluxCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
|
||||
var externBytes []byte
|
||||
if feature.InjectLatestSuccessTime().Enabled(ctx) {
|
||||
extern := ts.Extern()
|
||||
if len(extern.Body) > 0 {
|
||||
var err error
|
||||
externBytes, err = json.Marshal(extern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return lang.FluxCompiler{
|
||||
Query: query,
|
||||
Query: query,
|
||||
Extern: externBytes,
|
||||
// TODO(brett): This mitigates an immediate problem where
|
||||
// Checks/Notifications breaks when sending Now, and system Tasks do not
|
||||
// break when sending Now. We are currently sending C+N through using
|
||||
|
@ -617,7 +671,13 @@ func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compile
|
|||
// we are able to locate the root cause and use Flux Compiler for all
|
||||
// Task types.
|
||||
//
|
||||
// This should be removed once we diagnose the problem.
|
||||
// It turns out this is due to the exclusive nature of the stop time in
|
||||
// Flux "from" and that we weren't including the left-hand boundary of
|
||||
// the range check for notifications. We're shipping a fix soon in
|
||||
//
|
||||
// https://github.com/influxdata/influxdb/pull/19392
|
||||
//
|
||||
// Once this has merged, we can send Now again.
|
||||
//
|
||||
// Now: now,
|
||||
}, nil
|
||||
|
|
|
@ -12,14 +12,17 @@ import (
|
|||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
icontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/inmem"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
|
||||
tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||
influxdbmock "github.com/influxdata/influxdb/v2/mock"
|
||||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/v2/task/backend"
|
||||
|
@ -85,19 +88,7 @@ func taskExecutorSystem(t *testing.T) tes {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTaskExecutor(t *testing.T) {
|
||||
t.Run("QuerySuccess", testQuerySuccess)
|
||||
t.Run("QueryFailure", testQueryFailure)
|
||||
t.Run("ManualRun", testManualRun)
|
||||
t.Run("ResumeRun", testResumingRun)
|
||||
t.Run("WorkerLimit", testWorkerLimit)
|
||||
t.Run("LimitFunc", testLimitFunc)
|
||||
t.Run("Metrics", testMetrics)
|
||||
t.Run("IteratorFailure", testIteratorFailure)
|
||||
t.Run("ErrorHandling", testErrorHandling)
|
||||
}
|
||||
|
||||
func testQuerySuccess(t *testing.T) {
|
||||
func TestTaskExecutor_QuerySuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tes := taskExecutorSystem(t)
|
||||
|
@ -133,8 +124,8 @@ func testQuerySuccess(t *testing.T) {
|
|||
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.SucceedQuery(script)
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.SucceedQuery(script, nil)
|
||||
|
||||
<-promise.Done()
|
||||
|
||||
|
@ -165,7 +156,107 @@ func testQuerySuccess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testQueryFailure(t *testing.T) {
|
||||
func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
var (
|
||||
script = fmt.Sprintf(fmtTestScript, t.Name())
|
||||
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
|
||||
span = opentracing.GlobalTracer().StartSpan("test-span")
|
||||
)
|
||||
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||
|
||||
task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: tes.tc.OrgID,
|
||||
OwnerID: tes.tc.Auth.GetUserID(),
|
||||
Flux: script,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Simulate previous run to establish a timestamp
|
||||
latestSuccess := time.Now().UTC()
|
||||
task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{
|
||||
LatestSuccess: &latestSuccess,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
extern := &ast.File{
|
||||
Body: []ast.Statement{&ast.OptionStatement{
|
||||
Assignment: &ast.VariableAssignment{
|
||||
ID: &ast.Identifier{Name: latestSuccessOption},
|
||||
Init: &ast.DateTimeLiteral{
|
||||
Value: latestSuccess,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.InjectLatestSuccessTime(): true,
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
promiseID := influxdb.ID(promise.ID())
|
||||
|
||||
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if run.ID != promiseID {
|
||||
t.Fatal("promise and run dont match")
|
||||
}
|
||||
|
||||
if run.RunAt != time.Unix(126, 0).UTC() {
|
||||
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script, extern)
|
||||
tes.svc.SucceedQuery(script, extern)
|
||||
|
||||
<-promise.Done()
|
||||
|
||||
if got := promise.Error(); got != nil {
|
||||
t.Fatal(got)
|
||||
}
|
||||
|
||||
// confirm run is removed from in-mem store
|
||||
run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID)
|
||||
if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") {
|
||||
t.Fatal("run was returned when it should have been removed from kv")
|
||||
}
|
||||
|
||||
// ensure the run returned by TaskControlService.FinishRun(...)
|
||||
// has run logs formatted as expected
|
||||
if run = tes.tcs.run; run == nil {
|
||||
t.Fatal("expected run returned by FinishRun to not be nil")
|
||||
}
|
||||
|
||||
if len(run.Log) < 3 {
|
||||
t.Fatalf("expected 3 run logs, found %d", len(run.Log))
|
||||
}
|
||||
|
||||
sctx := span.Context().(jaeger.SpanContext)
|
||||
expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID())
|
||||
if expectedMessage != run.Log[1].Message {
|
||||
t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskExecutor_QueryFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -191,8 +282,8 @@ func testQueryFailure(t *testing.T) {
|
|||
t.Fatal("promise and run dont match")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))
|
||||
|
||||
<-promise.Done()
|
||||
|
||||
|
@ -201,7 +292,7 @@ func testQueryFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testManualRun(t *testing.T) {
|
||||
func TestManualRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -240,15 +331,15 @@ func testManualRun(t *testing.T) {
|
|||
t.Fatal("promise and run and manual run dont match")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.SucceedQuery(script)
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.SucceedQuery(script, nil)
|
||||
|
||||
if got := promise.Error(); got != nil {
|
||||
t.Fatal(got)
|
||||
}
|
||||
}
|
||||
|
||||
func testResumingRun(t *testing.T) {
|
||||
func TestTaskExecutor_ResumingRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -283,15 +374,15 @@ func testResumingRun(t *testing.T) {
|
|||
t.Fatal("promise and run and manual run dont match")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.SucceedQuery(script)
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.SucceedQuery(script, nil)
|
||||
|
||||
if got := promise.Error(); got != nil {
|
||||
t.Fatal(got)
|
||||
}
|
||||
}
|
||||
|
||||
func testWorkerLimit(t *testing.T) {
|
||||
func TestTaskExecutor_WorkerLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -311,8 +402,8 @@ func testWorkerLimit(t *testing.T) {
|
|||
t.Fatal("expected a worker to be started")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))
|
||||
|
||||
<-promise.Done()
|
||||
|
||||
|
@ -321,7 +412,7 @@ func testWorkerLimit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testLimitFunc(t *testing.T) {
|
||||
func TestTaskExecutor_LimitFunc(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -360,7 +451,7 @@ func testLimitFunc(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testMetrics(t *testing.T) {
|
||||
func TestTaskExecutor_Metrics(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
metrics := tes.metrics
|
||||
|
@ -395,7 +486,7 @@ func testMetrics(t *testing.T) {
|
|||
t.Fatal("promise and run dont match")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
|
||||
mg = promtest.MustGather(t, reg)
|
||||
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
|
||||
|
@ -403,7 +494,7 @@ func testMetrics(t *testing.T) {
|
|||
t.Fatalf("expected 1 total runs active, got %v", got)
|
||||
}
|
||||
|
||||
tes.svc.SucceedQuery(script)
|
||||
tes.svc.SucceedQuery(script, nil)
|
||||
<-promise.Done()
|
||||
|
||||
mg = promtest.MustGather(t, reg)
|
||||
|
@ -457,7 +548,7 @@ func testMetrics(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func testIteratorFailure(t *testing.T) {
|
||||
func TestTaskExecutor_IteratorFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -495,8 +586,8 @@ func testIteratorFailure(t *testing.T) {
|
|||
t.Fatal("promise and run dont match")
|
||||
}
|
||||
|
||||
tes.svc.WaitForQueryLive(t, script)
|
||||
tes.svc.SucceedQuery(script)
|
||||
tes.svc.WaitForQueryLive(t, script, nil)
|
||||
tes.svc.SucceedQuery(script, nil)
|
||||
|
||||
<-promise.Done()
|
||||
|
||||
|
@ -505,7 +596,7 @@ func testIteratorFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testErrorHandling(t *testing.T) {
|
||||
func TestTaskExecutor_ErrorHandling(t *testing.T) {
|
||||
t.Parallel()
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
|
@ -551,7 +642,7 @@ func testErrorHandling(t *testing.T) {
|
|||
*/
|
||||
}
|
||||
|
||||
func TestPromiseFailure(t *testing.T) {
|
||||
func TestTaskExecutor_PromiseFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tes := taskExecutorSystem(t)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/memory"
|
||||
|
@ -31,14 +32,24 @@ type fakeQueryService struct {
|
|||
|
||||
var _ query.AsyncQueryService = (*fakeQueryService)(nil)
|
||||
|
||||
func makeAST(q string) lang.ASTCompiler {
|
||||
func makeAST(q string, extern *ast.File) lang.ASTCompiler {
|
||||
pkg, err := runtime.ParseToJSON(q)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var externBytes []byte
|
||||
if extern != nil && len(extern.Body) > 0 {
|
||||
var err error
|
||||
externBytes, err = json.Marshal(extern)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return lang.ASTCompiler{
|
||||
AST: pkg,
|
||||
Now: time.Unix(123, 0),
|
||||
AST: pkg,
|
||||
Now: time.Unix(123, 0),
|
||||
Extern: externBytes,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,12 +96,12 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.
|
|||
}
|
||||
|
||||
// SucceedQuery allows the running query matching the given script to return on its Ready channel.
|
||||
func (s *fakeQueryService) SucceedQuery(script string) {
|
||||
func (s *fakeQueryService) SucceedQuery(script string, extern *ast.File) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Unblock the flux.
|
||||
ast := makeAST(script)
|
||||
ast := makeAST(script, extern)
|
||||
spec := makeASTString(ast)
|
||||
fq, ok := s.queries[spec]
|
||||
if !ok {
|
||||
|
@ -103,12 +114,12 @@ func (s *fakeQueryService) SucceedQuery(script string) {
|
|||
}
|
||||
|
||||
// FailQuery closes the running query's Ready channel and sets its error to the given value.
|
||||
func (s *fakeQueryService) FailQuery(script string, forced error) {
|
||||
func (s *fakeQueryService) FailQuery(script string, extern *ast.File, forced error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Unblock the flux.
|
||||
ast := makeAST(script)
|
||||
ast := makeAST(script, nil)
|
||||
spec := makeASTString(ast)
|
||||
fq, ok := s.queries[spec]
|
||||
if !ok {
|
||||
|
@ -129,12 +140,12 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
|
|||
// WaitForQueryLive ensures that the query has made it into the service.
|
||||
// This is particularly useful for the synchronous executor,
|
||||
// because the execution starts on a separate goroutine.
|
||||
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
|
||||
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string, extern *ast.File) {
|
||||
t.Helper()
|
||||
|
||||
const attempts = 10
|
||||
ast := makeAST(script)
|
||||
astUTC := makeAST(script)
|
||||
ast := makeAST(script, extern)
|
||||
astUTC := makeAST(script, extern)
|
||||
astUTC.Now = ast.Now.UTC()
|
||||
spec := makeASTString(ast)
|
||||
specUTC := makeASTString(astUTC)
|
||||
|
|
Loading…
Reference in New Issue