diff --git a/storage/reads/reader.go b/storage/reads/reader.go index 5181b5026f..ab97b232c1 100644 --- a/storage/reads/reader.go +++ b/storage/reads/reader.go @@ -19,7 +19,7 @@ import ( type storageTable interface { flux.Table Close() - Done() chan struct{} + Cancel() } type storeReader struct { @@ -151,23 +151,23 @@ READ: } key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds) - + done := make(chan struct{}) switch typedCur := cur.(type) { case cursors.IntegerArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt) - table = newIntegerTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs) + table = newIntegerTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs) case cursors.FloatArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat) - table = newFloatTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs) + table = newFloatTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs) case cursors.UnsignedArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt) - table = newUnsignedTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs) + table = newUnsignedTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs) case cursors.BooleanArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool) - table = newBooleanTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs) + table = newBooleanTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs) case cursors.StringArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) - table = newStringTable(typedCur, bi.bounds, key, cols, rs.Tags(), defs) + table = newStringTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs) default: panic(fmt.Sprintf("unreachable: %T", typedCur)) } @@ -175,10 +175,6 @@ READ: cur = nil if !table.Empty() { - // Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine - // and reassigns the underlying channel. - done := table.Done() - if err := f(table); err != nil { table.Close() table = nil @@ -187,6 +183,7 @@ READ: select { case <-done: case <-bi.ctx.Done(): + table.Cancel() break READ } } @@ -217,12 +214,9 @@ READ: } key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds) + done := make(chan struct{}) cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) - table = newTableNoPoints(bi.bounds, key, cols, rs.Tags(), defs) - - // Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine - // and reassigns the underlying channel. - done := table.Done() + table = newTableNoPoints(done, bi.bounds, key, cols, rs.Tags(), defs) if err := f(table); err != nil { table.Close() @@ -232,6 +226,7 @@ READ: select { case <-done: case <-bi.ctx.Done(): + table.Cancel() break READ } @@ -279,22 +274,23 @@ READ: } key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds) + done := make(chan struct{}) switch typedCur := cur.(type) { case cursors.IntegerArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt) - table = newIntegerGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) + table = newIntegerGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) case cursors.FloatArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat) - table = newFloatGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) + table = newFloatGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) case cursors.UnsignedArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt) - table = newUnsignedGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) + table = newUnsignedGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) case cursors.BooleanArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool) - table = newBooleanGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) + table = newBooleanGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) case cursors.StringArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString) - table = newStringGroupTable(gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) + table = newStringGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs) default: panic(fmt.Sprintf("unreachable: %T", typedCur)) } @@ -303,19 +299,15 @@ READ: cur = nil gc = nil - // Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine - // and reassigns the underlying channel. - done := table.Done() - if err := f(table); err != nil { table.Close() table = nil return err } - // Wait until the table has been read. select { case <-done: case <-bi.ctx.Done(): + table.Cancel() break READ } @@ -348,25 +340,21 @@ func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs Gr READ: for gc != nil { key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds) + done := make(chan struct{}) cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString) - table = newGroupTableNoPoints(bi.bounds, key, cols, defs) + table = newGroupTableNoPoints(done, bi.bounds, key, cols, defs) gc.Close() gc = nil - // Evaluate table.Done early to avoid a data race if table.Close() is run on another goroutine - // and reassigns the underlying channel. - done := table.Done() - if err := f(table); err != nil { table.Close() table = nil return err } - - // Wait until the table has been read. select { case <-done: case <-bi.ctx.Done(): + table.Cancel() break READ } diff --git a/storage/reads/table.gen.go b/storage/reads/table.gen.go index cbbb915447..f019fc6f4b 100644 --- a/storage/reads/table.gen.go +++ b/storage/reads/table.gen.go @@ -7,6 +7,8 @@ package reads import ( + "sync" + "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/platform/models" @@ -20,11 +22,13 @@ import ( type floatTable struct { table - cur cursors.FloatArrayCursor valBuf []float64 + mu sync.Mutex + cur cursors.FloatArrayCursor } func newFloatTable( + done chan struct{}, cur cursors.FloatArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -33,32 +37,34 @@ func newFloatTable( defs [][]byte, ) *floatTable { t := &floatTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *floatTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *floatTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -94,7 +100,6 @@ func (t *floatTable) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -102,12 +107,14 @@ func (t *floatTable) advance() bool { type floatGroupTable struct { table + valBuf []float64 + mu sync.Mutex gc GroupCursor cur cursors.FloatArrayCursor - valBuf []float64 } func newFloatGroupTable( + done chan struct{}, gc GroupCursor, cur cursors.FloatArrayCursor, bounds execute.Bounds, @@ -117,17 +124,18 @@ func newFloatGroupTable( defs [][]byte, ) *floatGroupTable { t := &floatGroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *floatGroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -136,18 +144,19 @@ func (t *floatGroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *floatGroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -188,7 +197,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -221,11 +229,13 @@ func (t *floatGroupTable) advanceCursor() bool { type integerTable struct { table - cur cursors.IntegerArrayCursor valBuf []int64 + mu sync.Mutex + cur cursors.IntegerArrayCursor } func newIntegerTable( + done chan struct{}, cur cursors.IntegerArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -234,32 +244,34 @@ func newIntegerTable( defs [][]byte, ) *integerTable { t := &integerTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *integerTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *integerTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -295,7 +307,6 @@ func (t *integerTable) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -303,12 +314,14 @@ func (t *integerTable) advance() bool { type integerGroupTable struct { table + valBuf []int64 + mu sync.Mutex gc GroupCursor cur cursors.IntegerArrayCursor - valBuf []int64 } func newIntegerGroupTable( + done chan struct{}, gc GroupCursor, cur cursors.IntegerArrayCursor, bounds execute.Bounds, @@ -318,17 +331,18 @@ func newIntegerGroupTable( defs [][]byte, ) *integerGroupTable { t := &integerGroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *integerGroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -337,18 +351,19 @@ func (t *integerGroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *integerGroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -389,7 +404,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -422,11 +436,13 @@ func (t *integerGroupTable) advanceCursor() bool { type unsignedTable struct { table - cur cursors.UnsignedArrayCursor valBuf []uint64 + mu sync.Mutex + cur cursors.UnsignedArrayCursor } func newUnsignedTable( + done chan struct{}, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -435,32 +451,34 @@ func newUnsignedTable( defs [][]byte, ) *unsignedTable { t := &unsignedTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *unsignedTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *unsignedTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -496,7 +514,6 @@ func (t *unsignedTable) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -504,12 +521,14 @@ func (t *unsignedTable) advance() bool { type unsignedGroupTable struct { table + valBuf []uint64 + mu sync.Mutex gc GroupCursor cur cursors.UnsignedArrayCursor - valBuf []uint64 } func newUnsignedGroupTable( + done chan struct{}, gc GroupCursor, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, @@ -519,17 +538,18 @@ func newUnsignedGroupTable( defs [][]byte, ) *unsignedGroupTable { t := &unsignedGroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *unsignedGroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -538,18 +558,19 @@ func (t *unsignedGroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -590,7 +611,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -623,11 +643,13 @@ func (t *unsignedGroupTable) advanceCursor() bool { type stringTable struct { table - cur cursors.StringArrayCursor valBuf []string + mu sync.Mutex + cur cursors.StringArrayCursor } func newStringTable( + done chan struct{}, cur cursors.StringArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -636,32 +658,34 @@ func newStringTable( defs [][]byte, ) *stringTable { t := &stringTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *stringTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *stringTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -697,7 +721,6 @@ func (t *stringTable) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -705,12 +728,14 @@ func (t *stringTable) advance() bool { type stringGroupTable struct { table + valBuf []string + mu sync.Mutex gc GroupCursor cur cursors.StringArrayCursor - valBuf []string } func newStringGroupTable( + done chan struct{}, gc GroupCursor, cur cursors.StringArrayCursor, bounds execute.Bounds, @@ -720,17 +745,18 @@ func newStringGroupTable( defs [][]byte, ) *stringGroupTable { t := &stringGroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *stringGroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -739,18 +765,19 @@ func (t *stringGroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *stringGroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -791,7 +818,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -824,11 +850,13 @@ func (t *stringGroupTable) advanceCursor() bool { type booleanTable struct { table - cur cursors.BooleanArrayCursor valBuf []bool + mu sync.Mutex + cur cursors.BooleanArrayCursor } func newBooleanTable( + done chan struct{}, cur cursors.BooleanArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -837,32 +865,34 @@ func newBooleanTable( defs [][]byte, ) *booleanTable { t := &booleanTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *booleanTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *booleanTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -898,7 +928,6 @@ func (t *booleanTable) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -906,12 +935,14 @@ func (t *booleanTable) advance() bool { type booleanGroupTable struct { table + valBuf []bool + mu sync.Mutex gc GroupCursor cur cursors.BooleanArrayCursor - valBuf []bool } func newBooleanGroupTable( + done chan struct{}, gc GroupCursor, cur cursors.BooleanArrayCursor, bounds execute.Bounds, @@ -921,17 +952,18 @@ func newBooleanGroupTable( defs [][]byte, ) *booleanGroupTable { t := &booleanGroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *booleanGroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -940,18 +972,19 @@ func (t *booleanGroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -992,7 +1025,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } diff --git a/storage/reads/table.gen.go.tmpl b/storage/reads/table.gen.go.tmpl index 6a1ab6d7fa..56b3f27188 100644 --- a/storage/reads/table.gen.go.tmpl +++ b/storage/reads/table.gen.go.tmpl @@ -1,6 +1,8 @@ package reads import ( + "sync" + "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/platform/models" @@ -14,11 +16,13 @@ import ( type {{.name}}Table struct { table - cur cursors.{{.Name}}ArrayCursor valBuf []{{.Type}} + mu sync.Mutex + cur cursors.{{.Name}}ArrayCursor } func new{{.Name}}Table( + done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, key flux.GroupKey, @@ -27,32 +31,34 @@ func new{{.Name}}Table( defs [][]byte, ) *{{.name}}Table { t := &{{.name}}Table{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *{{.name}}Table) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -88,7 +94,6 @@ func (t *{{.name}}Table) advance() bool { t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } @@ -96,12 +101,14 @@ func (t *{{.name}}Table) advance() bool { type {{.name}}GroupTable struct { table + valBuf []{{.Type}} + mu sync.Mutex gc GroupCursor cur cursors.{{.Name}}ArrayCursor - valBuf []{{.Type}} } func new{{.Name}}GroupTable( + done chan struct{}, gc GroupCursor, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, @@ -111,17 +118,18 @@ func new{{.Name}}GroupTable( defs [][]byte, ) *{{.name}}GroupTable { t := &{{.name}}GroupTable{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) - t.more = t.advance() + t.advance() return t } func (t *{{.name}}GroupTable) Close() { + t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil @@ -130,18 +138,19 @@ func (t *{{.name}}GroupTable) Close() { t.gc.Close() t.gc = nil } - if t.done != nil { - close(t.done) - t.done = nil - } + t.mu.Unlock() } func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error { - defer t.Close() + t.mu.Lock() + defer func() { + t.closeDone() + t.mu.Unlock() + }() - if t.more { + if !t.Empty() { t.err = f(t) - for t.err == nil && t.advance() { + for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } @@ -182,7 +191,6 @@ RETRY: t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() - t.empty = false return true } diff --git a/storage/reads/table.go b/storage/reads/table.go index 6ea659acf7..a7c9787b51 100644 --- a/storage/reads/table.go +++ b/storage/reads/table.go @@ -4,6 +4,7 @@ package reads import ( "fmt" + "sync/atomic" "github.com/influxdata/flux" "github.com/influxdata/flux/execute" @@ -31,36 +32,42 @@ type table struct { err error - empty bool - more bool + cancelled int32 } func newTable( + done chan struct{}, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, defs [][]byte, ) table { return table{ + done: done, bounds: bounds, key: key, tags: make([][]byte, len(cols)), defs: defs, colBufs: make([]interface{}, len(cols)), cols: cols, - done: make(chan struct{}), - empty: true, } } -func (t *table) Done() chan struct{} { return t.done } func (t *table) Key() flux.GroupKey { return t.key } func (t *table) Cols() []flux.ColMeta { return t.cols } func (t *table) RefCount(n int) {} func (t *table) Err() error { return t.err } -func (t *table) Empty() bool { return t.empty } +func (t *table) Empty() bool { return t.l == 0 } func (t *table) Len() int { return t.l } +func (t *table) Cancel() { + atomic.StoreInt32(&t.cancelled, 1) +} + +func (t *table) isCancelled() bool { + return atomic.LoadInt32(&t.cancelled) != 0 +} + func (t *table) Bools(j int) []bool { execute.CheckColType(t.cols[j], flux.TBool) return t.colBufs[j].([]bool) @@ -150,6 +157,13 @@ func (t *table) appendBounds() { } } +func (t *table) closeDone() { + if t.done != nil { + close(t.done) + t.done = nil + } +} + // hasPoints returns true if the next block from cur has data. If cur is not // nil, it will be closed. func hasPoints(cur cursors.Cursor) bool { @@ -186,6 +200,7 @@ type tableNoPoints struct { } func newTableNoPoints( + done chan struct{}, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, @@ -193,23 +208,21 @@ func newTableNoPoints( defs [][]byte, ) *tableNoPoints { t := &tableNoPoints{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), } t.readTags(tags) return t } -func (t *tableNoPoints) Close() { - if t.done != nil { - close(t.done) - t.done = nil - } -} +func (t *tableNoPoints) Close() {} func (t *tableNoPoints) Do(f func(flux.ColReader) error) error { + if t.isCancelled() { + return nil + } t.err = f(t) - t.Close() + t.closeDone() return t.err } @@ -218,27 +231,26 @@ type groupTableNoPoints struct { } func newGroupTableNoPoints( + done chan struct{}, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, defs [][]byte, ) *groupTableNoPoints { t := &groupTableNoPoints{ - table: newTable(bounds, key, cols, defs), + table: newTable(done, bounds, key, cols, defs), } return t } -func (t *groupTableNoPoints) Close() { - if t.done != nil { - close(t.done) - t.done = nil - } -} +func (t *groupTableNoPoints) Close() {} func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error { + if t.isCancelled() { + return nil + } t.err = f(t) - t.Close() + t.closeDone() return t.err }