diff --git a/query/internal/internal.pb.go b/query/internal/internal.pb.go index 0b2ba473db..242245ec85 100644 --- a/query/internal/internal.pb.go +++ b/query/internal/internal.pb.go @@ -554,7 +554,7 @@ func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorIntern var fileDescriptorInternal = []byte{ // 796 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x6d, 0x6f, 0xe3, 0x44, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x55, 0x6d, 0x6f, 0xe3, 0x44, 0x10, 0x96, 0xe3, 0x3a, 0x8d, 0x27, 0xcd, 0xf5, 0x58, 0x4a, 0x59, 0xa1, 0x13, 0xb2, 0x2c, 0x40, 0x16, 0xa0, 0x22, 0xf5, 0x13, 0x9f, 0x90, 0x72, 0xf4, 0x8a, 0x2a, 0xdd, 0xb5, 0xa7, 0x4d, 0xe9, 0xf7, 0x25, 0x9e, 0x5a, 0x2b, 0x39, 0xeb, 0xb0, 0x5e, 0xa3, 0xe4, 0x07, 0xf4, 0x87, 0xf1, 0x13, diff --git a/query/iterator.gen.go b/query/iterator.gen.go index 850a49988a..c3e624a1d1 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -14,7 +14,6 @@ import ( "sync" "time" - "github.com/davecgh/go-spew/spew" "github.com/gogo/protobuf/proto" "github.com/influxdata/influxql" ) @@ -141,6 +140,7 @@ func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMe // Append to the heap. itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput}) } + return itr } @@ -960,8 +960,6 @@ func (itr *floatAuxIterator) stream() { } } - spew.Dump(itr.input) - itr.Close() close(itr.output) itr.fields.close() } diff --git a/query/query_executor.go b/query/query_executor.go index a49655f8a6..36a36fd871 100644 --- a/query/query_executor.go +++ b/query/query_executor.go @@ -276,8 +276,7 @@ func (e *QueryExecutor) ExecuteQuery(query *influxql.Query, opt ExecutionOptions func (e *QueryExecutor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) { defer close(results) - println("dbg/no.recover") - // defer e.recover(query, results) + defer e.recover(query, results) atomic.AddInt64(&e.stats.ActiveQueries, 1) atomic.AddInt64(&e.stats.ExecutedQueries, 1) diff --git a/tsdb/index.go b/tsdb/index.go index 7fd52a0b12..950315bf1e 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -85,6 +85,7 @@ type SeriesElem interface { // SeriesIterator represents a iterator over a list of series. type SeriesIterator interface { + Close() error Next() (SeriesElem, error) } @@ -101,6 +102,8 @@ type seriesIteratorAdapter struct { itr SeriesIDIterator } +func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() } + func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) { elem, err := itr.itr.Next() if err != nil { @@ -334,9 +337,7 @@ type seriesIDMergeIterator struct { } func (itr *seriesIDMergeIterator) Close() error { - for i := range itr.itrs { - itr.itrs[i].Close() - } + SeriesIDIterators(itr.itrs).Close() return nil } @@ -697,6 +698,7 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { if err != nil { return nil, err } else if e.SeriesID == 0 { + itr.sitr.Close() itr.sitr = nil continue } @@ -1042,6 +1044,7 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(sfile *SeriesFile, name []byte, e } else if itr == nil { return nil, nil } + defer itr.Close() // Iterate over all series and generate keys. var keys [][]byte @@ -1087,6 +1090,9 @@ func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *Mea // Get the series IDs and filter expressions for the RHS. ritr, err := is.seriesByExprIterator(name, expr.RHS, mf) if err != nil { + if litr != nil { + litr.Close() + } return nil, err } @@ -1285,23 +1291,27 @@ func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, val defer vitr.Close() var itrs []SeriesIDIterator - for { - e, err := vitr.Next() - if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err - } else if e != nil { - break - } - - if !value.Match(e) { - itr, err := is.TagValueSeriesIDIterator(name, key, e) + if err := func() error { + for { + e, err := vitr.Next() if err != nil { - SeriesIDIterators(itrs).Close() - return nil, err + return err + } else if e != nil { + break + } + + if !value.Match(e) { + itr, err := is.TagValueSeriesIDIterator(name, key, e) + if err != nil { + return err + } + itrs = append(itrs, itr) } - itrs = append(itrs, itr) } + return nil + }(); err != nil { + SeriesIDIterators(itrs).Close() + return nil, err } mitr, err := is.MeasurementSeriesIDIterator(name) @@ -1427,6 +1437,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, sfile *SeriesFil } else if itr == nil { return nil } + defer itr.Close() keyIdxs := make(map[string]int, len(keys)) for ki, key := range keys { diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index d381810437..97933309df 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -606,8 +606,9 @@ type fileSetSeriesIDIterator struct { itr tsdb.SeriesIDIterator } -func newFileSetSeriesIDIterator(fs *FileSet, itr tsdb.SeriesIDIterator) *fileSetSeriesIDIterator { +func newFileSetSeriesIDIterator(fs *FileSet, itr tsdb.SeriesIDIterator) tsdb.SeriesIDIterator { if itr == nil { + fs.Release() return nil } return &fileSetSeriesIDIterator{fs: fs, itr: itr} diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 756cbb1e90..12e49ef891 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -33,17 +33,25 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { t.Fatal("expected iterator") } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `mem` || tags.String() != `[{region east}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if e := itr.Next(); e.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", e.SeriesID) + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if elem.SeriesID != 0 { + t.Fatalf("expected eof, got: %d", elem.SeriesID) } }) /* @@ -115,14 +123,20 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { t.Fatal("expected iterator") } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if e := itr.Next(); e.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", e.SeriesID) + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if elem.SeriesID != 0 { + t.Fatalf("expected eof, got: %d", elem.SeriesID) } }) @@ -144,17 +158,25 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { t.Fatalf("expected iterator") } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region east}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region west}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if name, tags := fs.SeriesFile().Series(itr.Next().SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` { + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if name, tags := fs.SeriesFile().Series(elem.SeriesID); string(name) != `cpu` || tags.String() != `[{region north}]` { t.Fatalf("unexpected series: %s/%s", name, tags.String()) } - if e := itr.Next(); e.SeriesID != 0 { - t.Fatalf("expected eof, got: %d", e.SeriesID) + if elem, err := itr.Next(); err != nil { + t.Fatal(err) + } else if elem.SeriesID != 0 { + t.Fatalf("expected eof, got: %d", elem.SeriesID) } }) } diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 78a2dbb17a..8ab51a8159 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -389,8 +389,9 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) { if err != nil { tsdb.MeasurementIterators(itrs).Close() return nil, err + } else if itr != nil { + itrs = append(itrs, itr) } - itrs = append(itrs, itr) } return tsdb.MergeMeasurementIterators(itrs...), nil } @@ -403,8 +404,9 @@ func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, if err != nil { tsdb.SeriesIDIterators(itrs).Close() return nil, err + } else if itr != nil { + itrs = append(itrs, itr) } - itrs = append(itrs, itr) } return tsdb.MergeSeriesIDIterators(itrs...), nil } @@ -833,15 +835,4 @@ func (i *Index) UnassignShard(k string, shardID uint64, ts int64) error { return i.DropSeries([]byte(k), ts) } -/* -// SeriesIDIterator returns a series iterator over all matching series. -func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterator, error) { - itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions)) - for k, p := range i.partitions { - itrs = append(itrs, p.seriesIDIterator(opt)) - } - return tsdb.MergeSeriesIDIterators(itrs...), nil -} -*/ - func (i *Index) Rebuild() {} diff --git a/tsdb/index/tsi1/log_file_test.go b/tsdb/index/tsi1/log_file_test.go index 1d7e795bd2..1a39702920 100644 --- a/tsdb/index/tsi1/log_file_test.go +++ b/tsdb/index/tsi1/log_file_test.go @@ -109,8 +109,10 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) { var prevSeriesID uint64 for i := 0; i < len(tvs); i++ { - elem := itr.Next() - if elem.SeriesID == 0 { + elem, err := itr.Next() + if err != nil { + t.Fatal(err) + } else if elem.SeriesID == 0 { t.Fatal("got nil series") } else if elem.SeriesID < prevSeriesID { t.Fatalf("series out of order: %d !< %d ", elem.SeriesID, prevSeriesID) diff --git a/tsdb/index/tsi1/tsi1_test.go b/tsdb/index/tsi1/tsi1_test.go index d18bcb0187..234c4a1f5d 100644 --- a/tsdb/index/tsi1/tsi1_test.go +++ b/tsdb/index/tsi1/tsi1_test.go @@ -168,36 +168,6 @@ func TestSeriesIDIterator(t *testing.T) { } } -// Ensure iterator can merge multiple iterators together. -func TestMergeSeriesIDIterators(t *testing.T) { - itr := tsi1.MergeSeriesIDIterators( - &SeriesIDIterator{Elems: []tsdb.SeriesIDElem{ - {SeriesID: 1}, - {SeriesID: 2}, - {SeriesID: 3}, - }}, - &SeriesIDIterator{}, - &SeriesIDIterator{Elems: []tsdb.SeriesIDElem{ - {SeriesID: 1}, - {SeriesID: 2}, - {SeriesID: 3}, - {SeriesID: 4}, - }}, - ) - - if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 1}) { - t.Fatalf("unexpected elem(0): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 2}) { - t.Fatalf("unexpected elem(1): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 3}) { - t.Fatalf("unexpected elem(2): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 4}) { - t.Fatalf("unexpected elem(3): %#v", e) - } else if e := itr.Next(); e.SeriesID != 0 { - t.Fatalf("expected nil elem: %#v", e) - } -} - // MeasurementElem represents a test implementation of tsi1.MeasurementElem. type MeasurementElem struct { name []byte diff --git a/tsdb/index_test.go b/tsdb/index_test.go new file mode 100644 index 0000000000..edf6ce924d --- /dev/null +++ b/tsdb/index_test.go @@ -0,0 +1,43 @@ +package tsdb_test + +import ( + "reflect" + "testing" + + "github.com/influxdata/influxdb/tsdb" +) + +// Ensure iterator can merge multiple iterators together. +func TestMergeSeriesIDIterators(t *testing.T) { + itr := tsdb.MergeSeriesIDIterators( + tsdb.NewSeriesIDSliceIterator([]uint64{1, 2, 3}), + tsdb.NewSeriesIDSliceIterator(nil), + tsdb.NewSeriesIDSliceIterator([]uint64{1, 2, 3, 4}), + ) + + if e, err := itr.Next(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 1}) { + t.Fatalf("unexpected elem(0): %#v", e) + } + if e, err := itr.Next(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 2}) { + t.Fatalf("unexpected elem(1): %#v", e) + } + if e, err := itr.Next(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 3}) { + t.Fatalf("unexpected elem(2): %#v", e) + } + if e, err := itr.Next(); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: 4}) { + t.Fatalf("unexpected elem(3): %#v", e) + } + if e, err := itr.Next(); err != nil { + t.Fatal(err) + } else if e.SeriesID != 0 { + t.Fatalf("expected nil elem: %#v", e) + } +} diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index b38ae7e7f0..64fcea7ca5 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -669,6 +669,7 @@ cpu,host=serverB,region=uswest value=25 0 if err != nil { t.Fatal(err) } + defer itr.Close() fitr := itr.(query.FloatIterator) // Read values from iterator.