diff --git a/query/select.go b/query/select.go index 6fc6bfe9f4..c8702ba178 100644 --- a/query/select.go +++ b/query/select.go @@ -6,11 +6,13 @@ import ( "io" "sort" "strings" + "sync" "time" "github.com/influxdata/influxdb/pkg/tracing" "github.com/influxdata/influxdb/query/internal/gota" "github.com/influxdata/influxql" + "golang.org/x/sync/errgroup" ) var DefaultTypeMapper = influxql.MultiTypeMapper( @@ -726,28 +728,44 @@ func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic Iterato // Produce an iterator for every single call and create an iterator scanner // associated with it. + var g errgroup.Group + var mu sync.Mutex scanners := make([]IteratorScanner, 0, len(valueMapper.calls)) for call := range valueMapper.calls { + call := call + driver := valueMapper.table[call] if driver.Type == influxql.Unknown { // The primary driver of this call is of unknown type, so skip this. continue } - itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil) - if err != nil { - for _, s := range scanners { - s.Close() + g.Go(func() error { + itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil) + if err != nil { + return err } - return nil, err + + keys := make([]influxql.VarRef, 0, len(auxKeys)+1) + keys = append(keys, driver) + keys = append(keys, auxKeys...) + + scanner := NewIteratorScanner(itr, keys, opt.FillValue) + + mu.Lock() + scanners = append(scanners, scanner) + mu.Unlock() + + return nil + }) + } + + // Close all scanners if any iterator fails. + if err := g.Wait(); err != nil { + for _, s := range scanners { + s.Close() } - - keys := make([]influxql.VarRef, 0, len(auxKeys)+1) - keys = append(keys, driver) - keys = append(keys, auxKeys...) - - scanner := NewIteratorScanner(itr, keys, opt.FillValue) - scanners = append(scanners, scanner) + return nil, err } if len(scanners) == 0 {