From 19546faab3cc1cf315a89e0be619bfb71e8a5cef Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 Aug 2016 22:51:16 -0600 Subject: [PATCH] Release cursor/iterator resources aggressively --- influxql/iterator.gen.go | 28 +++++++++++++ influxql/iterator.gen.go.tmpl | 13 ++++-- tsdb/engine/tsm1/iterator.gen.go | 60 ++++++++++++++++++++++++--- tsdb/engine/tsm1/iterator.gen.go.tmpl | 18 +++++++- 4 files changed, 109 insertions(+), 10 deletions(-) diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 82f119df13..b222251224 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -164,6 +164,9 @@ func (itr *floatMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } + itr.curr = nil + itr.inputs = nil + itr.heap.items = nil return nil } @@ -440,6 +443,7 @@ type floatParallelIterator struct { once sync.Once closing chan struct{} + wg sync.WaitGroup } // newFloatParallelIterator returns a new instance of floatParallelIterator. @@ -449,6 +453,7 @@ func newFloatParallelIterator(input FloatIterator) *floatParallelIterator { ch: make(chan floatPointError, 1), closing: make(chan struct{}), } + itr.wg.Add(1) go itr.monitor() return itr } @@ -459,6 +464,7 @@ func (itr *floatParallelIterator) Stats() IteratorStats { return itr.input.Stats // Close closes the underlying iterators. func (itr *floatParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) + itr.wg.Wait() return itr.input.Close() } @@ -474,6 +480,7 @@ func (itr *floatParallelIterator) Next() (*FloatPoint, error) { // monitor runs in a separate goroutine and actively pulls the next point. func (itr *floatParallelIterator) monitor() { defer close(itr.ch) + defer itr.wg.Done() for { // Read next point. @@ -2221,6 +2228,9 @@ func (itr *integerMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } + itr.curr = nil + itr.inputs = nil + itr.heap.items = nil return nil } @@ -2497,6 +2507,7 @@ type integerParallelIterator struct { once sync.Once closing chan struct{} + wg sync.WaitGroup } // newIntegerParallelIterator returns a new instance of integerParallelIterator. @@ -2506,6 +2517,7 @@ func newIntegerParallelIterator(input IntegerIterator) *integerParallelIterator ch: make(chan integerPointError, 1), closing: make(chan struct{}), } + itr.wg.Add(1) go itr.monitor() return itr } @@ -2516,6 +2528,7 @@ func (itr *integerParallelIterator) Stats() IteratorStats { return itr.input.Sta // Close closes the underlying iterators. func (itr *integerParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) + itr.wg.Wait() return itr.input.Close() } @@ -2531,6 +2544,7 @@ func (itr *integerParallelIterator) Next() (*IntegerPoint, error) { // monitor runs in a separate goroutine and actively pulls the next point. func (itr *integerParallelIterator) monitor() { defer close(itr.ch) + defer itr.wg.Done() for { // Read next point. @@ -4275,6 +4289,9 @@ func (itr *stringMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } + itr.curr = nil + itr.inputs = nil + itr.heap.items = nil return nil } @@ -4551,6 +4568,7 @@ type stringParallelIterator struct { once sync.Once closing chan struct{} + wg sync.WaitGroup } // newStringParallelIterator returns a new instance of stringParallelIterator. @@ -4560,6 +4578,7 @@ func newStringParallelIterator(input StringIterator) *stringParallelIterator { ch: make(chan stringPointError, 1), closing: make(chan struct{}), } + itr.wg.Add(1) go itr.monitor() return itr } @@ -4570,6 +4589,7 @@ func (itr *stringParallelIterator) Stats() IteratorStats { return itr.input.Stat // Close closes the underlying iterators. func (itr *stringParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) + itr.wg.Wait() return itr.input.Close() } @@ -4585,6 +4605,7 @@ func (itr *stringParallelIterator) Next() (*StringPoint, error) { // monitor runs in a separate goroutine and actively pulls the next point. func (itr *stringParallelIterator) monitor() { defer close(itr.ch) + defer itr.wg.Done() for { // Read next point. @@ -6329,6 +6350,9 @@ func (itr *booleanMergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } + itr.curr = nil + itr.inputs = nil + itr.heap.items = nil return nil } @@ -6605,6 +6629,7 @@ type booleanParallelIterator struct { once sync.Once closing chan struct{} + wg sync.WaitGroup } // newBooleanParallelIterator returns a new instance of booleanParallelIterator. @@ -6614,6 +6639,7 @@ func newBooleanParallelIterator(input BooleanIterator) *booleanParallelIterator ch: make(chan booleanPointError, 1), closing: make(chan struct{}), } + itr.wg.Add(1) go itr.monitor() return itr } @@ -6624,6 +6650,7 @@ func (itr *booleanParallelIterator) Stats() IteratorStats { return itr.input.Sta // Close closes the underlying iterators. func (itr *booleanParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) + itr.wg.Wait() return itr.input.Close() } @@ -6639,6 +6666,7 @@ func (itr *booleanParallelIterator) Next() (*BooleanPoint, error) { // monitor runs in a separate goroutine and actively pulls the next point. func (itr *booleanParallelIterator) monitor() { defer close(itr.ch) + defer itr.wg.Done() for { // Read next point. diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 08e2566175..c7d33625f9 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -161,6 +161,9 @@ func (itr *{{$k.name}}MergeIterator) Close() error { for _, input := range itr.inputs { input.Close() } + itr.curr = nil + itr.inputs = nil + itr.heap.items = nil return nil } @@ -435,9 +438,10 @@ type {{$k.name}}SortedMergeHeapItem struct { type {{$k.name}}ParallelIterator struct { input {{$k.Name}}Iterator ch chan {{$k.name}}PointError - + once sync.Once closing chan struct{} + wg sync.WaitGroup } // new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator. @@ -445,8 +449,9 @@ func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}Paral itr := &{{$k.name}}ParallelIterator{ input: input, ch: make(chan {{$k.name}}PointError, 1), - closing: make(chan struct{}), + closing: make(chan struct{}), } + itr.wg.Add(1) go itr.monitor() return itr } @@ -457,7 +462,8 @@ func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input // Close closes the underlying iterators. func (itr *{{$k.name}}ParallelIterator) Close() error { itr.once.Do(func() { close(itr.closing) }) - return itr.input.Close() + itr.wg.Wait() + return itr.input.Close() } // Next returns the next point from the iterator. @@ -472,6 +478,7 @@ func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) { // monitor runs in a separate goroutine and actively pulls the next point. func (itr *{{$k.name}}ParallelIterator) monitor() { defer close(itr.ch) + defer itr.wg.Done() for { // Read next point. diff --git a/tsdb/engine/tsm1/iterator.gen.go b/tsdb/engine/tsm1/iterator.gen.go index cc01cdb6f2..4a19c3e916 100644 --- a/tsdb/engine/tsm1/iterator.gen.go +++ b/tsdb/engine/tsm1/iterator.gen.go @@ -49,7 +49,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor { } func (c *bufCursor) close() error { - return c.cur.close() + err := c.cur.close() + c.cur = nil + return err } // next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. @@ -238,11 +240,15 @@ func (itr *floatIterator) Close() error { for _, c := range itr.aux { c.close() } + itr.aux = nil for _, c := range itr.conds.curs { c.close() } + itr.conds.curs = nil if itr.cur != nil { - return itr.cur.close() + err := itr.cur.close() + itr.cur = nil + return err } return nil } @@ -355,6 +361,10 @@ func (c *floatAscendingCursor) peekTSM() (t int64, v float64) { // close closes the cursor and any dependent cursors. func (c *floatAscendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -473,6 +483,10 @@ func (c *floatDescendingCursor) peekTSM() (t int64, v float64) { // close closes the cursor and any dependent cursors. func (c *floatDescendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -673,11 +687,15 @@ func (itr *integerIterator) Close() error { for _, c := range itr.aux { c.close() } + itr.aux = nil for _, c := range itr.conds.curs { c.close() } + itr.conds.curs = nil if itr.cur != nil { - return itr.cur.close() + err := itr.cur.close() + itr.cur = nil + return err } return nil } @@ -790,6 +808,10 @@ func (c *integerAscendingCursor) peekTSM() (t int64, v int64) { // close closes the cursor and any dependent cursors. func (c *integerAscendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -908,6 +930,10 @@ func (c *integerDescendingCursor) peekTSM() (t int64, v int64) { // close closes the cursor and any dependent cursors. func (c *integerDescendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -1108,11 +1134,15 @@ func (itr *stringIterator) Close() error { for _, c := range itr.aux { c.close() } + itr.aux = nil for _, c := range itr.conds.curs { c.close() } + itr.conds.curs = nil if itr.cur != nil { - return itr.cur.close() + err := itr.cur.close() + itr.cur = nil + return err } return nil } @@ -1225,6 +1255,10 @@ func (c *stringAscendingCursor) peekTSM() (t int64, v string) { // close closes the cursor and any dependent cursors. func (c *stringAscendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -1343,6 +1377,10 @@ func (c *stringDescendingCursor) peekTSM() (t int64, v string) { // close closes the cursor and any dependent cursors. func (c *stringDescendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -1543,11 +1581,15 @@ func (itr *booleanIterator) Close() error { for _, c := range itr.aux { c.close() } + itr.aux = nil for _, c := range itr.conds.curs { c.close() } + itr.conds.curs = nil if itr.cur != nil { - return itr.cur.close() + err := itr.cur.close() + itr.cur = nil + return err } return nil } @@ -1660,6 +1702,10 @@ func (c *booleanAscendingCursor) peekTSM() (t int64, v bool) { // close closes the cursor and any dependent cursors. func (c *booleanAscendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -1778,6 +1824,10 @@ func (c *booleanDescendingCursor) peekTSM() (t int64, v bool) { // close closes the cursor and any dependent cursors. func (c *booleanDescendingCursor) close() error { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } diff --git a/tsdb/engine/tsm1/iterator.gen.go.tmpl b/tsdb/engine/tsm1/iterator.gen.go.tmpl index 0cc57db04d..92b22a405b 100644 --- a/tsdb/engine/tsm1/iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/iterator.gen.go.tmpl @@ -42,7 +42,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor { } func (c *bufCursor) close() error { - return c.cur.close() + err := c.cur.close() + c.cur = nil + return err } // next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. @@ -234,11 +236,15 @@ func (itr *{{.name}}Iterator) Close() error { for _, c := range itr.aux { c.close() } + itr.aux = nil for _, c := range itr.conds.curs { c.close() } + itr.conds.curs = nil if itr.cur != nil { - return itr.cur.close() + err := itr.cur.close() + itr.cur = nil + return err } return nil } @@ -351,6 +357,10 @@ func (c *{{.name}}AscendingCursor) peekTSM() (t int64, v {{.Type}}) { // close closes the cursor and any dependent cursors. func (c *{{.name}}AscendingCursor) close() (error) { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil } @@ -469,6 +479,10 @@ func (c *{{.name}}DescendingCursor) peekTSM() (t int64, v {{.Type}}) { // close closes the cursor and any dependent cursors. func (c *{{.name}}DescendingCursor) close() (error) { c.tsm.keyCursor.Close() + c.tsm.keyCursor = nil + c.tsm.buf = nil + c.cache.values = nil + c.tsm.values = nil return nil }