Ensure aux and cond cursors are closed when iterator is closed

pull/7084/head
Jason Wilder 2016-07-27 16:03:20 -06:00
parent 0b60862248
commit 602a2e80ce
2 changed files with 80 additions and 5 deletions

View File

@ -23,6 +23,7 @@ type cursor interface {
// cursorAt provides a bufferred cursor interface. // cursorAt provides a bufferred cursor interface.
// This required for literal value cursors which don't have a time value. // This required for literal value cursors which don't have a time value.
type cursorAt interface { type cursorAt interface {
close() error
peek() (k int64, v interface{}) peek() (k int64, v interface{})
nextAt(seek int64) interface{} nextAt(seek int64) interface{}
} }
@ -47,6 +48,10 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending} return &bufCursor{cur: cur, ascending: ascending}
} }
func (c *bufCursor) close() error {
return c.cur.close()
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. // next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
func (c *bufCursor) next() (int64, interface{}) { func (c *bufCursor) next() (int64, interface{}) {
if c.buf.filled { if c.buf.filled {
@ -229,7 +234,18 @@ func (itr *floatIterator) Stats() influxql.IteratorStats {
} }
// Close closes the iterator. // Close closes the iterator.
func (itr *floatIterator) Close() error { return itr.cur.close() } func (itr *floatIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}
// floatLimitIterator // floatLimitIterator
type floatLimitIterator struct { type floatLimitIterator struct {
@ -518,6 +534,7 @@ type floatLiteralCursor struct {
value float64 value float64
} }
func (c *floatLiteralCursor) close() error { return nil }
func (c *floatLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *floatLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *floatLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *floatLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *floatLiteralCursor) nextAt(seek int64) interface{} { return c.value } func (c *floatLiteralCursor) nextAt(seek int64) interface{} { return c.value }
@ -526,6 +543,7 @@ func (c *floatLiteralCursor) nextAt(seek int64) interface{} { return c.value }
// It doesn't not have a time value so it can only be used with nextAt(). // It doesn't not have a time value so it can only be used with nextAt().
type floatNilLiteralCursor struct{} type floatNilLiteralCursor struct{}
func (c *floatNilLiteralCursor) close() error { return nil }
func (c *floatNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) } func (c *floatNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) }
func (c *floatNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) } func (c *floatNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*float64)(nil) }
func (c *floatNilLiteralCursor) nextAt(seek int64) interface{} { return (*float64)(nil) } func (c *floatNilLiteralCursor) nextAt(seek int64) interface{} { return (*float64)(nil) }
@ -651,7 +669,18 @@ func (itr *integerIterator) Stats() influxql.IteratorStats {
} }
// Close closes the iterator. // Close closes the iterator.
func (itr *integerIterator) Close() error { return itr.cur.close() } func (itr *integerIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}
// integerLimitIterator // integerLimitIterator
type integerLimitIterator struct { type integerLimitIterator struct {
@ -940,6 +969,7 @@ type integerLiteralCursor struct {
value int64 value int64
} }
func (c *integerLiteralCursor) close() error { return nil }
func (c *integerLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *integerLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *integerLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *integerLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *integerLiteralCursor) nextAt(seek int64) interface{} { return c.value } func (c *integerLiteralCursor) nextAt(seek int64) interface{} { return c.value }
@ -948,6 +978,7 @@ func (c *integerLiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt(). // It doesn't not have a time value so it can only be used with nextAt().
type integerNilLiteralCursor struct{} type integerNilLiteralCursor struct{}
func (c *integerNilLiteralCursor) close() error { return nil }
func (c *integerNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) } func (c *integerNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) } func (c *integerNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*int64)(nil) }
func (c *integerNilLiteralCursor) nextAt(seek int64) interface{} { return (*int64)(nil) } func (c *integerNilLiteralCursor) nextAt(seek int64) interface{} { return (*int64)(nil) }
@ -1073,7 +1104,18 @@ func (itr *stringIterator) Stats() influxql.IteratorStats {
} }
// Close closes the iterator. // Close closes the iterator.
func (itr *stringIterator) Close() error { return itr.cur.close() } func (itr *stringIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}
// stringLimitIterator // stringLimitIterator
type stringLimitIterator struct { type stringLimitIterator struct {
@ -1362,6 +1404,7 @@ type stringLiteralCursor struct {
value string value string
} }
func (c *stringLiteralCursor) close() error { return nil }
func (c *stringLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *stringLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *stringLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *stringLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *stringLiteralCursor) nextAt(seek int64) interface{} { return c.value } func (c *stringLiteralCursor) nextAt(seek int64) interface{} { return c.value }
@ -1370,6 +1413,7 @@ func (c *stringLiteralCursor) nextAt(seek int64) interface{} { return c.value }
// It doesn't not have a time value so it can only be used with nextAt(). // It doesn't not have a time value so it can only be used with nextAt().
type stringNilLiteralCursor struct{} type stringNilLiteralCursor struct{}
func (c *stringNilLiteralCursor) close() error { return nil }
func (c *stringNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) } func (c *stringNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) }
func (c *stringNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) } func (c *stringNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*string)(nil) }
func (c *stringNilLiteralCursor) nextAt(seek int64) interface{} { return (*string)(nil) } func (c *stringNilLiteralCursor) nextAt(seek int64) interface{} { return (*string)(nil) }
@ -1495,7 +1539,18 @@ func (itr *booleanIterator) Stats() influxql.IteratorStats {
} }
// Close closes the iterator. // Close closes the iterator.
func (itr *booleanIterator) Close() error { return itr.cur.close() } func (itr *booleanIterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}
// booleanLimitIterator // booleanLimitIterator
type booleanLimitIterator struct { type booleanLimitIterator struct {
@ -1784,6 +1839,7 @@ type booleanLiteralCursor struct {
value bool value bool
} }
func (c *booleanLiteralCursor) close() error { return nil }
func (c *booleanLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *booleanLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *booleanLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *booleanLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *booleanLiteralCursor) nextAt(seek int64) interface{} { return c.value } func (c *booleanLiteralCursor) nextAt(seek int64) interface{} { return c.value }
@ -1792,6 +1848,7 @@ func (c *booleanLiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt(). // It doesn't not have a time value so it can only be used with nextAt().
type booleanNilLiteralCursor struct{} type booleanNilLiteralCursor struct{}
func (c *booleanNilLiteralCursor) close() error { return nil }
func (c *booleanNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) } func (c *booleanNilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) }
func (c *booleanNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) } func (c *booleanNilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*bool)(nil) }
func (c *booleanNilLiteralCursor) nextAt(seek int64) interface{} { return (*bool)(nil) } func (c *booleanNilLiteralCursor) nextAt(seek int64) interface{} { return (*bool)(nil) }

View File

@ -17,6 +17,7 @@ type cursor interface {
// cursorAt provides a bufferred cursor interface. // cursorAt provides a bufferred cursor interface.
// This required for literal value cursors which don't have a time value. // This required for literal value cursors which don't have a time value.
type cursorAt interface { type cursorAt interface {
close() error
peek() (k int64, v interface{}) peek() (k int64, v interface{})
nextAt(seek int64) interface{} nextAt(seek int64) interface{}
} }
@ -40,6 +41,10 @@ func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending} return &bufCursor{cur: cur, ascending: ascending}
} }
func (c *bufCursor) close() error {
return c.cur.close()
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor. // next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
func (c *bufCursor) next() (int64, interface{}) { func (c *bufCursor) next() (int64, interface{}) {
if c.buf.filled { if c.buf.filled {
@ -225,7 +230,18 @@ func (itr *{{.name}}Iterator) Stats() influxql.IteratorStats {
} }
// Close closes the iterator. // Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error { return itr.cur.close() } func (itr *{{.name}}Iterator) Close() error {
for _, c := range itr.aux {
c.close()
}
for _, c := range itr.conds.curs {
c.close()
}
if itr.cur != nil {
return itr.cur.close()
}
return nil
}
// {{.name}}LimitIterator // {{.name}}LimitIterator
type {{.name}}LimitIterator struct { type {{.name}}LimitIterator struct {
@ -514,6 +530,7 @@ type {{.name}}LiteralCursor struct {
value {{.Type}} value {{.Type}}
} }
func (c *{{.name}}LiteralCursor) close() error { return nil }
func (c *{{.name}}LiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *{{.name}}LiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *{{.name}}LiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value } func (c *{{.name}}LiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *{{.name}}LiteralCursor) nextAt(seek int64) interface{} { return c.value } func (c *{{.name}}LiteralCursor) nextAt(seek int64) interface{} { return c.value }
@ -523,6 +540,7 @@ func (c *{{.name}}LiteralCursor) nextAt(seek int64) interface{} { return c.value
// It doesn't not have a time value so it can only be used with nextAt(). // It doesn't not have a time value so it can only be used with nextAt().
type {{.name}}NilLiteralCursor struct {} type {{.name}}NilLiteralCursor struct {}
func (c *{{.name}}NilLiteralCursor) close() error { return nil }
func (c *{{.name}}NilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) } func (c *{{.name}}NilLiteralCursor) peek() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) }
func (c *{{.name}}NilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) } func (c *{{.name}}NilLiteralCursor) next() (t int64, v interface{}) { return tsdb.EOF, (*{{.Type}})(nil) }
func (c *{{.name}}NilLiteralCursor) nextAt(seek int64) interface{} { return (*{{.Type}})(nil) } func (c *{{.name}}NilLiteralCursor) nextAt(seek int64) interface{} { return (*{{.Type}})(nil) }