Implement bitset iterator

pull/10044/head
Ben Johnson 2018-07-04 11:53:13 -06:00
parent ef4e525546
commit 979d790154
No known key found for this signature in database
GPG Key ID: 81741CD251883081
5 changed files with 160 additions and 64 deletions

View File

@ -164,6 +164,53 @@ type SeriesIDIterator interface {
Close() error
}
// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
type SeriesIDSetIterator interface {
SeriesIDIterator
SeriesIDSet() *SeriesIDSet
}
type seriesIDSetIterator struct {
ss *SeriesIDSet
itr SeriesIDSetIterable
}
func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator {
if ss == nil || ss.bitmap == nil {
return nil
}
return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()}
}
func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) {
if !itr.itr.HasNext() {
return SeriesIDElem{}, nil
}
return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil
}
func (itr *seriesIDSetIterator) Close() error { return nil }
func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss }
// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs
// can be type casted. Otherwise returns nil.
func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator {
if len(itrs) == 0 {
return nil
}
a := make([]SeriesIDSetIterator, len(itrs))
for i := range itrs {
if itr, ok := itrs[i].(SeriesIDSetIterator); ok {
a[i] = itr
} else {
return nil
}
}
return a
}
// ReadAllSeriesIDIterator returns all ids from the iterator.
func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) {
if itr == nil {
@ -205,6 +252,15 @@ func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) {
func (itr *SeriesIDSliceIterator) Close() error { return nil }
// SeriesIDSet returns a set of all remaining ids.
func (itr *SeriesIDSliceIterator) SeriesIDSet() *SeriesIDSet {
s := NewSeriesIDSet()
for _, id := range itr.ids {
s.AddNoLock(id)
}
return s
}
type SeriesIDIterators []SeriesIDIterator
func (a SeriesIDIterators) Close() (err error) {
@ -359,6 +415,19 @@ func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator {
return itrs[0]
}
// Merge as series id sets, if available.
if a := NewSeriesIDSetIterators(itrs); a != nil {
sets := make([]*SeriesIDSet, len(a))
for i := range a {
sets[i] = a[i].SeriesIDSet()
}
ss := NewSeriesIDSet()
ss.Merge(sets...)
SeriesIDIterators(itrs).Close()
return NewSeriesIDSetIterator(ss)
}
return &seriesIDMergeIterator{
buf: make([]SeriesIDElem, len(itrs)),
itrs: itrs,
@ -427,6 +496,13 @@ func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
return nil
}
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
return NewSeriesIDSetIterator(a[0].SeriesIDSet().And(a[1].SeriesIDSet()))
}
return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}
@ -509,6 +585,15 @@ func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
return itr0
}
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
ss := NewSeriesIDSet()
ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet())
return NewSeriesIDSetIterator(ss)
}
return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}
@ -586,6 +671,14 @@ func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
itr1.Close()
return nil
}
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
return NewSeriesIDSetIterator(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()))
}
return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}

View File

@ -496,6 +496,9 @@ func newFileSetSeriesIDIterator(fs *FileSet, itr tsdb.SeriesIDIterator) tsdb.Ser
fs.Release()
return nil
}
if itr, ok := itr.(tsdb.SeriesIDSetIterator); ok {
return &fileSetSeriesIDSetIterator{fs: fs, itr: itr}
}
return &fileSetSeriesIDIterator{fs: fs, itr: itr}
}
@ -508,6 +511,26 @@ func (itr *fileSetSeriesIDIterator) Close() error {
return itr.itr.Close()
}
// fileSetSeriesIDSetIterator attaches a fileset to an iterator that is released on close.
type fileSetSeriesIDSetIterator struct {
once sync.Once
fs *FileSet
itr tsdb.SeriesIDSetIterator
}
func (itr *fileSetSeriesIDSetIterator) Next() (tsdb.SeriesIDElem, error) {
return itr.itr.Next()
}
func (itr *fileSetSeriesIDSetIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return itr.itr.Close()
}
func (itr *fileSetSeriesIDSetIterator) SeriesIDSet() *tsdb.SeriesIDSet {
return itr.itr.SeriesIDSet()
}
// fileSetMeasurementIterator attaches a fileset to an iterator that is released on close.
type fileSetMeasurementIterator struct {
once sync.Once

View File

@ -738,49 +738,13 @@ func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
f.mu.RLock()
defer f.mu.RUnlock()
// Determine total series count across all measurements.
var n int
mSeriesIdx := make([]int, len(f.mms))
mSeries := make([][]tsdb.SeriesIDElem, 0, len(f.mms))
ss := tsdb.NewSeriesIDSet()
for _, mm := range f.mms {
n += len(mm.series)
a := make([]tsdb.SeriesIDElem, 0, len(mm.series))
for seriesID := range mm.series {
a = append(a, tsdb.SeriesIDElem{SeriesID: seriesID})
ss.AddNoLock(seriesID)
}
sort.Sort(tsdb.SeriesIDElems(a))
mSeries = append(mSeries, a)
}
// Combine series across all measurements by merging the already sorted
// series lists.
sBuffer := make([]tsdb.SeriesIDElem, len(f.mms))
series := make([]tsdb.SeriesIDElem, 0, n)
var minElem tsdb.SeriesIDElem
var minElemIdx int
for s := 0; s < cap(series); s++ {
for i := 0; i < len(sBuffer); i++ {
// Are there still serie to pull from this measurement?
if mSeriesIdx[i] < len(mSeries[i]) && sBuffer[i].SeriesID == 0 {
// Fill the buffer slot for this measurement.
sBuffer[i] = mSeries[i][mSeriesIdx[i]]
mSeriesIdx[i]++
}
// Does this measurement have the smallest current serie out of
// all those in the buffer?
if minElem.SeriesID == 0 || (sBuffer[i].SeriesID != 0 && sBuffer[i].SeriesID < minElem.SeriesID) {
minElem, minElemIdx = sBuffer[i], i
}
}
series, minElem.SeriesID, sBuffer[minElemIdx].SeriesID = append(series, minElem), 0, 0
}
if len(series) == 0 {
return nil
}
return &logSeriesIDIterator{series: series}
return tsdb.NewSeriesIDSetIterator(ss)
}
// createMeasurementIfNotExists returns a measurement by name.
@ -1397,37 +1361,17 @@ func (itr *logTagValueIterator) Next() (e TagValueElem) {
return e
}
// logSeriesIDIterator represents an iterator over a slice of series.
type logSeriesIDIterator struct {
series []tsdb.SeriesIDElem
}
// newLogSeriesIDIterator returns a new instance of logSeriesIDIterator.
// All series are copied to the iterator.
func newLogSeriesIDIterator(m map[uint64]struct{}) *logSeriesIDIterator {
// newLogSeriesIDIterator returns a new iterator. All series are copied to the iterator.
func newLogSeriesIDIterator(m map[uint64]struct{}) tsdb.SeriesIDIterator {
if len(m) == 0 {
return nil
}
itr := logSeriesIDIterator{series: make([]tsdb.SeriesIDElem, 0, len(m))}
ss := tsdb.NewSeriesIDSet()
for seriesID := range m {
itr.series = append(itr.series, tsdb.SeriesIDElem{SeriesID: seriesID})
ss.AddNoLock(seriesID)
}
sort.Sort(tsdb.SeriesIDElems(itr.series))
return &itr
}
func (itr *logSeriesIDIterator) Close() error { return nil }
// Next returns the next element in the iterator.
func (itr *logSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
if len(itr.series) == 0 {
return tsdb.SeriesIDElem{}, nil
}
elem := itr.series[0]
itr.series = itr.series[1:]
return elem, nil
return tsdb.NewSeriesIDSetIterator(ss)
}
// FormatLogFileName generates a log filename for the given index.

View File

@ -213,6 +213,22 @@ func (itr *rawSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
return tsdb.SeriesIDElem{SeriesID: seriesID}, nil
}
func (itr *rawSeriesIDIterator) SeriesIDSet() *tsdb.SeriesIDSet {
ss := tsdb.NewSeriesIDSet()
for data, prev := itr.data, uint64(0); len(data) > 0; {
delta, n, err := uvarint(data)
if err != nil {
break
}
data = data[n:]
seriesID := prev + uint64(delta)
prev = seriesID
ss.AddNoLock(seriesID)
}
return ss
}
// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock.
type MeasurementBlockTrailer struct {
Version int // Encoding version

View File

@ -115,6 +115,15 @@ func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
return s.bitmap.Equals(other.bitmap)
}
// And returns a new SeriesIDSet containing elements that were present in s and other.
func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet {
s.RLock()
defer s.RUnlock()
other.RLock()
defer other.RUnlock()
return &SeriesIDSet{bitmap: roaring.And(s.bitmap, other.bitmap)}
}
// AndNot returns a new SeriesIDSet containing elements that were present in s,
// but not present in other.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
@ -152,6 +161,12 @@ func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
s.bitmap = roaring.AndNot(s.bitmap, other.bitmap)
}
// Iterator returns an iterator to the underlying bitmap.
// This iterator is not protected by a lock.
func (s *SeriesIDSet) Iterator() SeriesIDSetIterable {
return s.bitmap.Iterator()
}
// UnmarshalBinary unmarshals data into the set.
func (s *SeriesIDSet) UnmarshalBinary(data []byte) error {
s.Lock()
@ -165,3 +180,8 @@ func (s *SeriesIDSet) WriteTo(w io.Writer) (int64, error) {
defer s.RUnlock()
return s.bitmap.WriteTo(w)
}
type SeriesIDSetIterable interface {
HasNext() bool
Next() uint32
}