fix(reads): Fix race condition when done channel was triggered

tl;dr
Previously, `Close` was being called concurrently by multiple
goroutines, resulting in a race condition. This commit resolves those
issues.

Background

The `Close` method was performing multiple duties, closing resources
and triggering that the table reading by the `Do` method was done.

Additionally, state to track whether more records existed and if the
table was empty, was ported from the more complicated gRPC
implementation. This logic has been simplified.

This new behavior:

* `table#Do` is responsible for triggering it is done, by closing the
  done channel
* The creator of the `table` is responsible for releasing the resources
  by calling the `table#Close` method
* The `table#Do` reading can be cancelled by calling the `Cancel`
  function, which is safe for concurrent use.
* the Do and Close methods are protected by a mutex to protect storage
  resources, such as cursors.
pull/10616/head
Stuart Carnie 2018-10-31 16:21:54 -07:00
parent 952c1440b9
commit b7d9505ac1
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
4 changed files with 227 additions and 187 deletions

View File

@ -19,7 +19,7 @@ import (
type storageTable interface {
flux.Table
Close()
Done() chan struct{}
Cancel()
}
type storeReader struct {
@ -151,23 +151,23 @@ READ:
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newIntegerTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newFloatTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newUnsignedTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newBooleanTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs)
table = newStringTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
@ -175,10 +175,6 @@ READ:
cur = nil
if !table.Empty() {
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
@ -187,6 +183,7 @@ READ:
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
}
@ -217,12 +214,9 @@ READ:
}
key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newTableNoPoints(bi.bounds, key, cols, rs.Tags(), defs)
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
table = newTableNoPoints(done, bi.bounds, key, cols, rs.Tags(), defs)
if err := f(table); err != nil {
table.Close()
@ -232,6 +226,7 @@ READ:
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
@ -279,22 +274,23 @@ READ:
}
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt)
table = newIntegerGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newIntegerGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat)
table = newFloatGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newFloatGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt)
table = newUnsignedGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newUnsignedGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool)
table = newBooleanGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newBooleanGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newStringGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
table = newStringGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
@ -303,19 +299,15 @@ READ:
cur = nil
gc = nil
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
// Wait until the table has been read.
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
@ -348,25 +340,21 @@ func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs Gr
READ:
for gc != nil {
key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds)
done := make(chan struct{})
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString)
table = newGroupTableNoPoints(bi.bounds, key, cols, defs)
table = newGroupTableNoPoints(done, bi.bounds, key, cols, defs)
gc.Close()
gc = nil
// Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine
// and reassigns the underlying channel.
done := table.Done()
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
// Wait until the table has been read.
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}

View File

@ -7,6 +7,8 @@
package reads
import (
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
@ -20,11 +22,13 @@ import (
type floatTable struct {
table
cur cursors.FloatArrayCursor
valBuf []float64
mu sync.Mutex
cur cursors.FloatArrayCursor
}
func newFloatTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -33,32 +37,34 @@ func newFloatTable(
defs [][]byte,
) *floatTable {
t := &floatTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *floatTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *floatTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -94,7 +100,6 @@ func (t *floatTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -102,12 +107,14 @@ func (t *floatTable) advance() bool {
type floatGroupTable struct {
table
valBuf []float64
mu sync.Mutex
gc GroupCursor
cur cursors.FloatArrayCursor
valBuf []float64
}
func newFloatGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
@ -117,17 +124,18 @@ func newFloatGroupTable(
defs [][]byte,
) *floatGroupTable {
t := &floatGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *floatGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -136,18 +144,19 @@ func (t *floatGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *floatGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -188,7 +197,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -221,11 +229,13 @@ func (t *floatGroupTable) advanceCursor() bool {
type integerTable struct {
table
cur cursors.IntegerArrayCursor
valBuf []int64
mu sync.Mutex
cur cursors.IntegerArrayCursor
}
func newIntegerTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -234,32 +244,34 @@ func newIntegerTable(
defs [][]byte,
) *integerTable {
t := &integerTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *integerTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *integerTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -295,7 +307,6 @@ func (t *integerTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -303,12 +314,14 @@ func (t *integerTable) advance() bool {
type integerGroupTable struct {
table
valBuf []int64
mu sync.Mutex
gc GroupCursor
cur cursors.IntegerArrayCursor
valBuf []int64
}
func newIntegerGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
@ -318,17 +331,18 @@ func newIntegerGroupTable(
defs [][]byte,
) *integerGroupTable {
t := &integerGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *integerGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -337,18 +351,19 @@ func (t *integerGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *integerGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -389,7 +404,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -422,11 +436,13 @@ func (t *integerGroupTable) advanceCursor() bool {
type unsignedTable struct {
table
cur cursors.UnsignedArrayCursor
valBuf []uint64
mu sync.Mutex
cur cursors.UnsignedArrayCursor
}
func newUnsignedTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -435,32 +451,34 @@ func newUnsignedTable(
defs [][]byte,
) *unsignedTable {
t := &unsignedTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *unsignedTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *unsignedTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -496,7 +514,6 @@ func (t *unsignedTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -504,12 +521,14 @@ func (t *unsignedTable) advance() bool {
type unsignedGroupTable struct {
table
valBuf []uint64
mu sync.Mutex
gc GroupCursor
cur cursors.UnsignedArrayCursor
valBuf []uint64
}
func newUnsignedGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
@ -519,17 +538,18 @@ func newUnsignedGroupTable(
defs [][]byte,
) *unsignedGroupTable {
t := &unsignedGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *unsignedGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -538,18 +558,19 @@ func (t *unsignedGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -590,7 +611,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -623,11 +643,13 @@ func (t *unsignedGroupTable) advanceCursor() bool {
type stringTable struct {
table
cur cursors.StringArrayCursor
valBuf []string
mu sync.Mutex
cur cursors.StringArrayCursor
}
func newStringTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -636,32 +658,34 @@ func newStringTable(
defs [][]byte,
) *stringTable {
t := &stringTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *stringTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *stringTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -697,7 +721,6 @@ func (t *stringTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -705,12 +728,14 @@ func (t *stringTable) advance() bool {
type stringGroupTable struct {
table
valBuf []string
mu sync.Mutex
gc GroupCursor
cur cursors.StringArrayCursor
valBuf []string
}
func newStringGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.StringArrayCursor,
bounds execute.Bounds,
@ -720,17 +745,18 @@ func newStringGroupTable(
defs [][]byte,
) *stringGroupTable {
t := &stringGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *stringGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -739,18 +765,19 @@ func (t *stringGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *stringGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -791,7 +818,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -824,11 +850,13 @@ func (t *stringGroupTable) advanceCursor() bool {
type booleanTable struct {
table
cur cursors.BooleanArrayCursor
valBuf []bool
mu sync.Mutex
cur cursors.BooleanArrayCursor
}
func newBooleanTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -837,32 +865,34 @@ func newBooleanTable(
defs [][]byte,
) *booleanTable {
t := &booleanTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *booleanTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *booleanTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -898,7 +928,6 @@ func (t *booleanTable) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -906,12 +935,14 @@ func (t *booleanTable) advance() bool {
type booleanGroupTable struct {
table
valBuf []bool
mu sync.Mutex
gc GroupCursor
cur cursors.BooleanArrayCursor
valBuf []bool
}
func newBooleanGroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
@ -921,17 +952,18 @@ func newBooleanGroupTable(
defs [][]byte,
) *booleanGroupTable {
t := &booleanGroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *booleanGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -940,18 +972,19 @@ func (t *booleanGroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -992,7 +1025,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}

View File

@ -1,6 +1,8 @@
package reads
import (
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform/models"
@ -14,11 +16,13 @@ import (
type {{.name}}Table struct {
table
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
mu sync.Mutex
cur cursors.{{.Name}}ArrayCursor
}
func new{{.Name}}Table(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
@ -27,32 +31,34 @@ func new{{.Name}}Table(
defs [][]byte,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *{{.name}}Table) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -88,7 +94,6 @@ func (t *{{.name}}Table) advance() bool {
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}
@ -96,12 +101,14 @@ func (t *{{.name}}Table) advance() bool {
type {{.name}}GroupTable struct {
table
valBuf []{{.Type}}
mu sync.Mutex
gc GroupCursor
cur cursors.{{.Name}}ArrayCursor
valBuf []{{.Type}}
}
func new{{.Name}}GroupTable(
done chan struct{},
gc GroupCursor,
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
@ -111,17 +118,18 @@ func new{{.Name}}GroupTable(
defs [][]byte,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.more = t.advance()
t.advance()
return t
}
func (t *{{.name}}GroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
@ -130,18 +138,19 @@ func (t *{{.name}}GroupTable) Close() {
t.gc.Close()
t.gc = nil
}
if t.done != nil {
close(t.done)
t.done = nil
}
t.mu.Unlock()
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
defer t.Close()
t.mu.Lock()
defer func() {
t.closeDone()
t.mu.Unlock()
}()
if t.more {
if !t.Empty() {
t.err = f(t)
for t.err == nil && t.advance() {
for !t.isCancelled() && t.err == nil && t.advance() {
t.err = f(t)
}
}
@ -182,7 +191,6 @@ RETRY:
t.colBufs[valueColIdx] = t.valBuf
t.appendTags()
t.appendBounds()
t.empty = false
return true
}

View File

@ -4,6 +4,7 @@ package reads
import (
"fmt"
"sync/atomic"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
@ -31,36 +32,42 @@ type table struct {
err error
empty bool
more bool
cancelled int32
}
func newTable(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) table {
return table{
done: done,
bounds: bounds,
key: key,
tags: make([][]byte, len(cols)),
defs: defs,
colBufs: make([]interface{}, len(cols)),
cols: cols,
done: make(chan struct{}),
empty: true,
}
}
func (t *table) Done() chan struct{} { return t.done }
func (t *table) Key() flux.GroupKey { return t.key }
func (t *table) Cols() []flux.ColMeta { return t.cols }
func (t *table) RefCount(n int) {}
func (t *table) Err() error { return t.err }
func (t *table) Empty() bool { return t.empty }
func (t *table) Empty() bool { return t.l == 0 }
func (t *table) Len() int { return t.l }
func (t *table) Cancel() {
atomic.StoreInt32(&t.cancelled, 1)
}
func (t *table) isCancelled() bool {
return atomic.LoadInt32(&t.cancelled) != 0
}
func (t *table) Bools(j int) []bool {
execute.CheckColType(t.cols[j], flux.TBool)
return t.colBufs[j].([]bool)
@ -150,6 +157,13 @@ func (t *table) appendBounds() {
}
}
func (t *table) closeDone() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
// hasPoints returns true if the next block from cur has data. If cur is not
// nil, it will be closed.
func hasPoints(cur cursors.Cursor) bool {
@ -186,6 +200,7 @@ type tableNoPoints struct {
}
func newTableNoPoints(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
@ -193,23 +208,21 @@ func newTableNoPoints(
defs [][]byte,
) *tableNoPoints {
t := &tableNoPoints{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
}
t.readTags(tags)
return t
}
func (t *tableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *tableNoPoints) Close() {}
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
return nil
}
t.err = f(t)
t.Close()
t.closeDone()
return t.err
}
@ -218,27 +231,26 @@ type groupTableNoPoints struct {
}
func newGroupTableNoPoints(
done chan struct{},
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
defs [][]byte,
) *groupTableNoPoints {
t := &groupTableNoPoints{
table: newTable(bounds, key, cols, defs),
table: newTable(done, bounds, key, cols, defs),
}
return t
}
func (t *groupTableNoPoints) Close() {
if t.done != nil {
close(t.done)
t.done = nil
}
}
func (t *groupTableNoPoints) Close() {}
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
return nil
}
t.err = f(t)
t.Close()
t.closeDone()
return t.err
}