From 0294e717a04a25d95136de197af5e127bff075fc Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 27 Oct 2016 09:47:41 -0600 Subject: [PATCH] Add mm, tag key, tag value, & series iterators. --- tsdb/engine/tsi1/index.go | 13 +- tsdb/engine/tsi1/index_file.go | 34 +-- tsdb/engine/tsi1/index_file_writer.go | 113 ------- tsdb/engine/tsi1/index_files.go | 228 +++++++++++++++ tsdb/engine/tsi1/index_files_test.go | 1 + tsdb/engine/tsi1/log_file.go | 6 +- tsdb/engine/tsi1/tag_set.go | 36 +-- tsdb/engine/tsi1/tsi1.go | 407 +++++++++++++++++++++++++- tsdb/engine/tsi1/tsi1_test.go | 197 +++++++++++++ 9 files changed, 867 insertions(+), 168 deletions(-) delete mode 100644 tsdb/engine/tsi1/index_file_writer.go create mode 100644 tsdb/engine/tsi1/index_files.go create mode 100644 tsdb/engine/tsi1/index_files_test.go create mode 100644 tsdb/engine/tsi1/tsi1_test.go diff --git a/tsdb/engine/tsi1/index.go b/tsdb/engine/tsi1/index.go index 0d0b62c30e..833384a253 100644 --- a/tsdb/engine/tsi1/index.go +++ b/tsdb/engine/tsi1/index.go @@ -48,22 +48,15 @@ func (i *Index) measurement(name []byte) *tsdb.Measurement { itr := i.file.MeasurementSeriesIterator(name) var id uint64 // TEMPORARY - var sname []byte - var tags models.Tags - var deleted bool - for { - if itr.Next(&sname, &tags, &deleted); sname == nil { - break - } - + for e := itr.Next(); e != nil; e = itr.Next() { // TODO: Handle deleted series. // Append series to to measurement. // TODO: Remove concept of series ids. m.AddSeries(&tsdb.Series{ ID: id, - Key: string(sname), - Tags: models.CopyTags(tags), + Key: string(e.Name), + Tags: models.CopyTags(e.Tags), }) // TEMPORARY: Increment ID. diff --git a/tsdb/engine/tsi1/index_file.go b/tsdb/engine/tsi1/index_file.go index be19d62237..123f8e8e56 100644 --- a/tsdb/engine/tsi1/index_file.go +++ b/tsdb/engine/tsi1/index_file.go @@ -5,8 +5,6 @@ import ( "encoding/binary" "errors" "io" - - "github.com/influxdata/influxdb/models" ) // IndexFileVersion is the current TSI1 index file version. @@ -94,17 +92,17 @@ func (i *IndexFile) Close() error { } // TagValueElem returns a list of series ids for a measurement/tag/value. -func (i *IndexFile) TagValueElem(name, key, value []byte) (TagValueElem, error) { +func (i *IndexFile) TagValueElem(name, key, value []byte) (TagSetValueElem, error) { // Find measurement. e, ok := i.mblk.Elem(name) if !ok { - return TagValueElem{}, nil + return TagSetValueElem{}, nil } // Find tag set block. tblk, err := i.tagSetBlock(&e) if err != nil { - return TagValueElem{}, err + return TagSetValueElem{}, err } return tblk.TagValueElem(key, value), nil } @@ -137,42 +135,44 @@ func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator { } // Return iterator. - return &seriesIterator{ + return &rawSeriesIterator{ n: e.Series.N, data: e.Series.Data, seriesList: &i.slist, } } -// seriesIterator iterates over a list of raw data. -type seriesIterator struct { - i, n uint32 - data []byte +// rawSeriesIterator iterates over a list of raw data. +type rawSeriesIterator struct { + i, n uint32 // index & total count + data []byte // raw data + // series list used for decoding seriesList *SeriesList + + // reusable buffer + e SeriesElem } // Next returns the next decoded series. Uses name & tags as reusable buffers. // Returns nils when the iterator is complete. -func (itr *seriesIterator) Next(name *[]byte, tags *models.Tags, deleted *bool) { +func (itr *rawSeriesIterator) Next() *SeriesElem { // Return nil if we've reached the end. if itr.i == itr.n { - *name, *tags = nil, nil - return + return nil } // Move forward and retrieved offset. offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:]) // Read from series list into buffers. - itr.seriesList.DecodeSeriesAt(offset, name, tags, deleted) + itr.seriesList.DecodeSeriesAt(offset, &itr.e.Name, &itr.e.Tags, &itr.e.Deleted) // Move iterator forward. itr.i++ -} -// IndexFiles represents a layered set of index files. -type IndexFiles []*IndexFile + return &itr.e +} // ReadIndexFileTrailer returns the index file trailer from data. func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) { diff --git a/tsdb/engine/tsi1/index_file_writer.go b/tsdb/engine/tsi1/index_file_writer.go deleted file mode 100644 index 76b2ebb9cb..0000000000 --- a/tsdb/engine/tsi1/index_file_writer.go +++ /dev/null @@ -1,113 +0,0 @@ -package tsi1 - -/* -import ( - "bytes" - "io" - "sort" - - "github.com/influxdata/influxdb/models" -) - -// IndexFileWriter represents a naive implementation of an index file builder. -type IndexFileWriter struct { - series indexFileSeries - mms indexFileMeasurements -} - -// NewIndexFileWriter returns a new instance of IndexFileWriter. -func NewIndexFileWriter() *IndexFileWriter { - return &IndexFileWriter{ - mms: make(indexFileMeasurements), - } -} - -// AddSeries adds a series to the index file. -func (iw *IndexFileWriter) AddSeries(name []byte, tags models.Tags) { - // Add to series list. - iw.series = append(iw.series, indexFileSerie{name: name, tags: tags}) - - // Find or create measurement. - mm, ok := iw.mms[string(name)] - if !ok { - mm.name = name - mm.tagset = make(indexFileTagset) - iw.mms[string(name)] = mm - } - - // Add tagset. - for _, tag := range tags { - t, ok := mm.tagset[string(tag.Key)] - if !ok { - t.name = tag.Key - t.values = make(indexFileValues) - mm.tagset[string(tag.Key)] = t - } - - v, ok := t.values[string(tag.Value)] - if !ok { - v.name = tag.Value - t.values[string(tag.Value)] = v - } - } -} - -// DeleteSeries removes a series from the writer. -func (iw *IndexFileWriter) DeleteSeries(name []byte, tags models.Tags) { -} - -type indexFileSerie struct { - name []byte - tags models.Tags - deleted bool - offset uint32 -} - -type indexFileSeries []indexFileSerie - -func (a indexFileSeries) Len() int { return len(a) } -func (a indexFileSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a indexFileSeries) Less(i, j int) bool { - if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 { - return cmp == -1 - } - return models.CompareTags(a[i].tags, a[j].tags) == -1 -} - -type indexFileMeasurement struct { - name []byte - deleted bool - tagset indexFileTagset - offset int64 // tagset offset - size int64 // tagset size - seriesIDs []uint32 -} - -type indexFileMeasurements map[string]indexFileMeasurement - -// Names returns a sorted list of measurement names. -func (m indexFileMeasurements) Names() []string { - a := make([]string, 0, len(m)) - for name := range m { - a = append(a, name) - } - sort.Strings(a) - return a -} - -type indexFileTag struct { - name []byte - deleted bool - values indexFileValues -} - -type indexFileTagset map[string]indexFileTag - -type indexFileValue struct { - name []byte - deleted bool - seriesIDs []uint32 -} - -type indexFileValues map[string]indexFileValue -*/ diff --git a/tsdb/engine/tsi1/index_files.go b/tsdb/engine/tsi1/index_files.go new file mode 100644 index 0000000000..7ee845cebb --- /dev/null +++ b/tsdb/engine/tsi1/index_files.go @@ -0,0 +1,228 @@ +package tsi1 + +import ( + "io" + "sort" +) + +// IndexFiles represents a layered set of index files. +type IndexFiles []*IndexFile + +// MeasurementNames returns a sorted list of all measurement names for all files. +func (p *IndexFiles) MeasurementNames() [][]byte { + itr := p.MeasurementIterator() + var names [][]byte + for e := itr.Next(); e != nil; e = itr.Next() { + names = append(names, copyBytes(e.Name)) + } + sort.Sort(byteSlices(names)) + return names +} + +// MeasurementIterator returns an iterator that merges measurements across all files. +func (p *IndexFiles) MeasurementIterator() MeasurementIterator { + panic("TODO") +} + +// TagKeyIterator returns an iterator that merges tag keys across all files. +func (p *IndexFiles) TagKeyIterator(name []byte) TagKeyIterator { + panic("TODO") +} + +// TagValueIterator returns an iterator that merges tag values across all files. +func (p *IndexFiles) TagValueIterator(name, key []byte) TagValueIterator { + panic("TODO") +} + +// SeriesIterator returns an iterator that merges series across all files. +func (p *IndexFiles) SeriesIterator() SeriesIterator { + panic("TODO") +} + +// MeasurementSeriesIterator returns an iterator that merges series across all files. +func (p *IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator { + panic("TODO") +} + +// TagValueSeriesIterator returns an iterator that merges series across all files. +func (p *IndexFiles) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { + panic("TODO") +} + +// CompactTo merges all index files and writes them to w. +func (p *IndexFiles) CompactTo(w io.Writer) (n int64, err error) { + var t IndexFileTrailer + + // Setup context object to track shared data for this compaction. + var info indexCompactInfo + info.tagSets = make(map[string]indexTagSetPos) + info.names = p.MeasurementNames() + + // Write magic number. + if err := writeTo(w, []byte(FileSignature), &n); err != nil { + return n, err + } + + // Write combined series list. + t.SeriesList.Offset = n + if err := p.writeSeriesListTo(w, &info, &n); err != nil { + return n, err + } + t.SeriesList.Size = n - t.SeriesList.Offset + + // Write tagset blocks in measurement order. + if err := p.writeTagsetsTo(w, &info, &n); err != nil { + return n, err + } + + // Write measurement block. + t.MeasurementBlock.Offset = n + if err := p.writeMeasurementBlockTo(w, &info, &n); err != nil { + return n, err + } + t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset + + // Write trailer. + nn, err := t.WriteTo(w) + n += nn + if err != nil { + return n, err + } + + return n, nil +} + +func (p *IndexFiles) writeSeriesListTo(w io.Writer, info *indexCompactInfo, n *int64) error { + itr := p.SeriesIterator() + + // Write all series. + sw := NewSeriesListWriter() + for e := itr.Next(); e != nil; e = itr.Next() { + if err := sw.Add(e.Name, e.Tags); err != nil { + return err + } + } + + // Flush series list. + nn, err := sw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + // Attach writer to info so we can obtain series offsets later. + info.sw = sw + + return nil +} + +func (p *IndexFiles) writeTagsetsTo(w io.Writer, info *indexCompactInfo, n *int64) error { + for _, name := range info.names { + if err := p.writeTagsetTo(w, name, info, n); err != nil { + return err + } + } + return nil +} + +// writeTagsetTo writes a single tagset to w and saves the tagset offset. +func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactInfo, n *int64) error { + kitr := p.TagKeyIterator(name) + + tsw := NewTagSetWriter() + for ke := kitr.Next(); ke != nil; ke = kitr.Next() { + // Mark tag deleted. + if ke.Deleted { + tsw.DeleteTag(ke.Key) + } + + // Iterate over tag values. + vitr := p.TagValueIterator(name, ke.Key) + for ve := vitr.Next(); ve != nil; ve = vitr.Next() { + // Look-up series ids. + sitr := p.TagValueSeriesIterator(name, ke.Key, ve.Value) + var seriesIDs []uint32 + for se := sitr.Next(); se != nil; se = sitr.Next() { + seriesID := info.sw.Offset(se.Name, se.Tags) + if seriesID == 0 { + panic("expected series id") + } + seriesIDs = append(seriesIDs, seriesID) + } + sort.Sort(uint32Slice(seriesIDs)) + + // Insert tag value into writer. + tsw.AddTagValue(name, ve.Value, ve.Deleted, seriesIDs) + } + } + + // Save tagset offset to measurement. + pos := info.tagSets[string(name)] + pos.offset = *n + + // Write tagset to writer. + nn, err := tsw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + // Save tagset size to measurement. + pos.size = *n - pos.offset + + info.tagSets[string(name)] = pos + + return nil +} + +func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { + mw := NewMeasurementBlockWriter() + + // Add measurement data. + for _, name := range info.names { + // Look-up series ids. + itr := p.MeasurementSeriesIterator(name) + var seriesIDs []uint32 + for e := itr.Next(); e != nil; e = itr.Next() { + seriesID := info.sw.Offset(e.Name, e.Tags) + if seriesID == 0 { + panic("expected series id") + } + seriesIDs = append(seriesIDs, seriesID) + } + sort.Sort(uint32Slice(seriesIDs)) + + // Add measurement to writer. + pos := info.tagSets[string(name)] + mw.Add(name, pos.offset, pos.size, seriesIDs) + } + + // Write data to writer. + nn, err := mw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + return nil +} + +// indexCompactInfo is a context object used for tracking position information +// during the compaction of index files. +type indexCompactInfo struct { + // Sorted list of all measurements. + // This is stored so it doesn't have to be recomputed. + names [][]byte + + // Saved to look up series offsets. + sw *SeriesListWriter + + // Tracks offset/size for each measurement's tagset. + tagSets map[string]indexTagSetPos +} + +// indexTagSetPos stores the offset/size of tagsets. +type indexTagSetPos struct { + offset int64 + size int64 +} diff --git a/tsdb/engine/tsi1/index_files_test.go b/tsdb/engine/tsi1/index_files_test.go new file mode 100644 index 0000000000..290bda1ef3 --- /dev/null +++ b/tsdb/engine/tsi1/index_files_test.go @@ -0,0 +1 @@ +package tsi1_test diff --git a/tsdb/engine/tsi1/log_file.go b/tsdb/engine/tsi1/log_file.go index 5f5c433efe..80e08abb3a 100644 --- a/tsdb/engine/tsi1/log_file.go +++ b/tsdb/engine/tsi1/log_file.go @@ -217,12 +217,12 @@ func (f *LogFile) measurement(name []byte) logMeasurement { func (f *LogFile) MeasurementIterator() MeasurementIterator { var itr measurementIterator for _, mm := range f.mms { - itr.mms = append(itr.mms, MeasurementElem{ + itr.elems = append(itr.elems, MeasurementElem{ Name: mm.name, Deleted: mm.deleted, }) } - sort.Sort(MeasurementElems(itr.mms)) + sort.Sort(MeasurementElems(itr.elems)) return &itr } @@ -334,7 +334,7 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, n *int64) error { for _, tag := range mm.tagSet { // Mark tag deleted. if tag.deleted { - tsw.AddTag(tag.name, true) + tsw.DeleteTag(tag.name) continue } diff --git a/tsdb/engine/tsi1/tag_set.go b/tsdb/engine/tsi1/tag_set.go index 7e66ce744d..19e5736196 100644 --- a/tsdb/engine/tsi1/tag_set.go +++ b/tsdb/engine/tsi1/tag_set.go @@ -60,7 +60,7 @@ func (ts *TagSet) Version() int { return ts.version } // TagKeyElem returns an element for a tag key. // Returns an element with a nil key if not found. -func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem { +func (ts *TagSet) TagKeyElem(key []byte) TagSetKeyElem { keyN := binary.BigEndian.Uint32(ts.hashData[:TagKeyNSize]) hash := hashKey(key) pos := int(hash % keyN) @@ -75,7 +75,7 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem { // Evaluate key if offset is not empty. if offset > 0 { // Parse into element. - var e TagKeyElem + var e TagSetKeyElem e.UnmarshalBinary(ts.data[offset:]) // Return if keys match. @@ -85,7 +85,7 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem { // Check if we've exceeded the probe distance. if d > dist(hashKey(e.Key), pos, int(keyN)) { - return TagKeyElem{} + return TagSetKeyElem{} } } @@ -97,11 +97,11 @@ func (ts *TagSet) TagKeyElem(key []byte) TagKeyElem { // TagValueElem returns an element for a tag value. // Returns an element with a nil value if not found. -func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem { +func (ts *TagSet) TagValueElem(key, value []byte) TagSetValueElem { // Find key element, exit if not found. kelem := ts.TagKeyElem(key) if len(kelem.Key) == 0 { - return TagValueElem{} + return TagSetValueElem{} } hashData := ts.data[kelem.Offset:] @@ -119,7 +119,7 @@ func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem { // Evaluate value if offset is not empty. if offset > 0 { // Parse into element. - var e TagValueElem + var e TagSetValueElem e.UnmarshalBinary(ts.data[offset:]) // Return if values match. @@ -129,7 +129,7 @@ func (ts *TagSet) TagValueElem(key, value []byte) TagValueElem { // Check if we've exceeded the probe distance. if d > dist(hashKey(e.Value), pos, int(valueN)) { - return TagValueElem{} + return TagSetValueElem{} } } @@ -172,15 +172,15 @@ func (ts *TagSet) UnmarshalBinary(data []byte) error { return nil } -// TagKeyElem represents a tag key element. -type TagKeyElem struct { +// TagSetKeyElem represents a tag key element in a TagSetBlock. +type TagSetKeyElem struct { Flag byte Key []byte Offset uint64 // Value block offset } // UnmarshalBinary unmarshals data into e. -func (e *TagKeyElem) UnmarshalBinary(data []byte) { +func (e *TagSetKeyElem) UnmarshalBinary(data []byte) { // Parse flag data. e.Flag, data = data[0], data[1:] @@ -193,8 +193,8 @@ func (e *TagKeyElem) UnmarshalBinary(data []byte) { e.Key = data[:sz] } -// TagValueElem represents a tag value element. -type TagValueElem struct { +// TagSetValueElem represents a tag value element. +type TagSetValueElem struct { Flag byte Value []byte Series struct { @@ -204,12 +204,12 @@ type TagValueElem struct { } // SeriesID returns series ID at an index. -func (e *TagValueElem) SeriesID(i int) uint32 { +func (e *TagSetValueElem) SeriesID(i int) uint32 { return binary.BigEndian.Uint32(e.Series.Data[i*SeriesIDSize:]) } // SeriesIDs returns a list decoded series ids. -func (e *TagValueElem) SeriesIDs() []uint32 { +func (e *TagSetValueElem) SeriesIDs() []uint32 { a := make([]uint32, e.Series.N) for i := 0; i < int(e.Series.N); i++ { a[i] = e.SeriesID(i) @@ -218,7 +218,7 @@ func (e *TagValueElem) SeriesIDs() []uint32 { } // UnmarshalBinary unmarshals data into e. -func (e *TagValueElem) UnmarshalBinary(data []byte) { +func (e *TagSetValueElem) UnmarshalBinary(data []byte) { // Parse flag data. e.Flag, data = data[0], data[1:] @@ -246,10 +246,10 @@ func NewTagSetWriter() *TagSetWriter { } } -// AddTag adds a key without any associated values. -func (tsw *TagSetWriter) AddTag(key []byte, deleted bool) { +// DeleteTag marks a key as deleted. +func (tsw *TagSetWriter) DeleteTag(key []byte) { ts := tsw.sets[string(key)] - ts.deleted = deleted + ts.deleted = true tsw.sets[string(key)] = ts } diff --git a/tsdb/engine/tsi1/tsi1.go b/tsdb/engine/tsi1/tsi1.go index 6cabcc8dd1..92a2f128e1 100644 --- a/tsdb/engine/tsi1/tsi1.go +++ b/tsdb/engine/tsi1/tsi1.go @@ -29,24 +29,401 @@ type MeasurementIterator interface { Next() *MeasurementElem } +// NewMeasurementIterator returns an iterator that operates on an in-memory slice. +func NewMeasurementIterator(elems []MeasurementElem) MeasurementIterator { + return &measurementIterator{elems: elems} +} + // measurementIterator represents an iterator over a slice of measurements. type measurementIterator struct { - mms []MeasurementElem + elems []MeasurementElem } // Next shifts the next element off the list. -func (itr *measurementIterator) Next() *MeasurementElem { - if len(itr.mms) == 0 { +func (itr *measurementIterator) Next() (e *MeasurementElem) { + if len(itr.elems) == 0 { return nil } - mm := itr.mms[0] - itr.mms = itr.mms[1:] - return &mm + e, itr.elems = &itr.elems[0], itr.elems[1:] + return e +} + +// MergeMeasurementIterators returns an iterator that merges a set of iterators. +// Iterators that are first in the list take precendence and a deletion by those +// early iterators will invalidate elements by later iterators. +func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator { + itr := &measurementMergeIterator{ + buf: make([]MeasurementElem, len(itrs)), + itrs: itrs, + } + + // Initialize buffers. + for i := range itr.itrs { + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } + } + + return itr +} + +type measurementMergeIterator struct { + e MeasurementElem + buf []MeasurementElem + itrs []MeasurementIterator +} + +// Next returns the element with the next lowest name across the iterators. +// +// If multiple iterators contain the same name then the first is returned +// and the remaining ones are skipped. +func (itr *measurementMergeIterator) Next() *MeasurementElem { + itr.e = MeasurementElem{} + + // Find next lowest name amongst the buffers. + var name []byte + for i := range itr.buf { + if len(itr.buf[i].Name) == 0 { + continue + } else if name == nil || bytes.Compare(itr.buf[i].Name, name) == -1 { + name = itr.buf[i].Name + } + } + + // Return nil if no elements remaining. + if len(name) == 0 { + return nil + } + + // Refill buffer. + for i := range itr.buf { + if !bytes.Equal(itr.buf[i].Name, name) { + continue + } + + // Copy first matching buffer to the return buffer. + if len(itr.e.Name) == 0 { + itr.e = itr.buf[i] + } + + // Fill buffer with next element. + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } else { + itr.buf[i] = MeasurementElem{} + } + } + + return &itr.e +} + +// TagKeyElem represents a generic tag key element. +type TagKeyElem struct { + Key []byte + Deleted bool +} + +// TagKeyIterator represents a iterator over a list of tag keys. +type TagKeyIterator interface { + Next() *TagKeyElem +} + +// NewTagKeyIterator returns an iterator that operates on an in-memory slice. +func NewTagKeyIterator(a []TagKeyElem) TagKeyIterator { + return &tagKeyIterator{elems: a} +} + +// tagKeyIterator represents an iterator over a slice of tag keys. +type tagKeyIterator struct { + elems []TagKeyElem +} + +// Next returns the next element. +func (itr *tagKeyIterator) Next() (e *TagKeyElem) { + if len(itr.elems) == 0 { + return nil + } + e, itr.elems = &itr.elems[0], itr.elems[1:] + return e +} + +// MergeTagKeyIterators returns an iterator that merges a set of iterators. +// Iterators that are first in the list take precendence and a deletion by those +// early iterators will invalidate elements by later iterators. +func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator { + itr := &tagKeyMergeIterator{ + buf: make([]TagKeyElem, len(itrs)), + itrs: itrs, + } + + // Initialize buffers. + for i := range itr.itrs { + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } + } + + return itr +} + +type tagKeyMergeIterator struct { + e TagKeyElem + buf []TagKeyElem + itrs []TagKeyIterator +} + +// Next returns the element with the next lowest key across the iterators. +// +// If multiple iterators contain the same key then the first is returned +// and the remaining ones are skipped. +func (itr *tagKeyMergeIterator) Next() *TagKeyElem { + itr.e = TagKeyElem{} + + // Find next lowest key amongst the buffers. + var key []byte + for i := range itr.buf { + if len(itr.buf[i].Key) == 0 { + continue + } else if key == nil || bytes.Compare(itr.buf[i].Key, key) == -1 { + key = itr.buf[i].Key + } + } + + // Return nil if no elements remaining. + if len(key) == 0 { + return nil + } + + // Refill buffer. + for i := range itr.buf { + if !bytes.Equal(itr.buf[i].Key, key) { + continue + } + + // Copy first matching buffer to the return buffer. + if len(itr.e.Key) == 0 { + itr.e = itr.buf[i] + } + + // Fill buffer with next element. + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } else { + itr.buf[i] = TagKeyElem{} + } + } + + return &itr.e +} + +// TagValueElem represents a generic tag value element. +type TagValueElem struct { + Value []byte + Deleted bool +} + +// TagValueIterator represents a iterator over a list of tag values. +type TagValueIterator interface { + Next() *TagValueElem +} + +// NewTagValueIterator returns an iterator that operates on an in-memory slice. +func NewTagValueIterator(a []TagValueElem) TagValueIterator { + return &tagValueIterator{elems: a} +} + +// tagValueIterator represents an iterator over a slice of tag values. +type tagValueIterator struct { + elems []TagValueElem +} + +// Next returns the next element. +func (itr *tagValueIterator) Next() (e *TagValueElem) { + if len(itr.elems) == 0 { + return nil + } + e, itr.elems = &itr.elems[0], itr.elems[1:] + return e +} + +// MergeTagValueIterators returns an iterator that merges a set of iterators. +// Iterators that are first in the list take precendence and a deletion by those +// early iterators will invalidate elements by later iterators. +func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator { + itr := &tagValueMergeIterator{ + buf: make([]TagValueElem, len(itrs)), + itrs: itrs, + } + + // Initialize buffers. + for i := range itr.itrs { + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } + } + + return itr +} + +type tagValueMergeIterator struct { + e TagValueElem + buf []TagValueElem + itrs []TagValueIterator +} + +// Next returns the element with the next lowest value across the iterators. +// +// If multiple iterators contain the same value then the first is returned +// and the remaining ones are skipped. +func (itr *tagValueMergeIterator) Next() *TagValueElem { + itr.e = TagValueElem{} + + // Find next lowest value amongst the buffers. + var value []byte + for i := range itr.buf { + if len(itr.buf[i].Value) == 0 { + continue + } else if value == nil || bytes.Compare(itr.buf[i].Value, value) == -1 { + value = itr.buf[i].Value + } + } + + // Return nil if no elements remaining. + if len(value) == 0 { + return nil + } + + // Refill buffer. + for i := range itr.buf { + if !bytes.Equal(itr.buf[i].Value, value) { + continue + } + + // Copy first matching buffer to the return buffer. + if len(itr.e.Value) == 0 { + itr.e = itr.buf[i] + } + + // Fill buffer with next element. + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } else { + itr.buf[i] = TagValueElem{} + } + } + + return &itr.e +} + +// SeriesElem represents a generic series element. +type SeriesElem struct { + Name []byte + Tags models.Tags + Deleted bool } // SeriesIterator represents a iterator over a list of series. type SeriesIterator interface { - Next(name *[]byte, tags *models.Tags, deleted *bool) + Next() *SeriesElem +} + +// NewSeriesIterator returns an iterator that operates on an in-memory slice. +func NewSeriesIterator(a []SeriesElem) SeriesIterator { + return &seriesIterator{elems: a} +} + +// seriesIterator represents an iterator over a slice of tag values. +type seriesIterator struct { + elems []SeriesElem +} + +// Next returns the next element. +func (itr *seriesIterator) Next() (e *SeriesElem) { + if len(itr.elems) == 0 { + return nil + } + e, itr.elems = &itr.elems[0], itr.elems[1:] + return e +} + +// MergeSeriesIterators returns an iterator that merges a set of iterators. +// Iterators that are first in the list take precendence and a deletion by those +// early iterators will invalidate elements by later iterators. +func MergeSeriesIterators(itrs ...SeriesIterator) SeriesIterator { + itr := &seriesMergeIterator{ + buf: make([]SeriesElem, len(itrs)), + itrs: itrs, + } + + // Initialize buffers. + for i := range itr.itrs { + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } + } + + return itr +} + +type seriesMergeIterator struct { + e SeriesElem + buf []SeriesElem + itrs []SeriesIterator +} + +// Next returns the element with the next lowest name/tags across the iterators. +// +// If multiple iterators contain the same name/tags then the first is returned +// and the remaining ones are skipped. +func (itr *seriesMergeIterator) Next() *SeriesElem { + itr.e = SeriesElem{} + + // Find next lowest name/tags amongst the buffers. + var name []byte + var tags models.Tags + for i := range itr.buf { + // Skip empty buffers. + if len(itr.buf[i].Name) == 0 { + continue + } + + // If the name is not set the pick the first non-empty name. + if name == nil { + name, tags = itr.buf[i].Name, itr.buf[i].Tags + continue + } + + // Set name/tags if they are lower than what has been seen. + if cmp := bytes.Compare(itr.buf[i].Name, name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags, tags) == -1) { + name, tags = itr.buf[i].Name, itr.buf[i].Tags + } + } + + // Return nil if no elements remaining. + if len(name) == 0 { + return nil + } + + // Refill buffer. + for i := range itr.buf { + if !bytes.Equal(itr.buf[i].Name, name) || models.CompareTags(itr.buf[i].Tags, tags) != 0 { + continue + } + + // Copy first matching buffer to the return buffer. + if len(itr.e.Name) == 0 { + itr.e = itr.buf[i] + } + + // Fill buffer with next element. + if e := itr.itrs[i].Next(); e != nil { + itr.buf[i] = *e + } else { + itr.buf[i] = SeriesElem{} + } + } + + return &itr.e } // writeTo writes write v into w. Updates n. @@ -134,3 +511,19 @@ type uint32Slice []uint32 func (a uint32Slice) Len() int { return len(a) } func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] } + +type byteSlices [][]byte + +func (a byteSlices) Len() int { return len(a) } +func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } + +// copyBytes returns a copy of b. +func copyBytes(b []byte) []byte { + if b == nil { + return nil + } + buf := make([]byte, len(b)) + copy(buf, b) + return buf +} diff --git a/tsdb/engine/tsi1/tsi1_test.go b/tsdb/engine/tsi1/tsi1_test.go new file mode 100644 index 0000000000..b97c6b35a7 --- /dev/null +++ b/tsdb/engine/tsi1/tsi1_test.go @@ -0,0 +1,197 @@ +package tsi1_test + +import ( + "reflect" + "testing" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/engine/tsi1" +) + +// Ensure iterator can operate over an in-memory list of elements. +func TestMeasurementIterator(t *testing.T) { + elems := []tsi1.MeasurementElem{ + {Name: []byte("cpu"), Deleted: true}, + {Name: []byte("mem")}, + } + + itr := tsi1.NewMeasurementIterator(elems) + if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can merge multiple iterators together. +func TestMergeMeasurementIterators(t *testing.T) { + itr := tsi1.MergeMeasurementIterators( + tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{ + {Name: []byte("aaa")}, + {Name: []byte("bbb"), Deleted: true}, + {Name: []byte("ccc")}, + }), + tsi1.NewMeasurementIterator(nil), + tsi1.NewMeasurementIterator([]tsi1.MeasurementElem{ + {Name: []byte("bbb")}, + {Name: []byte("ccc"), Deleted: true}, + {Name: []byte("ddd")}, + }), + ) + + if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("aaa")}) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("bbb"), Deleted: true}) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ccc")}) { + t.Fatalf("unexpected elem(2): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.MeasurementElem{Name: []byte("ddd")}) { + t.Fatalf("unexpected elem(3): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can operate over an in-memory list of tag key elements. +func TestTagKeyIterator(t *testing.T) { + elems := []tsi1.TagKeyElem{ + {Key: []byte("aaa"), Deleted: true}, + {Key: []byte("bbb")}, + } + + itr := tsi1.NewTagKeyIterator(elems) + if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can merge multiple iterators together. +func TestMergeTagKeyIterators(t *testing.T) { + itr := tsi1.MergeTagKeyIterators( + tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{ + {Key: []byte("aaa")}, + {Key: []byte("bbb"), Deleted: true}, + {Key: []byte("ccc")}, + }), + tsi1.NewTagKeyIterator(nil), + tsi1.NewTagKeyIterator([]tsi1.TagKeyElem{ + {Key: []byte("bbb")}, + {Key: []byte("ccc"), Deleted: true}, + {Key: []byte("ddd")}, + }), + ) + + if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("aaa")}) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("bbb"), Deleted: true}) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ccc")}) { + t.Fatalf("unexpected elem(2): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagKeyElem{Key: []byte("ddd")}) { + t.Fatalf("unexpected elem(3): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can operate over an in-memory list of tag value elements. +func TestTagValueIterator(t *testing.T) { + elems := []tsi1.TagValueElem{ + {Value: []byte("aaa"), Deleted: true}, + {Value: []byte("bbb")}, + } + + itr := tsi1.NewTagValueIterator(elems) + if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can merge multiple iterators together. +func TestMergeTagValueIterators(t *testing.T) { + itr := tsi1.MergeTagValueIterators( + tsi1.NewTagValueIterator([]tsi1.TagValueElem{ + {Value: []byte("aaa")}, + {Value: []byte("bbb"), Deleted: true}, + {Value: []byte("ccc")}, + }), + tsi1.NewTagValueIterator(nil), + tsi1.NewTagValueIterator([]tsi1.TagValueElem{ + {Value: []byte("bbb")}, + {Value: []byte("ccc"), Deleted: true}, + {Value: []byte("ddd")}, + }), + ) + + if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("aaa")}) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("bbb"), Deleted: true}) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ccc")}) { + t.Fatalf("unexpected elem(2): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.TagValueElem{Value: []byte("ddd")}) { + t.Fatalf("unexpected elem(3): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can operate over an in-memory list of series. +func TestSeriesIterator(t *testing.T) { + elems := []tsi1.SeriesElem{ + {Name: []byte("cpu"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}, + {Name: []byte("mem")}, + } + + itr := tsi1.NewSeriesIterator(elems) + if e := itr.Next(); !reflect.DeepEqual(&elems[0], e) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(&elems[1], e) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +} + +// Ensure iterator can merge multiple iterators together. +func TestMergeSeriesIterators(t *testing.T) { + itr := tsi1.MergeSeriesIterators( + tsi1.NewSeriesIterator([]tsi1.SeriesElem{ + {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}, + {Name: []byte("bbb"), Deleted: true}, + {Name: []byte("ccc")}, + }), + tsi1.NewSeriesIterator(nil), + tsi1.NewSeriesIterator([]tsi1.SeriesElem{ + {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}}, + {Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}, + {Name: []byte("bbb")}, + {Name: []byte("ccc"), Deleted: true}, + {Name: []byte("ddd")}, + }), + ) + + if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-east")}}, Deleted: true}) { + t.Fatalf("unexpected elem(0): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("aaa"), Tags: models.Tags{{Key: []byte("region"), Value: []byte("us-west")}}}) { + t.Fatalf("unexpected elem(1): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("bbb"), Deleted: true}) { + t.Fatalf("unexpected elem(2): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ccc")}) { + t.Fatalf("unexpected elem(3): %#v", e) + } else if e := itr.Next(); !reflect.DeepEqual(e, &tsi1.SeriesElem{Name: []byte("ddd")}) { + t.Fatalf("unexpected elem(4): %#v", e) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil elem: %#v", e) + } +}