Refactor file set tag value iterators to support series sets & tombstones.
parent
cb828f0187
commit
80d01325f8
|
@ -187,21 +187,6 @@ func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile {
|
|||
return a
|
||||
}
|
||||
|
||||
/*
|
||||
// SeriesIDIterator returns an iterator over all series in the index.
|
||||
func (fs *FileSet) SeriesIDIterator() tsdb.SeriesIDIterator {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
|
||||
for _, f := range fs.files {
|
||||
itr := f.SeriesIDIterator()
|
||||
if itr == nil {
|
||||
continue
|
||||
}
|
||||
a = append(a, itr)
|
||||
}
|
||||
return FilterUndeletedSeriesIterator(MergeSeriesIterators(a...))
|
||||
}
|
||||
*/
|
||||
|
||||
// Measurement returns a measurement by name.
|
||||
func (fs *FileSet) Measurement(name []byte) MeasurementElem {
|
||||
for _, f := range fs.files {
|
||||
|
@ -396,16 +381,31 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator {
|
|||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
|
||||
for _, f := range fs.files {
|
||||
itr, err := f.TagValueSeriesIDIterator(name, key, value)
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
|
||||
var ftss *tsdb.SeriesIDSet
|
||||
for i := len(fs.files) - 1; i >= 0; i-- {
|
||||
f := fs.files[i]
|
||||
|
||||
// Remove tombstones set in previous file.
|
||||
if ftss != nil && ftss.Cardinality() > 0 {
|
||||
ss = ss.AndNot(ftss)
|
||||
}
|
||||
|
||||
// Fetch tag value series set for this file and merge into overall set.
|
||||
fss, err := f.TagValueSeriesIDSet(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
} else if fss != nil {
|
||||
ss.Merge(fss)
|
||||
}
|
||||
|
||||
// Fetch tombstone set to be processed on next file.
|
||||
if ftss, err = f.TombstoneSeriesIDSet(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the merged measurement sketches for the FileSet.
|
||||
|
@ -455,7 +455,7 @@ type File interface {
|
|||
// Series iteration.
|
||||
MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
||||
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
||||
TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error)
|
||||
TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)
|
||||
|
||||
// Sketches for cardinality estimation
|
||||
MergeMeasurementsSketches(s, t estimator.Sketch) error
|
||||
|
|
|
@ -329,9 +329,8 @@ func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
|
|||
return tsdb.MergeSeriesIDIterators(itrs...)
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a tag value and a flag
|
||||
// indicating if a tombstone exists on the measurement, key, or value.
|
||||
func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// TagValueSeriesIDSet returns a series id set for a tag value.
|
||||
func (f *IndexFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
|
||||
tblk := f.tblks[string(name)]
|
||||
if tblk == nil {
|
||||
return nil, nil
|
||||
|
@ -341,21 +340,10 @@ func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Seri
|
|||
var valueElem TagBlockValueElem
|
||||
if !tblk.DecodeTagValueElem(key, value, &valueElem) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Return iterator based on uvarint encoding, if set.
|
||||
if valueElem.SeriesN() == 0 {
|
||||
} else if valueElem.SeriesN() == 0 {
|
||||
return nil, nil
|
||||
} else if valueElem.SeriesData() != nil {
|
||||
return &rawSeriesIDIterator{n: valueElem.SeriesN(), data: valueElem.SeriesData()}, nil
|
||||
}
|
||||
|
||||
// Otherwise return iterator over roaring.
|
||||
ss, err := valueElem.SeriesIDSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
return valueElem.SeriesIDSet()
|
||||
}
|
||||
|
||||
// TagKey returns a tag key.
|
||||
|
|
|
@ -138,19 +138,17 @@ func (p IndexFiles) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
|
|||
return tsdb.MergeSeriesIDIterators(a...)
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns an iterator that merges series across all files.
|
||||
func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(p))
|
||||
|
||||
// TagValueSeriesIDSet returns an iterator that merges series across all files.
|
||||
func (p IndexFiles) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for i := range p {
|
||||
itr, err := p[i].TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
if fss, err := p[i].TagValueSeriesIDSet(name, key, value); err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
} else if fss != nil {
|
||||
ss.Merge(fss)
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
// CompactTo merges all index files and writes them to w.
|
||||
|
@ -310,7 +308,6 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
|
|||
return err
|
||||
}
|
||||
|
||||
var seriesN int
|
||||
enc := NewTagBlockEncoder(w)
|
||||
for ke := kitr.Next(); ke != nil; ke = kitr.Next() {
|
||||
// Encode key.
|
||||
|
@ -325,33 +322,11 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
|
|||
|
||||
// Merge all series together.
|
||||
if err := func() error {
|
||||
sitr, err := p.TagValueSeriesIDIterator(name, ke.Key(), ve.Value())
|
||||
ss, err := p.TagValueSeriesIDSet(name, ke.Key(), ve.Value())
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr != nil {
|
||||
defer sitr.Close()
|
||||
for {
|
||||
se, err := sitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if se.SeriesID == 0 {
|
||||
break
|
||||
}
|
||||
seriesIDs = append(seriesIDs, se.SeriesID)
|
||||
|
||||
// Check for cancellation periodically.
|
||||
if seriesN++; seriesN%1000 == 0 {
|
||||
select {
|
||||
case <-info.cancel:
|
||||
return ErrCompactionInterrupted
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Encode value.
|
||||
return enc.EncodeValue(ve.Value(), ve.Deleted(), seriesIDs)
|
||||
return enc.EncodeValue(ve.Value(), ve.Deleted(), ss)
|
||||
}(); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -450,8 +450,8 @@ func (f *LogFile) DeleteTagKey(name, key []byte) error {
|
|||
return f.FlushAndSync()
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a tag value.
|
||||
func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// TagValueSeriesIDSet returns a series iterator for a tag value.
|
||||
func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
|
@ -472,7 +472,7 @@ func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Series
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()), nil
|
||||
return tv.seriesIDSet(), nil
|
||||
}
|
||||
|
||||
// MeasurementN returns the total number of measurements.
|
||||
|
@ -960,7 +960,7 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
|
|||
// Add each value.
|
||||
for _, v := range values {
|
||||
value := tag.tagValues[v]
|
||||
if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil {
|
||||
if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -600,7 +600,7 @@ func (enc *TagBlockEncoder) EncodeKey(key []byte, deleted bool) error {
|
|||
|
||||
// EncodeValue writes a tag value to the underlying writer.
|
||||
// The tag key must be lexicographical sorted after the previous encoded tag key.
|
||||
func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []uint64) error {
|
||||
func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, ss *tsdb.SeriesIDSet) error {
|
||||
if len(enc.keys) == 0 {
|
||||
return fmt.Errorf("tag key must be encoded before encoding values")
|
||||
} else if len(value) == 0 {
|
||||
|
@ -631,16 +631,12 @@ func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []
|
|||
|
||||
// Build series data in buffer.
|
||||
enc.buf.Reset()
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for _, seriesID := range seriesIDs {
|
||||
ss.AddNoLock(seriesID)
|
||||
}
|
||||
if _, err := ss.WriteTo(&enc.buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write series count.
|
||||
if err := writeUvarintTo(enc.w, uint64(len(seriesIDs)), &enc.n); err != nil {
|
||||
if err := writeUvarintTo(enc.w, uint64(ss.Cardinality()), &enc.n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/tsdb/index/tsi1"
|
||||
)
|
||||
|
||||
|
@ -17,19 +18,19 @@ func TestTagBlockWriter(t *testing.T) {
|
|||
|
||||
if err := enc.EncodeKey([]byte("host"), false); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := enc.EncodeValue([]byte("server0"), false, []uint64{1}); err != nil {
|
||||
} else if err := enc.EncodeValue([]byte("server0"), false, tsdb.NewSeriesIDSet(1)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := enc.EncodeValue([]byte("server1"), false, []uint64{2}); err != nil {
|
||||
} else if err := enc.EncodeValue([]byte("server1"), false, tsdb.NewSeriesIDSet(2)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := enc.EncodeValue([]byte("server2"), false, []uint64{3}); err != nil {
|
||||
} else if err := enc.EncodeValue([]byte("server2"), false, tsdb.NewSeriesIDSet(3)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := enc.EncodeKey([]byte("region"), false); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := enc.EncodeValue([]byte("us-east"), false, []uint64{1, 2}); err != nil {
|
||||
} else if err := enc.EncodeValue([]byte("us-east"), false, tsdb.NewSeriesIDSet(1, 2)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := enc.EncodeValue([]byte("us-west"), false, []uint64{3}); err != nil {
|
||||
} else if err := enc.EncodeValue([]byte("us-west"), false, tsdb.NewSeriesIDSet(3)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -115,7 +116,7 @@ func benchmarkTagBlock_SeriesN(b *testing.B, tagN, valueN int, blk **tsi1.TagBlo
|
|||
}
|
||||
|
||||
for j := 0; j < valueN; j++ {
|
||||
if err := enc.EncodeValue([]byte(fmt.Sprintf("%08d", j)), false, []uint64{1}); err != nil {
|
||||
if err := enc.EncodeValue([]byte(fmt.Sprintf("%08d", j)), false, tsdb.NewSeriesIDSet(1)); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,10 +15,16 @@ type SeriesIDSet struct {
|
|||
}
|
||||
|
||||
// NewSeriesIDSet returns a new instance of SeriesIDSet.
|
||||
func NewSeriesIDSet() *SeriesIDSet {
|
||||
return &SeriesIDSet{
|
||||
bitmap: roaring.NewBitmap(),
|
||||
func NewSeriesIDSet(a ...uint64) *SeriesIDSet {
|
||||
ss := &SeriesIDSet{bitmap: roaring.NewBitmap()}
|
||||
if len(a) > 0 {
|
||||
a32 := make([]uint32, len(a))
|
||||
for i := range a {
|
||||
a32[i] = uint32(a[i])
|
||||
}
|
||||
ss.bitmap.AddMany(a32)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
// Bytes estimates the memory footprint of this SeriesIDSet, in bytes.
|
||||
|
|
Loading…
Reference in New Issue