Fixing up tests.

pull/9150/head
Ben Johnson 2017-12-02 16:52:34 -07:00
parent 68de5ca24f
commit e0df47d54f
No known key found for this signature in database
GPG Key ID: 81741CD251883081
11 changed files with 121 additions and 83 deletions

View File

@ -554,7 +554,7 @@ func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorIntern
var fileDescriptorInternal = []byte{ var fileDescriptorInternal = []byte{
// 796 bytes of a gzipped FileDescriptorProto // 796 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x6d, 0x6f, 0xe3, 0x44, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x55, 0x6d, 0x6f, 0xe3, 0x44,
0x10, 0x96, 0xe3, 0x3a, 0x8d, 0x27, 0xcd, 0xf5, 0x58, 0x4a, 0x59, 0xa1, 0x13, 0xb2, 0x2c, 0x40, 0x10, 0x96, 0xe3, 0x3a, 0x8d, 0x27, 0xcd, 0xf5, 0x58, 0x4a, 0x59, 0xa1, 0x13, 0xb2, 0x2c, 0x40,
0x16, 0xa0, 0x22, 0xf5, 0x13, 0x9f, 0x90, 0x72, 0xf4, 0x8a, 0x2a, 0xdd, 0xb5, 0xa7, 0x4d, 0xe9, 0x16, 0xa0, 0x22, 0xf5, 0x13, 0x9f, 0x90, 0x72, 0xf4, 0x8a, 0x2a, 0xdd, 0xb5, 0xa7, 0x4d, 0xe9,
0xf7, 0x25, 0x9e, 0x5a, 0x2b, 0x39, 0xeb, 0xb0, 0x5e, 0xa3, 0xe4, 0x07, 0xf4, 0x87, 0xf1, 0x13, 0xf7, 0x25, 0x9e, 0x5a, 0x2b, 0x39, 0xeb, 0xb0, 0x5e, 0xa3, 0xe4, 0x07, 0xf4, 0x87, 0xf1, 0x13,

View File

@ -14,7 +14,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/davecgh/go-spew/spew"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/influxdata/influxql" "github.com/influxdata/influxql"
) )
@ -141,6 +140,7 @@ func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMe
// Append to the heap. // Append to the heap.
itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput}) itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput})
} }
return itr return itr
} }
@ -960,8 +960,6 @@ func (itr *floatAuxIterator) stream() {
} }
} }
spew.Dump(itr.input)
itr.Close()
close(itr.output) close(itr.output)
itr.fields.close() itr.fields.close()
} }

View File

@ -276,8 +276,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions
func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {
defer close(results) defer close(results)
println("dbg/no.recover") defer e.recover(query, results)
// defer e.recover(query, results)
atomic.AddInt64(&e.stats.ActiveQueries, 1) atomic.AddInt64(&e.stats.ActiveQueries, 1)
atomic.AddInt64(&e.stats.ExecutedQueries, 1) atomic.AddInt64(&e.stats.ExecutedQueries, 1)

View File

@ -85,6 +85,7 @@ type SeriesElem interface {
// SeriesIterator represents a iterator over a list of series. // SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface { type SeriesIterator interface {
Close() error
Next() (SeriesElem, error) Next() (SeriesElem, error)
} }
@ -101,6 +102,8 @@ type seriesIteratorAdapter struct {
itr SeriesIDIterator itr SeriesIDIterator
} }
func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() }
func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) { func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
elem, err := itr.itr.Next() elem, err := itr.itr.Next()
if err != nil { if err != nil {
@ -334,9 +337,7 @@ type seriesIDMergeIterator struct {
} }
func (itr *seriesIDMergeIterator) Close() error { func (itr *seriesIDMergeIterator) Close() error {
for i := range itr.itrs { SeriesIDIterators(itr.itrs).Close()
itr.itrs[i].Close()
}
return nil return nil
} }
@ -697,6 +698,7 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} else if e.SeriesID == 0 { } else if e.SeriesID == 0 {
itr.sitr.Close()
itr.sitr = nil itr.sitr = nil
continue continue
} }
@ -1042,6 +1044,7 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(sfile *SeriesFile, name []byte, e
} else if itr == nil { } else if itr == nil {
return nil, nil return nil, nil
} }
defer itr.Close()
// Iterate over all series and generate keys. // Iterate over all series and generate keys.
var keys [][]byte var keys [][]byte
@ -1087,6 +1090,9 @@ func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *Mea
// Get the series IDs and filter expressions for the RHS. // Get the series IDs and filter expressions for the RHS.
ritr, err := is.seriesByExprIterator(name, expr.RHS, mf) ritr, err := is.seriesByExprIterator(name, expr.RHS, mf)
if err != nil { if err != nil {
if litr != nil {
litr.Close()
}
return nil, err return nil, err
} }
@ -1285,11 +1291,11 @@ func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, val
defer vitr.Close() defer vitr.Close()
var itrs []SeriesIDIterator var itrs []SeriesIDIterator
if err := func() error {
for { for {
e, err := vitr.Next() e, err := vitr.Next()
if err != nil { if err != nil {
SeriesIDIterators(itrs).Close() return err
return nil, err
} else if e != nil { } else if e != nil {
break break
} }
@ -1297,12 +1303,16 @@ func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, val
if !value.Match(e) { if !value.Match(e) {
itr, err := is.TagValueSeriesIDIterator(name, key, e) itr, err := is.TagValueSeriesIDIterator(name, key, e)
if err != nil { if err != nil {
SeriesIDIterators(itrs).Close() return err
return nil, err
} }
itrs = append(itrs, itr) itrs = append(itrs, itr)
} }
} }
return nil
}(); err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
}
mitr, err := is.MeasurementSeriesIDIterator(name) mitr, err := is.MeasurementSeriesIDIterator(name)
if err != nil { if err != nil {
@ -1427,6 +1437,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, sfile *SeriesFil
} else if itr == nil { } else if itr == nil {
return nil return nil
} }
defer itr.Close()
keyIdxs := make(map[string]int, len(keys)) keyIdxs := make(map[string]int, len(keys))
for ki, key := range keys { for ki, key := range keys {

View File

@ -606,8 +606,9 @@ type fileSetSeriesIDIterator struct {
itr tsdb.SeriesIDIterator itr tsdb.SeriesIDIterator
} }
func newFileSetSeriesIDIterator(fs *FileSet, itr tsdb.SeriesIDIterator) *fileSetSeriesIDIterator { func newFileSetSeriesIDIterator(fs *FileSet, itr tsdb.SeriesIDIterator) tsdb.SeriesIDIterator {
if itr == nil { if itr == nil {
fs.Release()
return nil return nil
} }
return &fileSetSeriesIDIterator{fs: fs, itr: itr} return &fileSetSeriesIDIterator{fs: fs, itr: itr}

View File

@ -33,17 +33,25 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
t.Fatal("expected iterator") t.Fatal("expected iterator")
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if e := itr.Next(); e.SeriesID != 0 { if elem, err := itr.Next(); err != nil {
t.Fatalf("expected eof, got: %d", e.SeriesID) t.Fatal(err)
} else if elem.SeriesID != 0 {
t.Fatalf("expected eof, got: %d", elem.SeriesID)
} }
}) })
/* /*
@ -115,14 +123,20 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
t.Fatal("expected iterator") t.Fatal("expected iterator")
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if e := itr.Next(); e.SeriesID != 0 { if elem, err := itr.Next(); err != nil {
t.Fatalf("expected eof, got: %d", e.SeriesID) t.Fatal(err)
} else if elem.SeriesID != 0 {
t.Fatalf("expected eof, got: %d", elem.SeriesID)
} }
}) })
@ -144,17 +158,25 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
t.Fatalf("expected iterator") t.Fatalf("expected iterator")
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` { if elem, err := itr.Next(); err != nil {
t.Fatal(err)
} else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` {
t.Fatalf("unexpected series: %s/%s", name, tags.String()) t.Fatalf("unexpected series: %s/%s", name, tags.String())
} }
if e := itr.Next(); e.SeriesID != 0 { if elem, err := itr.Next(); err != nil {
t.Fatalf("expected eof, got: %d", e.SeriesID) t.Fatal(err)
} else if elem.SeriesID != 0 {
t.Fatalf("expected eof, got: %d", elem.SeriesID)
} }
}) })
} }

View File

@ -389,9 +389,10 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) {
if err != nil { if err != nil {
tsdb.MeasurementIterators(itrs).Close() tsdb.MeasurementIterators(itrs).Close()
return nil, err return nil, err
} } else if itr != nil {
itrs = append(itrs, itr) itrs = append(itrs, itr)
} }
}
return tsdb.MergeMeasurementIterators(itrs...), nil return tsdb.MergeMeasurementIterators(itrs...), nil
} }
@ -403,9 +404,10 @@ func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator,
if err != nil { if err != nil {
tsdb.SeriesIDIterators(itrs).Close() tsdb.SeriesIDIterators(itrs).Close()
return nil, err return nil, err
} } else if itr != nil {
itrs = append(itrs, itr) itrs = append(itrs, itr)
} }
}
return tsdb.MergeSeriesIDIterators(itrs...), nil return tsdb.MergeSeriesIDIterators(itrs...), nil
} }
@ -833,15 +835,4 @@ func (i *Index) UnassignShard(k string, shardID uint64, ts int64) error {
return i.DropSeries([]byte(k), ts) return i.DropSeries([]byte(k), ts)
} }
/*
// SeriesIDIterator returns a series iterator over all matching series.
func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterator, error) {
itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
for k, p := range i.partitions {
itrs = append(itrs, p.seriesIDIterator(opt))
}
return tsdb.MergeSeriesIDIterators(itrs...), nil
}
*/
func (i *Index) Rebuild() {} func (i *Index) Rebuild() {}

View File

@ -109,8 +109,10 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) {
var prevSeriesID uint64 var prevSeriesID uint64
for i := 0; i < len(tvs); i++ { for i := 0; i < len(tvs); i++ {
elem := itr.Next() elem, err := itr.Next()
if elem.SeriesID == 0 { if err != nil {
t.Fatal(err)
} else if elem.SeriesID == 0 {
t.Fatal("got nil series") t.Fatal("got nil series")
} else if elem.SeriesID < prevSeriesID { } else if elem.SeriesID < prevSeriesID {
t.Fatalf("series out of order: %d !< %d ", elem.SeriesID, prevSeriesID) t.Fatalf("series out of order: %d !< %d ", elem.SeriesID, prevSeriesID)

View File

@ -168,36 +168,6 @@ func TestSeriesIDIterator(t *testing.T) {
} }
} }
// Ensure iterator can merge multiple iterators together.
func TestMergeSeriesIDIterators(t *testing.T) {
itr := tsi1.MergeSeriesIDIterators(
&SeriesIDIterator{Elems: []tsdb.SeriesIDElem{
{SeriesID: 1},
{SeriesID: 2},
{SeriesID: 3},
}},
&SeriesIDIterator{},
&SeriesIDIterator{Elems: []tsdb.SeriesIDElem{
{SeriesID: 1},
{SeriesID: 2},
{SeriesID: 3},
{SeriesID: 4},
}},
)
if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 1}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 2}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 3}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 4}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e.SeriesID != 0 {
t.Fatalf("expected nil elem: %#v", e)
}
}
// MeasurementElem represents a test implementation of tsi1.MeasurementElem. // MeasurementElem represents a test implementation of tsi1.MeasurementElem.
type MeasurementElem struct { type MeasurementElem struct {
name []byte name []byte

43
tsdb/index_test.go Normal file
View File

@ -0,0 +1,43 @@
package tsdb_test
import (
"reflect"
"testing"
"github.com/influxdata/influxdb/tsdb"
)
// Ensure iterator can merge multiple iterators together.
func TestMergeSeriesIDIterators(t *testing.T) {
itr := tsdb.MergeSeriesIDIterators(
tsdb.NewSeriesIDSliceIterator([]uint64{1, 2, 3}),
tsdb.NewSeriesIDSliceIterator(nil),
tsdb.NewSeriesIDSliceIterator([]uint64{1, 2, 3, 4}),
)
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 1}) {
t.Fatalf("unexpected elem(0): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 2}) {
t.Fatalf("unexpected elem(1): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 3}) {
t.Fatalf("unexpected elem(2): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 4}) {
t.Fatalf("unexpected elem(3): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if e.SeriesID != 0 {
t.Fatalf("expected nil elem: %#v", e)
}
}

View File

@ -669,6 +669,7 @@ cpu,host=serverB,region=uswest value=25 0
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer itr.Close()
fitr := itr.(query.FloatIterator) fitr := itr.(query.FloatIterator)
// Read values from iterator. // Read values from iterator.