Refactor tsi1 merge iterators, finish multi-file compaction.
parent
e3af4b0dad
commit
8863e3c0f3
|
@ -1617,6 +1617,17 @@ type Tag struct {
|
|||
Value []byte
|
||||
}
|
||||
|
||||
// String returns the string reprsentation of the tag.
|
||||
func (t *Tag) String() string {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteByte('{')
|
||||
buf.WriteString(string(t.Key))
|
||||
buf.WriteByte(' ')
|
||||
buf.WriteString(string(t.Value))
|
||||
buf.WriteByte('}')
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// Tags represents a sorted list of tags.
|
||||
type Tags []Tag
|
||||
|
||||
|
@ -1633,14 +1644,23 @@ func NewTags(m map[string]string) Tags {
|
|||
return a
|
||||
}
|
||||
|
||||
// Len implements sort.Interface.
|
||||
func (a Tags) Len() int { return len(a) }
|
||||
// String returns the string representation of the tags.
|
||||
func (a Tags) String() string {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteByte('[')
|
||||
for i := range a {
|
||||
buf.WriteString(a[i].String())
|
||||
if i < len(a)-1 {
|
||||
buf.WriteByte(' ')
|
||||
}
|
||||
}
|
||||
buf.WriteByte(']')
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// Less implements sort.Interface.
|
||||
func (a Tags) Len() int { return len(a) }
|
||||
func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
|
||||
|
||||
// Swap implements sort.Interface.
|
||||
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// Equal returns true if a equals other.
|
||||
func (a Tags) Equal(other Tags) bool {
|
||||
|
|
|
@ -2,6 +2,7 @@ package rhh
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
@ -178,6 +179,20 @@ func (m *HashMap) dist(hash uint32, i int) int {
|
|||
return int(uint32(i+m.capacity-int(hash&m.mask)) & m.mask)
|
||||
}
|
||||
|
||||
// Keys returns a list of sorted keys.
|
||||
func (m *HashMap) Keys() [][]byte {
|
||||
a := make([][]byte, 0, m.Len())
|
||||
for i := 0; i < m.Cap(); i++ {
|
||||
k, v := m.Elem(i)
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
a = append(a, k)
|
||||
}
|
||||
sort.Sort(byteSlices(a))
|
||||
return a
|
||||
}
|
||||
|
||||
type hashElem struct {
|
||||
key []byte
|
||||
value interface{}
|
||||
|
@ -206,3 +221,9 @@ func pow2(v int) int {
|
|||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
|
|
@ -129,16 +129,15 @@ func (i *IndexFile) MeasurementIterator() MeasurementIterator {
|
|||
// TagKeyIterator returns an iterator over all tag keys for a measurement.
|
||||
func (i *IndexFile) TagKeyIterator(name []byte) (TagKeyIterator, error) {
|
||||
// Create an internal iterator.
|
||||
itr, err := i.tagBlockKeyIterator(name)
|
||||
bitr, err := i.tagBlockKeyIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decode into an externally accessible iterator.
|
||||
return &tagKeyDecodeIterator{
|
||||
itr: itr,
|
||||
sblk: &i.sblk,
|
||||
}, nil
|
||||
itr := newTagKeyDecodeIterator(&i.sblk)
|
||||
itr.itr = bitr
|
||||
return &itr, nil
|
||||
}
|
||||
|
||||
// tagBlockKeyIterator returns an internal iterator over all tag keys for a measurement.
|
||||
|
@ -167,7 +166,7 @@ func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
|||
|
||||
// SeriesIterator returns an iterator over all series.
|
||||
func (i *IndexFile) SeriesIterator() SeriesIterator {
|
||||
return i.sblk.Iterator()
|
||||
return i.sblk.SeriesIterator()
|
||||
}
|
||||
|
||||
// ReadIndexFileTrailer returns the index file trailer from data.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
)
|
||||
|
@ -53,8 +54,12 @@ func (p IndexFiles) SeriesIterator() SeriesIterator {
|
|||
}
|
||||
|
||||
// MeasurementSeriesIterator returns an iterator that merges series across all files.
|
||||
func (p *IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
||||
panic("TODO")
|
||||
func (p IndexFiles) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
||||
a := make([]SeriesIterator, len(p))
|
||||
for i := range p {
|
||||
a[i] = p[i].MeasurementSeriesIterator(name)
|
||||
}
|
||||
return MergeSeriesIterators(a...)
|
||||
}
|
||||
|
||||
// TagValueSeriesIterator returns an iterator that merges series across all files.
|
||||
|
@ -168,7 +173,7 @@ func (p *IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactI
|
|||
sort.Sort(uint32Slice(seriesIDs))
|
||||
|
||||
// Insert tag value into writer.
|
||||
tw.AddTagValue(name, ve.Value(), ve.Deleted(), seriesIDs)
|
||||
tw.AddTagValue(ke.Key(), ve.Value(), ve.Deleted(), seriesIDs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,7 +207,7 @@ func (p *IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo
|
|||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
seriesID := info.sw.Offset(e.Name(), e.Tags())
|
||||
if seriesID == 0 {
|
||||
panic("expected series id")
|
||||
panic(fmt.Sprintf("expected series id: %s %s", e.Name(), e.Tags().String()))
|
||||
}
|
||||
seriesIDs = append(seriesIDs, seriesID)
|
||||
}
|
||||
|
|
|
@ -44,5 +44,10 @@ func TestIndexFiles_WriteTo(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// TODO: Verify data in compacted file.
|
||||
// Verify data in compacted file.
|
||||
if e, err := f.TagValueElem([]byte("cpu"), []byte("region"), []byte("west")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if e.SeriesN() != 1 {
|
||||
t.Fatalf("unexpected series count: %d", e.SeriesN())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sort"
|
||||
|
@ -116,40 +117,48 @@ func (blk *SeriesBlock) decodeElemAt(offset uint32, e *seriesBlockElem) {
|
|||
|
||||
// Read length.
|
||||
n, sz := binary.Uvarint(data)
|
||||
e.size += int64(n)
|
||||
data = data[sz:]
|
||||
e.size += int64(sz) + int64(n)
|
||||
|
||||
// Decode the name and tags into the element.
|
||||
blk.DecodeSeries(data[sz:], &e.name, &e.tags)
|
||||
blk.DecodeSeries(data[:n], &e.name, &e.tags)
|
||||
}
|
||||
|
||||
// DecodeSeries decodes a dictionary encoded series into a name and tagset.
|
||||
func (blk *SeriesBlock) DecodeSeries(v []byte, name *[]byte, tags *models.Tags) {
|
||||
func (blk *SeriesBlock) DecodeSeries(buf []byte, name *[]byte, tags *models.Tags) {
|
||||
// Read name.
|
||||
offset, n := binary.Uvarint(v)
|
||||
*name, v = blk.DecodeTerm(uint32(offset)), v[n:]
|
||||
offset, n := binary.Uvarint(buf)
|
||||
*name, buf = blk.DecodeTerm(uint32(offset)), buf[n:]
|
||||
|
||||
// Read tag count.
|
||||
tagN, n := binary.Uvarint(v)
|
||||
v = v[n:]
|
||||
tagN, n := binary.Uvarint(buf)
|
||||
buf = buf[n:]
|
||||
|
||||
// Clear tags, if necessary.
|
||||
if len(*tags) > 0 {
|
||||
*tags = (*tags)[0:]
|
||||
*tags = (*tags)[:0]
|
||||
}
|
||||
|
||||
// Loop over tag key/values.
|
||||
for i := 0; i < int(tagN); i++ {
|
||||
// Read key.
|
||||
offset, n := binary.Uvarint(v)
|
||||
key, v := blk.DecodeTerm(uint32(offset)), v[n:]
|
||||
offset, n := binary.Uvarint(buf)
|
||||
key := blk.DecodeTerm(uint32(offset))
|
||||
buf = buf[n:]
|
||||
|
||||
// Read value.
|
||||
offset, n = binary.Uvarint(v)
|
||||
value, v := blk.DecodeTerm(uint32(offset)), v[n:]
|
||||
offset, n = binary.Uvarint(buf)
|
||||
value := blk.DecodeTerm(uint32(offset))
|
||||
buf = buf[n:]
|
||||
|
||||
// Add to tagset.
|
||||
tags.Set(key, value)
|
||||
}
|
||||
|
||||
// Ensure that the whole slice was read.
|
||||
if len(buf) != 0 {
|
||||
panic(fmt.Sprintf("remaining unmarshaled data in series: % x", buf))
|
||||
}
|
||||
}
|
||||
|
||||
// DecodeTerm returns the term at the given offset.
|
||||
|
@ -197,8 +206,8 @@ func (blk *SeriesBlock) SeriesCount() uint32 {
|
|||
return binary.BigEndian.Uint32(blk.seriesData[:SeriesCountSize])
|
||||
}
|
||||
|
||||
// Iterator returns an iterator over all the series.
|
||||
func (blk *SeriesBlock) Iterator() SeriesIterator {
|
||||
// SeriesIterator returns an iterator over all the series.
|
||||
func (blk *SeriesBlock) SeriesIterator() SeriesIterator {
|
||||
return &seriesBlockIterator{
|
||||
n: blk.SeriesCount(),
|
||||
offset: uint32(len(blk.termData) + SeriesCountSize),
|
||||
|
@ -259,6 +268,11 @@ type seriesDecodeIterator struct {
|
|||
e seriesBlockElem // buffer
|
||||
}
|
||||
|
||||
// newSeriesDecodeIterator returns a new instance of seriesDecodeIterator.
|
||||
func newSeriesDecodeIterator(sblk *SeriesBlock) seriesDecodeIterator {
|
||||
return seriesDecodeIterator{sblk: sblk}
|
||||
}
|
||||
|
||||
// Next returns the next series element.
|
||||
func (itr *seriesDecodeIterator) Next() SeriesElem {
|
||||
// Read next series id.
|
||||
|
@ -330,7 +344,11 @@ func (sw *SeriesBlockWriter) append(name []byte, tags models.Tags, deleted bool)
|
|||
}
|
||||
|
||||
// Append series to list.
|
||||
sw.series = append(sw.series, serie{name: name, tags: tags, deleted: deleted})
|
||||
sw.series = append(sw.series, serie{
|
||||
name: copyBytes(name),
|
||||
tags: models.CopyTags(tags),
|
||||
deleted: deleted,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -195,6 +195,7 @@ func (itr *tagBlockKeyIterator) next() *TagBlockKeyElem {
|
|||
itr.e.unmarshal(itr.keyData, itr.blk.data)
|
||||
itr.keyData = itr.keyData[itr.e.size:]
|
||||
|
||||
assert(len(itr.e.Key()) > 0, "invalid zero-length tag key")
|
||||
return &itr.e
|
||||
}
|
||||
|
||||
|
@ -215,6 +216,7 @@ func (itr *tagBlockValueIterator) next() *TagBlockValueElem {
|
|||
itr.e.unmarshal(itr.data)
|
||||
itr.data = itr.data[itr.e.size:]
|
||||
|
||||
assert(len(itr.e.Value()) > 0, "invalid zero-length tag value")
|
||||
return &itr.e
|
||||
}
|
||||
|
||||
|
@ -288,9 +290,8 @@ func (e *TagBlockKeyElem) unmarshal(buf, data []byte) {
|
|||
|
||||
// tagKeyDecodeElem provides an adapter for tagBlockKeyElem to TagKeyElem.
|
||||
type tagKeyDecodeElem struct {
|
||||
e *TagBlockKeyElem
|
||||
sblk *SeriesBlock
|
||||
itr tagValueDecodeIterator
|
||||
e *TagBlockKeyElem
|
||||
itr tagValueDecodeIterator
|
||||
}
|
||||
|
||||
// Key returns the key from the underlying element.
|
||||
|
@ -301,10 +302,7 @@ func (e *tagKeyDecodeElem) Deleted() bool { return e.e.Deleted() }
|
|||
|
||||
// TagValueIterator returns a decode iterator for the underlying value iterator.
|
||||
func (e *tagKeyDecodeElem) TagValueIterator() TagValueIterator {
|
||||
e.itr = tagValueDecodeIterator{
|
||||
itr: e.e.tagValueIterator(),
|
||||
sblk: e.sblk,
|
||||
}
|
||||
e.itr.itr = e.e.tagValueIterator()
|
||||
return &e.itr
|
||||
}
|
||||
|
||||
|
@ -315,6 +313,16 @@ type tagKeyDecodeIterator struct {
|
|||
e tagKeyDecodeElem
|
||||
}
|
||||
|
||||
// newTagKeyDecodeIterator returns a new instance of tagKeyDecodeIterator.
|
||||
func newTagKeyDecodeIterator(sblk *SeriesBlock) tagKeyDecodeIterator {
|
||||
return tagKeyDecodeIterator{
|
||||
sblk: sblk,
|
||||
e: tagKeyDecodeElem{
|
||||
itr: newTagValueDecodeIterator(sblk),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns the next element in the iterator.
|
||||
func (itr *tagKeyDecodeIterator) Next() TagKeyElem {
|
||||
// Find next internal element.
|
||||
|
@ -383,6 +391,7 @@ func (e *TagBlockValueElem) unmarshal(buf []byte) {
|
|||
|
||||
// Save reference to series data.
|
||||
e.series.data = buf[:e.series.n*SeriesIDSize]
|
||||
buf = buf[e.series.n*SeriesIDSize:]
|
||||
|
||||
// Save length of elem.
|
||||
e.size = start - len(buf)
|
||||
|
@ -396,9 +405,8 @@ func (e *TagBlockValueElem) seriesIDIterator() seriesIDIterator {
|
|||
|
||||
// tagValueDecodeElem provides an adapter for tagBlockValueElem to TagValueElem.
|
||||
type tagValueDecodeElem struct {
|
||||
e *TagBlockValueElem
|
||||
sblk *SeriesBlock
|
||||
itr seriesDecodeIterator
|
||||
e *TagBlockValueElem
|
||||
itr seriesDecodeIterator
|
||||
}
|
||||
|
||||
// Value returns the value from the underlying element.
|
||||
|
@ -409,10 +417,7 @@ func (e *tagValueDecodeElem) Deleted() bool { return e.e.Deleted() }
|
|||
|
||||
// SeriesIterator returns a decode iterator for the underlying value iterator.
|
||||
func (e *tagValueDecodeElem) SeriesIterator() SeriesIterator {
|
||||
e.itr = seriesDecodeIterator{
|
||||
itr: e.e.seriesIDIterator(),
|
||||
sblk: e.sblk,
|
||||
}
|
||||
e.itr.itr = e.e.seriesIDIterator()
|
||||
return &e.itr
|
||||
}
|
||||
|
||||
|
@ -423,6 +428,16 @@ type tagValueDecodeIterator struct {
|
|||
e tagValueDecodeElem
|
||||
}
|
||||
|
||||
// newTagValueDecodeIterator returns a new instance of tagValueDecodeIterator.
|
||||
func newTagValueDecodeIterator(sblk *SeriesBlock) tagValueDecodeIterator {
|
||||
return tagValueDecodeIterator{
|
||||
sblk: sblk,
|
||||
e: tagValueDecodeElem{
|
||||
itr: newSeriesDecodeIterator(sblk),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns the next element in the iterator.
|
||||
func (itr *tagValueDecodeIterator) Next() TagValueElem {
|
||||
// Find next internal element.
|
||||
|
@ -546,6 +561,8 @@ func NewTagBlockWriter() *TagBlockWriter {
|
|||
|
||||
// DeleteTag marks a key as deleted.
|
||||
func (tw *TagBlockWriter) DeleteTag(key []byte) {
|
||||
assert(len(key) > 0, "cannot delete zero-length tag")
|
||||
|
||||
ts := tw.sets[string(key)]
|
||||
ts.deleted = true
|
||||
tw.sets[string(key)] = ts
|
||||
|
@ -553,6 +570,10 @@ func (tw *TagBlockWriter) DeleteTag(key []byte) {
|
|||
|
||||
// AddTagValue adds a key/value pair with an associated list of series.
|
||||
func (tw *TagBlockWriter) AddTagValue(key, value []byte, deleted bool, seriesIDs []uint32) {
|
||||
assert(len(key) > 0, "cannot add zero-length key")
|
||||
assert(len(value) > 0, "cannot add zero-length value")
|
||||
assert(len(seriesIDs) > 0, "cannot add tag value without series ids")
|
||||
|
||||
ts, ok := tw.sets[string(key)]
|
||||
if !ok || ts.values == nil {
|
||||
ts.values = make(map[string]tagValue)
|
||||
|
@ -627,19 +648,18 @@ func (tw *TagBlockWriter) writeTagValueBlockTo(w io.Writer, ts *tagSet, n *int64
|
|||
Capacity: len(ts.values),
|
||||
LoadFactor: 90,
|
||||
})
|
||||
for value, tv := range ts.values {
|
||||
m.Put([]byte(value), tv)
|
||||
for value := range ts.values {
|
||||
tv := ts.values[value]
|
||||
m.Put([]byte(value), &tv)
|
||||
}
|
||||
|
||||
// Encode value list.
|
||||
ts.data.offset = *n
|
||||
offsets := make([]int64, m.Cap())
|
||||
for i := 0; i < m.Cap(); i++ {
|
||||
k, v := m.Elem(i)
|
||||
tv, _ := v.(tagValue)
|
||||
for _, k := range m.Keys() {
|
||||
tv := m.Get(k).(*tagValue)
|
||||
|
||||
// Save current offset so we can use it in the hash index.
|
||||
offsets[i] = *n
|
||||
tv.offset = *n
|
||||
|
||||
// Write value block.
|
||||
if err := tw.writeTagValueTo(w, k, tv, n); err != nil {
|
||||
|
@ -655,8 +675,15 @@ func (tw *TagBlockWriter) writeTagValueBlockTo(w io.Writer, ts *tagSet, n *int64
|
|||
}
|
||||
|
||||
// Encode hash map offset entries.
|
||||
for i := range offsets {
|
||||
if err := writeUint64To(w, uint64(offsets[i]), n); err != nil {
|
||||
for i := 0; i < m.Cap(); i++ {
|
||||
var offset int64
|
||||
|
||||
_, v := m.Elem(i)
|
||||
if v != nil {
|
||||
offset = v.(*tagValue).offset
|
||||
}
|
||||
|
||||
if err := writeUint64To(w, uint64(offset), n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -666,7 +693,9 @@ func (tw *TagBlockWriter) writeTagValueBlockTo(w io.Writer, ts *tagSet, n *int64
|
|||
}
|
||||
|
||||
// writeTagValueTo encodes a single tag value entry into w.
|
||||
func (tw *TagBlockWriter) writeTagValueTo(w io.Writer, v []byte, tv tagValue, n *int64) error {
|
||||
func (tw *TagBlockWriter) writeTagValueTo(w io.Writer, v []byte, tv *tagValue, n *int64) error {
|
||||
assert(len(v) > 0, "cannot write zero-length tag value")
|
||||
|
||||
// Write flag.
|
||||
if err := writeUint8To(w, tv.flag(), n); err != nil {
|
||||
return err
|
||||
|
@ -696,14 +725,10 @@ func (tw *TagBlockWriter) writeTagValueTo(w io.Writer, v []byte, tv tagValue, n
|
|||
|
||||
// writeTagKeyBlockTo encodes keys from a tag set into w.
|
||||
func (tw *TagBlockWriter) writeTagKeyBlockTo(w io.Writer, m *rhh.HashMap, t *TagBlockTrailer, n *int64) error {
|
||||
// Encode key list.
|
||||
// Encode key list in sorted order.
|
||||
t.KeyData.Offset = *n
|
||||
for i := 0; i < m.Cap(); i++ {
|
||||
k, v := m.Elem(i)
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
ts := v.(*tagSet)
|
||||
for _, k := range m.Keys() {
|
||||
ts := m.Get(k).(*tagSet)
|
||||
|
||||
// Save current offset so we can use it in the hash index.
|
||||
ts.offset = *n
|
||||
|
@ -741,6 +766,8 @@ func (tw *TagBlockWriter) writeTagKeyBlockTo(w io.Writer, m *rhh.HashMap, t *Tag
|
|||
|
||||
// writeTagKeyTo encodes a single tag key entry into w.
|
||||
func (tw *TagBlockWriter) writeTagKeyTo(w io.Writer, k []byte, ts *tagSet, n *int64) error {
|
||||
assert(len(k) > 0, "cannot write zero-length tag key")
|
||||
|
||||
if err := writeUint8To(w, ts.flag(), n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -791,6 +818,8 @@ func (ts tagSet) flag() byte {
|
|||
type tagValue struct {
|
||||
seriesIDs []uint32
|
||||
deleted bool
|
||||
|
||||
offset int64
|
||||
}
|
||||
|
||||
func (tv tagValue) flag() byte {
|
||||
|
|
|
@ -38,21 +38,15 @@ func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator
|
|||
return nil
|
||||
}
|
||||
|
||||
itr := &measurementMergeIterator{
|
||||
return &measurementMergeIterator{
|
||||
e: make(measurementMergeElem, 0, len(itrs)),
|
||||
buf: make([]MeasurementElem, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
|
||||
// Initialize buffers.
|
||||
for i := range itr.itrs {
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
}
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
type measurementMergeIterator struct {
|
||||
e MeasurementElem
|
||||
e measurementMergeElem
|
||||
buf []MeasurementElem
|
||||
itrs []MeasurementIterator
|
||||
}
|
||||
|
@ -64,36 +58,72 @@ type measurementMergeIterator struct {
|
|||
func (itr *measurementMergeIterator) Next() MeasurementElem {
|
||||
// Find next lowest name amongst the buffers.
|
||||
var name []byte
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil {
|
||||
continue
|
||||
} else if name == nil || bytes.Compare(itr.buf[i].Name(), name) == -1 {
|
||||
for i, buf := range itr.buf {
|
||||
// Fill buffer if empty.
|
||||
if buf == nil {
|
||||
if buf = itr.itrs[i].Next(); buf != nil {
|
||||
itr.buf[i] = buf
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Find next lowest name.
|
||||
if name == nil || bytes.Compare(itr.buf[i].Name(), name) == -1 {
|
||||
name = itr.buf[i].Name()
|
||||
}
|
||||
}
|
||||
|
||||
// Return nil if no elements remaining.
|
||||
if len(name) == 0 {
|
||||
if name == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refill buffer.
|
||||
var e MeasurementElem
|
||||
// Merge all elements together and clear buffers.
|
||||
itr.e = itr.e[:0]
|
||||
for i, buf := range itr.buf {
|
||||
if buf == nil || !bytes.Equal(buf.Name(), name) {
|
||||
continue
|
||||
}
|
||||
itr.e = append(itr.e, buf)
|
||||
itr.buf[i] = nil
|
||||
}
|
||||
return itr.e
|
||||
}
|
||||
|
||||
// Copy first matching buffer to the return buffer.
|
||||
if e == nil {
|
||||
e = buf
|
||||
}
|
||||
// measurementMergeElem represents a merged measurement element.
|
||||
type measurementMergeElem []MeasurementElem
|
||||
|
||||
// Fill buffer with next element.
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
// Name returns the name of the first element.
|
||||
func (p measurementMergeElem) Name() []byte {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
return p[0].Name()
|
||||
}
|
||||
|
||||
// Deleted returns the deleted flag of the first element.
|
||||
func (p measurementMergeElem) Deleted() bool {
|
||||
if len(p) == 0 {
|
||||
return false
|
||||
}
|
||||
return p[0].Deleted()
|
||||
}
|
||||
|
||||
// TagKeyIterator returns a merge iterator for all elements until a tombstone occurs.
|
||||
func (p measurementMergeElem) TagKeyIterator() TagKeyIterator {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return e
|
||||
a := make([]TagKeyIterator, 0, len(p))
|
||||
for _, e := range p {
|
||||
a = append(a, e.TagKeyIterator())
|
||||
if e.Deleted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
return MergeTagKeyIterators(a...)
|
||||
}
|
||||
|
||||
// TagKeyElem represents a generic tag key element.
|
||||
|
@ -116,21 +146,15 @@ func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
|
|||
return nil
|
||||
}
|
||||
|
||||
itr := &tagKeyMergeIterator{
|
||||
return &tagKeyMergeIterator{
|
||||
e: make(tagKeyMergeElem, 0, len(itrs)),
|
||||
buf: make([]TagKeyElem, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
|
||||
// Initialize buffers.
|
||||
for i := range itr.itrs {
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
}
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
type tagKeyMergeIterator struct {
|
||||
e TagKeyElem
|
||||
e tagKeyMergeElem
|
||||
buf []TagKeyElem
|
||||
itrs []TagKeyIterator
|
||||
}
|
||||
|
@ -142,36 +166,73 @@ type tagKeyMergeIterator struct {
|
|||
func (itr *tagKeyMergeIterator) Next() TagKeyElem {
|
||||
// Find next lowest key amongst the buffers.
|
||||
var key []byte
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil {
|
||||
continue
|
||||
} else if key == nil || bytes.Compare(itr.buf[i].Key(), key) == -1 {
|
||||
key = itr.buf[i].Key()
|
||||
for i, buf := range itr.buf {
|
||||
// Fill buffer.
|
||||
if buf == nil {
|
||||
if buf = itr.itrs[i].Next(); buf != nil {
|
||||
itr.buf[i] = buf
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Find next lowest key.
|
||||
if key == nil || bytes.Compare(buf.Key(), key) == -1 {
|
||||
key = buf.Key()
|
||||
}
|
||||
}
|
||||
|
||||
// Return nil if no elements remaining.
|
||||
if len(key) == 0 {
|
||||
if key == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refill buffer.
|
||||
var e TagKeyElem
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Key(), key) {
|
||||
// Merge elements together & clear buffer.
|
||||
itr.e = itr.e[:0]
|
||||
for i, buf := range itr.buf {
|
||||
if buf == nil || !bytes.Equal(buf.Key(), key) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Copy first matching buffer to the return buffer.
|
||||
if e == nil {
|
||||
e = itr.buf[i]
|
||||
}
|
||||
|
||||
// Fill buffer with next element.
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
itr.e = append(itr.e, buf)
|
||||
itr.buf[i] = nil
|
||||
}
|
||||
|
||||
return e
|
||||
return itr.e
|
||||
}
|
||||
|
||||
// tagKeyMergeElem represents a merged tag key element.
|
||||
type tagKeyMergeElem []TagKeyElem
|
||||
|
||||
// Key returns the key of the first element.
|
||||
func (p tagKeyMergeElem) Key() []byte {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
return p[0].Key()
|
||||
}
|
||||
|
||||
// Deleted returns the deleted flag of the first element.
|
||||
func (p tagKeyMergeElem) Deleted() bool {
|
||||
if len(p) == 0 {
|
||||
return false
|
||||
}
|
||||
return p[0].Deleted()
|
||||
}
|
||||
|
||||
// TagValueIterator returns a merge iterator for all elements until a tombstone occurs.
|
||||
func (p tagKeyMergeElem) TagValueIterator() TagValueIterator {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
a := make([]TagValueIterator, 0, len(p))
|
||||
for _, e := range p {
|
||||
a = append(a, e.TagValueIterator())
|
||||
if e.Deleted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
return MergeTagValueIterators(a...)
|
||||
}
|
||||
|
||||
// TagValueElem represents a generic tag value element.
|
||||
|
@ -194,21 +255,15 @@ func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator {
|
|||
return nil
|
||||
}
|
||||
|
||||
itr := &tagValueMergeIterator{
|
||||
return &tagValueMergeIterator{
|
||||
e: make(tagValueMergeElem, 0, len(itrs)),
|
||||
buf: make([]TagValueElem, len(itrs)),
|
||||
itrs: itrs,
|
||||
}
|
||||
|
||||
// Initialize buffers.
|
||||
for i := range itr.itrs {
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
}
|
||||
|
||||
return itr
|
||||
}
|
||||
|
||||
type tagValueMergeIterator struct {
|
||||
e TagValueElem
|
||||
e tagValueMergeElem
|
||||
buf []TagValueElem
|
||||
itrs []TagValueIterator
|
||||
}
|
||||
|
@ -220,35 +275,72 @@ type tagValueMergeIterator struct {
|
|||
func (itr *tagValueMergeIterator) Next() TagValueElem {
|
||||
// Find next lowest value amongst the buffers.
|
||||
var value []byte
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil {
|
||||
continue
|
||||
} else if value == nil || bytes.Compare(itr.buf[i].Value(), value) == -1 {
|
||||
value = itr.buf[i].Value()
|
||||
for i, buf := range itr.buf {
|
||||
// Fill buffer.
|
||||
if buf == nil {
|
||||
if buf = itr.itrs[i].Next(); buf != nil {
|
||||
itr.buf[i] = buf
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Find next lowest value.
|
||||
if value == nil || bytes.Compare(buf.Value(), value) == -1 {
|
||||
value = buf.Value()
|
||||
}
|
||||
}
|
||||
|
||||
// Return nil if no elements remaining.
|
||||
if len(value) == 0 {
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refill buffer.
|
||||
var e TagValueElem
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Value(), value) {
|
||||
// Merge elements and clear buffers.
|
||||
itr.e = itr.e[:0]
|
||||
for i, buf := range itr.buf {
|
||||
if buf == nil || !bytes.Equal(buf.Value(), value) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Copy first matching buffer to the return buffer.
|
||||
if e == nil {
|
||||
e = itr.buf[i]
|
||||
}
|
||||
|
||||
// Fill buffer with next element.
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
itr.e = append(itr.e, buf)
|
||||
itr.buf[i] = nil
|
||||
}
|
||||
return e
|
||||
return itr.e
|
||||
}
|
||||
|
||||
// tagValueMergeElem represents a merged tag value element.
|
||||
type tagValueMergeElem []TagValueElem
|
||||
|
||||
// Name returns the value of the first element.
|
||||
func (p tagValueMergeElem) Value() []byte {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
return p[0].Value()
|
||||
}
|
||||
|
||||
// Deleted returns the deleted flag of the first element.
|
||||
func (p tagValueMergeElem) Deleted() bool {
|
||||
if len(p) == 0 {
|
||||
return false
|
||||
}
|
||||
return p[0].Deleted()
|
||||
}
|
||||
|
||||
// SeriesIterator returns a merge iterator for all elements until a tombstone occurs.
|
||||
func (p tagValueMergeElem) SeriesIterator() SeriesIterator {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
a := make([]SeriesIterator, 0, len(p))
|
||||
for _, e := range p {
|
||||
a = append(a, e.SeriesIterator())
|
||||
if e.Deleted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
return MergeSeriesIterators(a...)
|
||||
}
|
||||
|
||||
// SeriesElem represents a generic series element.
|
||||
|
@ -297,43 +389,47 @@ func (itr *seriesMergeIterator) Next() SeriesElem {
|
|||
// Find next lowest name/tags amongst the buffers.
|
||||
var name []byte
|
||||
var tags models.Tags
|
||||
for i := range itr.buf {
|
||||
// Skip empty buffers.
|
||||
if itr.buf[i] == nil {
|
||||
continue
|
||||
for i, buf := range itr.buf {
|
||||
// Fill buffer.
|
||||
if buf == nil {
|
||||
if buf = itr.itrs[i].Next(); buf != nil {
|
||||
itr.buf[i] = buf
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// If the name is not set the pick the first non-empty name.
|
||||
if name == nil {
|
||||
name, tags = itr.buf[i].Name(), itr.buf[i].Tags()
|
||||
name, tags = buf.Name(), buf.Tags()
|
||||
continue
|
||||
}
|
||||
|
||||
// Set name/tags if they are lower than what has been seen.
|
||||
if cmp := bytes.Compare(itr.buf[i].Name(), name); cmp == -1 || (cmp == 0 && models.CompareTags(itr.buf[i].Tags(), tags) == -1) {
|
||||
name, tags = itr.buf[i].Name(), itr.buf[i].Tags()
|
||||
if cmp := bytes.Compare(buf.Name(), name); cmp == -1 || (cmp == 0 && models.CompareTags(buf.Tags(), tags) == -1) {
|
||||
name, tags = buf.Name(), buf.Tags()
|
||||
}
|
||||
}
|
||||
|
||||
// Return nil if no elements remaining.
|
||||
if len(name) == 0 {
|
||||
if name == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refill buffer.
|
||||
var e SeriesElem
|
||||
for i := range itr.buf {
|
||||
if itr.buf[i] == nil || !bytes.Equal(itr.buf[i].Name(), name) || models.CompareTags(itr.buf[i].Tags(), tags) != 0 {
|
||||
for i, buf := range itr.buf {
|
||||
if buf == nil || !bytes.Equal(buf.Name(), name) || models.CompareTags(buf.Tags(), tags) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Copy first matching buffer to the return buffer.
|
||||
if e == nil {
|
||||
e = itr.buf[i]
|
||||
e = buf
|
||||
}
|
||||
|
||||
// Fill buffer with next element.
|
||||
itr.buf[i] = itr.itrs[i].Next()
|
||||
// Clear buffer.
|
||||
itr.buf[i] = nil
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
@ -448,6 +544,7 @@ func writeUvarintTo(w io.Writer, v uint64, n *int64) error {
|
|||
}
|
||||
|
||||
func Hexdump(data []byte) {
|
||||
var buf bytes.Buffer
|
||||
addr := 0
|
||||
for len(data) > 0 {
|
||||
n := len(data)
|
||||
|
@ -455,12 +552,14 @@ func Hexdump(data []byte) {
|
|||
n = 16
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "%07x % x\n", addr, data[:n])
|
||||
fmt.Fprintf(&buf, "%07x % x\n", addr, data[:n])
|
||||
|
||||
data = data[n:]
|
||||
addr += n
|
||||
}
|
||||
fmt.Fprintln(os.Stderr, "")
|
||||
fmt.Fprintln(&buf, "")
|
||||
|
||||
buf.WriteTo(os.Stderr)
|
||||
}
|
||||
|
||||
// hashKey hashes a key using murmur3.
|
||||
|
@ -498,3 +597,10 @@ func copyBytes(b []byte) []byte {
|
|||
copy(buf, b)
|
||||
return buf
|
||||
}
|
||||
|
||||
// assert will panic with a given formatted message if the given condition is false.
|
||||
func assert(condition bool, msg string, v ...interface{}) {
|
||||
if !condition {
|
||||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsi1_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
|
@ -41,14 +42,14 @@ func TestMergeMeasurementIterators(t *testing.T) {
|
|||
}},
|
||||
)
|
||||
|
||||
if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("aaa")}) {
|
||||
t.Fatalf("unexpected elem(0): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("bbb"), deleted: true}) {
|
||||
t.Fatalf("unexpected elem(1): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ccc")}) {
|
||||
t.Fatalf("unexpected elem(2): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &MeasurementElem{name: []byte("ddd")}) {
|
||||
t.Fatalf("unexpected elem(3): %#v", e)
|
||||
if e := itr.Next(); !bytes.Equal(e.Name(), []byte("aaa")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(0): %s/%v", e.Name(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Name(), []byte("bbb")) || !e.Deleted() {
|
||||
t.Fatalf("unexpected elem(1): %s/%v", e.Name(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Name(), []byte("ccc")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(2): %s/%v", e.Name(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Name(), []byte("ddd")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(3): %s/%v", e.Name(), e.Deleted())
|
||||
} else if e := itr.Next(); e != nil {
|
||||
t.Fatalf("expected nil elem: %#v", e)
|
||||
}
|
||||
|
@ -87,14 +88,14 @@ func TestMergeTagKeyIterators(t *testing.T) {
|
|||
}},
|
||||
)
|
||||
|
||||
if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("aaa")}) {
|
||||
t.Fatalf("unexpected elem(0): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("bbb"), deleted: true}) {
|
||||
t.Fatalf("unexpected elem(1): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ccc")}) {
|
||||
t.Fatalf("unexpected elem(2): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagKeyElem{key: []byte("ddd")}) {
|
||||
t.Fatalf("unexpected elem(3): %#v", e)
|
||||
if e := itr.Next(); !bytes.Equal(e.Key(), []byte("aaa")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(0): %s/%v", e.Key(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Key(), []byte("bbb")) || !e.Deleted() {
|
||||
t.Fatalf("unexpected elem(1): %s/%v", e.Key(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Key(), []byte("ccc")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(2): %s/%v", e.Key(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Key(), []byte("ddd")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(3): %s/%v", e.Key(), e.Deleted())
|
||||
} else if e := itr.Next(); e != nil {
|
||||
t.Fatalf("expected nil elem: %#v", e)
|
||||
}
|
||||
|
@ -133,14 +134,14 @@ func TestMergeTagValueIterators(t *testing.T) {
|
|||
}},
|
||||
)
|
||||
|
||||
if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("aaa")}) {
|
||||
t.Fatalf("unexpected elem(0): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("bbb"), deleted: true}) {
|
||||
t.Fatalf("unexpected elem(1): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ccc")}) {
|
||||
t.Fatalf("unexpected elem(2): %#v", e)
|
||||
} else if e := itr.Next(); !reflect.DeepEqual(e, &TagValueElem{value: []byte("ddd")}) {
|
||||
t.Fatalf("unexpected elem(3): %#v", e)
|
||||
if e := itr.Next(); !bytes.Equal(e.Value(), []byte("aaa")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(0): %s/%v", e.Value(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Value(), []byte("bbb")) || !e.Deleted() {
|
||||
t.Fatalf("unexpected elem(1): %s/%v", e.Value(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Value(), []byte("ccc")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(2): %s/%v", e.Value(), e.Deleted())
|
||||
} else if e := itr.Next(); !bytes.Equal(e.Value(), []byte("ddd")) || e.Deleted() {
|
||||
t.Fatalf("unexpected elem(3): %s/%v", e.Value(), e.Deleted())
|
||||
} else if e := itr.Next(); e != nil {
|
||||
t.Fatalf("expected nil elem: %#v", e)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue