Merge pull request #7108 from influxdata/jw-defers

Query path GC
pull/7093/merge
Jason Wilder 2016-08-03 15:46:59 -06:00 committed by GitHub
commit a00fe4adee
5 changed files with 168 additions and 52 deletions

View File

@ -164,6 +164,9 @@ func (itr *floatMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -440,6 +443,7 @@ type floatParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newFloatParallelIterator returns a new instance of floatParallelIterator.
@ -449,6 +453,7 @@ func newFloatParallelIterator(input FloatIterator) *floatParallelIterator {
ch: make(chan floatPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -459,6 +464,7 @@ func (itr *floatParallelIterator) Stats() IteratorStats { return itr.input.Stats
// Close closes the underlying iterators.
func (itr *floatParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -474,6 +480,7 @@ func (itr *floatParallelIterator) Next() (*FloatPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *floatParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -2221,6 +2228,9 @@ func (itr *integerMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -2497,6 +2507,7 @@ type integerParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newIntegerParallelIterator returns a new instance of integerParallelIterator.
@ -2506,6 +2517,7 @@ func newIntegerParallelIterator(input IntegerIterator) *integerParallelIterator
ch: make(chan integerPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -2516,6 +2528,7 @@ func (itr *integerParallelIterator) Stats() IteratorStats { return itr.input.Sta
// Close closes the underlying iterators.
func (itr *integerParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -2531,6 +2544,7 @@ func (itr *integerParallelIterator) Next() (*IntegerPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *integerParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -4275,6 +4289,9 @@ func (itr *stringMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -4551,6 +4568,7 @@ type stringParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newStringParallelIterator returns a new instance of stringParallelIterator.
@ -4560,6 +4578,7 @@ func newStringParallelIterator(input StringIterator) *stringParallelIterator {
ch: make(chan stringPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -4570,6 +4589,7 @@ func (itr *stringParallelIterator) Stats() IteratorStats { return itr.input.Stat
// Close closes the underlying iterators.
func (itr *stringParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -4585,6 +4605,7 @@ func (itr *stringParallelIterator) Next() (*StringPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *stringParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.
@ -6329,6 +6350,9 @@ func (itr *booleanMergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -6605,6 +6629,7 @@ type booleanParallelIterator struct {
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// newBooleanParallelIterator returns a new instance of booleanParallelIterator.
@ -6614,6 +6639,7 @@ func newBooleanParallelIterator(input BooleanIterator) *booleanParallelIterator
ch: make(chan booleanPointError, 1),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -6624,6 +6650,7 @@ func (itr *booleanParallelIterator) Stats() IteratorStats { return itr.input.Sta
// Close closes the underlying iterators.
func (itr *booleanParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
itr.wg.Wait()
return itr.input.Close()
}
@ -6639,6 +6666,7 @@ func (itr *booleanParallelIterator) Next() (*BooleanPoint, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *booleanParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.

View File

@ -161,6 +161,9 @@ func (itr *{{$k.name}}MergeIterator) Close() error {
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
return nil
}
@ -435,9 +438,10 @@ type {{$k.name}}SortedMergeHeapItem struct {
type {{$k.name}}ParallelIterator struct {
input {{$k.Name}}Iterator
ch chan {{$k.name}}PointError
once sync.Once
closing chan struct{}
wg sync.WaitGroup
}
// new{{$k.Name}}ParallelIterator returns a new instance of {{$k.name}}ParallelIterator.
@ -445,8 +449,9 @@ func new{{$k.Name}}ParallelIterator(input {{$k.Name}}Iterator) *{{$k.name}}Paral
itr := &{{$k.name}}ParallelIterator{
input: input,
ch: make(chan {{$k.name}}PointError, 1),
closing: make(chan struct{}),
closing: make(chan struct{}),
}
itr.wg.Add(1)
go itr.monitor()
return itr
}
@ -457,7 +462,8 @@ func (itr *{{$k.name}}ParallelIterator) Stats() IteratorStats { return itr.input
// Close closes the underlying iterators.
func (itr *{{$k.name}}ParallelIterator) Close() error {
itr.once.Do(func() { close(itr.closing) })
return itr.input.Close()
itr.wg.Wait()
return itr.input.Close()
}
// Next returns the next point from the iterator.
@ -472,6 +478,7 @@ func (itr *{{$k.name}}ParallelIterator) Next() (*{{$k.Name}}Point, error) {
// monitor runs in a separate goroutine and actively pulls the next point.
func (itr *{{$k.name}}ParallelIterator) monitor() {
defer close(itr.ch)
defer itr.wg.Done()
for {
// Read next point.

View File

@ -49,7 +49,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
}
func (c *bufCursor) close() error {
return c.cur.close()
err := c.cur.close()
c.cur = nil
return err
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -238,11 +240,15 @@ func (itr *floatIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -355,6 +361,10 @@ func (c *floatAscendingCursor) peekTSM() (t int64, v float64) {
// close closes the cursor and any dependent cursors.
func (c *floatAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -473,6 +483,10 @@ func (c *floatDescendingCursor) peekTSM() (t int64, v float64) {
// close closes the cursor and any dependent cursors.
func (c *floatDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -673,11 +687,15 @@ func (itr *integerIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -790,6 +808,10 @@ func (c *integerAscendingCursor) peekTSM() (t int64, v int64) {
// close closes the cursor and any dependent cursors.
func (c *integerAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -908,6 +930,10 @@ func (c *integerDescendingCursor) peekTSM() (t int64, v int64) {
// close closes the cursor and any dependent cursors.
func (c *integerDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1108,11 +1134,15 @@ func (itr *stringIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -1225,6 +1255,10 @@ func (c *stringAscendingCursor) peekTSM() (t int64, v string) {
// close closes the cursor and any dependent cursors.
func (c *stringAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1343,6 +1377,10 @@ func (c *stringDescendingCursor) peekTSM() (t int64, v string) {
// close closes the cursor and any dependent cursors.
func (c *stringDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1543,11 +1581,15 @@ func (itr *booleanIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -1660,6 +1702,10 @@ func (c *booleanAscendingCursor) peekTSM() (t int64, v bool) {
// close closes the cursor and any dependent cursors.
func (c *booleanAscendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -1778,6 +1824,10 @@ func (c *booleanDescendingCursor) peekTSM() (t int64, v bool) {
// close closes the cursor and any dependent cursors.
func (c *booleanDescendingCursor) close() error {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}

View File

@ -42,7 +42,9 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
}
func (c *bufCursor) close() error {
return c.cur.close()
err := c.cur.close()
c.cur = nil
return err
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
@ -234,11 +236,15 @@ func (itr *{{.name}}Iterator) Close() error {
for _, c := range itr.aux {
c.close()
}
itr.aux = nil
for _, c := range itr.conds.curs {
c.close()
}
itr.conds.curs = nil
if itr.cur != nil {
return itr.cur.close()
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
@ -351,6 +357,10 @@ func (c *{{.name}}AscendingCursor) peekTSM() (t int64, v {{.Type}}) {
// close closes the cursor and any dependent cursors.
func (c *{{.name}}AscendingCursor) close() (error) {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
@ -469,6 +479,10 @@ func (c *{{.name}}DescendingCursor) peekTSM() (t int64, v {{.Type}}) {
// close closes the cursor and any dependent cursors.
func (c *{{.name}}DescendingCursor) close() (error) {
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.tsm.buf = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}

View File

@ -232,10 +232,10 @@ func (t *TSMReader) applyTombstones() error {
}
func (t *TSMReader) Path() string {
t.mu.Lock()
defer t.mu.Unlock()
return t.accessor.path()
t.mu.RLock()
p := t.accessor.path()
t.mu.RUnlock()
return p
}
func (t *TSMReader) Key(index int) (string, []IndexEntry) {
@ -249,54 +249,59 @@ func (t *TSMReader) KeyAt(idx int) (string, byte) {
func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readBlock(entry, vals)
v, err := t.accessor.readBlock(entry, vals)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, vals *[]FloatValue) ([]FloatValue, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readFloatBlock(entry, tdec, vdec, vals)
v, err := t.accessor.readFloatBlock(entry, tdec, vdec, vals)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, vals *[]IntegerValue) ([]IntegerValue, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readIntegerBlock(entry, tdec, vdec, vals)
v, err := t.accessor.readIntegerBlock(entry, tdec, vdec, vals)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, vals *[]StringValue) ([]StringValue, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readStringBlock(entry, tdec, vdec, vals)
v, err := t.accessor.readStringBlock(entry, tdec, vdec, vals)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, vals *[]BooleanValue) ([]BooleanValue, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readBooleanBlock(entry, tdec, vdec, vals)
v, err := t.accessor.readBooleanBlock(entry, tdec, vdec, vals)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) Read(key string, timestamp int64) ([]Value, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.read(key, timestamp)
v, err := t.accessor.read(key, timestamp)
t.mu.RUnlock()
return v, err
}
// ReadAll returns all values for a key in all blocks.
func (t *TSMReader) ReadAll(key string) ([]Value, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.accessor.readAll(key)
t.mu.RLock()
v, err := t.accessor.readAll(key)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) readBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.readBytes(e, b)
n, v, err := t.accessor.readBytes(e, b)
t.mu.RUnlock()
return n, v, err
}
func (t *TSMReader) Type(key string) (byte, error) {
@ -426,35 +431,40 @@ func (t *TSMReader) IndexSize() uint32 {
func (t *TSMReader) Size() uint32 {
t.mu.RLock()
defer t.mu.RUnlock()
return uint32(t.size)
size := t.size
t.mu.RUnlock()
return uint32(size)
}
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
defer t.mu.RUnlock()
return t.lastModified
lm := t.lastModified
t.mu.RUnlock()
return lm
}
// HasTombstones return true if there are any tombstone entries recorded.
func (t *TSMReader) HasTombstones() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.tombstoner.HasTombstones()
b := t.tombstoner.HasTombstones()
t.mu.RUnlock()
return b
}
// TombstoneFiles returns any tombstone files associated with this TSM file.
func (t *TSMReader) TombstoneFiles() []FileStat {
t.mu.RLock()
defer t.mu.RUnlock()
return t.tombstoner.TombstoneFiles()
fs := t.tombstoner.TombstoneFiles()
t.mu.RUnlock()
return fs
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (t *TSMReader) TombstoneRange(key string) []TimeRange {
t.mu.RLock()
defer t.mu.RUnlock()
return t.index.TombstoneRange(key)
tr := t.index.TombstoneRange(key)
t.mu.RUnlock()
return tr
}
func (t *TSMReader) Stats() FileStat {
@ -1026,14 +1036,15 @@ func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, er
func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *FloatDecoder, values *[]FloatValue) ([]FloatValue, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}
//TODO: Validate checksum
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
m.mu.RUnlock()
if err != nil {
return nil, err
}
@ -1043,13 +1054,15 @@ func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, tdec *TimeDecoder, vdec
func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *IntegerDecoder, values *[]IntegerValue) ([]IntegerValue, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}
//TODO: Validate checksum
a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
m.mu.RUnlock()
if err != nil {
return nil, err
}
@ -1059,13 +1072,15 @@ func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, tdec *TimeDecoder, vd
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *StringDecoder, values *[]StringValue) ([]StringValue, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}
//TODO: Validate checksum
a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
m.mu.RUnlock()
if err != nil {
return nil, err
}
@ -1075,13 +1090,15 @@ func (m *mmapAccessor) readStringBlock(entry *IndexEntry, tdec *TimeDecoder, vde
func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, tdec *TimeDecoder, vdec *BooleanDecoder, values *[]BooleanValue) ([]BooleanValue, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}
//TODO: Validate checksum
a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], tdec, vdec, values)
m.mu.RUnlock()
if err != nil {
return nil, err
}