From 16b9158b39d85af42ef13625f283a9d8a1cbba90 Mon Sep 17 00:00:00 2001 From: "Christopher M. Wolff" Date: Mon, 8 Apr 2019 09:18:09 -0700 Subject: [PATCH] refactor(task): make tasks use new Query interface (#13219) Fixes #12883. --- cmd/influxd/launcher/launcher_helpers.go | 21 ++++++---- cmd/influxd/launcher/tasks_test.go | 12 +++--- query/influxql/compiler.go | 2 +- task/backend/executor/executor.go | 52 +++++++++++++----------- task/backend/executor/executor_test.go | 26 +++++------- 5 files changed, 61 insertions(+), 52 deletions(-) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 9360fd8ea8..6bab6f232a 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -197,11 +197,18 @@ func (tl *TestLauncher) ExecuteQuery(q string) (*QueryResults, error) { if err != nil { return nil, err } - if err = fq.Err(); err != nil { - return nil, fq.Err() + + results := make([]flux.Result, 0, 1) + for res := range fq.Results() { + results = append(results, res) } + + if fq.Err() != nil { + return nil, err + } + return &QueryResults{ - Results: <-fq.Ready(), + Results: results, Query: fq, }, nil } @@ -335,7 +342,7 @@ func (r *QueryResult) TablesN() int { // QueryResults wraps a set of query results with some helper methods. type QueryResults struct { - Results map[string]flux.Result + Results []flux.Result Query flux.Query } @@ -360,14 +367,14 @@ func (r *QueryResults) HasTableCount(t *testing.T, n int) { } } -// Names returns the sorted set of table names for the query results. +// Names returns the sorted set of result names for the query results. func (r *QueryResults) Names() []string { if len(r.Results) == 0 { return nil } names := make([]string, len(r.Results), 0) - for k := range r.Results { - names = append(names, k) + for _, r := range r.Results { + names = append(names, r.Name()) } return names } diff --git a/cmd/influxd/launcher/tasks_test.go b/cmd/influxd/launcher/tasks_test.go index f1e03be48e..6e35e09b1f 100644 --- a/cmd/influxd/launcher/tasks_test.go +++ b/cmd/influxd/launcher/tasks_test.go @@ -169,14 +169,14 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b want := make(map[string][]*executetest.Table) // Want the original. i = 0 - for k, v := range res.Results { + for _, r := range res.Results { i++ - if err := v.Tables().Do(func(tbl flux.Table) error { + if err := r.Tables().Do(func(tbl flux.Table) error { ct, err := executetest.ConvertTable(tbl) if err != nil { return err } - want[k] = append(want[k], ct) + want[r.Name()] = append(want[r.Name()], ct) return nil }); err != nil { t.Fatal(err) @@ -189,13 +189,13 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b res = be.MustExecuteQuery(nowOpt + `from(bucket:"my_bucket_out") |> range(start:-5m)`) defer res.Done() got := make(map[string][]*executetest.Table) - for k, v := range res.Results { - if err := v.Tables().Do(func(tbl flux.Table) error { + for _, r := range res.Results { + if err := r.Tables().Do(func(tbl flux.Table) error { ct, err := executetest.ConvertTable(tbl) if err != nil { return err } - got[k] = append(got[k], ct) + got[r.Name()] = append(got[r.Name()], ct) return nil }); err != nil { t.Fatal(err) diff --git a/query/influxql/compiler.go b/query/influxql/compiler.go index 57c3b4c98f..83dd7ea923 100644 --- a/query/influxql/compiler.go +++ b/query/influxql/compiler.go @@ -74,7 +74,7 @@ func (c *Compiler) Compile(ctx context.Context) (flux.Program, error) { if err != nil { return nil, err } - return lang.Program{ + return &lang.Program{ PlanSpec: ps, }, nil } diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 94f0a28de5..9a09196140 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -317,38 +317,44 @@ func (p *asyncRunPromise) followQuery(wg *sync.WaitGroup) { // Always need to call Done after query is finished. defer p.q.Done() - select { - case <-p.ready: - // The promise was finished somewhere else, so we don't need to call p.finish. - // But we do need to cancel the flux. This could be a no-op. - p.q.Cancel() - case results, ok := <-p.q.Ready(): - if !ok { - // Something went wrong with the flux. Set the error in the run result. - rr := &runResult{err: p.q.Err()} - p.finish(rr, nil) + var rwg sync.WaitGroup +SelectLoop: + for { + select { + case <-p.ready: + // The promise was finished somewhere else, so we don't need to call p.finish. + // But we do need to cancel the flux. This could be a no-op. + p.q.Cancel() return - } + case r, ok := <-p.q.Results(): + if !ok { + break SelectLoop + } - // Exhaust the results so we don't leave unfinished iterators around. - var wg sync.WaitGroup - wg.Add(len(results)) - for _, res := range results { - r := res + + rwg.Add(1) go func() { - defer wg.Done() + defer rwg.Done() if err := exhaustResultIterators(r); err != nil { p.logger.Info("Error exhausting result iterator", zap.Error(err), zap.String("name", r.Name())) } }() } - wg.Wait() - - // Otherwise, query was successful. - // Must call query.Done before collecting statistics. It's safe to call multiple times. - p.q.Done() - p.finish(&runResult{statistics: p.q.Statistics()}, nil) } + + rwg.Wait() + + if p.q.Err() != nil { + // Something went wrong with the flux. Set the error in the run result. + rr := &runResult{err: p.q.Err()} + p.finish(rr, nil) + return + } + + // Otherwise, query was successful. + // Must call query.Done before collecting statistics. It's safe to call multiple times. + p.q.Done() + p.finish(&runResult{statistics: p.q.Statistics()}, nil) } func (p *asyncRunPromise) finish(res *runResult, err error) { diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index a0c67c687d..2db99db95d 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -77,8 +77,8 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux. } fq := &fakeQuery{ - wait: make(chan struct{}), - ready: make(chan map[string]flux.Result), + wait: make(chan struct{}), + results: make(chan flux.Result), } s.queries[makeSpecString(sc.Spec)] = fq @@ -140,7 +140,7 @@ func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) { } type fakeQuery struct { - ready chan map[string]flux.Result + results chan flux.Result wait chan struct{} // Blocks Ready from returning. forcedError error // Value to return from Err() method. @@ -149,11 +149,10 @@ type fakeQuery struct { var _ flux.Query = (*fakeQuery)(nil) -func (q *fakeQuery) Spec() *flux.Spec { return nil } -func (q *fakeQuery) Done() {} -func (q *fakeQuery) Cancel() { close(q.ready) } -func (q *fakeQuery) Statistics() flux.Statistics { return flux.Statistics{} } -func (q *fakeQuery) Ready() <-chan map[string]flux.Result { return q.ready } +func (q *fakeQuery) Done() {} +func (q *fakeQuery) Cancel() { close(q.results) } +func (q *fakeQuery) Statistics() flux.Statistics { return flux.Statistics{} } +func (q *fakeQuery) Results() <-chan flux.Result { return q.results } func (q *fakeQuery) Err() error { if q.ctxErr != nil { @@ -163,13 +162,14 @@ func (q *fakeQuery) Err() error { } // run is intended to be run on its own goroutine. -// It blocks until q.wait is closed, then sends a fake result on the q.ready channel. +// It blocks until q.wait is closed, then sends a fake result on the q.results channel. func (q *fakeQuery) run(ctx context.Context) { + defer close(q.results) + // Wait for call to set query success/fail. select { case <-ctx.Done(): q.ctxErr = ctx.Err() - close(q.ready) return case <-q.wait: // Normal case. @@ -177,11 +177,7 @@ func (q *fakeQuery) run(ctx context.Context) { if q.forcedError == nil { res := newFakeResult() - q.ready <- map[string]flux.Result{ - res.Name(): res, - } - } else { - close(q.ready) + q.results <- res } }