Refactor merge iterators.

pull/7913/head
Ben Johnson 2016-10-31 08:46:07 -06:00
parent 0294e717a0
commit ce9e3181a5
No known key found for this signature in database
GPG Key ID: 81741CD251883081
9 changed files with 364 additions and 333 deletions

View File

@ -55,8 +55,8 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement {
// TODO: Remove concept of series ids.
m.AddSeries(&tsdb.Series{
ID: id,
Key: string(e.Name),
Tags: models.CopyTags(e.Tags),
Key: string(e.Name()),
Tags: models.CopyTags(e.Tags()),
})
// TEMPORARY: Increment ID.
@ -74,7 +74,7 @@ func (i *Index) Measurements() (tsdb.Measurements, error) {
var mms tsdb.Measurements
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
mms = append(mms, i.measurement(e.Name))
mms = append(mms, i.measurement(e.Name()))
}
return mms, nil
}
@ -164,17 +164,17 @@ func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *r
var matched bool
switch op {
case influxql.EQ:
matched = string(e.Name) == val
matched = string(e.Name()) == val
case influxql.NEQ:
matched = string(e.Name) != val
matched = string(e.Name()) != val
case influxql.EQREGEX:
matched = regex.Match(e.Name)
matched = regex.Match(e.Name())
case influxql.NEQREGEX:
matched = !regex.Match(e.Name)
matched = !regex.Match(e.Name())
}
if matched {
mms = append(mms, i.measurement(e.Name))
mms = append(mms, i.measurement(e.Name()))
}
}
sort.Sort(mms)
@ -185,7 +185,7 @@ func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, rege
var mms tsdb.Measurements
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
mm := i.measurement(e.Name)
mm := i.measurement(e.Name())
tagVals := mm.SeriesByTagKeyValue(key)
if tagVals == nil {
@ -233,8 +233,8 @@ func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error)
mms := make([]*tsdb.Measurement, 0, len(names))
for e := itr.Next(); e != nil; e = itr.Next() {
for _, name := range names {
if string(e.Name) == name {
mms = append(mms, i.measurement(e.Name))
if string(e.Name()) == name {
mms = append(mms, i.measurement(e.Name()))
break
}
}
@ -246,8 +246,8 @@ func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error
itr := i.file.MeasurementIterator()
var mms tsdb.Measurements
for e := itr.Next(); e != nil; e = itr.Next() {
if re.Match(e.Name) {
mms = append(mms, i.measurement(e.Name))
if re.Match(e.Name()) {
mms = append(mms, i.measurement(e.Name()))
}
}
return mms, nil

View File

@ -5,6 +5,8 @@ import (
"encoding/binary"
"errors"
"io"
"github.com/influxdata/influxdb/models"
)
// IndexFileVersion is the current TSI1 index file version.
@ -110,8 +112,8 @@ func (i *IndexFile) TagValueElem(name, key, value []byte) (TagSetValueElem, erro
// tagSetBlock returns a tag set block for a measurement.
func (i *IndexFile) tagSetBlock(e *MeasurementBlockElem) (TagSet, error) {
// Slice tag set data.
buf := i.data[e.TagSet.Offset:]
buf = buf[:e.TagSet.Size]
buf := i.data[e.tagSet.offset:]
buf = buf[:e.tagSet.size]
// Unmarshal block.
var blk TagSet
@ -131,13 +133,13 @@ func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
// Find measurement element.
e, ok := i.mblk.Elem(name)
if !ok {
return &seriesIterator{}
return &rawSeriesIterator{n: 0}
}
// Return iterator.
return &rawSeriesIterator{
n: e.Series.N,
data: e.Series.Data,
n: e.series.n,
data: e.series.data,
seriesList: &i.slist,
}
}
@ -151,12 +153,12 @@ type rawSeriesIterator struct {
seriesList *SeriesList
// reusable buffer
e SeriesElem
e rawSeriesElem
}
// Next returns the next decoded series. Uses name & tags as reusable buffers.
// Returns nils when the iterator is complete.
func (itr *rawSeriesIterator) Next() *SeriesElem {
func (itr *rawSeriesIterator) Next() SeriesElem {
// Return nil if we've reached the end.
if itr.i == itr.n {
return nil
@ -166,7 +168,7 @@ func (itr *rawSeriesIterator) Next() *SeriesElem {
offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:])
// Read from series list into buffers.
itr.seriesList.DecodeSeriesAt(offset, &itr.e.Name, &itr.e.Tags, &itr.e.Deleted)
itr.seriesList.DecodeSeriesAt(offset, &itr.e.name, &itr.e.tags, &itr.e.deleted)
// Move iterator forward.
itr.i++
@ -174,6 +176,16 @@ func (itr *rawSeriesIterator) Next() *SeriesElem {
return &itr.e
}
type rawSeriesElem struct {
name []byte
tags models.Tags
deleted bool
}
func (e *rawSeriesElem) Name() []byte { return e.name }
func (e *rawSeriesElem) Tags() models.Tags { return e.tags }
func (e *rawSeriesElem) Deleted() bool { return e.deleted }
// ReadIndexFileTrailer returns the index file trailer from data.
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
var t IndexFileTrailer

View File

@ -13,7 +13,7 @@ func (p *IndexFiles) MeasurementNames() [][]byte {
itr := p.MeasurementIterator()
var names [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
names = append(names, copyBytes(e.Name))
names = append(names, copyBytes(e.Name()))
}
sort.Sort(byteSlices(names))
return names
@ -98,7 +98,7 @@ func (p *IndexFiles) writeSeriesListTo(w io.Writer, info *indexCompactInfo, n *i
// Write all series.
sw := NewSeriesListWriter()
for e := itr.Next(); e != nil; e = itr.Next() {
if err := sw.Add(e.Name, e.Tags); err != nil {
if err := sw.Add(e.Name(), e.Tags()); err != nil {
return err
}
}
@ -132,18 +132,18 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI
tsw := NewTagSetWriter()
for ke := kitr.Next(); ke != nil; ke = kitr.Next() {
// Mark tag deleted.
if ke.Deleted {
tsw.DeleteTag(ke.Key)
if ke.Deleted() {
tsw.DeleteTag(ke.Key())
}
// Iterate over tag values.
vitr := p.TagValueIterator(name, ke.Key)
vitr := ke.TagValueIterator()
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
// Look-up series ids.
sitr := p.TagValueSeriesIterator(name, ke.Key, ve.Value)
sitr := ve.SeriesIterator()
var seriesIDs []uint32
for se := sitr.Next(); se != nil; se = sitr.Next() {
seriesID := info.sw.Offset(se.Name, se.Tags)
seriesID := info.sw.Offset(se.Name(), se.Tags())
if seriesID == 0 {
panic("expected series id")
}
@ -152,7 +152,7 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI
sort.Sort(uint32Slice(seriesIDs))
// Insert tag value into writer.
tsw.AddTagValue(name, ve.Value, ve.Deleted, seriesIDs)
tsw.AddTagValue(name, ve.Value(), ve.Deleted(), seriesIDs)
}
}
@ -184,7 +184,7 @@ func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo
itr := p.MeasurementSeriesIterator(name)
var seriesIDs []uint32
for e := itr.Next(); e != nil; e = itr.Next() {
seriesID := info.sw.Offset(e.Name, e.Tags)
seriesID := info.sw.Offset(e.Name(), e.Tags())
if seriesID == 0 {
panic("expected series id")
}

View File

@ -215,14 +215,11 @@ func (f *LogFile) measurement(name []byte) logMeasurement {
// MeasurementIterator returns an iterator over all the measurements in the file.
func (f *LogFile) MeasurementIterator() MeasurementIterator {
var itr measurementIterator
var itr logMeasurementIterator
for _, mm := range f.mms {
itr.elems = append(itr.elems, MeasurementElem{
Name: mm.name,
Deleted: mm.deleted,
})
itr.mms = append(itr.mms, mm)
}
sort.Sort(MeasurementElems(itr.elems))
sort.Sort(logMeasurementSlice(itr.mms))
return &itr
}
@ -555,6 +552,31 @@ type logMeasurement struct {
seriesIDs []uint32 // series offsets
}
func (m *logMeasurement) Name() []byte { return m.name }
func (m *logMeasurement) Deleted() bool { return m.deleted }
func (m *logMeasurement) TagKeyIterator() TagKeyIterator { panic("TODO") }
// logMeasurementSlice is a sortable list of log measurements.
type logMeasurementSlice []logMeasurement
func (a logMeasurementSlice) Len() int { return len(a) }
func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
// logMeasurementIterator represents an iterator over a slice of measurements.
type logMeasurementIterator struct {
mms []logMeasurement
}
// Next returns the next element in the iterator.
func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
if len(itr.mms) == 0 {
return nil
}
e, itr.mms = &itr.mms[0], itr.mms[1:]
return e
}
type logTagSet struct {
name []byte
tagValues map[string]logTagValue

View File

@ -4,7 +4,6 @@ import (
"fmt"
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/influxdata/influxdb/models"
@ -27,9 +26,9 @@ func TestLogFile_AddSeries(t *testing.T) {
// Verify data.
itr := f.MeasurementIterator()
if e := itr.Next(); e == nil || string(e.Name) != "cpu" {
if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); e == nil || string(e.Name) != "mem" {
} else if e := itr.Next(); e == nil || string(e.Name()) != "mem" {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected eof, got: %#v", e)
@ -57,10 +56,10 @@ func TestLogFile_DeleteMeasurement(t *testing.T) {
// Verify data.
itr := f.MeasurementIterator()
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("cpu"), Deleted: true}) {
t.Fatalf("unexpected measurement: %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("mem")}) {
t.Fatalf("unexpected measurement: %#v", e)
if e := itr.Next(); string(e.Name()) != "cpu" || !e.Deleted() {
t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted())
} else if e := itr.Next(); string(e.Name()) != "mem" || e.Deleted() {
t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected eof, got: %#v", e)
}

View File

@ -72,12 +72,12 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool)
e.UnmarshalBinary(blk.data[offset:])
// Return if name match.
if bytes.Equal(e.Name, name) {
if bytes.Equal(e.name, name) {
return e, true
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.Name), pos, int(n)) {
if d > dist(hashKey(e.name), pos, int(n)) {
return MeasurementBlockElem{}, false
}
}
@ -124,29 +124,22 @@ func (blk *MeasurementBlock) Iterator() MeasurementIterator {
// blockMeasurementIterator iterates over a list measurements in a block.
type blockMeasurementIterator struct {
elem MeasurementElem
elem MeasurementBlockElem
data []byte
}
// Next returns the next measurement. Returns nil when iterator is complete.
func (itr *blockMeasurementIterator) Next() *MeasurementElem {
func (itr *blockMeasurementIterator) Next() MeasurementElem {
// Return nil when we run out of data.
if len(itr.data) == 0 {
return nil
}
// Unmarshal the element at the current position.
var elem MeasurementBlockElem
elem.UnmarshalBinary(itr.data)
// Copy to a generic measurement element.
itr.elem = MeasurementElem{
Name: elem.Name,
Deleted: elem.Deleted(),
}
itr.elem.UnmarshalBinary(itr.data)
// Move the data forward past the record.
itr.data = itr.data[elem.Size:]
itr.data = itr.data[itr.elem.size:]
return &itr.elem
}
@ -200,37 +193,49 @@ type MeasurementBlockTrailer struct {
// MeasurementBlockElem represents an internal measurement element.
type MeasurementBlockElem struct {
Flag byte // flag
Name []byte // measurement name
flag byte // flag
name []byte // measurement name
TagSet struct {
Offset int64
Size int64
tagSet struct {
offset int64
size int64
}
Series struct {
N uint32 // series count
Data []byte // serialized series data
series struct {
n uint32 // series count
data []byte // serialized series data
}
// Size in bytes, set after unmarshaling.
Size int
// size in bytes, set after unmarshaling.
size int
}
// Name returns the measurement name.
func (e *MeasurementBlockElem) Name() []byte { return e.name }
// Deleted returns true if the tombstone flag is set.
func (e *MeasurementBlockElem) Deleted() bool {
return (e.Flag & MeasurementTombstoneFlag) != 0
return (e.flag & MeasurementTombstoneFlag) != 0
}
// TagKeyIterator returns an iterator over the measurement's keys.
func (e *MeasurementBlockElem) TagKeyIterator() TagKeyIterator { panic("TODO") }
// TagSetOffset returns the offset of the measurement's tagset block.
func (e *MeasurementBlockElem) TagSetOffset() int64 { return e.tagSet.offset }
// TagSetSize returns the size of the measurement's tagset block.
func (e *MeasurementBlockElem) TagSetSize() int64 { return e.tagSet.size }
// SeriesID returns series ID at an index.
func (e *MeasurementBlockElem) SeriesID(i int) uint32 {
return binary.BigEndian.Uint32(e.Series.Data[i*SeriesIDSize:])
return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:])
}
// SeriesIDs returns a list of decoded series ids.
func (e *MeasurementBlockElem) SeriesIDs() []uint32 {
a := make([]uint32, e.Series.N)
for i := 0; i < int(e.Series.N); i++ {
a := make([]uint32, e.series.n)
for i := 0; i < int(e.series.n); i++ {
a[i] = e.SeriesID(i)
}
return a
@ -241,23 +246,23 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
start := len(data)
// Parse flag data.
e.Flag, data = data[0], data[1:]
e.flag, data = data[0], data[1:]
// Parse tagset offset.
e.TagSet.Offset, data = int64(binary.BigEndian.Uint64(data)), data[8:]
e.TagSet.Size, data = int64(binary.BigEndian.Uint64(data)), data[8:]
e.tagSet.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:]
e.tagSet.size, data = int64(binary.BigEndian.Uint64(data)), data[8:]
// Parse name.
sz, n := binary.Uvarint(data)
e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
e.name, data = data[n:n+int(sz)], data[n+int(sz):]
// Parse series data.
v, n := binary.Uvarint(data)
e.Series.N, data = uint32(v), data[n:]
e.Series.Data, data = data[:e.Series.N*SeriesIDSize], data[e.Series.N*SeriesIDSize:]
e.series.n, data = uint32(v), data[n:]
e.series.data, data = data[:e.series.n*SeriesIDSize], data[e.series.n*SeriesIDSize:]
// Save length of elem.
e.Size = start - len(data)
e.size = start - len(data)
return nil
}

View File

@ -33,24 +33,24 @@ func TestMeasurementBlockWriter(t *testing.T) {
// Verify data in block.
if e, ok := blk.Elem([]byte("foo")); !ok {
t.Fatal("expected element")
} else if e.TagSet.Offset != 100 || e.TagSet.Size != 10 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size)
} else if e.TagSetOffset() != 100 || e.TagSetSize() != 10 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize())
} else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{1, 3, 4}) {
t.Fatalf("unexpected series data: %#v", e.SeriesIDs())
}
if e, ok := blk.Elem([]byte("bar")); !ok {
t.Fatal("expected element")
} else if e.TagSet.Offset != 200 || e.TagSet.Size != 20 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size)
} else if e.TagSetOffset() != 200 || e.TagSetSize() != 20 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize())
} else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{2}) {
t.Fatalf("unexpected series data: %#v", e.SeriesIDs())
}
if e, ok := blk.Elem([]byte("baz")); !ok {
t.Fatal("expected element")
} else if e.TagSet.Offset != 300 || e.TagSet.Size != 30 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSet.Offset, e.TagSet.Size)
} else if e.TagSetOffset() != 300 || e.TagSetSize() != 30 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagSetOffset(), e.TagSetSize())
} else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{5, 6}) {
t.Fatalf("unexpected series data: %#v", e.SeriesIDs())
}

View File

@ -12,9 +12,10 @@ import (
)
// MeasurementElem represents a generic measurement element.
type MeasurementElem struct {
Deleted bool
Name []byte
type MeasurementElem interface {
Name() []byte
Deleted() bool
TagKeyIterator() TagKeyIterator
}
// MeasurementElems represents a list of MeasurementElem.
@ -22,30 +23,11 @@ type MeasurementElems []MeasurementElem
func (a MeasurementElems) Len() int { return len(a) }
func (a MeasurementElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MeasurementElems) Less(i, j int) bool { return bytes.Compare(a[i].Name, a[j].Name) == -1 }
func (a MeasurementElems) Less(i, j int) bool { return bytes.Compare(a[i].Name(), a[j].Name()) == -1 }
// MeasurementIterator represents a iterator over a list of measurements.
type MeasurementIterator interface {
Next() *MeasurementElem
}
// NewMeasurementIterator returns an iterator that operates on an in-memory slice.
func NewMeasurementIterator(elems []MeasurementElem) MeasurementIterator {
return &measurementIterator{elems: elems}
}
// measurementIterator represents an iterator over a slice of measurements.
type measurementIterator struct {
elems []MeasurementElem
}
// Next shifts the next element off the list.
func (itr *measurementIterator) Next() (e *MeasurementElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
Next() MeasurementElem
}
// MergeMeasurementIterators returns an iterator that merges a set of iterators.
@ -59,9 +41,7 @@ func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
itr.buf[i] = itr.itrs[i].Next()
}
return itr
@ -77,16 +57,14 @@ type measurementMergeIterator struct {
//
// If multiple iterators contain the same name then the first is returned
// and the remaining ones are skipped.
func (itr *measurementMergeIterator) Next() *MeasurementElem {
itr.e = MeasurementElem{}
func (itr *measurementMergeIterator) Next() MeasurementElem {
// Find next lowest name amongst the buffers.
var name []byte
for i := range itr.buf {
if len(itr.buf[i].Name) == 0 {
if itr.buf[i] == nil {
continue
} else if name == nil || bytes.Compare(itr.buf[i].Name, name) == -1 {
name = itr.buf[i].Name
} else if name == nil || bytes.Compare(itr.buf[i].Name(), name) == -1 {
name = itr.buf[i].Name()
}
}
@ -96,55 +74,34 @@ func (itr *measurementMergeIterator) Next() *MeasurementElem {
}
// Refill buffer.
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Name, name) {
var e MeasurementElem
for i, buf := range itr.buf {
if buf == nil || !bytes.Equal(buf.Name(), name) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Name) == 0 {
itr.e = itr.buf[i]
if e == nil {
e = buf
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = MeasurementElem{}
}
itr.buf[i] = itr.itrs[i].Next()
}
return &itr.e
return e
}
// TagKeyElem represents a generic tag key element.
type TagKeyElem struct {
Key []byte
Deleted bool
type TagKeyElem interface {
Key() []byte
Deleted() bool
TagValueIterator() TagValueIterator
}
// TagKeyIterator represents a iterator over a list of tag keys.
type TagKeyIterator interface {
Next() *TagKeyElem
}
// NewTagKeyIterator returns an iterator that operates on an in-memory slice.
func NewTagKeyIterator(a []TagKeyElem) TagKeyIterator {
return &tagKeyIterator{elems: a}
}
// tagKeyIterator represents an iterator over a slice of tag keys.
type tagKeyIterator struct {
elems []TagKeyElem
}
// Next returns the next element.
func (itr *tagKeyIterator) Next() (e *TagKeyElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
Next() TagKeyElem
}
// MergeTagKeyIterators returns an iterator that merges a set of iterators.
@ -158,9 +115,7 @@ func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
itr.buf[i] = itr.itrs[i].Next()
}
return itr
@ -176,16 +131,14 @@ type tagKeyMergeIterator struct {
//
// If multiple iterators contain the same key then the first is returned
// and the remaining ones are skipped.
func (itr *tagKeyMergeIterator) Next() *TagKeyElem {
itr.e = TagKeyElem{}
func (itr *tagKeyMergeIterator) Next() TagKeyElem {
// Find next lowest key amongst the buffers.
var key []byte
for i := range itr.buf {
if len(itr.buf[i].Key) == 0 {
if itr.buf[i] == nil {
continue
} else if key == nil || bytes.Compare(itr.buf[i].Key, key) == -1 {
key = itr.buf[i].Key
} else if key == nil || bytes.Compare(itr.buf[i].Key(), key) == -1 {
key = itr.buf[i].Key()
}
}
@ -195,55 +148,34 @@ func (itr *tagKeyMergeIterator) Next() *TagKeyElem {
}
// Refill buffer.
var e TagKeyElem
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Key, key) {
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Key(), key) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Key) == 0 {
itr.e = itr.buf[i]
if e == nil {
e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = TagKeyElem{}
}
itr.buf[i] = itr.itrs[i].Next()
}
return &itr.e
return e
}
// TagValueElem represents a generic tag value element.
type TagValueElem struct {
Value []byte
Deleted bool
type TagValueElem interface {
Value() []byte
Deleted() bool
SeriesIterator() SeriesIterator
}
// TagValueIterator represents a iterator over a list of tag values.
type TagValueIterator interface {
Next() *TagValueElem
}
// NewTagValueIterator returns an iterator that operates on an in-memory slice.
func NewTagValueIterator(a []TagValueElem) TagValueIterator {
return &tagValueIterator{elems: a}
}
// tagValueIterator represents an iterator over a slice of tag values.
type tagValueIterator struct {
elems []TagValueElem
}
// Next returns the next element.
func (itr *tagValueIterator) Next() (e *TagValueElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
Next() TagValueElem
}
// MergeTagValueIterators returns an iterator that merges a set of iterators.
@ -257,9 +189,7 @@ func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator {
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
itr.buf[i] = itr.itrs[i].Next()
}
return itr
@ -275,16 +205,14 @@ type tagValueMergeIterator struct {
//
// If multiple iterators contain the same value then the first is returned
// and the remaining ones are skipped.
func (itr *tagValueMergeIterator) Next() *TagValueElem {
itr.e = TagValueElem{}
func (itr *tagValueMergeIterator) Next() TagValueElem {
// Find next lowest value amongst the buffers.
var value []byte
for i := range itr.buf {
if len(itr.buf[i].Value) == 0 {
if itr.buf[i] == nil {
continue
} else if value == nil || bytes.Compare(itr.buf[i].Value, value) == -1 {
value = itr.buf[i].Value
} else if value == nil || bytes.Compare(itr.buf[i].Value(), value) == -1 {
value = itr.buf[i].Value()
}
}
@ -294,56 +222,33 @@ func (itr *tagValueMergeIterator) Next() *TagValueElem {
}
// Refill buffer.
var e TagValueElem
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Value, value) {
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Value(), value) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Value) == 0 {
itr.e = itr.buf[i]
if e == nil {
e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = TagValueElem{}
}
itr.buf[i] = itr.itrs[i].Next()
}
return &itr.e
return e
}
// SeriesElem represents a generic series element.
type SeriesElem struct {
Name []byte
Tags models.Tags
Deleted bool
type SeriesElem interface {
Name() []byte
Tags() models.Tags
Deleted() bool
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
Next() *SeriesElem
}
// NewSeriesIterator returns an iterator that operates on an in-memory slice.
func NewSeriesIterator(a []SeriesElem) SeriesIterator {
return &seriesIterator{elems: a}
}
// seriesIterator represents an iterator over a slice of tag values.
type seriesIterator struct {
elems []SeriesElem
}
// Next returns the next element.
func (itr *seriesIterator) Next() (e *SeriesElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
Next() SeriesElem
}
// MergeSeriesIterators returns an iterator that merges a set of iterators.
@ -357,16 +262,13 @@ func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator {
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
itr.buf[i] = itr.itrs[i].Next()
}
return itr
}
type seriesMergeIterator struct {
e SeriesElem
buf []SeriesElem
itrs []SeriesIterator
}
@ -375,27 +277,25 @@ type seriesMergeIterator struct {
//
// If multiple iterators contain the same name/tags then the first is returned
// and the remaining ones are skipped.
func (itr *seriesMergeIterator) Next() *SeriesElem {
itr.e = SeriesElem{}
func (itr *seriesMergeIterator) Next() SeriesElem {
// Find next lowest name/tags amongst the buffers.
var name []byte
var tags models.Tags
for i := range itr.buf {
// Skip empty buffers.
if len(itr.buf[i].Name) == 0 {
if itr.buf[i] == nil {
continue
}
// If the name is not set the pick the first non-empty name.
if name == nil {
name, tags = itr.buf[i].Name, itr.buf[i].Tags
name, tags = itr.buf[i].Name(), itr.buf[i].Tags()
continue
}
// Set name/tags if they are lower than what has been seen.
if cmp := bytes.Compare(itr.buf[i].Name, name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags, tags) == -1) {
name, tags = itr.buf[i].Name, itr.buf[i].Tags
if cmp := bytes.Compare(itr.buf[i].Name(), name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags(), tags) == -1) {
name, tags = itr.buf[i].Name(), itr.buf[i].Tags()
}
}
@ -405,25 +305,21 @@ func (itr *seriesMergeIterator) Next() *SeriesElem {
}
// Refill buffer.
var e SeriesElem
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Name, name) || models.CompareTags(itr.buf[i].Tags, tags) != 0 {
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Name(), name) || models.CompareTags(itr.buf[i].Tags(), tags) != 0 {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Name) == 0 {
itr.e = itr.buf[i]
if e == nil {
e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = SeriesElem{}
}
itr.buf[i] = itr.itrs[i].Next()
}
return &itr.e
return e
}
// writeTo writes write v into w. Updates n.

View File

@ -10,12 +10,12 @@ import (
// Ensure iterator can operate over an in-memory list of elements.
func TestMeasurementIterator(t *testing.T) {
elems := []tsi1.MeasurementElem{
{Name: []byte("cpu"), Deleted: true},
{Name: []byte("mem")},
elems := []MeasurementElem{
MeasurementElem{name: []byte("cpu"), deleted: true},
MeasurementElem{name: []byte("mem")},
}
itr := tsi1.NewMeasurementIterator(elems)
itr := MeasurementIterator{Elems: elems}
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
@ -28,26 +28,26 @@ func TestMeasurementIterator(t *testing.T) {
// Ensure iterator can merge multiple iterators together.
func TestMergeMeasurementIterators(t *testing.T) {
itr := tsi1.MergeMeasurementIterators(
tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{
{Name: []byte("aaa")},
{Name: []byte("bbb"), Deleted: true},
{Name: []byte("ccc")},
}),
tsi1.NewMeasurementIterator(nil),
tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{
{Name: []byte("bbb")},
{Name: []byte("ccc"), Deleted: true},
{Name: []byte("ddd")},
}),
&MeasurementIterator{Elems: []MeasurementElem{
{name: []byte("aaa")},
{name: []byte("bbb"), deleted: true},
{name: []byte("ccc")},
}},
&MeasurementIterator{},
&MeasurementIterator{Elems: []MeasurementElem{
{name: []byte("bbb")},
{name: []byte("ccc"), deleted: true},
{name: []byte("ddd")},
}},
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("aaa")}) {
if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("bbb"), Deleted: true}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("bbb"), deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ccc")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ddd")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
@ -56,12 +56,12 @@ func TestMergeMeasurementIterators(t *testing.T) {
// Ensure iterator can operate over an in-memory list of tag key elements.
func TestTagKeyIterator(t *testing.T) {
elems := []tsi1.TagKeyElem{
{Key: []byte("aaa"), Deleted: true},
{Key: []byte("bbb")},
elems := []TagKeyElem{
{key: []byte("aaa"), deleted: true},
{key: []byte("bbb")},
}
itr := tsi1.NewTagKeyIterator(elems)
itr := TagKeyIterator{Elems: elems}
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
@ -74,26 +74,26 @@ func TestTagKeyIterator(t *testing.T) {
// Ensure iterator can merge multiple iterators together.
func TestMergeTagKeyIterators(t *testing.T) {
itr := tsi1.MergeTagKeyIterators(
tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{
{Key: []byte("aaa")},
{Key: []byte("bbb"), Deleted: true},
{Key: []byte("ccc")},
}),
tsi1.NewTagKeyIterator(nil),
tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{
{Key: []byte("bbb")},
{Key: []byte("ccc"), Deleted: true},
{Key: []byte("ddd")},
}),
&TagKeyIterator{Elems: []TagKeyElem{
{key: []byte("aaa")},
{key: []byte("bbb"), deleted: true},
{key: []byte("ccc")},
}},
&TagKeyIterator{},
&TagKeyIterator{Elems: []TagKeyElem{
{key: []byte("bbb")},
{key: []byte("ccc"), deleted: true},
{key: []byte("ddd")},
}},
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("aaa")}) {
if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("bbb"), Deleted: true}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("bbb"), deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ccc")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ddd")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
@ -102,12 +102,12 @@ func TestMergeTagKeyIterators(t *testing.T) {
// Ensure iterator can operate over an in-memory list of tag value elements.
func TestTagValueIterator(t *testing.T) {
elems := []tsi1.TagValueElem{
{Value: []byte("aaa"), Deleted: true},
{Value: []byte("bbb")},
elems := []TagValueElem{
{value: []byte("aaa"), deleted: true},
{value: []byte("bbb")},
}
itr := tsi1.NewTagValueIterator(elems)
itr := &TagValueIterator{Elems: elems}
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
@ -120,26 +120,26 @@ func TestTagValueIterator(t *testing.T) {
// Ensure iterator can merge multiple iterators together.
func TestMergeTagValueIterators(t *testing.T) {
itr := tsi1.MergeTagValueIterators(
tsi1.NewTagValueIterator([]tsi1.TagValueElem{
{Value: []byte("aaa")},
{Value: []byte("bbb"), Deleted: true},
{Value: []byte("ccc")},
}),
tsi1.NewTagValueIterator(nil),
tsi1.NewTagValueIterator([]tsi1.TagValueElem{
{Value: []byte("bbb")},
{Value: []byte("ccc"), Deleted: true},
{Value: []byte("ddd")},
}),
&TagValueIterator{Elems: []TagValueElem{
{value: []byte("aaa")},
{value: []byte("bbb"), deleted: true},
{value: []byte("ccc")},
}},
&TagValueIterator{},
&TagValueIterator{Elems: []TagValueElem{
{value: []byte("bbb")},
{value: []byte("ccc"), deleted: true},
{value: []byte("ddd")},
}},
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("aaa")}) {
if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("bbb"), Deleted: true}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("bbb"), deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ccc")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ddd")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
@ -148,12 +148,12 @@ func TestMergeTagValueIterators(t *testing.T) {
// Ensure iterator can operate over an in-memory list of series.
func TestSeriesIterator(t *testing.T) {
elems := []tsi1.SeriesElem{
{Name: []byte("cpu"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true},
{Name: []byte("mem")},
elems := []SeriesElem{
{name: []byte("cpu"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true},
{name: []byte("mem")},
}
itr := tsi1.NewSeriesIterator(elems)
itr := SeriesIterator{Elems: elems}
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
@ -166,32 +166,129 @@ func TestSeriesIterator(t *testing.T) {
// Ensure iterator can merge multiple iterators together.
func TestMergeSeriesIterators(t *testing.T) {
itr := tsi1.MergeSeriesIterators(
tsi1.NewSeriesIterator([]tsi1.SeriesElem{
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true},
{Name: []byte("bbb"), Deleted: true},
{Name: []byte("ccc")},
}),
tsi1.NewSeriesIterator(nil),
tsi1.NewSeriesIterator([]tsi1.SeriesElem{
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}},
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}},
{Name: []byte("bbb")},
{Name: []byte("ccc"), Deleted: true},
{Name: []byte("ddd")},
}),
&SeriesIterator{Elems: []SeriesElem{
{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true},
{name: []byte("bbb"), deleted: true},
{name: []byte("ccc")},
}},
&SeriesIterator{},
&SeriesIterator{Elems: []SeriesElem{
{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}},
{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}},
{name: []byte("bbb")},
{name: []byte("ccc"), deleted: true},
{name: []byte("ddd")},
}},
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}) {
if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, deleted: true}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("aaa"), tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("bbb"), Deleted: true}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("bbb"), deleted: true}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ccc")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("ccc")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ddd")}) {
} else if e := itr.Next(); !reflect.DeepEqual(e, &SeriesElem{name: []byte("ddd")}) {
t.Fatalf("unexpected elem(4): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// MeasurementElem represents a test implementation of tsi1.MeasurementElem.
type MeasurementElem struct {
name []byte
deleted bool
}
func (e *MeasurementElem) Name() []byte { return e.name }
func (e *MeasurementElem) Deleted() bool { return e.deleted }
func (e *MeasurementElem) TagKeyIterator() tsi1.TagKeyIterator { return nil }
// MeasurementIterator represents an iterator over a slice of measurements.
type MeasurementIterator struct {
Elems []MeasurementElem
}
// Next returns the next element in the iterator.
func (itr *MeasurementIterator) Next() (e tsi1.MeasurementElem) {
if len(itr.Elems) == 0 {
return nil
}
e, itr.Elems = &itr.Elems[0], itr.Elems[1:]
return e
}
// TagKeyElem represents a test implementation of tsi1.TagKeyElem.
type TagKeyElem struct {
key []byte
deleted bool
}
func (e *TagKeyElem) Key() []byte { return e.key }
func (e *TagKeyElem) Deleted() bool { return e.deleted }
func (e *TagKeyElem) TagValueIterator() tsi1.TagValueIterator { return nil }
// TagKeyIterator represents an iterator over a slice of tag keys.
type TagKeyIterator struct {
Elems []TagKeyElem
}
// Next returns the next element in the iterator.
func (itr *TagKeyIterator) Next() (e tsi1.TagKeyElem) {
if len(itr.Elems) == 0 {
return nil
}
e, itr.Elems = &itr.Elems[0], itr.Elems[1:]
return e
}
// TagValueElem represents a test implementation of tsi1.TagValueElem.
type TagValueElem struct {
value []byte
deleted bool
}
func (e *TagValueElem) Value() []byte { return e.value }
func (e *TagValueElem) Deleted() bool { return e.deleted }
func (e *TagValueElem) SeriesIterator() tsi1.SeriesIterator { return nil }
// TagValueIterator represents an iterator over a slice of tag values.
type TagValueIterator struct {
Elems []TagValueElem
}
// Next returns the next element in the iterator.
func (itr *TagValueIterator) Next() (e tsi1.TagValueElem) {
if len(itr.Elems) == 0 {
return nil
}
e, itr.Elems = &itr.Elems[0], itr.Elems[1:]
return e
}
// SeriesElem represents a test implementation of tsi1.SeriesElem.
type SeriesElem struct {
name []byte
tags models.Tags
deleted bool
}
func (e *SeriesElem) Name() []byte { return e.name }
func (e *SeriesElem) Tags() models.Tags { return e.tags }
func (e *SeriesElem) Deleted() bool { return e.deleted }
// SeriesIterator represents an iterator over a slice of tag values.
type SeriesIterator struct {
Elems []SeriesElem
}
// Next returns the next element in the iterator.
func (itr *SeriesIterator) Next() (e tsi1.SeriesElem) {
if len(itr.Elems) == 0 {
return nil
}
e, itr.Elems = &itr.Elems[0], itr.Elems[1:]
return e
}