fix(tsi1): optimize the comparison of SeriesIDSet. (#21013)
parent
8ab90d335f
commit
4f535d281a
|
@ -285,13 +285,7 @@ func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection.
|
||||
for _, id := range mm.seriesIDs() {
|
||||
if ss.Contains(id) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return mm.hasSeries(ss)
|
||||
}
|
||||
|
||||
// MeasurementNames returns an ordered list of measurement names.
|
||||
|
@ -680,7 +674,7 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
|||
// the entire database and the server is restarted. This would cause
|
||||
// the log to replay its insert but the key cannot be found.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb/v2/issues/9444
|
||||
// https://github.com/influxdata/influxdb/issues/9444
|
||||
if seriesKey == nil {
|
||||
return
|
||||
}
|
||||
|
@ -1300,6 +1294,20 @@ func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
|
|||
return ss
|
||||
}
|
||||
|
||||
func (m *logMeasurement) hasSeries(ss *tsdb.SeriesIDSet) bool {
|
||||
if m.seriesSet != nil {
|
||||
return m.seriesSet.Intersects(ss)
|
||||
}
|
||||
|
||||
for seriesID := range m.series {
|
||||
if ss.Contains(seriesID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *logMeasurement) Name() []byte { return m.name }
|
||||
func (m *logMeasurement) Deleted() bool { return m.deleted }
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/bloom"
|
||||
|
@ -313,6 +314,60 @@ func TestLogFile_Open(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestLogFile_MeasurementHasSeries(t *testing.T) {
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
f := MustOpenLogFile(sfile.SeriesFile)
|
||||
defer f.Close()
|
||||
|
||||
measurementN, seriesValueN, seriesKeyN := 3, 2, 5
|
||||
tagValueN := pow(seriesValueN, seriesKeyN)
|
||||
|
||||
seriesSet := tsdb.NewSeriesIDSet() // all series in all measurements
|
||||
seriesIDs := make([]uint64, 0, tagValueN) // all series ids in measurement0
|
||||
|
||||
// add series to all measurements
|
||||
for i := 0; i < measurementN; i++ {
|
||||
name := []byte(fmt.Sprintf("measurement%d", i))
|
||||
|
||||
names := make([][]byte, tagValueN)
|
||||
tags := make([]models.Tags, tagValueN)
|
||||
|
||||
for j := 0; j < tagValueN; j++ {
|
||||
var tag models.Tags
|
||||
for k := 0; k < seriesKeyN; k++ {
|
||||
key := []byte(fmt.Sprintf("key%d", k))
|
||||
value := []byte(fmt.Sprintf("value%d", j/pow(seriesValueN, k)%seriesValueN))
|
||||
tag = append(tag, models.NewTag(key, value))
|
||||
}
|
||||
|
||||
names[j] = name
|
||||
tags[j] = tag
|
||||
}
|
||||
|
||||
ids, err := f.AddSeriesList(seriesSet, names, tags)
|
||||
require.NoError(t, err)
|
||||
|
||||
if i == 0 {
|
||||
seriesIDs = append(seriesIDs, ids...)
|
||||
}
|
||||
}
|
||||
|
||||
// remove series from measurement 0
|
||||
name := []byte("measurement0")
|
||||
for i := 0; i < tagValueN; i++ {
|
||||
// measurement0 has series before last one removed
|
||||
require.True(t, f.MeasurementHasSeries(seriesSet, name))
|
||||
|
||||
require.NoError(t, f.DeleteSeriesID(seriesIDs[i]))
|
||||
seriesSet.Remove(seriesIDs[i])
|
||||
}
|
||||
|
||||
// measurement0 has none series when last one removed
|
||||
require.False(t, f.MeasurementHasSeries(seriesSet, name))
|
||||
}
|
||||
|
||||
// LogFile is a test wrapper for tsi1.LogFile.
|
||||
type LogFile struct {
|
||||
*tsi1.LogFile
|
||||
|
@ -493,6 +548,70 @@ func BenchmarkLogFile_WriteTo(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func benchmarkLogFile_MeasurementHasSeries(b *testing.B, seriesKeyN, seriesValueN int) {
|
||||
b.StopTimer()
|
||||
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
f := MustOpenLogFile(sfile.SeriesFile)
|
||||
defer f.Close()
|
||||
|
||||
measurementN := 2
|
||||
tagValueN := pow(seriesValueN, seriesKeyN)
|
||||
|
||||
seriesSet := tsdb.NewSeriesIDSet() // all series in all measurements
|
||||
seriesIDs := make([]uint64, 0, tagValueN) // all series ids in measurement0
|
||||
|
||||
// add series to all measurements
|
||||
for i := 0; i < measurementN; i++ {
|
||||
name := []byte(fmt.Sprintf("measurement%d", i))
|
||||
|
||||
names := make([][]byte, tagValueN)
|
||||
tags := make([]models.Tags, tagValueN)
|
||||
|
||||
for j := 0; j < tagValueN; j++ {
|
||||
var tag models.Tags
|
||||
for k := 0; k < seriesKeyN; k++ {
|
||||
key := []byte(fmt.Sprintf("key%d", k))
|
||||
value := []byte(fmt.Sprintf("value%d", j/pow(seriesValueN, k)%seriesValueN))
|
||||
tag = append(tag, models.NewTag(key, value))
|
||||
}
|
||||
|
||||
names[j] = name
|
||||
tags[j] = tag
|
||||
}
|
||||
|
||||
ids, err := f.AddSeriesList(seriesSet, names, tags)
|
||||
require.NoError(b, err)
|
||||
|
||||
if i == 0 {
|
||||
seriesIDs = append(seriesIDs, ids...)
|
||||
}
|
||||
}
|
||||
|
||||
// remove some series in measurement0
|
||||
name := []byte("measurement0")
|
||||
for i := 0; i < 50; i++ {
|
||||
require.NoError(b, f.DeleteSeriesID(seriesIDs[i]))
|
||||
seriesSet.Remove(seriesIDs[i])
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
if !f.MeasurementHasSeries(seriesSet, name) {
|
||||
b.Fatal("expect true, got false")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLogFile_MeasurementHasSeries_2_10(b *testing.B) { benchmarkLogFile_MeasurementHasSeries(b, 2, 10) } // 100 series
|
||||
func BenchmarkLogFile_MeasurementHasSeries_3_10(b *testing.B) { benchmarkLogFile_MeasurementHasSeries(b, 3, 10) } // 1k series
|
||||
func BenchmarkLogFile_MeasurementHasSeries_4_10(b *testing.B) { benchmarkLogFile_MeasurementHasSeries(b, 4, 10) } // 10k series
|
||||
func BenchmarkLogFile_MeasurementHasSeries_5_10(b *testing.B) { benchmarkLogFile_MeasurementHasSeries(b, 5, 10) } // 100k series
|
||||
|
||||
// MustStartCPUProfile starts a cpu profile in a temporary path based on name.
|
||||
func MustStartCPUProfile(name string) {
|
||||
name = regexp.MustCompile(`\W+`).ReplaceAllString(name, "-")
|
||||
|
|
|
@ -206,6 +206,17 @@ func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
|
|||
s.bitmap = roaring.AndNot(s.bitmap, other.bitmap)
|
||||
}
|
||||
|
||||
// Intersects checks whether two SeriesIDSet intersects, SeriesIDSet are not modified
|
||||
func (s *SeriesIDSet) Intersects(other *SeriesIDSet) bool {
|
||||
other.RLock()
|
||||
defer other.RUnlock()
|
||||
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
return s.bitmap.Intersects(other.bitmap)
|
||||
}
|
||||
|
||||
// Clone returns a new SeriesIDSet with a deep copy of the underlying bitmap.
|
||||
func (s *SeriesIDSet) Clone() *SeriesIDSet {
|
||||
s.RLock()
|
||||
|
|
Loading…
Reference in New Issue