diff --git a/tsdb/engine/tsi1/index.go b/tsdb/engine/tsi1/index.go index 833384a253..4549a32b50 100644 --- a/tsdb/engine/tsi1/index.go +++ b/tsdb/engine/tsi1/index.go @@ -55,8 +55,8 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement { // TODO: Remove concept of series ids. m.AddSeries(&tsdb.Series{ ID: id, - Key: string(e.Name), - Tags: models.CopyTags(e.Tags), + Key: string(e.Name()), + Tags: models.CopyTags(e.Tags()), }) // TEMPORARY: Increment ID. @@ -74,7 +74,7 @@ func (i *Index) Measurements() (tsdb.Measurements, error) { var mms tsdb.Measurements itr := i.file.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { - mms = append(mms, i.measurement(e.Name)) + mms = append(mms, i.measurement(e.Name())) } return mms, nil } @@ -164,17 +164,17 @@ func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *r var matched bool switch op { case influxql.EQ: - matched = string(e.Name) == val + matched = string(e.Name()) == val case influxql.NEQ: - matched = string(e.Name) != val + matched = string(e.Name()) != val case influxql.EQREGEX: - matched = regex.Match(e.Name) + matched = regex.Match(e.Name()) case influxql.NEQREGEX: - matched = !regex.Match(e.Name) + matched = !regex.Match(e.Name()) } if matched { - mms = append(mms, i.measurement(e.Name)) + mms = append(mms, i.measurement(e.Name())) } } sort.Sort(mms) @@ -185,7 +185,7 @@ func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, rege var mms tsdb.Measurements itr := i.file.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { - mm := i.measurement(e.Name) + mm := i.measurement(e.Name()) tagVals := mm.SeriesByTagKeyValue(key) if tagVals == nil { @@ -233,8 +233,8 @@ func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) mms := make([]*tsdb.Measurement, 0, len(names)) for e := itr.Next(); e != nil; e = itr.Next() { for _, name := range names { - if string(e.Name) == name { - mms = append(mms, i.measurement(e.Name)) + if string(e.Name()) == name { + mms = append(mms, i.measurement(e.Name())) break } } @@ -246,8 +246,8 @@ func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error itr := i.file.MeasurementIterator() var mms tsdb.Measurements for e := itr.Next(); e != nil; e = itr.Next() { - if re.Match(e.Name) { - mms = append(mms, i.measurement(e.Name)) + if re.Match(e.Name()) { + mms = append(mms, i.measurement(e.Name())) } } return mms, nil diff --git a/tsdb/engine/tsi1/index_file.go b/tsdb/engine/tsi1/index_file.go index 123f8e8e56..f43c1a30d3 100644 --- a/tsdb/engine/tsi1/index_file.go +++ b/tsdb/engine/tsi1/index_file.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "errors" "io" + + "github.com/influxdata/influxdb/models" ) // IndexFileVersion is the current TSI1 index file version. @@ -110,8 +112,8 @@ func (i *IndexFile) TagValueElem(name, key, value []byte) (TagSetValueElem, erro // tagSetBlock returns a tag set block for a measurement. func (i *IndexFile) tagSetBlock(e *MeasurementBlockElem) (TagSet, error) { // Slice tag set data. - buf := i.data[e.TagSet.Offset:] - buf = buf[:e.TagSet.Size] + buf := i.data[e.tagSet.offset:] + buf = buf[:e.tagSet.size] // Unmarshal block. var blk TagSet @@ -131,13 +133,13 @@ func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator { // Find measurement element. e, ok := i.mblk.Elem(name) if !ok { - return &seriesIterator{} + return &rawSeriesIterator{n: 0} } // Return iterator. return &rawSeriesIterator{ - n: e.Series.N, - data: e.Series.Data, + n: e.series.n, + data: e.series.data, seriesList: &i.slist, } } @@ -151,12 +153,12 @@ type rawSeriesIterator struct { seriesList *SeriesList // reusable buffer - e SeriesElem + e rawSeriesElem } // Next returns the next decoded series. Uses name & tags as reusable buffers. // Returns nils when the iterator is complete. -func (itr *rawSeriesIterator) Next() *SeriesElem { +func (itr *rawSeriesIterator) Next() SeriesElem { // Return nil if we've reached the end. if itr.i == itr.n { return nil @@ -166,7 +168,7 @@ func (itr *rawSeriesIterator) Next() *SeriesElem { offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:]) // Read from series list into buffers. - itr.seriesList.DecodeSeriesAt(offset, &itr.e.Name, &itr.e.Tags, &itr.e.Deleted) + itr.seriesList.DecodeSeriesAt(offset, &itr.e.name, &itr.e.tags, &itr.e.deleted) // Move iterator forward. itr.i++ @@ -174,6 +176,16 @@ func (itr *rawSeriesIterator) Next() *SeriesElem { return &itr.e } +type rawSeriesElem struct { + name []byte + tags models.Tags + deleted bool +} + +func (e *rawSeriesElem) Name() []byte { return e.name } +func (e *rawSeriesElem) Tags() models.Tags { return e.tags } +func (e *rawSeriesElem) Deleted() bool { return e.deleted } + // ReadIndexFileTrailer returns the index file trailer from data. func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) { var t IndexFileTrailer diff --git a/tsdb/engine/tsi1/index_files.go b/tsdb/engine/tsi1/index_files.go index 7ee845cebb..031e642bc9 100644 --- a/tsdb/engine/tsi1/index_files.go +++ b/tsdb/engine/tsi1/index_files.go @@ -13,7 +13,7 @@ func (p *IndexFiles) MeasurementNames() [][]byte { itr := p.MeasurementIterator() var names [][]byte for e := itr.Next(); e != nil; e = itr.Next() { - names = append(names, copyBytes(e.Name)) + names = append(names, copyBytes(e.Name())) } sort.Sort(byteSlices(names)) return names @@ -98,7 +98,7 @@ func (p *IndexFiles) writeSeriesListTo(w io.Writer, info *indexCompactInfo, n *i // Write all series. sw := NewSeriesListWriter() for e := itr.Next(); e != nil; e = itr.Next() { - if err := sw.Add(e.Name, e.Tags); err != nil { + if err := sw.Add(e.Name(), e.Tags()); err != nil { return err } } @@ -132,18 +132,18 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI tsw := NewTagSetWriter() for ke := kitr.Next(); ke != nil; ke = kitr.Next() { // Mark tag deleted. - if ke.Deleted { - tsw.DeleteTag(ke.Key) + if ke.Deleted() { + tsw.DeleteTag(ke.Key()) } // Iterate over tag values. - vitr := p.TagValueIterator(name, ke.Key) + vitr := ke.TagValueIterator() for ve := vitr.Next(); ve != nil; ve = vitr.Next() { // Look-up series ids. - sitr := p.TagValueSeriesIterator(name, ke.Key, ve.Value) + sitr := ve.SeriesIterator() var seriesIDs []uint32 for se := sitr.Next(); se != nil; se = sitr.Next() { - seriesID := info.sw.Offset(se.Name, se.Tags) + seriesID := info.sw.Offset(se.Name(), se.Tags()) if seriesID == 0 { panic("expected series id") } @@ -152,7 +152,7 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI sort.Sort(uint32Slice(seriesIDs)) // Insert tag value into writer. - tsw.AddTagValue(name, ve.Value, ve.Deleted, seriesIDs) + tsw.AddTagValue(name, ve.Value(), ve.Deleted(), seriesIDs) } } @@ -184,7 +184,7 @@ func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo itr := p.MeasurementSeriesIterator(name) var seriesIDs []uint32 for e := itr.Next(); e != nil; e = itr.Next() { - seriesID := info.sw.Offset(e.Name, e.Tags) + seriesID := info.sw.Offset(e.Name(), e.Tags()) if seriesID == 0 { panic("expected series id") } diff --git a/tsdb/engine/tsi1/log_file.go b/tsdb/engine/tsi1/log_file.go index 80e08abb3a..327a0e793d 100644 --- a/tsdb/engine/tsi1/log_file.go +++ b/tsdb/engine/tsi1/log_file.go @@ -215,14 +215,11 @@ func (f *LogFile) measurement(name []byte) logMeasurement { // MeasurementIterator returns an iterator over all the measurements in the file. func (f *LogFile) MeasurementIterator() MeasurementIterator { - var itr measurementIterator + var itr logMeasurementIterator for _, mm := range f.mms { - itr.elems = append(itr.elems, MeasurementElem{ - Name: mm.name, - Deleted: mm.deleted, - }) + itr.mms = append(itr.mms, mm) } - sort.Sort(MeasurementElems(itr.elems)) + sort.Sort(logMeasurementSlice(itr.mms)) return &itr } @@ -555,6 +552,31 @@ type logMeasurement struct { seriesIDs []uint32 // series offsets } +func (m *logMeasurement) Name() []byte { return m.name } +func (m *logMeasurement) Deleted() bool { return m.deleted } +func (m *logMeasurement) TagKeyIterator() TagKeyIterator { panic("TODO") } + +// logMeasurementSlice is a sortable list of log measurements. +type logMeasurementSlice []logMeasurement + +func (a logMeasurementSlice) Len() int { return len(a) } +func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } + +// logMeasurementIterator represents an iterator over a slice of measurements. +type logMeasurementIterator struct { + mms []logMeasurement +} + +// Next returns the next element in the iterator. +func (itr *logMeasurementIterator) Next() (e MeasurementElem) { + if len(itr.mms) == 0 { + return nil + } + e, itr.mms = &itr.mms[0], itr.mms[1:] + return e +} + type logTagSet struct { name []byte tagValues map[string]logTagValue diff --git a/tsdb/engine/tsi1/log_file_test.go b/tsdb/engine/tsi1/log_file_test.go index b504f6dc1d..aaae05d9c0 100644 --- a/tsdb/engine/tsi1/log_file_test.go +++ b/tsdb/engine/tsi1/log_file_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io/ioutil" "os" - "reflect" "testing" "github.com/influxdata/influxdb/models" @@ -27,9 +26,9 @@ func TestLogFile_AddSeries(t *testing.T) { // Verify data. itr := f.MeasurementIterator() - if e := itr.Next(); e == nil || string(e.Name) != "cpu" { + if e := itr.Next(); e == nil || string(e.Name()) != "cpu" { t.Fatalf("unexpected measurement: %#v", e) - } else if e := itr.Next(); e == nil || string(e.Name) != "mem" { + } else if e := itr.Next(); e == nil || string(e.Name()) != "mem" { t.Fatalf("unexpected measurement: %#v", e) } else if e := itr.Next(); e != nil { t.Fatalf("expected eof, got: %#v", e) @@ -57,10 +56,10 @@ func TestLogFile_DeleteMeasurement(t *testing.T) { // Verify data. itr := f.MeasurementIterator() - if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("cpu"), Deleted: true}) { - t.Fatalf("unexpected measurement: %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("mem")}) { - t.Fatalf("unexpected measurement: %#v", e) + if e := itr.Next(); string(e.Name()) != "cpu" || !e.Deleted() { + t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted()) + } else if e := itr.Next(); string(e.Name()) != "mem" || e.Deleted() { + t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted()) } else if e := itr.Next(); e != nil { t.Fatalf("expected eof, got: %#v", e) } diff --git a/tsdb/engine/tsi1/measurement_block.go b/tsdb/engine/tsi1/measurement_block.go index 63b16d40f7..5deec49415 100644 --- a/tsdb/engine/tsi1/measurement_block.go +++ b/tsdb/engine/tsi1/measurement_block.go @@ -72,12 +72,12 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) e.UnmarshalBinary(blk.data[offset:]) // Return if name match. - if bytes.Equal(e.Name, name) { + if bytes.Equal(e.name, name) { return e, true } // Check if we've exceeded the probe distance. - if d > dist(hashKey(e.Name), pos, int(n)) { + if d > dist(hashKey(e.name), pos, int(n)) { return MeasurementBlockElem{}, false } } @@ -124,29 +124,22 @@ func (blk *MeasurementBlock) Iterator() MeasurementIterator { // blockMeasurementIterator iterates over a list measurements in a block. type blockMeasurementIterator struct { - elem MeasurementElem + elem MeasurementBlockElem data []byte } // Next returns the next measurement. Returns nil when iterator is complete. -func (itr *blockMeasurementIterator) Next() *MeasurementElem { +func (itr *blockMeasurementIterator) Next() MeasurementElem { // Return nil when we run out of data. if len(itr.data) == 0 { return nil } // Unmarshal the element at the current position. - var elem MeasurementBlockElem - elem.UnmarshalBinary(itr.data) - - // Copy to a generic measurement element. - itr.elem = MeasurementElem{ - Name: elem.Name, - Deleted: elem.Deleted(), - } + itr.elem.UnmarshalBinary(itr.data) // Move the data forward past the record. - itr.data = itr.data[elem.Size:] + itr.data = itr.data[itr.elem.size:] return &itr.elem } @@ -200,37 +193,49 @@ type MeasurementBlockTrailer struct { // MeasurementBlockElem represents an internal measurement element. type MeasurementBlockElem struct { - Flag byte // flag - Name []byte // measurement name + flag byte // flag + name []byte // measurement name - TagSet struct { - Offset int64 - Size int64 + tagSet struct { + offset int64 + size int64 } - Series struct { - N uint32 // series count - Data []byte // serialized series data + series struct { + n uint32 // series count + data []byte // serialized series data } - // Size in bytes, set after unmarshaling. - Size int + // size in bytes, set after unmarshaling. + size int } +// Name returns the measurement name. +func (e *MeasurementBlockElem) Name() []byte { return e.name } + // Deleted returns true if the tombstone flag is set. func (e *MeasurementBlockElem) Deleted() bool { - return (e.Flag & MeasurementTombstoneFlag) != 0 + return (e.flag & MeasurementTombstoneFlag) != 0 } +// TagKeyIterator returns an iterator over the measurement's keys. +func (e *MeasurementBlockElem) TagKeyIterator() TagKeyIterator { panic("TODO") } + +// TagSetOffset returns the offset of the measurement's tagset block. +func (e *MeasurementBlockElem) TagSetOffset() int64 { return e.tagSet.offset } + +// TagSetSize returns the size of the measurement's tagset block. +func (e *MeasurementBlockElem) TagSetSize() int64 { return e.tagSet.size } + // SeriesID returns series ID at an index. func (e *MeasurementBlockElem) SeriesID(i int) uint32 { - return binary.BigEndian.Uint32(e.Series.Data[i*SeriesIDSize:]) + return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:]) } // SeriesIDs returns a list of decoded series ids. func (e *MeasurementBlockElem) SeriesIDs() []uint32 { - a := make([]uint32, e.Series.N) - for i := 0; i < int(e.Series.N); i++ { + a := make([]uint32, e.series.n) + for i := 0; i < int(e.series.n); i++ { a[i] = e.SeriesID(i) } return a @@ -241,23 +246,23 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { start := len(data) // Parse flag data. - e.Flag, data = data[0], data[1:] + e.flag, data = data[0], data[1:] // Parse tagset offset. - e.TagSet.Offset, data = int64(binary.BigEndian.Uint64(data)), data[8:] - e.TagSet.Size, data = int64(binary.BigEndian.Uint64(data)), data[8:] + e.tagSet.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:] + e.tagSet.size, data = int64(binary.BigEndian.Uint64(data)), data[8:] // Parse name. sz, n := binary.Uvarint(data) - e.Name, data = data[n:n+int(sz)], data[n+int(sz):] + e.name, data = data[n:n+int(sz)], data[n+int(sz):] // Parse series data. v, n := binary.Uvarint(data) - e.Series.N, data = uint32(v), data[n:] - e.Series.Data, data = data[:e.Series.N*SeriesIDSize], data[e.Series.N*SeriesIDSize:] + e.series.n, data = uint32(v), data[n:] + e.series.data, data = data[:e.series.n*SeriesIDSize], data[e.series.n*SeriesIDSize:] // Save length of elem. - e.Size = start - len(data) + e.size = start - len(data) return nil } diff --git a/tsdb/engine/tsi1/measurement_block_test.go b/tsdb/engine/tsi1/measurement_block_test.go index ccf81d1ed4..8b38257e92 100644 --- a/tsdb/engine/tsi1/measurement_block_test.go +++ b/tsdb/engine/tsi1/measurement_block_test.go @@ -33,24 +33,24 @@ func TestMeasurementBlockWriter(t *testing.T) { // Verify data in block. if e, ok := blk.Elem([]byte("foo")); !ok { t.Fatal("expected element") - } else if e.TagSet.Offset != 100 || e.TagSet.Size != 10 { - t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size) + } else if e.TagSetOffset() != 100 || e.TagSetSize() != 10 { + t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize()) } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{1, 3, 4}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } if e, ok := blk.Elem([]byte("bar")); !ok { t.Fatal("expected element") - } else if e.TagSet.Offset != 200 || e.TagSet.Size != 20 { - t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size) + } else if e.TagSetOffset() != 200 || e.TagSetSize() != 20 { + t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize()) } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{2}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } if e, ok := blk.Elem([]byte("baz")); !ok { t.Fatal("expected element") - } else if e.TagSet.Offset != 300 || e.TagSet.Size != 30 { - t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size) + } else if e.TagSetOffset() != 300 || e.TagSetSize() != 30 { + t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize()) } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{5, 6}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } diff --git a/tsdb/engine/tsi1/tsi1.go b/tsdb/engine/tsi1/tsi1.go index 92a2f128e1..0d341b2b7a 100644 --- a/tsdb/engine/tsi1/tsi1.go +++ b/tsdb/engine/tsi1/tsi1.go @@ -12,9 +12,10 @@ import ( ) // MeasurementElem represents a generic measurement element. -type MeasurementElem struct { - Deleted bool - Name []byte +type MeasurementElem interface { + Name() []byte + Deleted() bool + TagKeyIterator() TagKeyIterator } // MeasurementElems represents a list of MeasurementElem. @@ -22,30 +23,11 @@ type MeasurementElems []MeasurementElem func (a MeasurementElems) Len() int { return len(a) } func (a MeasurementElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a MeasurementElems) Less(i, j int) bool { return bytes.Compare(a[i].Name, a[j].Name) == -1 } +func (a MeasurementElems) Less(i, j int) bool { return bytes.Compare(a[i].Name(), a[j].Name()) == -1 } // MeasurementIterator represents a iterator over a list of measurements. type MeasurementIterator interface { - Next() *MeasurementElem -} - -// NewMeasurementIterator returns an iterator that operates on an in-memory slice. -func NewMeasurementIterator(elems []MeasurementElem) MeasurementIterator { - return &measurementIterator{elems: elems} -} - -// measurementIterator represents an iterator over a slice of measurements. -type measurementIterator struct { - elems []MeasurementElem -} - -// Next shifts the next element off the list. -func (itr *measurementIterator) Next() (e *MeasurementElem) { - if len(itr.elems) == 0 { - return nil - } - e, itr.elems = &itr.elems[0], itr.elems[1:] - return e + Next() MeasurementElem } // MergeMeasurementIterators returns an iterator that merges a set of iterators. @@ -59,9 +41,7 @@ func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator // Initialize buffers. for i := range itr.itrs { - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } + itr.buf[i] = itr.itrs[i].Next() } return itr @@ -77,16 +57,14 @@ type measurementMergeIterator struct { // // If multiple iterators contain the same name then the first is returned // and the remaining ones are skipped. -func (itr *measurementMergeIterator) Next() *MeasurementElem { - itr.e = MeasurementElem{} - +func (itr *measurementMergeIterator) Next() MeasurementElem { // Find next lowest name amongst the buffers. var name []byte for i := range itr.buf { - if len(itr.buf[i].Name) == 0 { + if itr.buf[i] == nil { continue - } else if name == nil || bytes.Compare(itr.buf[i].Name, name) == -1 { - name = itr.buf[i].Name + } else if name == nil || bytes.Compare(itr.buf[i].Name(), name) == -1 { + name = itr.buf[i].Name() } } @@ -96,55 +74,34 @@ func (itr *measurementMergeIterator) Next() *MeasurementElem { } // Refill buffer. - for i := range itr.buf { - if !bytes.Equal(itr.buf[i].Name, name) { + var e MeasurementElem + for i, buf := range itr.buf { + if buf == nil || !bytes.Equal(buf.Name(), name) { continue } // Copy first matching buffer to the return buffer. - if len(itr.e.Name) == 0 { - itr.e = itr.buf[i] + if e == nil { + e = buf } // Fill buffer with next element. - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } else { - itr.buf[i] = MeasurementElem{} - } + itr.buf[i] = itr.itrs[i].Next() } - return &itr.e + return e } // TagKeyElem represents a generic tag key element. -type TagKeyElem struct { - Key []byte - Deleted bool +type TagKeyElem interface { + Key() []byte + Deleted() bool + TagValueIterator() TagValueIterator } // TagKeyIterator represents a iterator over a list of tag keys. type TagKeyIterator interface { - Next() *TagKeyElem -} - -// NewTagKeyIterator returns an iterator that operates on an in-memory slice. -func NewTagKeyIterator(a []TagKeyElem) TagKeyIterator { - return &tagKeyIterator{elems: a} -} - -// tagKeyIterator represents an iterator over a slice of tag keys. -type tagKeyIterator struct { - elems []TagKeyElem -} - -// Next returns the next element. -func (itr *tagKeyIterator) Next() (e *TagKeyElem) { - if len(itr.elems) == 0 { - return nil - } - e, itr.elems = &itr.elems[0], itr.elems[1:] - return e + Next() TagKeyElem } // MergeTagKeyIterators returns an iterator that merges a set of iterators. @@ -158,9 +115,7 @@ func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator { // Initialize buffers. for i := range itr.itrs { - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } + itr.buf[i] = itr.itrs[i].Next() } return itr @@ -176,16 +131,14 @@ type tagKeyMergeIterator struct { // // If multiple iterators contain the same key then the first is returned // and the remaining ones are skipped. -func (itr *tagKeyMergeIterator) Next() *TagKeyElem { - itr.e = TagKeyElem{} - +func (itr *tagKeyMergeIterator) Next() TagKeyElem { // Find next lowest key amongst the buffers. var key []byte for i := range itr.buf { - if len(itr.buf[i].Key) == 0 { + if itr.buf[i] == nil { continue - } else if key == nil || bytes.Compare(itr.buf[i].Key, key) == -1 { - key = itr.buf[i].Key + } else if key == nil || bytes.Compare(itr.buf[i].Key(), key) == -1 { + key = itr.buf[i].Key() } } @@ -195,55 +148,34 @@ func (itr *tagKeyMergeIterator) Next() *TagKeyElem { } // Refill buffer. + var e TagKeyElem for i := range itr.buf { - if !bytes.Equal(itr.buf[i].Key, key) { + if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Key(), key) { continue } // Copy first matching buffer to the return buffer. - if len(itr.e.Key) == 0 { - itr.e = itr.buf[i] + if e == nil { + e = itr.buf[i] } // Fill buffer with next element. - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } else { - itr.buf[i] = TagKeyElem{} - } + itr.buf[i] = itr.itrs[i].Next() } - return &itr.e + return e } // TagValueElem represents a generic tag value element. -type TagValueElem struct { - Value []byte - Deleted bool +type TagValueElem interface { + Value() []byte + Deleted() bool + SeriesIterator() SeriesIterator } // TagValueIterator represents a iterator over a list of tag values. type TagValueIterator interface { - Next() *TagValueElem -} - -// NewTagValueIterator returns an iterator that operates on an in-memory slice. -func NewTagValueIterator(a []TagValueElem) TagValueIterator { - return &tagValueIterator{elems: a} -} - -// tagValueIterator represents an iterator over a slice of tag values. -type tagValueIterator struct { - elems []TagValueElem -} - -// Next returns the next element. -func (itr *tagValueIterator) Next() (e *TagValueElem) { - if len(itr.elems) == 0 { - return nil - } - e, itr.elems = &itr.elems[0], itr.elems[1:] - return e + Next() TagValueElem } // MergeTagValueIterators returns an iterator that merges a set of iterators. @@ -257,9 +189,7 @@ func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator { // Initialize buffers. for i := range itr.itrs { - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } + itr.buf[i] = itr.itrs[i].Next() } return itr @@ -275,16 +205,14 @@ type tagValueMergeIterator struct { // // If multiple iterators contain the same value then the first is returned // and the remaining ones are skipped. -func (itr *tagValueMergeIterator) Next() *TagValueElem { - itr.e = TagValueElem{} - +func (itr *tagValueMergeIterator) Next() TagValueElem { // Find next lowest value amongst the buffers. var value []byte for i := range itr.buf { - if len(itr.buf[i].Value) == 0 { + if itr.buf[i] == nil { continue - } else if value == nil || bytes.Compare(itr.buf[i].Value, value) == -1 { - value = itr.buf[i].Value + } else if value == nil || bytes.Compare(itr.buf[i].Value(), value) == -1 { + value = itr.buf[i].Value() } } @@ -294,56 +222,33 @@ func (itr *tagValueMergeIterator) Next() *TagValueElem { } // Refill buffer. + var e TagValueElem for i := range itr.buf { - if !bytes.Equal(itr.buf[i].Value, value) { + if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Value(), value) { continue } // Copy first matching buffer to the return buffer. - if len(itr.e.Value) == 0 { - itr.e = itr.buf[i] + if e == nil { + e = itr.buf[i] } // Fill buffer with next element. - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } else { - itr.buf[i] = TagValueElem{} - } + itr.buf[i] = itr.itrs[i].Next() } - - return &itr.e + return e } // SeriesElem represents a generic series element. -type SeriesElem struct { - Name []byte - Tags models.Tags - Deleted bool +type SeriesElem interface { + Name() []byte + Tags() models.Tags + Deleted() bool } // SeriesIterator represents a iterator over a list of series. type SeriesIterator interface { - Next() *SeriesElem -} - -// NewSeriesIterator returns an iterator that operates on an in-memory slice. -func NewSeriesIterator(a []SeriesElem) SeriesIterator { - return &seriesIterator{elems: a} -} - -// seriesIterator represents an iterator over a slice of tag values. -type seriesIterator struct { - elems []SeriesElem -} - -// Next returns the next element. -func (itr *seriesIterator) Next() (e *SeriesElem) { - if len(itr.elems) == 0 { - return nil - } - e, itr.elems = &itr.elems[0], itr.elems[1:] - return e + Next() SeriesElem } // MergeSeriesIterators returns an iterator that merges a set of iterators. @@ -357,16 +262,13 @@ func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator { // Initialize buffers. for i := range itr.itrs { - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } + itr.buf[i] = itr.itrs[i].Next() } return itr } type seriesMergeIterator struct { - e SeriesElem buf []SeriesElem itrs []SeriesIterator } @@ -375,27 +277,25 @@ type seriesMergeIterator struct { // // If multiple iterators contain the same name/tags then the first is returned // and the remaining ones are skipped. -func (itr *seriesMergeIterator) Next() *SeriesElem { - itr.e = SeriesElem{} - +func (itr *seriesMergeIterator) Next() SeriesElem { // Find next lowest name/tags amongst the buffers. var name []byte var tags models.Tags for i := range itr.buf { // Skip empty buffers. - if len(itr.buf[i].Name) == 0 { + if itr.buf[i] == nil { continue } // If the name is not set the pick the first non-empty name. if name == nil { - name, tags = itr.buf[i].Name, itr.buf[i].Tags + name, tags = itr.buf[i].Name(), itr.buf[i].Tags() continue } // Set name/tags if they are lower than what has been seen. - if cmp := bytes.Compare(itr.buf[i].Name, name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags, tags) == -1) { - name, tags = itr.buf[i].Name, itr.buf[i].Tags + if cmp := bytes.Compare(itr.buf[i].Name(), name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags(), tags) == -1) { + name, tags = itr.buf[i].Name(), itr.buf[i].Tags() } } @@ -405,25 +305,21 @@ func (itr *seriesMergeIterator) Next() *SeriesElem { } // Refill buffer. + var e SeriesElem for i := range itr.buf { - if !bytes.Equal(itr.buf[i].Name, name) || models.CompareTags(itr.buf[i].Tags, tags) != 0 { + if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Name(), name) || models.CompareTags(itr.buf[i].Tags(), tags) != 0 { continue } // Copy first matching buffer to the return buffer. - if len(itr.e.Name) == 0 { - itr.e = itr.buf[i] + if e == nil { + e = itr.buf[i] } // Fill buffer with next element. - if e := itr.itrs[i].Next(); e != nil { - itr.buf[i] = *e - } else { - itr.buf[i] = SeriesElem{} - } + itr.buf[i] = itr.itrs[i].Next() } - - return &itr.e + return e } // writeTo writes write v into w. Updates n. diff --git a/tsdb/engine/tsi1/tsi1_test.go b/tsdb/engine/tsi1/tsi1_test.go index b97c6b35a7..1190b180a3 100644 --- a/tsdb/engine/tsi1/tsi1_test.go +++ b/tsdb/engine/tsi1/tsi1_test.go @@ -10,12 +10,12 @@ import ( // Ensure iterator can operate over an in-memory list of elements. func TestMeasurementIterator(t *testing.T) { - elems := []tsi1.MeasurementElem{ - {Name: []byte("cpu"), Deleted: true}, - {Name: []byte("mem")}, + elems := []MeasurementElem{ + MeasurementElem{name: []byte("cpu"), deleted: true}, + MeasurementElem{name: []byte("mem")}, } - itr := tsi1.NewMeasurementIterator(elems) + itr := MeasurementIterator{Elems: elems} if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { t.Fatalf("unexpected elem(0): %#v", e) } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { @@ -28,26 +28,26 @@ func TestMeasurementIterator(t *testing.T) { // Ensure iterator can merge multiple iterators together. func TestMergeMeasurementIterators(t *testing.T) { itr := tsi1.MergeMeasurementIterators( - tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{ - {Name: []byte("aaa")}, - {Name: []byte("bbb"), Deleted: true}, - {Name: []byte("ccc")}, - }), - tsi1.NewMeasurementIterator(nil), - tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{ - {Name: []byte("bbb")}, - {Name: []byte("ccc"), Deleted: true}, - {Name: []byte("ddd")}, - }), + &MeasurementIterator{Elems: []MeasurementElem{ + {name: []byte("aaa")}, + {name: []byte("bbb"), deleted: true}, + {name: []byte("ccc")}, + }}, + &MeasurementIterator{}, + &MeasurementIterator{Elems: []MeasurementElem{ + {name: []byte("bbb")}, + {name: []byte("ccc"), deleted: true}, + {name: []byte("ddd")}, + }}, ) - if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("aaa")}) { + if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("aaa")}) { t.Fatalf("unexpected elem(0): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("bbb"), Deleted: true}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("bbb"), deleted: true}) { t.Fatalf("unexpected elem(1): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ccc")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ccc")}) { t.Fatalf("unexpected elem(2): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ddd")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ddd")}) { t.Fatalf("unexpected elem(3): %#v", e) } else if e := itr.Next(); e != nil { t.Fatalf("expected nil elem: %#v", e) @@ -56,12 +56,12 @@ func TestMergeMeasurementIterators(t *testing.T) { // Ensure iterator can operate over an in-memory list of tag key elements. func TestTagKeyIterator(t *testing.T) { - elems := []tsi1.TagKeyElem{ - {Key: []byte("aaa"), Deleted: true}, - {Key: []byte("bbb")}, + elems := []TagKeyElem{ + {key: []byte("aaa"), deleted: true}, + {key: []byte("bbb")}, } - itr := tsi1.NewTagKeyIterator(elems) + itr := TagKeyIterator{Elems: elems} if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { t.Fatalf("unexpected elem(0): %#v", e) } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { @@ -74,26 +74,26 @@ func TestTagKeyIterator(t *testing.T) { // Ensure iterator can merge multiple iterators together. func TestMergeTagKeyIterators(t *testing.T) { itr := tsi1.MergeTagKeyIterators( - tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{ - {Key: []byte("aaa")}, - {Key: []byte("bbb"), Deleted: true}, - {Key: []byte("ccc")}, - }), - tsi1.NewTagKeyIterator(nil), - tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{ - {Key: []byte("bbb")}, - {Key: []byte("ccc"), Deleted: true}, - {Key: []byte("ddd")}, - }), + &TagKeyIterator{Elems: []TagKeyElem{ + {key: []byte("aaa")}, + {key: []byte("bbb"), deleted: true}, + {key: []byte("ccc")}, + }}, + &TagKeyIterator{}, + &TagKeyIterator{Elems: []TagKeyElem{ + {key: []byte("bbb")}, + {key: []byte("ccc"), deleted: true}, + {key: []byte("ddd")}, + }}, ) - if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("aaa")}) { + if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("aaa")}) { t.Fatalf("unexpected elem(0): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("bbb"), Deleted: true}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("bbb"), deleted: true}) { t.Fatalf("unexpected elem(1): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ccc")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ccc")}) { t.Fatalf("unexpected elem(2): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ddd")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ddd")}) { t.Fatalf("unexpected elem(3): %#v", e) } else if e := itr.Next(); e != nil { t.Fatalf("expected nil elem: %#v", e) @@ -102,12 +102,12 @@ func TestMergeTagKeyIterators(t *testing.T) { // Ensure iterator can operate over an in-memory list of tag value elements. func TestTagValueIterator(t *testing.T) { - elems := []tsi1.TagValueElem{ - {Value: []byte("aaa"), Deleted: true}, - {Value: []byte("bbb")}, + elems := []TagValueElem{ + {value: []byte("aaa"), deleted: true}, + {value: []byte("bbb")}, } - itr := tsi1.NewTagValueIterator(elems) + itr := &TagValueIterator{Elems: elems} if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { t.Fatalf("unexpected elem(0): %#v", e) } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { @@ -120,26 +120,26 @@ func TestTagValueIterator(t *testing.T) { // Ensure iterator can merge multiple iterators together. func TestMergeTagValueIterators(t *testing.T) { itr := tsi1.MergeTagValueIterators( - tsi1.NewTagValueIterator([]tsi1.TagValueElem{ - {Value: []byte("aaa")}, - {Value: []byte("bbb"), Deleted: true}, - {Value: []byte("ccc")}, - }), - tsi1.NewTagValueIterator(nil), - tsi1.NewTagValueIterator([]tsi1.TagValueElem{ - {Value: []byte("bbb")}, - {Value: []byte("ccc"), Deleted: true}, - {Value: []byte("ddd")}, - }), + &TagValueIterator{Elems: []TagValueElem{ + {value: []byte("aaa")}, + {value: []byte("bbb"), deleted: true}, + {value: []byte("ccc")}, + }}, + &TagValueIterator{}, + &TagValueIterator{Elems: []TagValueElem{ + {value: []byte("bbb")}, + {value: []byte("ccc"), deleted: true}, + {value: []byte("ddd")}, + }}, ) - if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("aaa")}) { + if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("aaa")}) { t.Fatalf("unexpected elem(0): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("bbb"), Deleted: true}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("bbb"), deleted: true}) { t.Fatalf("unexpected elem(1): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ccc")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ccc")}) { t.Fatalf("unexpected elem(2): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ddd")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ddd")}) { t.Fatalf("unexpected elem(3): %#v", e) } else if e := itr.Next(); e != nil { t.Fatalf("expected nil elem: %#v", e) @@ -148,12 +148,12 @@ func TestMergeTagValueIterators(t *testing.T) { // Ensure iterator can operate over an in-memory list of series. func TestSeriesIterator(t *testing.T) { - elems := []tsi1.SeriesElem{ - {Name: []byte("cpu"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}, - {Name: []byte("mem")}, + elems := []SeriesElem{ + {name: []byte("cpu"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true}, + {name: []byte("mem")}, } - itr := tsi1.NewSeriesIterator(elems) + itr := SeriesIterator{Elems: elems} if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { t.Fatalf("unexpected elem(0): %#v", e) } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { @@ -166,32 +166,129 @@ func TestSeriesIterator(t *testing.T) { // Ensure iterator can merge multiple iterators together. func TestMergeSeriesIterators(t *testing.T) { itr := tsi1.MergeSeriesIterators( - tsi1.NewSeriesIterator([]tsi1.SeriesElem{ - {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}, - {Name: []byte("bbb"), Deleted: true}, - {Name: []byte("ccc")}, - }), - tsi1.NewSeriesIterator(nil), - tsi1.NewSeriesIterator([]tsi1.SeriesElem{ - {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}}, - {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}, - {Name: []byte("bbb")}, - {Name: []byte("ccc"), Deleted: true}, - {Name: []byte("ddd")}, - }), + &SeriesIterator{Elems: []SeriesElem{ + {name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true}, + {name: []byte("bbb"), deleted: true}, + {name: []byte("ccc")}, + }}, + &SeriesIterator{}, + &SeriesIterator{Elems: []SeriesElem{ + {name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}}, + {name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}, + {name: []byte("bbb")}, + {name: []byte("ccc"), deleted: true}, + {name: []byte("ddd")}, + }}, ) - if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}) { + if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true}) { t.Fatalf("unexpected elem(0): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) { t.Fatalf("unexpected elem(1): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("bbb"), Deleted: true}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("bbb"), deleted: true}) { t.Fatalf("unexpected elem(2): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ccc")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("ccc")}) { t.Fatalf("unexpected elem(3): %#v", e) - } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ddd")}) { + } else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("ddd")}) { t.Fatalf("unexpected elem(4): %#v", e) } else if e := itr.Next(); e != nil { t.Fatalf("expected nil elem: %#v", e) } } + +// MeasurementElem represents a test implementation of tsi1.MeasurementElem. +type MeasurementElem struct { + name []byte + deleted bool +} + +func (e *MeasurementElem) Name() []byte { return e.name } +func (e *MeasurementElem) Deleted() bool { return e.deleted } +func (e *MeasurementElem) TagKeyIterator() tsi1.TagKeyIterator { return nil } + +// MeasurementIterator represents an iterator over a slice of measurements. +type MeasurementIterator struct { + Elems []MeasurementElem +} + +// Next returns the next element in the iterator. +func (itr *MeasurementIterator) Next() (e tsi1.MeasurementElem) { + if len(itr.Elems) == 0 { + return nil + } + e, itr.Elems = &itr.Elems[0], itr.Elems[1:] + return e +} + +// TagKeyElem represents a test implementation of tsi1.TagKeyElem. +type TagKeyElem struct { + key []byte + deleted bool +} + +func (e *TagKeyElem) Key() []byte { return e.key } +func (e *TagKeyElem) Deleted() bool { return e.deleted } +func (e *TagKeyElem) TagValueIterator() tsi1.TagValueIterator { return nil } + +// TagKeyIterator represents an iterator over a slice of tag keys. +type TagKeyIterator struct { + Elems []TagKeyElem +} + +// Next returns the next element in the iterator. +func (itr *TagKeyIterator) Next() (e tsi1.TagKeyElem) { + if len(itr.Elems) == 0 { + return nil + } + e, itr.Elems = &itr.Elems[0], itr.Elems[1:] + return e +} + +// TagValueElem represents a test implementation of tsi1.TagValueElem. +type TagValueElem struct { + value []byte + deleted bool +} + +func (e *TagValueElem) Value() []byte { return e.value } +func (e *TagValueElem) Deleted() bool { return e.deleted } +func (e *TagValueElem) SeriesIterator() tsi1.SeriesIterator { return nil } + +// TagValueIterator represents an iterator over a slice of tag values. +type TagValueIterator struct { + Elems []TagValueElem +} + +// Next returns the next element in the iterator. +func (itr *TagValueIterator) Next() (e tsi1.TagValueElem) { + if len(itr.Elems) == 0 { + return nil + } + e, itr.Elems = &itr.Elems[0], itr.Elems[1:] + return e +} + +// SeriesElem represents a test implementation of tsi1.SeriesElem. +type SeriesElem struct { + name []byte + tags models.Tags + deleted bool +} + +func (e *SeriesElem) Name() []byte { return e.name } +func (e *SeriesElem) Tags() models.Tags { return e.tags } +func (e *SeriesElem) Deleted() bool { return e.deleted } + +// SeriesIterator represents an iterator over a slice of tag values. +type SeriesIterator struct { + Elems []SeriesElem +} + +// Next returns the next element in the iterator. +func (itr *SeriesIterator) Next() (e tsi1.SeriesElem) { + if len(itr.Elems) == 0 { + return nil + } + e, itr.Elems = &itr.Elems[0], itr.Elems[1:] + return e +}