Remove iterators

pull/10616/head
Edd Robinson 2018-11-20 19:17:33 +00:00
parent 2471024253
commit 308a5148cf
6 changed files with 0 additions and 3199 deletions

View File

@ -29,7 +29,6 @@ import (
"go.uber.org/zap"
)
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@iterator.gen.go.tmpldata iterator.gen.go.tmpl array_cursor.gen.go.tmpl array_cursor_iterator.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/influxdata/platform/tools/tmpl -i -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go
//go:generate env GO111MODULE=on go run github.com/influxdata/platform/tools/tmpl -i -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl

File diff suppressed because it is too large Load Diff

View File

@ -1,574 +0,0 @@
package tsm1
import (
"sort"
"fmt"
"runtime"
"sync"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/influxql"
"go.uber.org/zap"
)
type cursor interface {
close() error
next() (t int64, v interface{})
}
// cursorAt provides a bufferred cursor interface.
// This required for literal value cursors which don't have a time value.
type cursorAt interface {
close() error
peek() (k int64, v interface{})
nextAt(seek int64) interface{}
}
type nilCursor struct {}
func (nilCursor) next() (int64, interface{}) { return tsdb.EOF, nil }
func (nilCursor) close() error { return nil }
// bufCursor implements a bufferred cursor.
type bufCursor struct {
cur cursor
buf struct {
key int64
value interface{}
filled bool
}
ascending bool
}
// newBufCursor returns a bufferred wrapper for cur.
func newBufCursor(cur cursor, ascending bool) *bufCursor {
return &bufCursor{cur: cur, ascending: ascending}
}
func (c *bufCursor) close() error {
if c.cur == nil {
return nil
}
err := c.cur.close()
c.cur = nil
return err
}
// next returns the buffer, if filled. Otherwise returns the next key/value from the cursor.
func (c *bufCursor) next() (int64, interface{}) {
if c.buf.filled {
k, v := c.buf.key, c.buf.value
c.buf.filled = false
return k, v
}
return c.cur.next()
}
// unread pushes k and v onto the buffer.
func (c *bufCursor) unread(k int64, v interface{}) {
c.buf.key, c.buf.value = k, v
c.buf.filled = true
}
// peek reads next next key/value without removing them from the cursor.
func (c *bufCursor) peek() (k int64, v interface{}) {
k, v = c.next()
c.unread(k, v)
return
}
// nextAt returns the next value where key is equal to seek.
// Skips over any keys that are less than seek.
// If the key doesn't exist then a nil value is returned instead.
func (c *bufCursor) nextAt(seek int64) interface{} {
for {
k, v := c.next()
if k != tsdb.EOF {
if k == seek {
return v
} else if c.ascending && k < seek {
continue
} else if !c.ascending && k > seek {
continue
}
c.unread(k, v)
}
// Return "nil" value for type.
switch c.cur.(type) {
case floatCursor:
return (*float64)(nil)
case integerCursor:
return (*int64)(nil)
case unsignedCursor:
return (*uint64)(nil)
case stringCursor:
return (*string)(nil)
case booleanCursor:
return (*bool)(nil)
default:
panic("unreachable")
}
}
}
// statsBufferCopyIntervalN is the number of points that are read before
// copying the stats buffer to the iterator's stats field. This is used to
// amortize the cost of using a mutex when updating stats.
const statsBufferCopyIntervalN = 100
{{range .}}
type {{.name}}FinalizerIterator struct {
query.{{.Name}}Iterator
logger *zap.Logger
}
func new{{.Name}}FinalizerIterator(inner query.{{.Name}}Iterator, logger *zap.Logger) *{{.name}}FinalizerIterator {
itr := &{{.name}}FinalizerIterator{ {{.Name}}Iterator: inner, logger: logger}
runtime.SetFinalizer(itr, (*{{.name}}FinalizerIterator).closeGC)
return itr
}
func (itr *{{.name}}FinalizerIterator) closeGC() {
go func() {
itr.logger.Error("{{.Name}}Iterator finalized by GC")
itr.Close()
}()
}
func (itr *{{.name}}FinalizerIterator) Close() error {
runtime.SetFinalizer(itr, nil)
return itr.{{.Name}}Iterator.Close()
}
type {{.name}}Iterator struct {
cur {{.name}}Cursor
aux []cursorAt
conds struct {
names []string
curs []cursorAt
}
opt query.IteratorOptions
m map[string]interface{} // map used for condition evaluation
point query.{{.Name}}Point // reusable buffer
statsLock sync.Mutex
stats query.IteratorStats
statsBuf query.IteratorStats
}
func new{{.Name}}Iterator(name string, tags query.Tags, opt query.IteratorOptions, cur {{.name}}Cursor, aux []cursorAt, conds []cursorAt, condNames []string) *{{.name}}Iterator {
itr := &{{.name}}Iterator{
cur: cur,
aux: aux,
opt: opt,
point: query.{{.Name}}Point{
Name: name,
Tags: tags,
},
statsBuf: query.IteratorStats{
SeriesN: 1,
},
}
itr.stats = itr.statsBuf
if len(aux) > 0 {
itr.point.Aux = make([]interface{}, len(aux))
}
if opt.Condition != nil {
itr.m = make(map[string]interface{}, len(aux)+len(conds))
}
itr.conds.names = condNames
itr.conds.curs = conds
return itr
}
// Next returns the next point from the iterator.
func (itr *{{.name}}Iterator) Next() (*query.{{.Name}}Point, error) {
for {
seek := tsdb.EOF
if itr.cur != nil {
// Read from the main cursor if we have one.
itr.point.Time, itr.point.Value = itr.cur.next{{.Name}}()
seek = itr.point.Time
} else {
// Otherwise find lowest aux timestamp.
for i := range itr.aux {
if k, _ := itr.aux[i].peek(); k != tsdb.EOF {
if seek == tsdb.EOF || (itr.opt.Ascending && k < seek) || (!itr.opt.Ascending && k > seek) {
seek = k
}
}
}
itr.point.Time = seek
}
// Exit if we have no more points or we are outside our time range.
if itr.point.Time == tsdb.EOF {
itr.copyStats()
return nil, nil
} else if itr.opt.Ascending && itr.point.Time > itr.opt.EndTime {
itr.copyStats()
return nil, nil
} else if !itr.opt.Ascending && itr.point.Time < itr.opt.StartTime {
itr.copyStats()
return nil, nil
}
// Read from each auxiliary cursor.
for i := range itr.opt.Aux {
itr.point.Aux[i] = itr.aux[i].nextAt(seek)
}
// Read from condition field cursors.
for i := range itr.conds.curs {
itr.m[itr.conds.names[i]] = itr.conds.curs[i].nextAt(seek)
}
// Evaluate condition, if one exists. Retry if it fails.
valuer := influxql.ValuerEval{
Valuer: influxql.MultiValuer(
query.MathValuer{},
influxql.MapValuer(itr.m),
),
}
if itr.opt.Condition != nil && !valuer.EvalBool(itr.opt.Condition) {
continue
}
// Track points returned.
itr.statsBuf.PointN++
// Copy buffer to stats periodically.
if itr.statsBuf.PointN % statsBufferCopyIntervalN == 0 {
itr.copyStats()
}
return &itr.point, nil
}
}
// copyStats copies from the itr stats buffer to the stats under lock.
func (itr *{{.name}}Iterator) copyStats() {
itr.statsLock.Lock()
itr.stats = itr.statsBuf
itr.statsLock.Unlock()
}
// Stats returns stats on the points processed.
func (itr *{{.name}}Iterator) Stats() query.IteratorStats {
itr.statsLock.Lock()
stats := itr.stats
itr.statsLock.Unlock()
return stats
}
// Close closes the iterator.
func (itr *{{.name}}Iterator) Close() error {
cursorsAt(itr.aux).close()
itr.aux = nil
cursorsAt(itr.conds.curs).close()
itr.conds.curs = nil
if itr.cur != nil {
err := itr.cur.close()
itr.cur = nil
return err
}
return nil
}
// {{.name}}LimitIterator
type {{.name}}LimitIterator struct {
input query.{{.Name}}Iterator
opt query.IteratorOptions
n int
}
func new{{.Name}}LimitIterator(input query.{{.Name}}Iterator, opt query.IteratorOptions) *{{.name}}LimitIterator {
return &{{.name}}LimitIterator{
input: input,
opt: opt,
}
}
func (itr *{{.name}}LimitIterator) Stats() query.IteratorStats { return itr.input.Stats() }
func (itr *{{.name}}LimitIterator) Close() error { return itr.input.Close() }
func (itr *{{.name}}LimitIterator) Next() (*query.{{.Name}}Point, error) {
// Check if we are beyond the limit.
if (itr.n-itr.opt.Offset) > itr.opt.Limit {
return nil, nil
}
// Read the next point.
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
// Increment counter.
itr.n++
// Offsets are handled by a higher level iterator so return all points.
return p, nil
}
// {{.name}}Cursor represents an object for iterating over a single {{.name}} field.
type {{.name}}Cursor interface {
cursor
next{{.Name}}() (t int64, v {{.Type}})
}
func new{{.Name}}Cursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) {{.name}}Cursor {
if ascending {
return new{{.Name}}AscendingCursor(seek, cacheValues, tsmKeyCursor)
}
return new{{.Name}}DescendingCursor(seek, cacheValues, tsmKeyCursor)
}
type {{.name}}AscendingCursor struct {
cache struct {
values Values
pos int
}
tsm struct {
values []{{.Name}}Value
pos int
keyCursor *KeyCursor
}
}
func new{{.Name}}AscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *{{.name}}AscendingCursor {
c := &{{.name}}AscendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
return c
}
// peekCache returns the current time/value from the cache.
func (c *{{.name}}AscendingCursor) peekCache() (t int64, v {{.Type}}) {
if c.cache.pos >= len(c.cache.values) {
return tsdb.EOF, {{.Nil}}
}
item := c.cache.values[c.cache.pos]
return item.UnixNano(), item.({{.ValueType}}).value
}
// peekTSM returns the current time/value from tsm.
func (c *{{.name}}AscendingCursor) peekTSM() (t int64, v {{.Type}}) {
if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) {
return tsdb.EOF, {{.Nil}}
}
item := c.tsm.values[c.tsm.pos]
return item.UnixNano(), item.value
}
// close closes the cursor and any dependent cursors.
func (c *{{.name}}AscendingCursor) close() (error) {
if c.tsm.keyCursor == nil {
return nil
}
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
// next returns the next key/value for the cursor.
func (c *{{.name}}AscendingCursor) next() (int64, interface{}) { return c.next{{.Name}}() }
// next{{.Name}} returns the next key/value for the cursor.
func (c *{{.name}}AscendingCursor) next{{.Name}}() (int64, {{.Type}}) {
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// No more data in cache or in TSM files.
if ckey == tsdb.EOF && tkey == tsdb.EOF {
return tsdb.EOF, {{.Nil}}
}
// Both cache and tsm files have the same key, cache takes precedence.
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
// Buffered cache key precedes that in TSM file.
if ckey != tsdb.EOF && (ckey < tkey || tkey == tsdb.EOF) {
c.nextCache()
return ckey, cvalue
}
// Buffered TSM key precedes that in cache.
c.nextTSM()
return tkey, tvalue
}
// nextCache returns the next value from the cache.
func (c *{{.name}}AscendingCursor) nextCache() {
if c.cache.pos >= len(c.cache.values) {
return
}
c.cache.pos++
}
// nextTSM returns the next value from the TSM files.
func (c *{{.name}}AscendingCursor) nextTSM() {
c.tsm.pos++
if c.tsm.pos >= len(c.tsm.values) {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values)
if len(c.tsm.values) == 0 {
return
}
c.tsm.pos = 0
}
}
type {{.name}}DescendingCursor struct {
cache struct {
values Values
pos int
}
tsm struct {
values []{{.Name}}Value
pos int
keyCursor *KeyCursor
}
}
func new{{.Name}}DescendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *{{.name}}DescendingCursor {
c := &{{.name}}DescendingCursor{}
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})
if t, _ := c.peekCache(); t != seek {
c.cache.pos--
}
c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values)
c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
return c.tsm.values[i].UnixNano() >= seek
})
if t, _ := c.peekTSM(); t != seek {
c.tsm.pos--
}
return c
}
// peekCache returns the current time/value from the cache.
func (c *{{.name}}DescendingCursor) peekCache() (t int64, v {{.Type}}) {
if c.cache.pos < 0 || c.cache.pos >= len(c.cache.values) {
return tsdb.EOF, {{.Nil}}
}
item := c.cache.values[c.cache.pos]
return item.UnixNano(), item.({{.ValueType}}).value
}
// peekTSM returns the current time/value from tsm.
func (c *{{.name}}DescendingCursor) peekTSM() (t int64, v {{.Type}}) {
if c.tsm.pos < 0 || c.tsm.pos >= len(c.tsm.values) {
return tsdb.EOF, {{.Nil}}
}
item := c.tsm.values[c.tsm.pos]
return item.UnixNano(), item.value
}
// close closes the cursor and any dependent cursors.
func (c *{{.name}}DescendingCursor) close() (error) {
if c.tsm.keyCursor == nil {
return nil
}
c.tsm.keyCursor.Close()
c.tsm.keyCursor = nil
c.cache.values = nil
c.tsm.values = nil
return nil
}
// next returns the next key/value for the cursor.
func (c *{{.name}}DescendingCursor) next() (int64, interface{}) { return c.next{{.Name}}() }
// next{{.Name}} returns the next key/value for the cursor.
func (c *{{.name}}DescendingCursor) next{{.Name}}() (int64, {{.Type}}) {
ckey, cvalue := c.peekCache()
tkey, tvalue := c.peekTSM()
// No more data in cache or in TSM files.
if ckey == tsdb.EOF && tkey == tsdb.EOF {
return tsdb.EOF, {{.Nil}}
}
// Both cache and tsm files have the same key, cache takes precedence.
if ckey == tkey {
c.nextCache()
c.nextTSM()
return ckey, cvalue
}
// Buffered cache key precedes that in TSM file.
if ckey != tsdb.EOF && (ckey > tkey || tkey == tsdb.EOF) {
c.nextCache()
return ckey, cvalue
}
// Buffered TSM key precedes that in cache.
c.nextTSM()
return tkey, tvalue
}
// nextCache returns the next value from the cache.
func (c *{{.name}}DescendingCursor) nextCache() {
if c.cache.pos < 0 {
return
}
c.cache.pos--
}
// nextTSM returns the next value from the TSM files.
func (c *{{.name}}DescendingCursor) nextTSM() {
c.tsm.pos--
if c.tsm.pos < 0 {
c.tsm.keyCursor.Next()
c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}Block(&c.tsm.values)
if len(c.tsm.values) == 0 {
return
}
c.tsm.pos = len(c.tsm.values) - 1
}
}
{{end}}
var _ = fmt.Print

View File

@ -1,42 +0,0 @@
[
{
"Name":"Float",
"name":"float",
"Type":"float64",
"ValueType":"FloatValue",
"Nil":"0",
"Size":"8"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64",
"ValueType":"IntegerValue",
"Nil":"0",
"Size":"8"
},
{
"Name":"Unsigned",
"name":"unsigned",
"Type":"uint64",
"ValueType":"UnsignedValue",
"Nil":"0",
"Size":"8"
},
{
"Name":"String",
"name":"string",
"Type":"string",
"ValueType":"StringValue",
"Nil":"\"\"",
"Size":"0"
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool",
"ValueType":"BooleanValue",
"Nil":"false",
"Size":"1"
}
]

View File

@ -1,53 +0,0 @@
package tsm1
import (
"fmt"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/tsdb"
"go.uber.org/zap"
)
// literalValueCursor represents a cursor that always returns a single value.
// It doesn't not have a time value so it can only be used with nextAt().
type literalValueCursor struct {
value interface{}
}
func (c *literalValueCursor) close() error { return nil }
func (c *literalValueCursor) peek() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *literalValueCursor) next() (t int64, v interface{}) { return tsdb.EOF, c.value }
func (c *literalValueCursor) nextAt(seek int64) interface{} { return c.value }
type cursorsAt []cursorAt
func (c cursorsAt) close() {
for _, cur := range c {
cur.close()
}
}
// newFinalizerIterator creates a new iterator that installs a runtime finalizer
// to ensure close is eventually called if the iterator is garbage collected.
// This additional guard attempts to protect against clients of CreateIterator not
// correctly closing them and leaking cursors.
func newFinalizerIterator(itr query.Iterator, log *zap.Logger) query.Iterator {
if itr == nil {
return nil
}
switch inner := itr.(type) {
case query.FloatIterator:
return newFloatFinalizerIterator(inner, log)
case query.IntegerIterator:
return newIntegerFinalizerIterator(inner, log)
case query.UnsignedIterator:
return newUnsignedFinalizerIterator(inner, log)
case query.StringIterator:
return newStringFinalizerIterator(inner, log)
case query.BooleanIterator:
return newBooleanFinalizerIterator(inner, log)
default:
panic(fmt.Sprintf("unsupported finalizer iterator type: %T", itr))
}
}

View File

@ -1,161 +0,0 @@
package tsm1
import (
"os"
"runtime"
"testing"
"time"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/logger"
"github.com/influxdata/platform/query"
)
func BenchmarkIntegerIterator_Next(b *testing.B) {
opt := query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}},
}
aux := []cursorAt{
&literalValueCursor{value: "foo bar"},
&literalValueCursor{value: int64(1e3)},
&literalValueCursor{value: float64(1e3)},
&literalValueCursor{value: true},
}
cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
cur.Next()
}
}
type infiniteIntegerCursor struct{}
func (*infiniteIntegerCursor) close() error {
return nil
}
func (*infiniteIntegerCursor) next() (t int64, v interface{}) {
return 0, 0
}
func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) {
return 0, 0
}
type testFinalizerIterator struct {
OnClose func()
}
func (itr *testFinalizerIterator) Next() (*query.FloatPoint, error) {
return nil, nil
}
func (itr *testFinalizerIterator) Close() error {
// Act as if this is a slow finalizer and ensure that it doesn't block
// the finalizer background thread.
itr.OnClose()
return nil
}
func (itr *testFinalizerIterator) Stats() query.IteratorStats {
return query.IteratorStats{}
}
func TestFinalizerIterator(t *testing.T) {
var (
step1 = make(chan struct{})
step2 = make(chan struct{})
step3 = make(chan struct{})
)
l := logger.New(os.Stderr)
done := make(chan struct{})
func() {
itr := &testFinalizerIterator{
OnClose: func() {
// Simulate a slow closing iterator by waiting for the done channel
// to be closed. The done channel is closed by a later finalizer.
close(step1)
<-done
close(step3)
},
}
newFinalizerIterator(itr, l)
}()
for i := 0; i < 100; i++ {
runtime.GC()
}
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The finalizer for the iterator did not run")
close(done)
case <-step1:
// The finalizer has successfully run, but should not have completed yet.
timer.Stop()
}
select {
case <-step3:
t.Fatal("The finalizer should not have finished yet")
default:
}
// Use a fake value that will be collected by the garbage collector and have
// the finalizer close the channel. This finalizer should run after the iterator's
// finalizer.
value := func() int {
foo := &struct {
value int
}{value: 1}
runtime.SetFinalizer(foo, func(value interface{}) {
close(done)
close(step2)
})
return foo.value + 2
}()
if value < 2 {
t.Log("This should never be output")
}
for i := 0; i < 100; i++ {
runtime.GC()
}
timer.Reset(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The second finalizer did not run")
case <-step2:
// The finalizer has successfully run and should have
// closed the done channel.
timer.Stop()
}
// Wait for step3 to finish where the closed value should be set.
timer.Reset(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The iterator was not finalized")
case <-step3:
timer.Stop()
}
}
func TestBufCursor_DoubleClose(t *testing.T) {
c := newBufCursor(nilCursor{}, true)
if err := c.close(); err != nil {
t.Fatalf("error closing: %v", err)
}
// This shouldn't panic
if err := c.close(); err != nil {
t.Fatalf("error closing: %v", err)
}
}