Release cursor/iterator resources aggressively

pull/7108/head
Jason Wilder 2016-08-02 22:51:16 -06:00
parent e8e6bc44a7
commit 19546faab3
4 changed files with 109 additions and 10 deletions

View File

@ -164,6 +164,9 @@ func (itr *floatMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -440,6 +443,7 @@ type floatParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newFloatParallelIterator returns a new instance of floatParallelIterator.
@ -449,6 +453,7 @@ func newFloatParallelIterator(input FloatIterator) *floatParallelIterator {
ch: make(chan floatPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -459,6 +464,7 @@ func (itr *floatParallelIterator) Stats() IteratorStats { return itr.input.Stats
// Close closes the underlying iterators.
func (itr *floatParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -474,6 +480,7 @@ func (itr *floatParallelIterator) Next() (*FloatPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *floatParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -2221,6 +2228,9 @@ func (itr *integerMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -2497,6 +2507,7 @@ type integerParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newIntegerParallelIterator returns a new instance of integerParallelIterator.
@ -2506,6 +2517,7 @@ func newIntegerParallelIterator(input IntegerIterator) *integerParallelIterator
ch: make(chan integerPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -2516,6 +2528,7 @@ func (itr *integerParallelIterator) Stats() IteratorStats { return itr.input.Sta
// Close closes the underlying iterators.
func (itr *integerParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -2531,6 +2544,7 @@ func (itr *integerParallelIterator) Next() (*IntegerPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *integerParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -4275,6 +4289,9 @@ func (itr *stringMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -4551,6 +4568,7 @@ type stringParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newStringParallelIterator returns a new instance of stringParallelIterator.
@ -4560,6 +4578,7 @@ func newStringParallelIterator(input StringIterator) *stringParallelIterator {
ch: make(chan stringPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -4570,6 +4589,7 @@ func (itr *stringParallelIterator) Stats() IteratorStats { return itr.input.Stat
// Close closes the underlying iterators.
func (itr *stringParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -4585,6 +4605,7 @@ func (itr *stringParallelIterator) Next() (*StringPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *stringParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -6329,6 +6350,9 @@ func (itr *booleanMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -6605,6 +6629,7 @@ type booleanParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newBooleanParallelIterator returns a new instance of booleanParallelIterator.
@ -6614,6 +6639,7 @@ func newBooleanParallelIterator(input BooleanIterator) *booleanParallelIterator
ch: make(chan booleanPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -6624,6 +6650,7 @@ func (itr *booleanParallelIterator) Stats() IteratorStats { return itr.input.Sta
// Close closes the underlying iterators.
func (itr *booleanParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -6639,6 +6666,7 @@ func (itr *booleanParallelIterator) Next() (*BooleanPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *booleanParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.

View File

@ -161,6 +161,9 @@ func (itr *{{$k.name}}MergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -435,9 +438,10 @@ type {{$k.name}}SortedMergeHeapItem struct {
type {{$k.name}}ParallelIterator struct {
input {{$k.Name}}Iterator
ch chan {{$k.name}}PointError
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
@ -445,8 +449,9 @@ func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}Paral
itr := &{{$k.name}}ParallelIterator{
input: input,
ch: make(chan {{$k.name}}PointError, 1),
closing: make(chan struct{}),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -457,7 +462,8 @@ func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input
// Close closes the underlying iterators.
func (itr *{{$k.name}}ParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
itr.wg.Wait()
return itr.input.Close()
}
// Next returns the next point from the iterator.
@ -472,6 +478,7 @@ func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *{{$k.name}}ParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.

View File

@ -49,7 +49,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
}
func (c *bufCursor) close() error {
return c.cur.close()
err := c.cur.close()
c.cur = nil
return err
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -238,11 +240,15 @@ func (itr *floatIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -355,6 +361,10 @@ func (c *floatAscendingCursor) peekTSM() (t int64, v float64) {
// close closes the cursor and any dependent cursors.
func (c *floatAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -473,6 +483,10 @@ func (c *floatDescendingCursor) peekTSM() (t int64, v float64) {
// close closes the cursor and any dependent cursors.
func (c *floatDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -673,11 +687,15 @@ func (itr *integerIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -790,6 +808,10 @@ func (c *integerAscendingCursor) peekTSM() (t int64, v int64) {
// close closes the cursor and any dependent cursors.
func (c *integerAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -908,6 +930,10 @@ func (c *integerDescendingCursor) peekTSM() (t int64, v int64) {
// close closes the cursor and any dependent cursors.
func (c *integerDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1108,11 +1134,15 @@ func (itr *stringIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -1225,6 +1255,10 @@ func (c *stringAscendingCursor) peekTSM() (t int64, v string) {
// close closes the cursor and any dependent cursors.
func (c *stringAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1343,6 +1377,10 @@ func (c *stringDescendingCursor) peekTSM() (t int64, v string) {
// close closes the cursor and any dependent cursors.
func (c *stringDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1543,11 +1581,15 @@ func (itr *booleanIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -1660,6 +1702,10 @@ func (c *booleanAscendingCursor) peekTSM() (t int64, v bool) {
// close closes the cursor and any dependent cursors.
func (c *booleanAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1778,6 +1824,10 @@ func (c *booleanDescendingCursor) peekTSM() (t int64, v bool) {
// close closes the cursor and any dependent cursors.
func (c *booleanDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}

View File

@ -42,7 +42,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
}
func (c *bufCursor) close() error {
return c.cur.close()
err := c.cur.close()
c.cur = nil
return err
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -234,11 +236,15 @@ func (itr *{{.name}}Iterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -351,6 +357,10 @@ func (c *{{.name}}AscendingCursor) peekTSM() (t int64, v {{.Type}}) {
// close closes the cursor and any dependent cursors.
func (c *{{.name}}AscendingCursor) close() (error) {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -469,6 +479,10 @@ func (c *{{.name}}DescendingCursor) peekTSM() (t int64, v {{.Type}}) {
// close closes the cursor and any dependent cursors.
func (c *{{.name}}DescendingCursor) close() (error) {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}