refactor(task): make tasks use new Query interface (#13219)

Fixes #12883.
pull/13405/head
Christopher M. Wolff 2019-04-08 09:18:09 -07:00
parent dec149d22d
commit 16b9158b39
5 changed files with 61 additions and 52 deletions

View File

@ -197,11 +197,18 @@ func (tl *TestLauncher) ExecuteQuery(q string) (*QueryResults, error) {
if err != nil {
return nil, err
}
if err = fq.Err(); err != nil {
return nil, fq.Err()
results := make([]flux.Result, 0, 1)
for res := range fq.Results() {
results = append(results, res)
}
if fq.Err() != nil {
return nil, err
}
return &QueryResults{
Results: <-fq.Ready(),
Results: results,
Query: fq,
}, nil
}
@ -335,7 +342,7 @@ func (r *QueryResult) TablesN() int {
// QueryResults wraps a set of query results with some helper methods.
type QueryResults struct {
Results map[string]flux.Result
Results []flux.Result
Query flux.Query
}
@ -360,14 +367,14 @@ func (r *QueryResults) HasTableCount(t *testing.T, n int) {
}
}
// Names returns the sorted set of table names for the query results.
// Names returns the sorted set of result names for the query results.
func (r *QueryResults) Names() []string {
if len(r.Results) == 0 {
return nil
}
names := make([]string, len(r.Results), 0)
for k := range r.Results {
names = append(names, k)
for _, r := range r.Results {
names = append(names, r.Name())
}
return names
}

View File

@ -169,14 +169,14 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
want := make(map[string][]*executetest.Table) // Want the original.
i = 0
for k, v := range res.Results {
for _, r := range res.Results {
i++
if err := v.Tables().Do(func(tbl flux.Table) error {
if err := r.Tables().Do(func(tbl flux.Table) error {
ct, err := executetest.ConvertTable(tbl)
if err != nil {
return err
}
want[k] = append(want[k], ct)
want[r.Name()] = append(want[r.Name()], ct)
return nil
}); err != nil {
t.Fatal(err)
@ -189,13 +189,13 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
res = be.MustExecuteQuery(nowOpt + `from(bucket:"my_bucket_out") |> range(start:-5m)`)
defer res.Done()
got := make(map[string][]*executetest.Table)
for k, v := range res.Results {
if err := v.Tables().Do(func(tbl flux.Table) error {
for _, r := range res.Results {
if err := r.Tables().Do(func(tbl flux.Table) error {
ct, err := executetest.ConvertTable(tbl)
if err != nil {
return err
}
got[k] = append(got[k], ct)
got[r.Name()] = append(got[r.Name()], ct)
return nil
}); err != nil {
t.Fatal(err)

View File

@ -74,7 +74,7 @@ func (c *Compiler) Compile(ctx context.Context) (flux.Program, error) {
if err != nil {
return nil, err
}
return lang.Program{
return &lang.Program{
PlanSpec: ps,
}, nil
}

View File

@ -317,38 +317,44 @@ func (p *asyncRunPromise) followQuery(wg *sync.WaitGroup) {
// Always need to call Done after query is finished.
defer p.q.Done()
select {
case <-p.ready:
// The promise was finished somewhere else, so we don't need to call p.finish.
// But we do need to cancel the flux. This could be a no-op.
p.q.Cancel()
case results, ok := <-p.q.Ready():
if !ok {
// Something went wrong with the flux. Set the error in the run result.
rr := &runResult{err: p.q.Err()}
p.finish(rr, nil)
var rwg sync.WaitGroup
SelectLoop:
for {
select {
case <-p.ready:
// The promise was finished somewhere else, so we don't need to call p.finish.
// But we do need to cancel the flux. This could be a no-op.
p.q.Cancel()
return
}
case r, ok := <-p.q.Results():
if !ok {
break SelectLoop
}
// Exhaust the results so we don't leave unfinished iterators around.
var wg sync.WaitGroup
wg.Add(len(results))
for _, res := range results {
r := res
rwg.Add(1)
go func() {
defer wg.Done()
defer rwg.Done()
if err := exhaustResultIterators(r); err != nil {
p.logger.Info("Error exhausting result iterator", zap.Error(err), zap.String("name", r.Name()))
}
}()
}
wg.Wait()
// Otherwise, query was successful.
// Must call query.Done before collecting statistics. It's safe to call multiple times.
p.q.Done()
p.finish(&runResult{statistics: p.q.Statistics()}, nil)
}
rwg.Wait()
if p.q.Err() != nil {
// Something went wrong with the flux. Set the error in the run result.
rr := &runResult{err: p.q.Err()}
p.finish(rr, nil)
return
}
// Otherwise, query was successful.
// Must call query.Done before collecting statistics. It's safe to call multiple times.
p.q.Done()
p.finish(&runResult{statistics: p.q.Statistics()}, nil)
}
func (p *asyncRunPromise) finish(res *runResult, err error) {

View File

@ -77,8 +77,8 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.
}
fq := &fakeQuery{
wait: make(chan struct{}),
ready: make(chan map[string]flux.Result),
wait: make(chan struct{}),
results: make(chan flux.Result),
}
s.queries[makeSpecString(sc.Spec)] = fq
@ -140,7 +140,7 @@ func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
}
type fakeQuery struct {
ready chan map[string]flux.Result
results chan flux.Result
wait chan struct{} // Blocks Ready from returning.
forcedError error // Value to return from Err() method.
@ -149,11 +149,10 @@ type fakeQuery struct {
var _ flux.Query = (*fakeQuery)(nil)
func (q *fakeQuery) Spec() *flux.Spec { return nil }
func (q *fakeQuery) Done() {}
func (q *fakeQuery) Cancel() { close(q.ready) }
func (q *fakeQuery) Statistics() flux.Statistics { return flux.Statistics{} }
func (q *fakeQuery) Ready() <-chan map[string]flux.Result { return q.ready }
func (q *fakeQuery) Done() {}
func (q *fakeQuery) Cancel() { close(q.results) }
func (q *fakeQuery) Statistics() flux.Statistics { return flux.Statistics{} }
func (q *fakeQuery) Results() <-chan flux.Result { return q.results }
func (q *fakeQuery) Err() error {
if q.ctxErr != nil {
@ -163,13 +162,14 @@ func (q *fakeQuery) Err() error {
}
// run is intended to be run on its own goroutine.
// It blocks until q.wait is closed, then sends a fake result on the q.ready channel.
// It blocks until q.wait is closed, then sends a fake result on the q.results channel.
func (q *fakeQuery) run(ctx context.Context) {
defer close(q.results)
// Wait for call to set query success/fail.
select {
case <-ctx.Done():
q.ctxErr = ctx.Err()
close(q.ready)
return
case <-q.wait:
// Normal case.
@ -177,11 +177,7 @@ func (q *fakeQuery) run(ctx context.Context) {
if q.forcedError == nil {
res := newFakeResult()
q.ready <- map[string]flux.Result{
res.Name(): res,
}
} else {
close(q.ready)
q.results <- res
}
}