From e1ecceaca00df9923f784458de00c4c01fb5ff16 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 8 Aug 2018 12:50:24 -0600 Subject: [PATCH] refactor(task): make setting now on every execution By switching to passing a spec it allows us more control --- task/backend/executor/executor.go | 16 +++++++- task/backend/executor/executor_test.go | 53 +++++++++++++++++++++----- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index d4ea4d57cc..150b88c667 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -5,6 +5,7 @@ package executor import ( "context" "sync" + "time" "github.com/influxdata/influxdb/logger" "github.com/influxdata/platform/query" @@ -116,7 +117,13 @@ func (p *syncRunPromise) finish(res *runResult, err error) { } func (p *syncRunPromise) doQuery() { - it, err := p.svc.QueryWithCompile(p.ctx, p.t.Org, p.t.Script) + spec, err := query.Compile(p.ctx, p.t.Script, time.Unix(p.qr.Now, 0)) + if err != nil { + p.finish(nil, err) + return + } + + it, err := p.svc.Query(p.ctx, p.t.Org, spec) if err != nil { // Assume the error should not be part of the runResult. p.finish(nil, err) @@ -164,7 +171,12 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que return nil, err } - q, err := e.svc.QueryWithCompile(ctx, t.Org, t.Script) + spec, err := query.Compile(ctx, t.Script, time.Unix(run.Now, 0)) + if err != nil { + return nil, err + } + + q, err := e.svc.Query(ctx, t.Org, spec) if err != nil { return nil, err } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 7d99679912..3c51addfdf 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -2,6 +2,7 @@ package executor_test import ( "context" + "encoding/json" "errors" "math" "reflect" @@ -27,12 +28,43 @@ type fakeQueryService struct { var _ query.AsyncQueryService = (*fakeQueryService)(nil) +func makeSpec(q string) *query.Spec { + qs, err := query.Compile(context.Background(), q, time.Unix(123, 0)) + if err != nil { + panic(err) + } + return qs +} + +func makeSpecString(q *query.Spec) string { + b, err := json.Marshal(q) + if err != nil { + panic(err) + } + return string(b) +} + func newFakeQueryService() *fakeQueryService { return &fakeQueryService{queries: make(map[string]*fakeQuery)} } -func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, query *query.Spec) (query.Query, error) { - panic("not implemented") +func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, q *query.Spec) (query.Query, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.queryErr != nil { + err := s.queryErr + s.queryErr = nil + return nil, err + } + + fq := &fakeQuery{ + wait: make(chan struct{}), + ready: make(chan map[string]query.Result), + } + s.queries[makeSpecString(q)] = fq + go fq.run() + + return fq, nil } func (s *fakeQueryService) QueryWithCompile(ctx context.Context, orgID platform.ID, q string) (query.Query, error) { @@ -48,7 +80,7 @@ func (s *fakeQueryService) QueryWithCompile(ctx context.Context, orgID platform. wait: make(chan struct{}), ready: make(chan map[string]query.Result), } - s.queries[q] = fq + s.queries[makeSpecString(makeSpec(q))] = fq go fq.run() return fq, nil @@ -60,8 +92,9 @@ func (s *fakeQueryService) SucceedQuery(script string) { defer s.mu.Unlock() // Unblock the query. - close(s.queries[script].wait) - delete(s.queries, script) + spec := makeSpecString(makeSpec(script)) + close(s.queries[spec].wait) + delete(s.queries, spec) } // FailQuery closes the running query's Ready channel and sets its error to the given value. @@ -70,9 +103,10 @@ func (s *fakeQueryService) FailQuery(script string, forced error) { defer s.mu.Unlock() // Unblock the query. - s.queries[script].forcedError = forced - close(s.queries[script].wait) - delete(s.queries, script) + spec := makeSpecString(makeSpec(script)) + s.queries[spec].forcedError = forced + close(s.queries[spec].wait) + delete(s.queries, spec) } // FailNextQuery causes the next call to QueryWithCompile to return the given error. @@ -85,13 +119,14 @@ func (s *fakeQueryService) FailNextQuery(forced error) { // because the execution starts on a separate goroutine. func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) { const attempts = 10 + spec := makeSpecString(makeSpec(script)) for i := 0; i < attempts; i++ { if i != 0 { time.Sleep(5 * time.Millisecond) } s.mu.Lock() - _, ok := s.queries[script] + _, ok := s.queries[spec] s.mu.Unlock() if ok { return