Store series ID sets in LogFile as bitmaps
This commit swaps out map[uint64]struct{} implementations for roaring bitmaps, which in turn improves memory usage and read performance. The bitmap implementation is abstracted such that for low cardinality sets a simple slice of ids is used, to reduce in-use memory.pull/10048/head
parent
13f896b9ff
commit
96ed566e6c
|
@ -288,7 +288,10 @@ func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
|
|||
if !ok {
|
||||
return false
|
||||
}
|
||||
for id := range mm.series {
|
||||
|
||||
// TODO(edd): if mm is using a bitmap then this could be changed to do a fast
|
||||
// intersection.
|
||||
for _, id := range mm.seriesIDs() {
|
||||
if ss.Contains(id) {
|
||||
return true
|
||||
}
|
||||
|
@ -345,10 +348,10 @@ func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator
|
|||
// Combine iterators across all tag keys.
|
||||
itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
|
||||
for _, tv := range tk.tagValues {
|
||||
if len(tv.series) == 0 {
|
||||
if tv.cardinality() == 0 {
|
||||
continue
|
||||
}
|
||||
itrs = append(itrs, newLogSeriesIDIterator(tv.series))
|
||||
itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()))
|
||||
}
|
||||
|
||||
return tsdb.MergeSeriesIDIterators(itrs...)
|
||||
|
@ -462,11 +465,11 @@ func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesI
|
|||
tv, ok := tk.tagValues[string(value)]
|
||||
if !ok {
|
||||
return nil
|
||||
} else if len(tv.series) == 0 {
|
||||
} else if tv.cardinality() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return newLogSeriesIDIterator(tv.series)
|
||||
return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())
|
||||
}
|
||||
|
||||
// MeasurementN returns the total number of measurements.
|
||||
|
@ -582,7 +585,7 @@ func (f *LogFile) SeriesN() (n uint64) {
|
|||
defer f.mu.RUnlock()
|
||||
|
||||
for _, mm := range f.mms {
|
||||
n += uint64(len(mm.series))
|
||||
n += uint64(mm.cardinality())
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
@ -636,6 +639,7 @@ func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
|
|||
mm.deleted = true
|
||||
mm.tagSet = make(map[string]logTagKey)
|
||||
mm.series = make(map[uint64]struct{})
|
||||
mm.bitmap = nil
|
||||
|
||||
// Update measurement tombstone sketch.
|
||||
f.mTSketch.Add(e.Name)
|
||||
|
@ -693,9 +697,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
|||
mm := f.createMeasurementIfNotExists(name)
|
||||
mm.deleted = false
|
||||
if !deleted {
|
||||
mm.series[e.SeriesID] = struct{}{}
|
||||
mm.addSeriesID(e.SeriesID)
|
||||
} else {
|
||||
delete(mm.series, e.SeriesID)
|
||||
mm.removeSeriesID(e.SeriesID)
|
||||
}
|
||||
|
||||
// Read tag count.
|
||||
|
@ -710,9 +714,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
|||
|
||||
// Add/remove a reference to the series on the tag value.
|
||||
if !deleted {
|
||||
tv.series[e.SeriesID] = struct{}{}
|
||||
tv.addSeriesID(e.SeriesID)
|
||||
} else {
|
||||
delete(tv.series, e.SeriesID)
|
||||
tv.removeSeriesID(e.SeriesID)
|
||||
}
|
||||
|
||||
ts.tagValues[string(v)] = tv
|
||||
|
@ -737,11 +741,25 @@ func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
|
|||
defer f.mu.RUnlock()
|
||||
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
allBitMaps := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
|
||||
|
||||
for _, mm := range f.mms {
|
||||
for seriesID := range mm.series {
|
||||
ss.AddNoLock(seriesID)
|
||||
if mm.bitmap != nil {
|
||||
allBitMaps = append(allBitMaps, mm.bitmap)
|
||||
continue
|
||||
}
|
||||
|
||||
// measurement is not using bitmap to store series IDs.
|
||||
mm.forEach(func(seriesID uint64) {
|
||||
ss.AddNoLock(seriesID)
|
||||
})
|
||||
}
|
||||
|
||||
// Fast merge all bitmaps.
|
||||
if len(allBitMaps) > 0 {
|
||||
ss.Merge(allBitMaps...)
|
||||
}
|
||||
|
||||
return tsdb.NewSeriesIDSetIterator(ss)
|
||||
}
|
||||
|
||||
|
@ -781,10 +799,10 @@ func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
|
|||
defer f.mu.RUnlock()
|
||||
|
||||
mm := f.mms[string(name)]
|
||||
if mm == nil || len(mm.series) == 0 {
|
||||
if mm == nil || mm.cardinality() == 0 {
|
||||
return nil
|
||||
}
|
||||
return newLogSeriesIDIterator(mm.series)
|
||||
return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet())
|
||||
}
|
||||
|
||||
// CompactTo compacts the log file and writes it to w.
|
||||
|
@ -1175,30 +1193,96 @@ type logMeasurement struct {
|
|||
tagSet map[string]logTagKey
|
||||
deleted bool
|
||||
series map[uint64]struct{}
|
||||
bitmap *tsdb.SeriesIDSet
|
||||
}
|
||||
|
||||
// bytes estimates the memory footprint of this logMeasurement, in bytes.
|
||||
func (mm *logMeasurement) bytes() int {
|
||||
func (m *logMeasurement) bytes() int {
|
||||
var b int
|
||||
b += len(mm.name)
|
||||
for k, v := range mm.tagSet {
|
||||
b += len(m.name)
|
||||
for k, v := range m.tagSet {
|
||||
b += len(k)
|
||||
b += v.bytes()
|
||||
}
|
||||
b += len(mm.series) * 8
|
||||
b += int(unsafe.Sizeof(*mm))
|
||||
b += (int(m.cardinality()) * 8)
|
||||
b += int(unsafe.Sizeof(*m))
|
||||
return b
|
||||
}
|
||||
|
||||
func (mm *logMeasurement) seriesIDs() []uint64 {
|
||||
a := make([]uint64, 0, len(mm.series))
|
||||
for seriesID := range mm.series {
|
||||
func (m *logMeasurement) addSeriesID(x uint64) {
|
||||
if m.bitmap != nil {
|
||||
m.bitmap.AddNoLock(x)
|
||||
return
|
||||
}
|
||||
|
||||
m.series[x] = struct{}{}
|
||||
|
||||
// If the map is getting too big it can be converted into a roaring bitmap.
|
||||
if len(m.series) > 25 {
|
||||
m.bitmap = tsdb.NewSeriesIDSet()
|
||||
for id := range m.series {
|
||||
m.bitmap.AddNoLock(id)
|
||||
}
|
||||
m.series = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *logMeasurement) removeSeriesID(x uint64) {
|
||||
if m.bitmap != nil {
|
||||
m.bitmap.RemoveNoLock(x)
|
||||
return
|
||||
}
|
||||
delete(m.series, x)
|
||||
}
|
||||
|
||||
func (m *logMeasurement) cardinality() int64 {
|
||||
if m.bitmap != nil {
|
||||
return int64(m.bitmap.Cardinality())
|
||||
}
|
||||
return int64(len(m.series))
|
||||
}
|
||||
|
||||
// forEach applies fn to every series ID in the logMeasurement.
|
||||
func (m *logMeasurement) forEach(fn func(uint64)) {
|
||||
if m.bitmap != nil {
|
||||
m.bitmap.ForEachNoLock(fn)
|
||||
return
|
||||
}
|
||||
|
||||
for seriesID := range m.series {
|
||||
fn(seriesID)
|
||||
}
|
||||
}
|
||||
|
||||
// seriesIDs returns a sorted set of seriesIDs.
|
||||
func (m *logMeasurement) seriesIDs() []uint64 {
|
||||
a := make([]uint64, 0, m.cardinality())
|
||||
if m.bitmap != nil {
|
||||
m.bitmap.ForEachNoLock(func(id uint64) { a = append(a, id) })
|
||||
return a // IDs are already sorted.
|
||||
}
|
||||
|
||||
for seriesID := range m.series {
|
||||
a = append(a, seriesID)
|
||||
}
|
||||
sort.Sort(uint64Slice(a))
|
||||
return a
|
||||
}
|
||||
|
||||
// seriesIDSet returns a copy of the logMeasurement's bitmap, or creates a new
|
||||
// one
|
||||
func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
|
||||
if m.bitmap != nil {
|
||||
return m.bitmap.CloneNoLock()
|
||||
}
|
||||
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for seriesID := range m.series {
|
||||
ss.AddNoLock(seriesID)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func (m *logMeasurement) Name() []byte { return m.name }
|
||||
func (m *logMeasurement) Deleted() bool { return m.deleted }
|
||||
|
||||
|
@ -1289,19 +1373,59 @@ type logTagValue struct {
|
|||
name []byte
|
||||
deleted bool
|
||||
series map[uint64]struct{}
|
||||
bitmap *tsdb.SeriesIDSet
|
||||
}
|
||||
|
||||
// bytes estimates the memory footprint of this logTagValue, in bytes.
|
||||
func (tv *logTagValue) bytes() int {
|
||||
var b int
|
||||
b += len(tv.name)
|
||||
b += len(tv.series) * 8
|
||||
b += int(unsafe.Sizeof(*tv))
|
||||
b += (int(tv.cardinality()) * 8)
|
||||
return b
|
||||
}
|
||||
|
||||
func (tv *logTagValue) addSeriesID(x uint64) {
|
||||
if tv.bitmap != nil {
|
||||
tv.bitmap.AddNoLock(x)
|
||||
return
|
||||
}
|
||||
|
||||
tv.series[x] = struct{}{}
|
||||
|
||||
// If the map is getting too big it can be converted into a roaring bitmap.
|
||||
if len(tv.series) > 25 {
|
||||
tv.bitmap = tsdb.NewSeriesIDSet()
|
||||
for id := range tv.series {
|
||||
tv.bitmap.AddNoLock(id)
|
||||
}
|
||||
tv.series = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (tv *logTagValue) removeSeriesID(x uint64) {
|
||||
if tv.bitmap != nil {
|
||||
tv.bitmap.RemoveNoLock(x)
|
||||
return
|
||||
}
|
||||
delete(tv.series, x)
|
||||
}
|
||||
|
||||
func (tv *logTagValue) cardinality() int64 {
|
||||
if tv.bitmap != nil {
|
||||
return int64(tv.bitmap.Cardinality())
|
||||
}
|
||||
return int64(len(tv.series))
|
||||
}
|
||||
|
||||
// seriesIDs returns a sorted set of seriesIDs.
|
||||
func (tv *logTagValue) seriesIDs() []uint64 {
|
||||
a := make([]uint64, 0, len(tv.series))
|
||||
a := make([]uint64, 0, tv.cardinality())
|
||||
if tv.bitmap != nil {
|
||||
tv.bitmap.ForEachNoLock(func(id uint64) { a = append(a, id) })
|
||||
return a // IDs are already sorted.
|
||||
}
|
||||
|
||||
for seriesID := range tv.series {
|
||||
a = append(a, seriesID)
|
||||
}
|
||||
|
@ -1309,6 +1433,20 @@ func (tv *logTagValue) seriesIDs() []uint64 {
|
|||
return a
|
||||
}
|
||||
|
||||
// seriesIDSet returns a copy of the logMeasurement's bitmap, or creates a new
|
||||
// one
|
||||
func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet {
|
||||
if tv.bitmap != nil {
|
||||
return tv.bitmap.CloneNoLock()
|
||||
}
|
||||
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for seriesID := range tv.series {
|
||||
ss.AddNoLock(seriesID)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func (tv *logTagValue) Value() []byte { return tv.name }
|
||||
func (tv *logTagValue) Deleted() bool { return tv.deleted }
|
||||
|
||||
|
@ -1359,19 +1497,6 @@ func (itr *logTagValueIterator) Next() (e TagValueElem) {
|
|||
return e
|
||||
}
|
||||
|
||||
// newLogSeriesIDIterator returns a new iterator. All series are copied to the iterator.
|
||||
func newLogSeriesIDIterator(m map[uint64]struct{}) tsdb.SeriesIDIterator {
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
for seriesID := range m {
|
||||
ss.AddNoLock(seriesID)
|
||||
}
|
||||
return tsdb.NewSeriesIDSetIterator(ss)
|
||||
}
|
||||
|
||||
// FormatLogFileName generates a log filename for the given index.
|
||||
func FormatLogFileName(id int) string {
|
||||
return fmt.Sprintf("L0-%08d%s", id, LogFileExt)
|
||||
|
|
|
@ -135,7 +135,8 @@ func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
|
|||
return &SeriesIDSet{bitmap: roaring.AndNot(s.bitmap, other.bitmap)}
|
||||
}
|
||||
|
||||
// ForEach calls f for each id in the set.
|
||||
// ForEach calls f for each id in the set. The function is applied to the IDs
|
||||
// in ascending order.
|
||||
func (s *SeriesIDSet) ForEach(f func(id uint64)) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
@ -145,6 +146,14 @@ func (s *SeriesIDSet) ForEach(f func(id uint64)) {
|
|||
}
|
||||
}
|
||||
|
||||
// ForEachNoLock calls f for each id in the set without taking a lock.
|
||||
func (s *SeriesIDSet) ForEachNoLock(f func(id uint64)) {
|
||||
itr := s.bitmap.Iterator()
|
||||
for itr.HasNext() {
|
||||
f(uint64(itr.Next()))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SeriesIDSet) String() string {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
@ -161,6 +170,20 @@ func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
|
|||
s.bitmap = roaring.AndNot(s.bitmap, other.bitmap)
|
||||
}
|
||||
|
||||
// Clone returns a new SeriesIDSet with a deep copy of the underlying bitmap.
|
||||
func (s *SeriesIDSet) Clone() *SeriesIDSet {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.CloneNoLock()
|
||||
}
|
||||
|
||||
// CloneNoLock calls Clone without taking a lock.
|
||||
func (s *SeriesIDSet) CloneNoLock() *SeriesIDSet {
|
||||
new := NewSeriesIDSet()
|
||||
new.bitmap = s.bitmap.Clone()
|
||||
return new
|
||||
}
|
||||
|
||||
// Iterator returns an iterator to the underlying bitmap.
|
||||
// This iterator is not protected by a lock.
|
||||
func (s *SeriesIDSet) Iterator() SeriesIDSetIterable {
|
||||
|
|
Loading…
Reference in New Issue