Add roaring bitmaps to TSI index files.
parent
468497c11f
commit
fdfd038401
|
@ -20,8 +20,8 @@
|
|||
[[projects]]
|
||||
name = "github.com/RoaringBitmap/roaring"
|
||||
packages = ["."]
|
||||
revision = "d6540aab65a17321470b1661bfc52da1823871e9"
|
||||
version = "v0.4.3"
|
||||
revision = "084ecabb327a0f98dbfca86a3774eb313921f90f"
|
||||
source = "https://github.com/benbjohnson/roaring.git"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
|
@ -407,6 +407,6 @@
|
|||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "d895b14175ef4f2cad07769402d5920dc8284d07d90f843d1ec6f45f633548e9"
|
||||
inputs-digest = "7930c85c9857de37ed365f33b510cf55d632fae1a07dabe92a5e0badf8e5ca43"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
|
@ -8,7 +8,8 @@
|
|||
|
||||
[[constraint]]
|
||||
name = "github.com/RoaringBitmap/roaring"
|
||||
version = "0.4.3"
|
||||
source = "https://github.com/influxdata/roaring.git"
|
||||
revision = "ec86e26aba5545a1819e1ad68e9faa0f1745fff5"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/boltdb/bolt"
|
||||
|
|
|
@ -353,7 +353,10 @@ func (cmd *Command) printTagValueSeries(sfile *tsdb.SeriesFile, fs *tsi1.FileSet
|
|||
|
||||
// Iterate over each series.
|
||||
tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0)
|
||||
itr := fs.TagValueSeriesIDIterator(name, key, value)
|
||||
itr, err := fs.TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
e, err := itr.Next()
|
||||
if err != nil {
|
||||
|
|
|
@ -395,15 +395,17 @@ func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator {
|
|||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
|
||||
func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(fs.files))
|
||||
for _, f := range fs.files {
|
||||
itr := f.TagValueSeriesIDIterator(name, key, value)
|
||||
if itr != nil {
|
||||
itr, err := f.TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(a...)
|
||||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
// MeasurementsSketches returns the merged measurement sketches for the FileSet.
|
||||
|
@ -453,7 +455,7 @@ type File interface {
|
|||
// Series iteration.
|
||||
MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
||||
TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
||||
TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator
|
||||
TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error)
|
||||
|
||||
// Sketches for cardinality estimation
|
||||
MergeMeasurementsSketches(s, t estimator.Sketch) error
|
||||
|
|
|
@ -838,8 +838,10 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
|
|||
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
|
||||
for _, p := range i.partitions {
|
||||
itr := p.TagValueSeriesIDIterator(name, key, value)
|
||||
if itr != nil {
|
||||
itr, err := p.TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -331,20 +331,31 @@ func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
|
|||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a tag value and a flag
|
||||
// indicating if a tombstone exists on the measurement, key, or value.
|
||||
func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
|
||||
func (f *IndexFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
tblk := f.tblks[string(name)]
|
||||
if tblk == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Find value element.
|
||||
n, data := tblk.TagValueSeriesData(key, value)
|
||||
if n == 0 {
|
||||
return nil
|
||||
var valueElem TagBlockValueElem
|
||||
if !tblk.DecodeTagValueElem(key, value, &valueElem) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Create an iterator over value's series.
|
||||
return &rawSeriesIDIterator{n: n, data: data}
|
||||
// Return iterator based on uvarint encoding, if set.
|
||||
if valueElem.SeriesN() == 0 {
|
||||
return nil, nil
|
||||
} else if valueElem.SeriesData() != nil {
|
||||
return &rawSeriesIDIterator{n: valueElem.SeriesN(), data: valueElem.SeriesData()}, nil
|
||||
}
|
||||
|
||||
// Otherwise return iterator over roaring.
|
||||
ss, err := valueElem.SeriesIDSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
}
|
||||
|
||||
// TagKey returns a tag key.
|
||||
|
|
|
@ -139,16 +139,18 @@ func (p IndexFiles) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterat
|
|||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns an iterator that merges series across all files.
|
||||
func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
|
||||
func (p IndexFiles) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(p))
|
||||
|
||||
for i := range p {
|
||||
itr := p[i].TagValueSeriesIDIterator(name, key, value)
|
||||
if itr != nil {
|
||||
itr, err := p[i].TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(a...)
|
||||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
// CompactTo merges all index files and writes them to w.
|
||||
|
@ -185,6 +187,13 @@ func (p IndexFiles) CompactTo(w io.Writer, sfile *tsdb.SeriesFile, m, k uint64,
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Ensure block is word aligned.
|
||||
if offset := (n) % 8; offset != 0 {
|
||||
if err := writeTo(bw, make([]byte, 8-offset), &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write measurement block.
|
||||
t.MeasurementBlock.Offset = n
|
||||
if err := p.writeMeasurementBlockTo(bw, &info, &n); err != nil {
|
||||
|
@ -289,6 +298,13 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
|
|||
default:
|
||||
}
|
||||
|
||||
// Ensure block is word aligned.
|
||||
if offset := (*n) % 8; offset != 0 {
|
||||
if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
kitr, err := p.TagKeyIterator(name)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -309,8 +325,10 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
|
|||
|
||||
// Merge all series together.
|
||||
if err := func() error {
|
||||
sitr := p.TagValueSeriesIDIterator(name, ke.Key(), ve.Value())
|
||||
if sitr != nil {
|
||||
sitr, err := p.TagValueSeriesIDIterator(name, ke.Key(), ve.Value())
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr != nil {
|
||||
defer sitr.Close()
|
||||
for {
|
||||
se, err := sitr.Next()
|
||||
|
|
|
@ -451,28 +451,28 @@ func (f *LogFile) DeleteTagKey(name, key []byte) error {
|
|||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a tag value.
|
||||
func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
|
||||
func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
mm, ok := f.mms[string(name)]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tk, ok := mm.tagSet[string(key)]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tv, ok := tk.tagValues[string(value)]
|
||||
if !ok {
|
||||
return nil
|
||||
return nil, nil
|
||||
} else if tv.cardinality() == 0 {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())
|
||||
return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()), nil
|
||||
}
|
||||
|
||||
// MeasurementN returns the total number of measurements.
|
||||
|
@ -846,6 +846,13 @@ func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Ensure block is word aligned.
|
||||
if offset := n % 8; offset != 0 {
|
||||
if err := writeTo(bw, make([]byte, 8-offset), &n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write measurement block.
|
||||
t.MeasurementBlock.Offset = n
|
||||
if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil {
|
||||
|
@ -924,6 +931,13 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn
|
|||
default:
|
||||
}
|
||||
|
||||
// Ensure block is word aligned.
|
||||
if offset := (*n) % 8; offset != 0 {
|
||||
if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
enc := NewTagBlockEncoder(w)
|
||||
var valueN int
|
||||
for _, k := range mm.keys() {
|
||||
|
|
|
@ -19,7 +19,8 @@ const MeasurementBlockVersion = 1
|
|||
|
||||
// Measurement flag constants.
|
||||
const (
|
||||
MeasurementTombstoneFlag = 0x01
|
||||
MeasurementTombstoneFlag = 0x01
|
||||
MeasurementSeriesIDSetFlag = 0x02
|
||||
)
|
||||
|
||||
// Measurement field size constants.
|
||||
|
@ -162,6 +163,9 @@ func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
|||
if !ok {
|
||||
return &rawSeriesIDIterator{}
|
||||
}
|
||||
if e.seriesIDSet != nil {
|
||||
return tsdb.NewSeriesIDSetIterator(e.seriesIDSet)
|
||||
}
|
||||
return &rawSeriesIDIterator{n: e.series.n, data: e.series.data}
|
||||
}
|
||||
|
||||
|
@ -343,6 +347,8 @@ type MeasurementBlockElem struct {
|
|||
data []byte // serialized series data
|
||||
}
|
||||
|
||||
seriesIDSet *tsdb.SeriesIDSet
|
||||
|
||||
// size in bytes, set after unmarshaling.
|
||||
size int
|
||||
}
|
||||
|
@ -388,6 +394,17 @@ func (e *MeasurementBlockElem) SeriesIDs() []uint64 {
|
|||
}
|
||||
|
||||
func (e *MeasurementBlockElem) ForEachSeriesID(fn func(uint64) error) error {
|
||||
// Read from roaring, if available.
|
||||
if e.seriesIDSet != nil {
|
||||
itr := e.seriesIDSet.Iterator()
|
||||
for itr.HasNext() {
|
||||
if err := fn(uint64(itr.Next())); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read from uvarint encoded data, if available.
|
||||
var prev uint64
|
||||
for data := e.series.data; len(data) > 0; {
|
||||
delta, n, err := uvarint(data)
|
||||
|
@ -426,18 +443,31 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
|
|||
}
|
||||
e.name, data = data[n:n+int(sz)], data[n+int(sz):]
|
||||
|
||||
// Parse series data.
|
||||
// Parse series count.
|
||||
v, n, err := uvarint(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.series.n, data = uint64(v), data[n:]
|
||||
|
||||
// Parse series data size.
|
||||
sz, n, err = uvarint(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = data[n:]
|
||||
e.series.data, data = data[:sz], data[sz:]
|
||||
|
||||
// Parse series data (original uvarint encoded or roaring bitmap).
|
||||
if e.flag&MeasurementSeriesIDSetFlag == 0 {
|
||||
e.series.data, data = data[:sz], data[sz:]
|
||||
} else {
|
||||
data = memalign(data)
|
||||
e.seriesIDSet = tsdb.NewSeriesIDSet()
|
||||
if err = e.seriesIDSet.UnmarshalBinaryUnsafe(data[:sz]); err != nil {
|
||||
return err
|
||||
}
|
||||
data = data[sz:]
|
||||
}
|
||||
|
||||
// Save length of elem.
|
||||
e.size = start - len(data)
|
||||
|
@ -469,7 +499,14 @@ func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size in
|
|||
mm.deleted = deleted
|
||||
mm.tagBlock.offset = offset
|
||||
mm.tagBlock.size = size
|
||||
mm.seriesIDs = seriesIDs
|
||||
|
||||
if mm.seriesIDSet == nil {
|
||||
mm.seriesIDSet = tsdb.NewSeriesIDSet()
|
||||
}
|
||||
for _, seriesID := range seriesIDs {
|
||||
mm.seriesIDSet.AddNoLock(seriesID)
|
||||
}
|
||||
|
||||
mw.mms[string(name)] = mm
|
||||
|
||||
if deleted {
|
||||
|
@ -592,21 +629,12 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m
|
|||
|
||||
// Write series data to buffer.
|
||||
mw.buf.Reset()
|
||||
var prev uint64
|
||||
for _, seriesID := range mm.seriesIDs {
|
||||
delta := seriesID - prev
|
||||
|
||||
var buf [binary.MaxVarintLen32]byte
|
||||
i := binary.PutUvarint(buf[:], uint64(delta))
|
||||
if _, err := mw.buf.Write(buf[:i]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prev = seriesID
|
||||
if _, err := mm.seriesIDSet.WriteTo(&mw.buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write series count.
|
||||
if err := writeUvarintTo(w, uint64(len(mm.seriesIDs)), n); err != nil {
|
||||
if err := writeUvarintTo(w, mm.seriesIDSet.Cardinality(), n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -614,6 +642,14 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m
|
|||
if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Word align bitmap data.
|
||||
if offset := (*n) % 8; offset != 0 {
|
||||
if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
nn, err := mw.buf.WriteTo(w)
|
||||
*n += nn
|
||||
return err
|
||||
|
@ -639,12 +675,12 @@ type measurement struct {
|
|||
offset int64
|
||||
size int64
|
||||
}
|
||||
seriesIDs []uint64
|
||||
offset int64
|
||||
seriesIDSet *tsdb.SeriesIDSet
|
||||
offset int64
|
||||
}
|
||||
|
||||
func (mm measurement) flag() byte {
|
||||
var flag byte
|
||||
flag := byte(MeasurementSeriesIDSetFlag)
|
||||
if mm.deleted {
|
||||
flag |= MeasurementTombstoneFlag
|
||||
}
|
||||
|
|
|
@ -765,18 +765,20 @@ func (p *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
|
|||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single key value.
|
||||
func (p *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
|
||||
func (p *Partition) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
fs, err := p.RetainFileSet()
|
||||
if err != nil {
|
||||
return nil // TODO(edd): this should probably return an error.
|
||||
return nil, err // TODO(edd): this should probably return an error.
|
||||
}
|
||||
|
||||
itr := fs.TagValueSeriesIDIterator(name, key, value)
|
||||
if itr == nil {
|
||||
itr, err := fs.TagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr == nil {
|
||||
fs.Release()
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
return newFileSetSeriesIDIterator(fs, itr)
|
||||
return newFileSetSeriesIDIterator(fs, itr), nil
|
||||
}
|
||||
|
||||
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"io"
|
||||
|
||||
"github.com/influxdata/influxdb/pkg/rhh"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// TagBlockVersion is the version of the tag block.
|
||||
|
@ -20,7 +21,8 @@ const (
|
|||
|
||||
// Tag value flag constants.
|
||||
const (
|
||||
TagValueTombstoneFlag = 0x01
|
||||
TagValueTombstoneFlag = 0x01
|
||||
TagValueSeriesIDSetFlag = 0x02
|
||||
)
|
||||
|
||||
// TagBlock variable size constants.
|
||||
|
@ -143,15 +145,6 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
|
|||
return &valueElem
|
||||
}
|
||||
|
||||
// TagValueElem returns an element for a tag value.
|
||||
func (blk *TagBlock) TagValueSeriesData(key, value []byte) (uint64, []byte) {
|
||||
var valueElem TagBlockValueElem
|
||||
if !blk.DecodeTagValueElem(key, value, &valueElem) {
|
||||
return 0, nil
|
||||
}
|
||||
return valueElem.series.n, valueElem.series.data
|
||||
}
|
||||
|
||||
// DecodeTagValueElem returns an element for a tag value.
|
||||
func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool {
|
||||
// Find key element, exit if not found.
|
||||
|
@ -318,13 +311,16 @@ func (e *TagBlockKeyElem) unmarshal(buf, data []byte) {
|
|||
|
||||
// TagBlockValueElem represents a tag value element.
|
||||
type TagBlockValueElem struct {
|
||||
flag byte
|
||||
value []byte
|
||||
flag byte
|
||||
value []byte
|
||||
|
||||
series struct {
|
||||
n uint64 // Series count
|
||||
data []byte // Raw series data
|
||||
}
|
||||
|
||||
seriesIDSetData []byte
|
||||
|
||||
size int
|
||||
}
|
||||
|
||||
|
@ -347,6 +343,14 @@ func (e *TagBlockValueElem) SeriesID(i int) uint64 {
|
|||
|
||||
// SeriesIDs returns a list decoded series ids.
|
||||
func (e *TagBlockValueElem) SeriesIDs() ([]uint64, error) {
|
||||
if e.seriesIDSetData != nil {
|
||||
ss, err := e.SeriesIDSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss.Slice(), nil
|
||||
}
|
||||
|
||||
a := make([]uint64, 0, e.series.n)
|
||||
var prev uint64
|
||||
for data := e.series.data; len(data) > 0; {
|
||||
|
@ -363,6 +367,34 @@ func (e *TagBlockValueElem) SeriesIDs() ([]uint64, error) {
|
|||
return a, nil
|
||||
}
|
||||
|
||||
// SeriesIDSet returns a set of series ids.
|
||||
func (e *TagBlockValueElem) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
|
||||
// Read bitmap data directly from mmap, if available.
|
||||
if e.seriesIDSetData != nil {
|
||||
if err := ss.UnmarshalBinaryUnsafe(e.seriesIDSetData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
// Otherwise decode series ids from uvarint encoding.
|
||||
var prev uint64
|
||||
for data := e.series.data; len(data) > 0; {
|
||||
delta, n, err := uvarint(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data = data[n:]
|
||||
|
||||
seriesID := prev + uint64(delta)
|
||||
ss.AddNoLock(seriesID)
|
||||
prev = seriesID
|
||||
}
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
// Size returns the size of the element.
|
||||
func (e *TagBlockValueElem) Size() int { return e.size }
|
||||
|
||||
|
@ -386,9 +418,13 @@ func (e *TagBlockValueElem) unmarshal(buf []byte) {
|
|||
sz, n = binary.Uvarint(buf)
|
||||
buf = buf[n:]
|
||||
|
||||
// Save reference to series data.
|
||||
e.series.data = buf[:sz]
|
||||
buf = buf[sz:]
|
||||
// Parse series data (original uvarint encoded or roaring bitmap).
|
||||
if e.flag&TagValueSeriesIDSetFlag == 0 {
|
||||
e.series.data, buf = buf[:sz], buf[sz:]
|
||||
} else {
|
||||
buf = memalign(buf)
|
||||
e.seriesIDSetData, buf = buf, buf[sz:]
|
||||
}
|
||||
|
||||
// Save length of elem.
|
||||
e.size = start - len(buf)
|
||||
|
@ -591,17 +627,12 @@ func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []
|
|||
|
||||
// Build series data in buffer.
|
||||
enc.buf.Reset()
|
||||
var prev uint64
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for _, seriesID := range seriesIDs {
|
||||
delta := seriesID - prev
|
||||
|
||||
var buf [binary.MaxVarintLen32]byte
|
||||
i := binary.PutUvarint(buf[:], uint64(delta))
|
||||
if _, err := enc.buf.Write(buf[:i]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prev = seriesID
|
||||
ss.AddNoLock(seriesID)
|
||||
}
|
||||
if _, err := ss.WriteTo(&enc.buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write series count.
|
||||
|
@ -613,6 +644,14 @@ func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []
|
|||
if err := writeUvarintTo(enc.w, uint64(enc.buf.Len()), &enc.n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Word align bitmap data.
|
||||
if offset := (enc.n) % 8; offset != 0 {
|
||||
if err := writeTo(enc.w, make([]byte, 8-offset), &enc.n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
nn, err := enc.buf.WriteTo(enc.w)
|
||||
if enc.n += nn; err != nil {
|
||||
return err
|
||||
|
@ -778,7 +817,7 @@ func encodeTagKeyFlag(deleted bool) byte {
|
|||
}
|
||||
|
||||
func encodeTagValueFlag(deleted bool) byte {
|
||||
var flag byte
|
||||
flag := byte(TagValueSeriesIDSetFlag)
|
||||
if deleted {
|
||||
flag |= TagValueTombstoneFlag
|
||||
}
|
||||
|
|
|
@ -3,8 +3,11 @@ package tsi1
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"unsafe"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
@ -545,8 +548,17 @@ func uvarint(data []byte) (value uint64, n int, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// memalign returns data if its memory address is word align.
|
||||
// Otherwise returns the next word aligned memory location.
|
||||
func memalign(data []byte) []byte {
|
||||
if n := int(uintptr(unsafe.Pointer(&data[0])) % 8); n != 0 {
|
||||
data = data[8-n:]
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// hexdump is a helper for dumping binary data to stderr.
|
||||
// func hexdump(data []byte) { os.Stderr.Write([]byte(hex.Dump(data))) }
|
||||
func hexdump(data []byte) { os.Stderr.Write([]byte(hex.Dump(data))) }
|
||||
|
||||
// stack is a helper for dumping a stack trace.
|
||||
// func stack() string {
|
||||
|
|
|
@ -197,6 +197,14 @@ func (s *SeriesIDSet) UnmarshalBinary(data []byte) error {
|
|||
return s.bitmap.UnmarshalBinary(data)
|
||||
}
|
||||
|
||||
// UnmarshalBinaryUnsafe unmarshals data into the set.
|
||||
// References to the underlying data are used so data should not be reused by caller.
|
||||
func (s *SeriesIDSet) UnmarshalBinaryUnsafe(data []byte) error {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.bitmap.UnmarshalBinaryUnsafe(data)
|
||||
}
|
||||
|
||||
// WriteTo writes the set to w.
|
||||
func (s *SeriesIDSet) WriteTo(w io.Writer) (int64, error) {
|
||||
s.RLock()
|
||||
|
@ -204,6 +212,18 @@ func (s *SeriesIDSet) WriteTo(w io.Writer) (int64, error) {
|
|||
return s.bitmap.WriteTo(w)
|
||||
}
|
||||
|
||||
// Slice returns a slice of series ids.
|
||||
func (s *SeriesIDSet) Slice() []uint64 {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
var a []uint64
|
||||
for _, seriesID := range s.bitmap.ToArray() {
|
||||
a = append(a, uint64(seriesID))
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
type SeriesIDSetIterable interface {
|
||||
HasNext() bool
|
||||
Next() uint32
|
||||
|
|
Loading…
Reference in New Issue