Add mm, tag key, tag value, & series iterators.

pull/7913/head
Ben Johnson 2016-10-27 09:47:41 -06:00
parent 2bfafaed76
commit 0294e717a0
No known key found for this signature in database
GPG Key ID: 81741CD251883081
9 changed files with 867 additions and 168 deletions

View File

@ -48,22 +48,15 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement {
itr := i.file.MeasurementSeriesIterator(name)
var id uint64 // TEMPORARY
var sname []byte
var tags models.Tags
var deleted bool
for {
if itr.Next(&sname, &tags, &deleted); sname == nil {
break
}
for e := itr.Next(); e != nil; e = itr.Next() {
// TODO: Handle deleted series.
// Append series to to measurement.
// TODO: Remove concept of series ids.
m.AddSeries(&tsdb.Series{
ID: id,
Key: string(sname),
Tags: models.CopyTags(tags),
Key: string(e.Name),
Tags: models.CopyTags(e.Tags),
})
// TEMPORARY: Increment ID.

View File

@ -5,8 +5,6 @@ import (
"encoding/binary"
"errors"
"io"
"github.com/influxdata/influxdb/models"
)
// IndexFileVersion is the current TSI1 index file version.
@ -94,17 +92,17 @@ func (i *IndexFile) Close() error {
}
// TagValueElem returns a list of series ids for a measurement/tag/value.
func (i *IndexFile) TagValueElem(name, key, value []byte) (TagValueElem, error) {
func (i *IndexFile) TagValueElem(name, key, value []byte) (TagSetValueElem, error) {
// Find measurement.
e, ok := i.mblk.Elem(name)
if !ok {
return TagValueElem{}, nil
return TagSetValueElem{}, nil
}
// Find tag set block.
tblk, err := i.tagSetBlock(&e)
if err != nil {
return TagValueElem{}, err
return TagSetValueElem{}, err
}
return tblk.TagValueElem(key, value), nil
}
@ -137,42 +135,44 @@ func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
}
// Return iterator.
return &seriesIterator{
return &rawSeriesIterator{
n: e.Series.N,
data: e.Series.Data,
seriesList: &i.slist,
}
}
// seriesIterator iterates over a list of raw data.
type seriesIterator struct {
i, n uint32
data []byte
// rawSeriesIterator iterates over a list of raw data.
type rawSeriesIterator struct {
i, n uint32 // index & total count
data []byte // raw data
// series list used for decoding
seriesList *SeriesList
// reusable buffer
e SeriesElem
}
// Next returns the next decoded series. Uses name & tags as reusable buffers.
// Returns nils when the iterator is complete.
func (itr *seriesIterator) Next(name *[]byte, tags *models.Tags, deleted *bool) {
func (itr *rawSeriesIterator) Next() *SeriesElem {
// Return nil if we've reached the end.
if itr.i == itr.n {
*name, *tags = nil, nil
return
return nil
}
// Move forward and retrieved offset.
offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:])
// Read from series list into buffers.
itr.seriesList.DecodeSeriesAt(offset, name, tags, deleted)
itr.seriesList.DecodeSeriesAt(offset, &itr.e.Name, &itr.e.Tags, &itr.e.Deleted)
// Move iterator forward.
itr.i++
}
// IndexFiles represents a layered set of index files.
type IndexFiles []*IndexFile
return &itr.e
}
// ReadIndexFileTrailer returns the index file trailer from data.
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {

View File

@ -1,113 +0,0 @@
package tsi1
/*
import (
"bytes"
"io"
"sort"
"github.com/influxdata/influxdb/models"
)
// IndexFileWriter represents a naive implementation of an index file builder.
type IndexFileWriter struct {
series indexFileSeries
mms indexFileMeasurements
}
// NewIndexFileWriter returns a new instance of IndexFileWriter.
func NewIndexFileWriter() *IndexFileWriter {
return &IndexFileWriter{
mms: make(indexFileMeasurements),
}
}
// AddSeries adds a series to the index file.
func (iw *IndexFileWriter) AddSeries(name []byte, tags models.Tags) {
// Add to series list.
iw.series = append(iw.series, indexFileSerie{name: name, tags: tags})
// Find or create measurement.
mm, ok := iw.mms[string(name)]
if !ok {
mm.name = name
mm.tagset = make(indexFileTagset)
iw.mms[string(name)] = mm
}
// Add tagset.
for _, tag := range tags {
t, ok := mm.tagset[string(tag.Key)]
if !ok {
t.name = tag.Key
t.values = make(indexFileValues)
mm.tagset[string(tag.Key)] = t
}
v, ok := t.values[string(tag.Value)]
if !ok {
v.name = tag.Value
t.values[string(tag.Value)] = v
}
}
}
// DeleteSeries removes a series from the writer.
func (iw *IndexFileWriter) DeleteSeries(name []byte, tags models.Tags) {
}
type indexFileSerie struct {
name []byte
tags models.Tags
deleted bool
offset uint32
}
type indexFileSeries []indexFileSerie
func (a indexFileSeries) Len() int { return len(a) }
func (a indexFileSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a indexFileSeries) Less(i, j int) bool {
if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 {
return cmp == -1
}
return models.CompareTags(a[i].tags, a[j].tags) == -1
}
type indexFileMeasurement struct {
name []byte
deleted bool
tagset indexFileTagset
offset int64 // tagset offset
size int64 // tagset size
seriesIDs []uint32
}
type indexFileMeasurements map[string]indexFileMeasurement
// Names returns a sorted list of measurement names.
func (m indexFileMeasurements) Names() []string {
a := make([]string, 0, len(m))
for name := range m {
a = append(a, name)
}
sort.Strings(a)
return a
}
type indexFileTag struct {
name []byte
deleted bool
values indexFileValues
}
type indexFileTagset map[string]indexFileTag
type indexFileValue struct {
name []byte
deleted bool
seriesIDs []uint32
}
type indexFileValues map[string]indexFileValue
*/

View File

@ -0,0 +1,228 @@
package tsi1
import (
"io"
"sort"
)
// IndexFiles represents a layered set of index files.
type IndexFiles []*IndexFile
// MeasurementNames returns a sorted list of all measurement names for all files.
func (p *IndexFiles) MeasurementNames() [][]byte {
itr := p.MeasurementIterator()
var names [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
names = append(names, copyBytes(e.Name))
}
sort.Sort(byteSlices(names))
return names
}
// MeasurementIterator returns an iterator that merges measurements across all files.
func (p *IndexFiles) MeasurementIterator() MeasurementIterator {
panic("TODO")
}
// TagKeyIterator returns an iterator that merges tag keys across all files.
func (p *IndexFiles) TagKeyIterator(name []byte) TagKeyIterator {
panic("TODO")
}
// TagValueIterator returns an iterator that merges tag values across all files.
func (p *IndexFiles) TagValueIterator(name, key []byte) TagValueIterator {
panic("TODO")
}
// SeriesIterator returns an iterator that merges series across all files.
func (p *IndexFiles) SeriesIterator() SeriesIterator {
panic("TODO")
}
// MeasurementSeriesIterator returns an iterator that merges series across all files.
func (p *IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator {
panic("TODO")
}
// TagValueSeriesIterator returns an iterator that merges series across all files.
func (p *IndexFiles) TagValueSeriesIterator(name, key, value []byte) SeriesIterator {
panic("TODO")
}
// CompactTo merges all index files and writes them to w.
func (p *IndexFiles) CompactTo(w io.Writer) (n int64, err error) {
var t IndexFileTrailer
// Setup context object to track shared data for this compaction.
var info indexCompactInfo
info.tagSets = make(map[string]indexTagSetPos)
info.names = p.MeasurementNames()
// Write magic number.
if err := writeTo(w, []byte(FileSignature), &n); err != nil {
return n, err
}
// Write combined series list.
t.SeriesList.Offset = n
if err := p.writeSeriesListTo(w, &info, &n); err != nil {
return n, err
}
t.SeriesList.Size = n - t.SeriesList.Offset
// Write tagset blocks in measurement order.
if err := p.writeTagsetsTo(w, &info, &n); err != nil {
return n, err
}
// Write measurement block.
t.MeasurementBlock.Offset = n
if err := p.writeMeasurementBlockTo(w, &info, &n); err != nil {
return n, err
}
t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
// Write trailer.
nn, err := t.WriteTo(w)
n += nn
if err != nil {
return n, err
}
return n, nil
}
func (p *IndexFiles) writeSeriesListTo(w io.Writer, info *indexCompactInfo, n *int64) error {
itr := p.SeriesIterator()
// Write all series.
sw := NewSeriesListWriter()
for e := itr.Next(); e != nil; e = itr.Next() {
if err := sw.Add(e.Name, e.Tags); err != nil {
return err
}
}
// Flush series list.
nn, err := sw.WriteTo(w)
*n += nn
if err != nil {
return err
}
// Attach writer to info so we can obtain series offsets later.
info.sw = sw
return nil
}
func (p *IndexFiles) writeTagsetsTo(w io.Writer, info *indexCompactInfo, n *int64) error {
for _, name := range info.names {
if err := p.writeTagsetTo(w, name, info, n); err != nil {
return err
}
}
return nil
}
// writeTagsetTo writes a single tagset to w and saves the tagset offset.
func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactInfo, n *int64) error {
kitr := p.TagKeyIterator(name)
tsw := NewTagSetWriter()
for ke := kitr.Next(); ke != nil; ke = kitr.Next() {
// Mark tag deleted.
if ke.Deleted {
tsw.DeleteTag(ke.Key)
}
// Iterate over tag values.
vitr := p.TagValueIterator(name, ke.Key)
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
// Look-up series ids.
sitr := p.TagValueSeriesIterator(name, ke.Key, ve.Value)
var seriesIDs []uint32
for se := sitr.Next(); se != nil; se = sitr.Next() {
seriesID := info.sw.Offset(se.Name, se.Tags)
if seriesID == 0 {
panic("expected series id")
}
seriesIDs = append(seriesIDs, seriesID)
}
sort.Sort(uint32Slice(seriesIDs))
// Insert tag value into writer.
tsw.AddTagValue(name, ve.Value, ve.Deleted, seriesIDs)
}
}
// Save tagset offset to measurement.
pos := info.tagSets[string(name)]
pos.offset = *n
// Write tagset to writer.
nn, err := tsw.WriteTo(w)
*n += nn
if err != nil {
return err
}
// Save tagset size to measurement.
pos.size = *n - pos.offset
info.tagSets[string(name)] = pos
return nil
}
func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error {
mw := NewMeasurementBlockWriter()
// Add measurement data.
for _, name := range info.names {
// Look-up series ids.
itr := p.MeasurementSeriesIterator(name)
var seriesIDs []uint32
for e := itr.Next(); e != nil; e = itr.Next() {
seriesID := info.sw.Offset(e.Name, e.Tags)
if seriesID == 0 {
panic("expected series id")
}
seriesIDs = append(seriesIDs, seriesID)
}
sort.Sort(uint32Slice(seriesIDs))
// Add measurement to writer.
pos := info.tagSets[string(name)]
mw.Add(name, pos.offset, pos.size, seriesIDs)
}
// Write data to writer.
nn, err := mw.WriteTo(w)
*n += nn
if err != nil {
return err
}
return nil
}
// indexCompactInfo is a context object used for tracking position information
// during the compaction of index files.
type indexCompactInfo struct {
// Sorted list of all measurements.
// This is stored so it doesn't have to be recomputed.
names [][]byte
// Saved to look up series offsets.
sw *SeriesListWriter
// Tracks offset/size for each measurement's tagset.
tagSets map[string]indexTagSetPos
}
// indexTagSetPos stores the offset/size of tagsets.
type indexTagSetPos struct {
offset int64
size int64
}

View File

@ -0,0 +1 @@
package tsi1_test

View File

@ -217,12 +217,12 @@ func (f *LogFile) measurement(name []byte) logMeasurement {
func (f *LogFile) MeasurementIterator() MeasurementIterator {
var itr measurementIterator
for _, mm := range f.mms {
itr.mms = append(itr.mms, MeasurementElem{
itr.elems = append(itr.elems, MeasurementElem{
Name: mm.name,
Deleted: mm.deleted,
})
}
sort.Sort(MeasurementElems(itr.mms))
sort.Sort(MeasurementElems(itr.elems))
return &itr
}
@ -334,7 +334,7 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, n *int64) error {
for _, tag := range mm.tagSet {
// Mark tag deleted.
if tag.deleted {
tsw.AddTag(tag.name, true)
tsw.DeleteTag(tag.name)
continue
}

View File

@ -60,7 +60,7 @@ func (ts *TagSet) Version() int { return ts.version }
// TagKeyElem returns an element for a tag key.
// Returns an element with a nil key if not found.
func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem {
func (ts *TagSet) TagKeyElem(key []byte) TagSetKeyElem {
keyN := binary.BigEndian.Uint32(ts.hashData[:TagKeyNSize])
hash := hashKey(key)
pos := int(hash % keyN)
@ -75,7 +75,7 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem {
// Evaluate key if offset is not empty.
if offset > 0 {
// Parse into element.
var e TagKeyElem
var e TagSetKeyElem
e.UnmarshalBinary(ts.data[offset:])
// Return if keys match.
@ -85,7 +85,7 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem {
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.Key), pos, int(keyN)) {
return TagKeyElem{}
return TagSetKeyElem{}
}
}
@ -97,11 +97,11 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem {
// TagValueElem returns an element for a tag value.
// Returns an element with a nil value if not found.
func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem {
func (ts *TagSet) TagValueElem(key, value []byte) TagSetValueElem {
// Find key element, exit if not found.
kelem := ts.TagKeyElem(key)
if len(kelem.Key) == 0 {
return TagValueElem{}
return TagSetValueElem{}
}
hashData := ts.data[kelem.Offset:]
@ -119,7 +119,7 @@ func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem {
// Evaluate value if offset is not empty.
if offset > 0 {
// Parse into element.
var e TagValueElem
var e TagSetValueElem
e.UnmarshalBinary(ts.data[offset:])
// Return if values match.
@ -129,7 +129,7 @@ func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem {
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.Value), pos, int(valueN)) {
return TagValueElem{}
return TagSetValueElem{}
}
}
@ -172,15 +172,15 @@ func (ts *TagSet) UnmarshalBinary(data []byte) error {
return nil
}
// TagKeyElem represents a tag key element.
type TagKeyElem struct {
// TagSetKeyElem represents a tag key element in a TagSetBlock.
type TagSetKeyElem struct {
Flag byte
Key []byte
Offset uint64 // Value block offset
}
// UnmarshalBinary unmarshals data into e.
func (e *TagKeyElem) UnmarshalBinary(data []byte) {
func (e *TagSetKeyElem) UnmarshalBinary(data []byte) {
// Parse flag data.
e.Flag, data = data[0], data[1:]
@ -193,8 +193,8 @@ func (e *TagKeyElem) UnmarshalBinary(data []byte) {
e.Key = data[:sz]
}
// TagValueElem represents a tag value element.
type TagValueElem struct {
// TagSetValueElem represents a tag value element.
type TagSetValueElem struct {
Flag byte
Value []byte
Series struct {
@ -204,12 +204,12 @@ type TagValueElem struct {
}
// SeriesID returns series ID at an index.
func (e *TagValueElem) SeriesID(i int) uint32 {
func (e *TagSetValueElem) SeriesID(i int) uint32 {
return binary.BigEndian.Uint32(e.Series.Data[i*SeriesIDSize:])
}
// SeriesIDs returns a list decoded series ids.
func (e *TagValueElem) SeriesIDs() []uint32 {
func (e *TagSetValueElem) SeriesIDs() []uint32 {
a := make([]uint32, e.Series.N)
for i := 0; i < int(e.Series.N); i++ {
a[i] = e.SeriesID(i)
@ -218,7 +218,7 @@ func (e *TagValueElem) SeriesIDs() []uint32 {
}
// UnmarshalBinary unmarshals data into e.
func (e *TagValueElem) UnmarshalBinary(data []byte) {
func (e *TagSetValueElem) UnmarshalBinary(data []byte) {
// Parse flag data.
e.Flag, data = data[0], data[1:]
@ -246,10 +246,10 @@ func NewTagSetWriter() *TagSetWriter {
}
}
// AddTag adds a key without any associated values.
func (tsw *TagSetWriter) AddTag(key []byte, deleted bool) {
// DeleteTag marks a key as deleted.
func (tsw *TagSetWriter) DeleteTag(key []byte) {
ts := tsw.sets[string(key)]
ts.deleted = deleted
ts.deleted = true
tsw.sets[string(key)] = ts
}

View File

@ -29,24 +29,401 @@ type MeasurementIterator interface {
Next() *MeasurementElem
}
// NewMeasurementIterator returns an iterator that operates on an in-memory slice.
func NewMeasurementIterator(elems []MeasurementElem) MeasurementIterator {
return &measurementIterator{elems: elems}
}
// measurementIterator represents an iterator over a slice of measurements.
type measurementIterator struct {
mms []MeasurementElem
elems []MeasurementElem
}
// Next shifts the next element off the list.
func (itr *measurementIterator) Next() *MeasurementElem {
if len(itr.mms) == 0 {
func (itr *measurementIterator) Next() (e *MeasurementElem) {
if len(itr.elems) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
return &mm
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
}
// MergeMeasurementIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precendence and a deletion by those
// early iterators will invalidate elements by later iterators.
func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator {
itr := &measurementMergeIterator{
buf: make([]MeasurementElem, len(itrs)),
itrs: itrs,
}
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
}
return itr
}
type measurementMergeIterator struct {
e MeasurementElem
buf []MeasurementElem
itrs []MeasurementIterator
}
// Next returns the element with the next lowest name across the iterators.
//
// If multiple iterators contain the same name then the first is returned
// and the remaining ones are skipped.
func (itr *measurementMergeIterator) Next() *MeasurementElem {
itr.e = MeasurementElem{}
// Find next lowest name amongst the buffers.
var name []byte
for i := range itr.buf {
if len(itr.buf[i].Name) == 0 {
continue
} else if name == nil || bytes.Compare(itr.buf[i].Name, name) == -1 {
name = itr.buf[i].Name
}
}
// Return nil if no elements remaining.
if len(name) == 0 {
return nil
}
// Refill buffer.
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Name, name) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Name) == 0 {
itr.e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = MeasurementElem{}
}
}
return &itr.e
}
// TagKeyElem represents a generic tag key element.
type TagKeyElem struct {
Key []byte
Deleted bool
}
// TagKeyIterator represents a iterator over a list of tag keys.
type TagKeyIterator interface {
Next() *TagKeyElem
}
// NewTagKeyIterator returns an iterator that operates on an in-memory slice.
func NewTagKeyIterator(a []TagKeyElem) TagKeyIterator {
return &tagKeyIterator{elems: a}
}
// tagKeyIterator represents an iterator over a slice of tag keys.
type tagKeyIterator struct {
elems []TagKeyElem
}
// Next returns the next element.
func (itr *tagKeyIterator) Next() (e *TagKeyElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
}
// MergeTagKeyIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precendence and a deletion by those
// early iterators will invalidate elements by later iterators.
func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
itr := &tagKeyMergeIterator{
buf: make([]TagKeyElem, len(itrs)),
itrs: itrs,
}
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
}
return itr
}
type tagKeyMergeIterator struct {
e TagKeyElem
buf []TagKeyElem
itrs []TagKeyIterator
}
// Next returns the element with the next lowest key across the iterators.
//
// If multiple iterators contain the same key then the first is returned
// and the remaining ones are skipped.
func (itr *tagKeyMergeIterator) Next() *TagKeyElem {
itr.e = TagKeyElem{}
// Find next lowest key amongst the buffers.
var key []byte
for i := range itr.buf {
if len(itr.buf[i].Key) == 0 {
continue
} else if key == nil || bytes.Compare(itr.buf[i].Key, key) == -1 {
key = itr.buf[i].Key
}
}
// Return nil if no elements remaining.
if len(key) == 0 {
return nil
}
// Refill buffer.
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Key, key) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Key) == 0 {
itr.e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = TagKeyElem{}
}
}
return &itr.e
}
// TagValueElem represents a generic tag value element.
type TagValueElem struct {
Value []byte
Deleted bool
}
// TagValueIterator represents a iterator over a list of tag values.
type TagValueIterator interface {
Next() *TagValueElem
}
// NewTagValueIterator returns an iterator that operates on an in-memory slice.
func NewTagValueIterator(a []TagValueElem) TagValueIterator {
return &tagValueIterator{elems: a}
}
// tagValueIterator represents an iterator over a slice of tag values.
type tagValueIterator struct {
elems []TagValueElem
}
// Next returns the next element.
func (itr *tagValueIterator) Next() (e *TagValueElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
}
// MergeTagValueIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precendence and a deletion by those
// early iterators will invalidate elements by later iterators.
func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator {
itr := &tagValueMergeIterator{
buf: make([]TagValueElem, len(itrs)),
itrs: itrs,
}
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
}
return itr
}
type tagValueMergeIterator struct {
e TagValueElem
buf []TagValueElem
itrs []TagValueIterator
}
// Next returns the element with the next lowest value across the iterators.
//
// If multiple iterators contain the same value then the first is returned
// and the remaining ones are skipped.
func (itr *tagValueMergeIterator) Next() *TagValueElem {
itr.e = TagValueElem{}
// Find next lowest value amongst the buffers.
var value []byte
for i := range itr.buf {
if len(itr.buf[i].Value) == 0 {
continue
} else if value == nil || bytes.Compare(itr.buf[i].Value, value) == -1 {
value = itr.buf[i].Value
}
}
// Return nil if no elements remaining.
if len(value) == 0 {
return nil
}
// Refill buffer.
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Value, value) {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Value) == 0 {
itr.e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = TagValueElem{}
}
}
return &itr.e
}
// SeriesElem represents a generic series element.
type SeriesElem struct {
Name []byte
Tags models.Tags
Deleted bool
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
Next(name *[]byte, tags *models.Tags, deleted *bool)
Next() *SeriesElem
}
// NewSeriesIterator returns an iterator that operates on an in-memory slice.
func NewSeriesIterator(a []SeriesElem) SeriesIterator {
return &seriesIterator{elems: a}
}
// seriesIterator represents an iterator over a slice of tag values.
type seriesIterator struct {
elems []SeriesElem
}
// Next returns the next element.
func (itr *seriesIterator) Next() (e *SeriesElem) {
if len(itr.elems) == 0 {
return nil
}
e, itr.elems = &itr.elems[0], itr.elems[1:]
return e
}
// MergeSeriesIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precendence and a deletion by those
// early iterators will invalidate elements by later iterators.
func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator {
itr := &seriesMergeIterator{
buf: make([]SeriesElem, len(itrs)),
itrs: itrs,
}
// Initialize buffers.
for i := range itr.itrs {
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
}
}
return itr
}
type seriesMergeIterator struct {
e SeriesElem
buf []SeriesElem
itrs []SeriesIterator
}
// Next returns the element with the next lowest name/tags across the iterators.
//
// If multiple iterators contain the same name/tags then the first is returned
// and the remaining ones are skipped.
func (itr *seriesMergeIterator) Next() *SeriesElem {
itr.e = SeriesElem{}
// Find next lowest name/tags amongst the buffers.
var name []byte
var tags models.Tags
for i := range itr.buf {
// Skip empty buffers.
if len(itr.buf[i].Name) == 0 {
continue
}
// If the name is not set the pick the first non-empty name.
if name == nil {
name, tags = itr.buf[i].Name, itr.buf[i].Tags
continue
}
// Set name/tags if they are lower than what has been seen.
if cmp := bytes.Compare(itr.buf[i].Name, name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags, tags) == -1) {
name, tags = itr.buf[i].Name, itr.buf[i].Tags
}
}
// Return nil if no elements remaining.
if len(name) == 0 {
return nil
}
// Refill buffer.
for i := range itr.buf {
if !bytes.Equal(itr.buf[i].Name, name) || models.CompareTags(itr.buf[i].Tags, tags) != 0 {
continue
}
// Copy first matching buffer to the return buffer.
if len(itr.e.Name) == 0 {
itr.e = itr.buf[i]
}
// Fill buffer with next element.
if e := itr.itrs[i].Next(); e != nil {
itr.buf[i] = *e
} else {
itr.buf[i] = SeriesElem{}
}
}
return &itr.e
}
// writeTo writes write v into w. Updates n.
@ -134,3 +511,19 @@ type uint32Slice []uint32
func (a uint32Slice) Len() int { return len(a) }
func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] }
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
// copyBytes returns a copy of b.
func copyBytes(b []byte) []byte {
if b == nil {
return nil
}
buf := make([]byte, len(b))
copy(buf, b)
return buf
}

View File

@ -0,0 +1,197 @@
package tsi1_test
import (
"reflect"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
)
// Ensure iterator can operate over an in-memory list of elements.
func TestMeasurementIterator(t *testing.T) {
elems := []tsi1.MeasurementElem{
{Name: []byte("cpu"), Deleted: true},
{Name: []byte("mem")},
}
itr := tsi1.NewMeasurementIterator(elems)
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can merge multiple iterators together.
func TestMergeMeasurementIterators(t *testing.T) {
itr := tsi1.MergeMeasurementIterators(
tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{
{Name: []byte("aaa")},
{Name: []byte("bbb"), Deleted: true},
{Name: []byte("ccc")},
}),
tsi1.NewMeasurementIterator(nil),
tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{
{Name: []byte("bbb")},
{Name: []byte("ccc"), Deleted: true},
{Name: []byte("ddd")},
}),
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("bbb"), Deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can operate over an in-memory list of tag key elements.
func TestTagKeyIterator(t *testing.T) {
elems := []tsi1.TagKeyElem{
{Key: []byte("aaa"), Deleted: true},
{Key: []byte("bbb")},
}
itr := tsi1.NewTagKeyIterator(elems)
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can merge multiple iterators together.
func TestMergeTagKeyIterators(t *testing.T) {
itr := tsi1.MergeTagKeyIterators(
tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{
{Key: []byte("aaa")},
{Key: []byte("bbb"), Deleted: true},
{Key: []byte("ccc")},
}),
tsi1.NewTagKeyIterator(nil),
tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{
{Key: []byte("bbb")},
{Key: []byte("ccc"), Deleted: true},
{Key: []byte("ddd")},
}),
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("bbb"), Deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can operate over an in-memory list of tag value elements.
func TestTagValueIterator(t *testing.T) {
elems := []tsi1.TagValueElem{
{Value: []byte("aaa"), Deleted: true},
{Value: []byte("bbb")},
}
itr := tsi1.NewTagValueIterator(elems)
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can merge multiple iterators together.
func TestMergeTagValueIterators(t *testing.T) {
itr := tsi1.MergeTagValueIterators(
tsi1.NewTagValueIterator([]tsi1.TagValueElem{
{Value: []byte("aaa")},
{Value: []byte("bbb"), Deleted: true},
{Value: []byte("ccc")},
}),
tsi1.NewTagValueIterator(nil),
tsi1.NewTagValueIterator([]tsi1.TagValueElem{
{Value: []byte("bbb")},
{Value: []byte("ccc"), Deleted: true},
{Value: []byte("ddd")},
}),
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("aaa")}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("bbb"), Deleted: true}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ccc")}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ddd")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can operate over an in-memory list of series.
func TestSeriesIterator(t *testing.T) {
elems := []tsi1.SeriesElem{
{Name: []byte("cpu"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true},
{Name: []byte("mem")},
}
itr := tsi1.NewSeriesIterator(elems)
if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}
// Ensure iterator can merge multiple iterators together.
func TestMergeSeriesIterators(t *testing.T) {
itr := tsi1.MergeSeriesIterators(
tsi1.NewSeriesIterator([]tsi1.SeriesElem{
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true},
{Name: []byte("bbb"), Deleted: true},
{Name: []byte("ccc")},
}),
tsi1.NewSeriesIterator(nil),
tsi1.NewSeriesIterator([]tsi1.SeriesElem{
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}},
{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}},
{Name: []byte("bbb")},
{Name: []byte("ccc"), Deleted: true},
{Name: []byte("ddd")},
}),
)
if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}) {
t.Fatalf("unexpected elem(0): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) {
t.Fatalf("unexpected elem(1): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("bbb"), Deleted: true}) {
t.Fatalf("unexpected elem(2): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ccc")}) {
t.Fatalf("unexpected elem(3): %#v", e)
} else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ddd")}) {
t.Fatalf("unexpected elem(4): %#v", e)
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil elem: %#v", e)
}
}