Merge pull request #1262 from influxdata/sgc-fix-race

Fix race condition as `Close` was called by multiple goroutines
pull/10616/head
Stuart Carnie 2018-11-01 14:22:32 -07:00 committed by GitHub
commit 56510ba0dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 227 additions and 187 deletions

View File

@ -19,7 +19,7 @@ import (
type storageTable interface {
flux.Table
Close()
Done() chan struct{}
Cancel()
}
type storeReader struct {
@ -151,23 +151,23 @@ READ:
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newIntegerTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newFloatTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newUnsignedTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newBooleanTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newStringTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
@ -175,10 +175,6 @@ READ:
cur = nil
if !table.Empty() {
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
@ -187,6 +183,7 @@ READ:
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
}
@ -217,12 +214,9 @@ READ:
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newTableNoPoints(bi.bounds, key, cols, rs.Tags(), defs)
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
table = newTableNoPoints(done, bi.bounds, key, cols, rs.Tags(), defs)
if err := f(table); err != nil {
table.Close()
@ -232,6 +226,7 @@ READ:
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
@ -279,22 +274,23 @@ READ:
}
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt)
table = newIntegerGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newIntegerGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat)
table = newFloatGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newFloatGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt)
table = newUnsignedGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newUnsignedGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool)
table = newBooleanGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newBooleanGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newStringGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newStringGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
@ -303,19 +299,15 @@ READ:
cur = nil
gc = nil
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
// Wait until the table has been read.
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
@ -348,25 +340,21 @@ func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs Gr
READ:
for gc != nil {
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newGroupTableNoPoints(bi.bounds, key, cols, defs)
table = newGroupTableNoPoints(done, bi.bounds, key, cols, defs)
gc.Close()
gc = nil
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
// Wait until the table has been read.
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}

View File

@ -7,6 +7,8 @@
package reads
import (
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
@ -20,11 +22,13 @@ import (
type floatTable struct {
table
cur cursors.FloatArrayCursor
valBuf []float64
mu sync.Mutex
cur cursors.FloatArrayCursor
}
func newFloatTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -33,32 +37,34 @@ func newFloatTable(
defs [][]byte,
) *floatTable {
t := &floatTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *floatTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *floatTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -94,7 +100,6 @@ func (t *floatTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -102,12 +107,14 @@ func (t *floatTable) advance() bool {
type floatGroupTable struct {
table
valBuf []float64
mu sync.Mutex
gc GroupCursor
cur cursors.FloatArrayCursor
valBuf []float64
}
func newFloatGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
@ -117,17 +124,18 @@ func newFloatGroupTable(
defs [][]byte,
) *floatGroupTable {
t := &floatGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *floatGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -136,18 +144,19 @@ func (t *floatGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *floatGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -188,7 +197,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -221,11 +229,13 @@ func (t *floatGroupTable) advanceCursor() bool {
type integerTable struct {
table
cur cursors.IntegerArrayCursor
valBuf []int64
mu sync.Mutex
cur cursors.IntegerArrayCursor
}
func newIntegerTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -234,32 +244,34 @@ func newIntegerTable(
defs [][]byte,
) *integerTable {
t := &integerTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *integerTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *integerTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -295,7 +307,6 @@ func (t *integerTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -303,12 +314,14 @@ func (t *integerTable) advance() bool {
type integerGroupTable struct {
table
valBuf []int64
mu sync.Mutex
gc GroupCursor
cur cursors.IntegerArrayCursor
valBuf []int64
}
func newIntegerGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
@ -318,17 +331,18 @@ func newIntegerGroupTable(
defs [][]byte,
) *integerGroupTable {
t := &integerGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *integerGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -337,18 +351,19 @@ func (t *integerGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *integerGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -389,7 +404,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -422,11 +436,13 @@ func (t *integerGroupTable) advanceCursor() bool {
type unsignedTable struct {
table
cur cursors.UnsignedArrayCursor
valBuf []uint64
mu sync.Mutex
cur cursors.UnsignedArrayCursor
}
func newUnsignedTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -435,32 +451,34 @@ func newUnsignedTable(
defs [][]byte,
) *unsignedTable {
t := &unsignedTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *unsignedTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *unsignedTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -496,7 +514,6 @@ func (t *unsignedTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -504,12 +521,14 @@ func (t *unsignedTable) advance() bool {
type unsignedGroupTable struct {
table
valBuf []uint64
mu sync.Mutex
gc GroupCursor
cur cursors.UnsignedArrayCursor
valBuf []uint64
}
func newUnsignedGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
@ -519,17 +538,18 @@ func newUnsignedGroupTable(
defs [][]byte,
) *unsignedGroupTable {
t := &unsignedGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *unsignedGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -538,18 +558,19 @@ func (t *unsignedGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -590,7 +611,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -623,11 +643,13 @@ func (t *unsignedGroupTable) advanceCursor() bool {
type stringTable struct {
table
cur cursors.StringArrayCursor
valBuf []string
mu sync.Mutex
cur cursors.StringArrayCursor
}
func newStringTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -636,32 +658,34 @@ func newStringTable(
defs [][]byte,
) *stringTable {
t := &stringTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *stringTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *stringTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -697,7 +721,6 @@ func (t *stringTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -705,12 +728,14 @@ func (t *stringTable) advance() bool {
type stringGroupTable struct {
table
valBuf []string
mu sync.Mutex
gc GroupCursor
cur cursors.StringArrayCursor
valBuf []string
}
func newStringGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.StringArrayCursor,
bounds execute.Bounds,
@ -720,17 +745,18 @@ func newStringGroupTable(
defs [][]byte,
) *stringGroupTable {
t := &stringGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *stringGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -739,18 +765,19 @@ func (t *stringGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *stringGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -791,7 +818,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -824,11 +850,13 @@ func (t *stringGroupTable) advanceCursor() bool {
type booleanTable struct {
table
cur cursors.BooleanArrayCursor
valBuf []bool
mu sync.Mutex
cur cursors.BooleanArrayCursor
}
func newBooleanTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -837,32 +865,34 @@ func newBooleanTable(
defs [][]byte,
) *booleanTable {
t := &booleanTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *booleanTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *booleanTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -898,7 +928,6 @@ func (t *booleanTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -906,12 +935,14 @@ func (t *booleanTable) advance() bool {
type booleanGroupTable struct {
table
valBuf []bool
mu sync.Mutex
gc GroupCursor
cur cursors.BooleanArrayCursor
valBuf []bool
}
func newBooleanGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
@ -921,17 +952,18 @@ func newBooleanGroupTable(
defs [][]byte,
) *booleanGroupTable {
t := &booleanGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *booleanGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -940,18 +972,19 @@ func (t *booleanGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -992,7 +1025,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}

View File

@ -1,6 +1,8 @@
package reads
import (
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
@ -14,11 +16,13 @@ import (
type {{.name}}Table struct {
table
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
mu sync.Mutex
cur cursors.{{.Name}}ArrayCursor
}
func new{{.Name}}Table(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -27,32 +31,34 @@ func new{{.Name}}Table(
defs [][]byte,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *{{.name}}Table) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -88,7 +94,6 @@ func (t *{{.name}}Table) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -96,12 +101,14 @@ func (t *{{.name}}Table) advance() bool {
type {{.name}}GroupTable struct {
table
valBuf []{{.Type}}
mu sync.Mutex
gc GroupCursor
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}GroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
@ -111,17 +118,18 @@ func new{{.Name}}GroupTable(
defs [][]byte,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *{{.name}}GroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -130,18 +138,19 @@ func (t *{{.name}}GroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -182,7 +191,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}

View File

@ -4,6 +4,7 @@ package reads
import (
"fmt"
"sync/atomic"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
@ -31,36 +32,42 @@ type table struct {
err error
empty bool
more bool
cancelled int32
}
func newTable(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) table {
return table{
done: done,
bounds: bounds,
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
cols: cols,
done: make(chan struct{}),
empty: true,
}
}
func (t *table) Done() chan struct{} { return t.done }
func (t *table) Key() flux.GroupKey { return t.key }
func (t *table) Cols() []flux.ColMeta { return t.cols }
func (t *table) RefCount(n int) {}
func (t *table) Err() error { return t.err }
func (t *table) Empty() bool { return t.empty }
func (t *table) Empty() bool { return t.l == 0 }
func (t *table) Len() int { return t.l }
func (t *table) Cancel() {
atomic.StoreInt32(&t.cancelled, 1)
}
func (t *table) isCancelled() bool {
return atomic.LoadInt32(&t.cancelled) != 0
}
func (t *table) Bools(j int) []bool {
execute.CheckColType(t.cols[j], flux.TBool)
return t.colBufs[j].([]bool)
@ -150,6 +157,13 @@ func (t *table) appendBounds() {
}
}
func (t *table) closeDone() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
// hasPoints returns true if the next block from cur has data. If cur is not
// nil, it will be closed.
func hasPoints(cur cursors.Cursor) bool {
@ -186,6 +200,7 @@ type tableNoPoints struct {
}
func newTableNoPoints(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
@ -193,23 +208,21 @@ func newTableNoPoints(
defs [][]byte,
) *tableNoPoints {
t := &tableNoPoints{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
}
t.readTags(tags)
return t
}
func (t *tableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *tableNoPoints) Close() {}
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
return nil
}
t.err = f(t)
t.Close()
t.closeDone()
return t.err
}
@ -218,27 +231,26 @@ type groupTableNoPoints struct {
}
func newGroupTableNoPoints(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) *groupTableNoPoints {
t := &groupTableNoPoints{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
}
return t
}
func (t *groupTableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *groupTableNoPoints) Close() {}
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
return nil
}
t.err = f(t)
t.Close()
t.closeDone()
return t.err
}