refactor(task): make setting now on every execution
By switching to passing a spec it allows us more controlpull/10616/head
parent
3873ac0c65
commit
e1ecceaca0
|
@ -5,6 +5,7 @@ package executor
|
|||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/platform/query"
|
||||
|
@ -116,7 +117,13 @@ func (p *syncRunPromise) finish(res *runResult, err error) {
|
|||
}
|
||||
|
||||
func (p *syncRunPromise) doQuery() {
|
||||
it, err := p.svc.QueryWithCompile(p.ctx, p.t.Org, p.t.Script)
|
||||
spec, err := query.Compile(p.ctx, p.t.Script, time.Unix(p.qr.Now, 0))
|
||||
if err != nil {
|
||||
p.finish(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
it, err := p.svc.Query(p.ctx, p.t.Org, spec)
|
||||
if err != nil {
|
||||
// Assume the error should not be part of the runResult.
|
||||
p.finish(nil, err)
|
||||
|
@ -164,7 +171,12 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que
|
|||
return nil, err
|
||||
}
|
||||
|
||||
q, err := e.svc.QueryWithCompile(ctx, t.Org, t.Script)
|
||||
spec, err := query.Compile(ctx, t.Script, time.Unix(run.Now, 0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q, err := e.svc.Query(ctx, t.Org, spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package executor_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"math"
|
||||
"reflect"
|
||||
|
@ -27,12 +28,43 @@ type fakeQueryService struct {
|
|||
|
||||
var _ query.AsyncQueryService = (*fakeQueryService)(nil)
|
||||
|
||||
func makeSpec(q string) *query.Spec {
|
||||
qs, err := query.Compile(context.Background(), q, time.Unix(123, 0))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return qs
|
||||
}
|
||||
|
||||
func makeSpecString(q *query.Spec) string {
|
||||
b, err := json.Marshal(q)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func newFakeQueryService() *fakeQueryService {
|
||||
return &fakeQueryService{queries: make(map[string]*fakeQuery)}
|
||||
}
|
||||
|
||||
func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, query *query.Spec) (query.Query, error) {
|
||||
panic("not implemented")
|
||||
func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, q *query.Spec) (query.Query, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.queryErr != nil {
|
||||
err := s.queryErr
|
||||
s.queryErr = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fq := &fakeQuery{
|
||||
wait: make(chan struct{}),
|
||||
ready: make(chan map[string]query.Result),
|
||||
}
|
||||
s.queries[makeSpecString(q)] = fq
|
||||
go fq.run()
|
||||
|
||||
return fq, nil
|
||||
}
|
||||
|
||||
func (s *fakeQueryService) QueryWithCompile(ctx context.Context, orgID platform.ID, q string) (query.Query, error) {
|
||||
|
@ -48,7 +80,7 @@ func (s *fakeQueryService) QueryWithCompile(ctx context.Context, orgID platform.
|
|||
wait: make(chan struct{}),
|
||||
ready: make(chan map[string]query.Result),
|
||||
}
|
||||
s.queries[q] = fq
|
||||
s.queries[makeSpecString(makeSpec(q))] = fq
|
||||
go fq.run()
|
||||
|
||||
return fq, nil
|
||||
|
@ -60,8 +92,9 @@ func (s *fakeQueryService) SucceedQuery(script string) {
|
|||
defer s.mu.Unlock()
|
||||
|
||||
// Unblock the query.
|
||||
close(s.queries[script].wait)
|
||||
delete(s.queries, script)
|
||||
spec := makeSpecString(makeSpec(script))
|
||||
close(s.queries[spec].wait)
|
||||
delete(s.queries, spec)
|
||||
}
|
||||
|
||||
// FailQuery closes the running query's Ready channel and sets its error to the given value.
|
||||
|
@ -70,9 +103,10 @@ func (s *fakeQueryService) FailQuery(script string, forced error) {
|
|||
defer s.mu.Unlock()
|
||||
|
||||
// Unblock the query.
|
||||
s.queries[script].forcedError = forced
|
||||
close(s.queries[script].wait)
|
||||
delete(s.queries, script)
|
||||
spec := makeSpecString(makeSpec(script))
|
||||
s.queries[spec].forcedError = forced
|
||||
close(s.queries[spec].wait)
|
||||
delete(s.queries, spec)
|
||||
}
|
||||
|
||||
// FailNextQuery causes the next call to QueryWithCompile to return the given error.
|
||||
|
@ -85,13 +119,14 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
|
|||
// because the execution starts on a separate goroutine.
|
||||
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
|
||||
const attempts = 10
|
||||
spec := makeSpecString(makeSpec(script))
|
||||
for i := 0; i < attempts; i++ {
|
||||
if i != 0 {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
_, ok := s.queries[script]
|
||||
_, ok := s.queries[spec]
|
||||
s.mu.Unlock()
|
||||
if ok {
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue