From 3cc2638bbfcc3372ce28ff19307d55d638bd9a67 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 22 Jul 2020 10:12:27 -0600 Subject: [PATCH] feat(tsi1): Add optional mincore limiter to TSI --- pkg/mincore/limiter.go | 4 +++ storage/engine.go | 2 ++ tsdb/seriesfile/series_file.go | 18 ++++++++++++++ tsdb/seriesfile/series_index.go | 12 +++++++++ tsdb/seriesfile/series_partition.go | 20 ++++++++++++--- tsdb/seriesfile/series_segment.go | 13 ++++++++++ tsdb/tsi1/index.go | 31 +++++++++++++++++------ tsdb/tsi1/index_file.go | 38 +++++++++++++++-------------- tsdb/tsi1/index_files.go | 2 +- tsdb/tsi1/log_file.go | 5 ++-- tsdb/tsi1/measurement_block.go | 25 ++++++++++++++----- tsdb/tsi1/measurement_block_test.go | 8 +++--- tsdb/tsi1/partition.go | 10 ++++++-- tsdb/tsi1/tag_block.go | 35 +++++++++++++++++--------- tsdb/tsi1/tag_block_test.go | 12 ++++----- tsdb/tsi1/tsi1.go | 7 +++--- tsdb/tsi1/tsi1_test.go | 7 +++--- 17 files changed, 182 insertions(+), 67 deletions(-) diff --git a/pkg/mincore/limiter.go b/pkg/mincore/limiter.go index 2c673ec6dc..04ba6846cd 100644 --- a/pkg/mincore/limiter.go +++ b/pkg/mincore/limiter.go @@ -34,6 +34,10 @@ type Limiter struct { // NewLimiter returns a new instance of Limiter associated with an mmap. // The underlying limiter can be shared to limit faults across the entire process. func NewLimiter(underlying *rate.Limiter, data []byte) *Limiter { + if underlying == nil { + return nil + } + return &Limiter{ underlying: underlying, data: data, diff --git a/storage/engine.go b/storage/engine.go index cf1d07e89d..207f494a2a 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -171,6 +171,8 @@ func WithWritePointsValidationEnabled(v bool) Option { func WithPageFaultLimiter(limiter *rate.Limiter) Option { return func(e *Engine) { e.engine.WithPageFaultLimiter(limiter) + e.index.WithPageFaultLimiter(limiter) + e.sfile.WithPageFaultLimiter(limiter) } } diff --git a/tsdb/seriesfile/series_file.go b/tsdb/seriesfile/series_file.go index b83a77d3dc..5352f22565 100644 --- a/tsdb/seriesfile/series_file.go +++ b/tsdb/seriesfile/series_file.go @@ -17,12 +17,14 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/binaryutil" "github.com/influxdata/influxdb/v2/pkg/lifecycle" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/rhh" "github.com/influxdata/influxdb/v2/tsdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) var ( @@ -50,6 +52,8 @@ type SeriesFile struct { defaultMetricLabels prometheus.Labels metricsEnabled bool + pageFaultLimiter *rate.Limiter // Limits page faults by the series file + LargeWriteThreshold int Logger *zap.Logger @@ -86,6 +90,11 @@ func (f *SeriesFile) DisableMetrics() { f.metricsEnabled = false } +// WithPageFaultLimiter sets a limiter to restrict the number of page faults. +func (f *SeriesFile) WithPageFaultLimiter(limiter *rate.Limiter) { + f.pageFaultLimiter = limiter +} + // Open memory maps the data file at the file's path. func (f *SeriesFile) Open(ctx context.Context) error { f.mu.Lock() @@ -129,6 +138,7 @@ func (f *SeriesFile) Open(ctx context.Context) error { p := NewSeriesPartition(i, f.SeriesPartitionPath(i)) p.LargeWriteThreshold = f.LargeWriteThreshold p.Logger = f.Logger.With(zap.Int("partition", p.ID())) + p.pageFaultLimiter = f.pageFaultLimiter // For each series file index, rhh trackers are used to track the RHH Hashmap. // Each of the trackers needs to be given slightly different default @@ -608,3 +618,11 @@ func SeriesKeySize(name []byte, tags models.Tags) int { n += binaryutil.UvarintSize(uint64(n)) return n } + +// wait rate limits page faults to the underlying data. Skipped if limiter is not set. +func wait(limiter *mincore.Limiter, b []byte) error { + if limiter == nil { + return nil + } + return limiter.WaitRange(context.Background(), b) +} diff --git a/tsdb/seriesfile/series_index.go b/tsdb/seriesfile/series_index.go index 5cbeb3f10e..0c6515b6a2 100644 --- a/tsdb/seriesfile/series_index.go +++ b/tsdb/seriesfile/series_index.go @@ -8,10 +8,12 @@ import ( "os" "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/mmap" "github.com/influxdata/influxdb/v2/pkg/rhh" "github.com/influxdata/influxdb/v2/tsdb" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" ) const ( @@ -63,6 +65,8 @@ type SeriesIndex struct { keyIDMap *rhh.HashMap idOffsetMap map[tsdb.SeriesID]int64 tombstones map[tsdb.SeriesID]struct{} + + limiter *mincore.Limiter // Limits page faults by the partition } func NewSeriesIndex(path string) *SeriesIndex { @@ -124,6 +128,12 @@ func (idx *SeriesIndex) Close() (err error) { return err } +// SetPageFaultLimiter sets the limiter used for rate limiting page faults. +// Must be called after Open(). +func (idx *SeriesIndex) SetPageFaultLimiter(limiter *rate.Limiter) { + idx.limiter = mincore.NewLimiter(limiter, idx.data) +} + // Recover rebuilds the in-memory index for all new entries. func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error { // Allocate new in-memory maps. @@ -247,6 +257,7 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:SeriesOffsetSize])) + _ = wait(idx.limiter, elem[:SeriesOffsetSize]) // elem size is two uint64s if elemOffset == 0 { return tsdb.SeriesIDTyped{} @@ -298,6 +309,7 @@ func (idx *SeriesIndex) FindOffsetByID(id tsdb.SeriesID) int64 { for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { elem := idx.idOffsetData[(pos * SeriesIndexElemSize):] elemID := tsdb.NewSeriesID(binary.BigEndian.Uint64(elem[:SeriesIDSize])) + _ = wait(idx.limiter, elem[:SeriesIDSize]) if elemID == id { return int64(binary.BigEndian.Uint64(elem[SeriesIDSize:])) diff --git a/tsdb/seriesfile/series_partition.go b/tsdb/seriesfile/series_partition.go index eae963a55a..5e8c293a18 100644 --- a/tsdb/seriesfile/series_partition.go +++ b/tsdb/seriesfile/series_partition.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "golang.org/x/time/rate" ) var ( @@ -48,6 +49,8 @@ type SeriesPartition struct { compacting bool compactionsDisabled int + pageFaultLimiter *rate.Limiter // Limits page faults by the partition + CompactThreshold int LargeWriteThreshold int @@ -94,7 +97,10 @@ func (p *SeriesPartition) Open() error { if err := p.index.Open(); err != nil { return err - } else if err = p.index.Recover(p.segments); err != nil { + } + p.index.SetPageFaultLimiter(p.pageFaultLimiter) + + if err = p.index.Recover(p.segments); err != nil { return err } return nil @@ -124,6 +130,7 @@ func (p *SeriesPartition) openSegments() error { if err := segment.Open(); err != nil { return err } + segment.SetPageFaultLimiter(p.pageFaultLimiter) p.segments = append(p.segments, segment) } @@ -142,6 +149,7 @@ func (p *SeriesPartition) openSegments() error { if err != nil { return err } + segment.SetPageFaultLimiter(p.pageFaultLimiter) p.segments = append(p.segments, segment) } @@ -569,6 +577,7 @@ func (p *SeriesPartition) createSegment() (*SeriesSegment, error) { if err != nil { return nil, err } + segment.SetPageFaultLimiter(p.pageFaultLimiter) p.segments = append(p.segments, segment) // Allow segment to write. @@ -591,7 +600,9 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { continue } - key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + buf := segment.Slice(pos + SeriesEntryHeaderSize) + key, _ := ReadSeriesKey(buf) + _ = wait(segment.limiter, buf[:len(key)]) return key } @@ -769,7 +780,10 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) (time.Duration, e return err } else if err := fs.RenameFileWithReplacement(indexPath, index.path); err != nil { return err - } else if err := p.index.Open(); err != nil { + } + + p.index.SetPageFaultLimiter(p.pageFaultLimiter) + if err := p.index.Open(); err != nil { return err } diff --git a/tsdb/seriesfile/series_segment.go b/tsdb/seriesfile/series_segment.go index fc1c39b2bf..5d619f7e84 100644 --- a/tsdb/seriesfile/series_segment.go +++ b/tsdb/seriesfile/series_segment.go @@ -12,8 +12,10 @@ import ( "strconv" "github.com/influxdata/influxdb/v2/pkg/fs" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/mmap" "github.com/influxdata/influxdb/v2/tsdb" + "golang.org/x/time/rate" ) const ( @@ -47,6 +49,8 @@ type SeriesSegment struct { file *os.File // write file handle w *bufio.Writer // bufferred file handle size uint32 // current file size + + limiter *mincore.Limiter } // NewSeriesSegment returns a new instance of SeriesSegment. @@ -125,6 +129,7 @@ func (s *SeriesSegment) InitForWrite() (err error) { if !IsValidSeriesEntryFlag(flag) { break } + _ = wait(s.limiter, s.data[s.size:int64(s.size)+sz]) s.size += uint32(sz) } @@ -172,6 +177,12 @@ func (s *SeriesSegment) CloseForWrite() (err error) { return err } +// SetPageFaultLimiter sets the limiter used for rate limiting page faults. +// Must be called after Open(). +func (s *SeriesSegment) SetPageFaultLimiter(limiter *rate.Limiter) { + s.limiter = mincore.NewLimiter(limiter, s.data) +} + // Data returns the raw data. func (s *SeriesSegment) Data() []byte { return s.data } @@ -247,6 +258,7 @@ func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id tsdb.SeriesIDTyped, if !IsValidSeriesEntryFlag(flag) { break } + _ = wait(s.limiter, s.data[pos:int64(pos)+sz]) offset := JoinSeriesOffset(s.id, pos) if err := fn(flag, id, offset, key); err != nil { @@ -335,6 +347,7 @@ func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte { } buf := segment.Slice(pos) key, _ := ReadSeriesKey(buf) + _ = wait(segment.limiter, buf[:len(key)]) return key } diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index e8da81ef7c..a4f766aacd 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -21,6 +21,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/lifecycle" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/slices" "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/tsdb" @@ -29,6 +30,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) // ErrCompactionInterrupted is returned if compactions are disabled or @@ -115,13 +117,14 @@ type Index struct { metricsEnabled bool // The following may be set when initializing an Index. - path string // Root directory of the index partitions. - disableCompactions bool // Initially disables compactions on the index. - maxLogFileSize int64 // Maximum size of a LogFile before it's compacted. - logfileBufferSize int // The size of the buffer used by the LogFile. - disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline. - logger *zap.Logger // Index's logger. - config Config // The index configuration + path string // Root directory of the index partitions. + disableCompactions bool // Initially disables compactions on the index. + maxLogFileSize int64 // Maximum size of a LogFile before it's compacted. + logfileBufferSize int // The size of the buffer used by the LogFile. + disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline. + pageFaultLimiter *rate.Limiter // Limits page faults by the index. + logger *zap.Logger // Index's logger. + config Config // The index configuration // The following must be set when initializing an Index. sfile *seriesfile.SeriesFile // series lookup file @@ -162,6 +165,11 @@ func NewIndex(sfile *seriesfile.SeriesFile, c Config, options ...IndexOption) *I return idx } +// WithPageFaultLimiter sets a limiter to restrict the number of page faults. +func (i *Index) WithPageFaultLimiter(limiter *rate.Limiter) { + i.pageFaultLimiter = limiter +} + // SetDefaultMetricLabels sets the default labels on the trackers. func (i *Index) SetDefaultMetricLabels(labels prometheus.Labels) { i.defaultLabels = make(prometheus.Labels, len(labels)) @@ -253,6 +261,7 @@ func (i *Index) Open(ctx context.Context) error { p.StatsTTL = i.StatsTTL p.nosync = i.disableFsync p.logbufferSize = i.logfileBufferSize + p.pageFaultLimiter = i.pageFaultLimiter p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) // Each of the trackers needs to be given slightly different default @@ -1686,3 +1695,11 @@ type DropSeriesItem struct { SeriesID tsdb.SeriesID Key []byte } + +// wait rate limits page faults to the underlying data. Skipped if limiter is not set. +func wait(limiter *mincore.Limiter, b []byte) error { + if limiter == nil { + return nil + } + return limiter.WaitRange(context.Background(), b) +} diff --git a/tsdb/tsi1/index_file.go b/tsdb/tsi1/index_file.go index fe82c54ea2..89a984c42b 100644 --- a/tsdb/tsi1/index_file.go +++ b/tsdb/tsi1/index_file.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/lifecycle" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/mmap" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/seriesfile" @@ -73,6 +74,8 @@ type IndexFile struct { // Path to data file. path string + + pageFaultLimiter *mincore.Limiter } // NewIndexFile returns a new instance of IndexFile. @@ -215,7 +218,7 @@ func (f *IndexFile) UnmarshalBinary(data []byte) error { // Unmarshal each tag block. f.tblks = make(map[string]*TagBlock) - itr := f.mblk.Iterator() + itr := f.mblk.Iterator(f.pageFaultLimiter) for m := itr.Next(); m != nil; m = itr.Next() { e := m.(*MeasurementBlockElem) @@ -243,7 +246,7 @@ func (f *IndexFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) { if err := ss.UnmarshalBinary(f.seriesIDSetData); err != nil { return nil, err } - return ss, nil + return ss, wait(f.pageFaultLimiter, f.seriesIDSetData) } func (f *IndexFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { @@ -251,12 +254,12 @@ func (f *IndexFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { if err := ss.UnmarshalBinaryUnsafe(f.tombstoneSeriesIDSetData); err != nil { return nil, err } - return ss, nil + return ss, wait(f.pageFaultLimiter, f.tombstoneSeriesIDSetData) } // Measurement returns a measurement element. func (f *IndexFile) Measurement(name []byte) MeasurementElem { - e, ok := f.mblk.Elem(name) + e, ok := f.mblk.Elem(name, f.pageFaultLimiter) if !ok { return nil } @@ -265,7 +268,7 @@ func (f *IndexFile) Measurement(name []byte) MeasurementElem { // MeasurementN returns the number of measurements in the file. func (f *IndexFile) MeasurementN() (n uint64) { - mitr := f.mblk.Iterator() + mitr := f.mblk.Iterator(f.pageFaultLimiter) for me := mitr.Next(); me != nil; me = mitr.Next() { n++ } @@ -274,7 +277,7 @@ func (f *IndexFile) MeasurementN() (n uint64) { // MeasurementHasSeries returns true if a measurement has any non-tombstoned series. func (f *IndexFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) (ok bool) { - e, ok := f.mblk.Elem(name) + e, ok := f.mblk.Elem(name, f.pageFaultLimiter) if !ok { return false } @@ -299,13 +302,13 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator { } // Find key element. - ke := tblk.TagKeyElem(key) + ke := tblk.TagKeyElem(key, f.pageFaultLimiter) if ke == nil { return nil } // Merge all value series iterators together. - return ke.TagValueIterator() + return ke.TagValueIterator(f.pageFaultLimiter) } // TagKeySeriesIDIterator returns a series iterator for a tag key and a flag @@ -317,13 +320,13 @@ func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDItera } // Find key element. - ke := tblk.TagKeyElem(key) + ke := tblk.TagKeyElem(key, f.pageFaultLimiter) if ke == nil { return nil, nil } // Merge all value series iterators together. - vitr := ke.TagValueIterator() + vitr := ke.TagValueIterator(f.pageFaultLimiter) var itrs []tsdb.SeriesIDIterator for ve := vitr.Next(); ve != nil; ve = vitr.Next() { @@ -351,7 +354,7 @@ func (f *IndexFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesID // Find value element. var valueElem TagBlockValueElem - if !tblk.DecodeTagValueElem(key, value, &valueElem) { + if !tblk.DecodeTagValueElem(key, value, &valueElem, f.pageFaultLimiter) { return nil, nil } else if valueElem.SeriesN() == 0 { return nil, nil @@ -365,7 +368,7 @@ func (f *IndexFile) TagKey(name, key []byte) TagKeyElem { if tblk == nil { return nil } - return tblk.TagKeyElem(key) + return tblk.TagKeyElem(key, f.pageFaultLimiter) } // TagValue returns a tag value. @@ -374,7 +377,7 @@ func (f *IndexFile) TagValue(name, key, value []byte) TagValueElem { if tblk == nil { return nil } - return tblk.TagValueElem(key, value) + return tblk.TagValueElem(key, value, f.pageFaultLimiter) } // HasSeries returns flags indicating if the series exists and if it is tombstoned. @@ -388,12 +391,12 @@ func (f *IndexFile) TagValueElem(name, key, value []byte) TagValueElem { if !ok { return nil } - return tblk.TagValueElem(key, value) + return tblk.TagValueElem(key, value, f.pageFaultLimiter) } // MeasurementIterator returns an iterator over all measurements. func (f *IndexFile) MeasurementIterator() MeasurementIterator { - return f.mblk.Iterator() + return f.mblk.Iterator(f.pageFaultLimiter) } // TagKeyIterator returns an iterator over all tag keys for a measurement. @@ -402,13 +405,12 @@ func (f *IndexFile) TagKeyIterator(name []byte) TagKeyIterator { if blk == nil { return nil } - - return blk.TagKeyIterator() + return blk.TagKeyIterator(f.pageFaultLimiter) } // MeasurementSeriesIDIterator returns an iterator over a measurement's series. func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator { - return f.mblk.SeriesIDIterator(name) + return f.mblk.SeriesIDIterator(name, f.pageFaultLimiter) } // ReadIndexFileTrailer returns the index file trailer from data. diff --git a/tsdb/tsi1/index_files.go b/tsdb/tsi1/index_files.go index 5a4897d8c0..62ecf5917c 100644 --- a/tsdb/tsi1/index_files.go +++ b/tsdb/tsi1/index_files.go @@ -284,7 +284,7 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn } // Iterate over tag values. - vitr := ke.TagValueIterator() + vitr := ke.TagValueIterator(nil) for ve := vitr.Next(); ve != nil; ve = vitr.Next() { seriesIDs = seriesIDs[:0] diff --git a/tsdb/tsi1/log_file.go b/tsdb/tsi1/log_file.go index 0b93e25656..3a9332a50a 100644 --- a/tsdb/tsi1/log_file.go +++ b/tsdb/tsi1/log_file.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bloom" "github.com/influxdata/influxdb/v2/pkg/lifecycle" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/mmap" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/seriesfile" @@ -453,7 +454,7 @@ func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { return nil } - return tk.TagValueIterator() + return tk.TagValueIterator(nil) } // deleteTagKey adds a tombstone for a tag key to the log file without a lock. @@ -1390,7 +1391,7 @@ func (tk *logTagKey) bytes() int { func (tk *logTagKey) Key() []byte { return tk.name } func (tk *logTagKey) Deleted() bool { return tk.deleted } -func (tk *logTagKey) TagValueIterator() TagValueIterator { +func (tk *logTagKey) TagValueIterator(_ *mincore.Limiter) TagValueIterator { a := make([]logTagValue, 0, len(tk.tagValues)) for _, v := range tk.tagValues { a = append(a, v) diff --git a/tsdb/tsi1/measurement_block.go b/tsdb/tsi1/measurement_block.go index 744fe881e7..0ce8c9977a 100644 --- a/tsdb/tsi1/measurement_block.go +++ b/tsdb/tsi1/measurement_block.go @@ -8,6 +8,7 @@ import ( "sort" "unsafe" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/rhh" "github.com/influxdata/influxdb/v2/tsdb" ) @@ -70,7 +71,8 @@ func (blk *MeasurementBlock) bytes() int { func (blk *MeasurementBlock) Version() int { return blk.version } // Elem returns an element for a measurement. -func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) { +func (blk *MeasurementBlock) Elem(name []byte, limiter *mincore.Limiter) (e MeasurementBlockElem, ok bool) { + _ = wait(limiter, blk.hashData[:MeasurementNSize]) n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize])) hash := rhh.HashKey(name) pos := hash % n @@ -79,6 +81,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) var d int64 for { // Find offset of measurement. + _ = wait(limiter, blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):MeasurementNSize+(pos*MeasurementOffsetSize)+8]) offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):]) if offset == 0 { return MeasurementBlockElem{}, false @@ -88,6 +91,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) if offset > 0 { // Parse into element. var e MeasurementBlockElem + _ = wait(limiter, blk.hashData[offset:offset+1]) e.UnmarshalBinary(blk.data[offset:]) // Return if name match. @@ -132,18 +136,22 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error { } // Iterator returns an iterator over all measurements. -func (blk *MeasurementBlock) Iterator() MeasurementIterator { - return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]} +func (blk *MeasurementBlock) Iterator(limiter *mincore.Limiter) MeasurementIterator { + return &blockMeasurementIterator{ + data: blk.data[MeasurementFillSize:], + limiter: limiter, + } } // SeriesIDIterator returns an iterator for all series ids in a measurement. -func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator { +func (blk *MeasurementBlock) SeriesIDIterator(name []byte, limiter *mincore.Limiter) tsdb.SeriesIDIterator { // Find measurement element. - e, ok := blk.Elem(name) + e, ok := blk.Elem(name, limiter) if !ok { return &rawSeriesIDIterator{} } if e.seriesIDSet != nil { + _ = wait(limiter, e.seriesIDSetData) return tsdb.NewSeriesIDSetIterator(e.seriesIDSet) } return &rawSeriesIDIterator{n: e.series.n, data: e.series.data} @@ -153,6 +161,8 @@ func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator type blockMeasurementIterator struct { elem MeasurementBlockElem data []byte + + limiter *mincore.Limiter } // Next returns the next measurement. Returns nil when iterator is complete. @@ -164,6 +174,7 @@ func (itr *blockMeasurementIterator) Next() MeasurementElem { // Unmarshal the element at the current position. itr.elem.UnmarshalBinary(itr.data) + _ = wait(itr.limiter, itr.data[:itr.elem.size]) // Move the data forward past the record. itr.data = itr.data[itr.elem.size:] @@ -304,7 +315,8 @@ type MeasurementBlockElem struct { data []byte // serialized series data } - seriesIDSet *tsdb.SeriesIDSet + seriesIDSet *tsdb.SeriesIDSet + seriesIDSetData []byte // size in bytes, set after unmarshaling. size int @@ -420,6 +432,7 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { } else { // data = memalign(data) e.seriesIDSet = tsdb.NewSeriesIDSet() + e.seriesIDSetData = data[:sz] if err = e.seriesIDSet.UnmarshalBinaryUnsafe(data[:sz]); err != nil { return err } diff --git a/tsdb/tsi1/measurement_block_test.go b/tsdb/tsi1/measurement_block_test.go index 90a630b162..b3b694a08a 100644 --- a/tsdb/tsi1/measurement_block_test.go +++ b/tsdb/tsi1/measurement_block_test.go @@ -117,7 +117,7 @@ func TestMeasurementBlockWriter(t *testing.T) { } // Verify data in block. - if e, ok := blk.Elem([]byte("foo")); !ok { + if e, ok := blk.Elem([]byte("foo"), nil); !ok { t.Fatal("expected element") } else if e.TagBlockOffset() != 100 || e.TagBlockSize() != 10 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) @@ -125,7 +125,7 @@ func TestMeasurementBlockWriter(t *testing.T) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } - if e, ok := blk.Elem([]byte("bar")); !ok { + if e, ok := blk.Elem([]byte("bar"), nil); !ok { t.Fatal("expected element") } else if e.TagBlockOffset() != 200 || e.TagBlockSize() != 20 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) @@ -133,7 +133,7 @@ func TestMeasurementBlockWriter(t *testing.T) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } - if e, ok := blk.Elem([]byte("baz")); !ok { + if e, ok := blk.Elem([]byte("baz"), nil); !ok { t.Fatal("expected element") } else if e.TagBlockOffset() != 300 || e.TagBlockSize() != 30 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) @@ -142,7 +142,7 @@ func TestMeasurementBlockWriter(t *testing.T) { } // Verify non-existent measurement doesn't exist. - if _, ok := blk.Elem([]byte("BAD_MEASUREMENT")); ok { + if _, ok := blk.Elem([]byte("BAD_MEASUREMENT"), nil); ok { t.Fatal("expected no element") } } diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 787991b3a3..74dd4cb7fc 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -20,11 +20,13 @@ import ( "github.com/influxdata/influxdb/v2/pkg/bytesutil" "github.com/influxdata/influxdb/v2/pkg/fs" "github.com/influxdata/influxdb/v2/pkg/lifecycle" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/seriesfile" "github.com/influxdata/influxql" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "golang.org/x/time/rate" ) // Version is the current version of the TSI index. @@ -86,6 +88,8 @@ type Partition struct { nosync bool // when true, flushing and syncing of LogFile will be disabled. logbufferSize int // the LogFile's buffer is set to this value. + pageFaultLimiter *rate.Limiter + logger *zap.Logger // Current size of MANIFEST. Used to determine partition size. @@ -294,7 +298,6 @@ func (p *Partition) openLogFile(path string) (*LogFile, error) { f := NewLogFile(p.sfile, path) f.nosync = p.nosync f.bufferSize = p.logbufferSize - if err := f.Open(); err != nil { return nil, err } @@ -308,6 +311,7 @@ func (p *Partition) openIndexFile(path string) (*IndexFile, error) { if err := f.Open(); err != nil { return nil, err } + f.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, f.data) return f, nil } @@ -628,7 +632,7 @@ func (p *Partition) DropMeasurement(name []byte) error { } // Delete each value in key. - if vitr := k.TagValueIterator(); vitr != nil { + if vitr := k.TagValueIterator(nil); vitr != nil { for v := vitr.Next(); v != nil; v = vitr.Next() { if !v.Deleted() { if err := func() error { @@ -1095,6 +1099,7 @@ func (p *Partition) compactToLevel(files []*IndexFile, frefs lifecycle.Reference log.Error("Cannot open new index file", zap.Error(err)) return } + file.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, file.data) // Obtain lock to swap in index file and write manifest. if err = func() error { @@ -1262,6 +1267,7 @@ func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interr log.Error("Cannot open compacted index file", zap.Error(err), zap.String("path", file.Path())) return } + file.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, file.data) // Obtain lock to swap in index file and write manifest. if err := func() error { diff --git a/tsdb/tsi1/tag_block.go b/tsdb/tsi1/tag_block.go index 653a9c4777..6de6d3c1f2 100644 --- a/tsdb/tsi1/tag_block.go +++ b/tsdb/tsi1/tag_block.go @@ -7,6 +7,7 @@ import ( "fmt" "io" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/pkg/rhh" "github.com/influxdata/influxdb/v2/tsdb" ) @@ -91,15 +92,15 @@ func (blk *TagBlock) UnmarshalBinary(data []byte) error { // TagKeyElem returns an element for a tag key. // Returns an element with a nil key if not found. -func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem { +func (blk *TagBlock) TagKeyElem(key []byte, limiter *mincore.Limiter) TagKeyElem { var elem TagBlockKeyElem - if !blk.DecodeTagKeyElem(key, &elem) { + if !blk.DecodeTagKeyElem(key, &elem, limiter) { return nil } return &elem } -func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool { +func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem, limiter *mincore.Limiter) bool { keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize])) hash := rhh.HashKey(key) pos := hash % keyN @@ -108,6 +109,7 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool { var d int64 for { // Find offset of tag key. + _ = wait(limiter, blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):TagKeyNSize+(pos*TagKeyOffsetSize)+8]) offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):]) if offset == 0 { return false @@ -115,6 +117,7 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool { // Parse into element. elem.unmarshal(blk.data[offset:], blk.data) + _ = wait(limiter, blk.data[offset:offset+uint64(elem.size)]) // Return if keys match. if bytes.Equal(elem.key, key) { @@ -137,25 +140,26 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool { } // TagValueElem returns an element for a tag value. -func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem { +func (blk *TagBlock) TagValueElem(key, value []byte, limiter *mincore.Limiter) TagValueElem { var valueElem TagBlockValueElem - if !blk.DecodeTagValueElem(key, value, &valueElem) { + if !blk.DecodeTagValueElem(key, value, &valueElem, limiter) { return nil } return &valueElem } // DecodeTagValueElem returns an element for a tag value. -func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool { +func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem, limiter *mincore.Limiter) bool { // Find key element, exit if not found. var keyElem TagBlockKeyElem - if !blk.DecodeTagKeyElem(key, &keyElem) { + if !blk.DecodeTagKeyElem(key, &keyElem, limiter) { return false } // Slice hash index data. hashData := keyElem.hashIndex.buf + _ = wait(limiter, hashData[:TagValueNSize]) valueN := int64(binary.BigEndian.Uint64(hashData[:TagValueNSize])) hash := rhh.HashKey(value) pos := hash % valueN @@ -164,6 +168,7 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa var d int64 for { // Find offset of tag value. + _ = wait(limiter, hashData[TagValueNSize+(pos*TagValueOffsetSize):TagValueNSize+(pos*TagValueOffsetSize)+8]) offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):]) if offset == 0 { return false @@ -171,6 +176,7 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa // Parse into element. valueElem.unmarshal(blk.data[offset:]) + _ = wait(limiter, blk.data[offset:offset+uint64(valueElem.size)]) // Return if values match. if bytes.Equal(valueElem.value, value) { @@ -194,10 +200,11 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa } // TagKeyIterator returns an iterator over all the keys in the block. -func (blk *TagBlock) TagKeyIterator() TagKeyIterator { +func (blk *TagBlock) TagKeyIterator(limiter *mincore.Limiter) TagKeyIterator { return &tagBlockKeyIterator{ blk: blk, keyData: blk.keyData, + limiter: limiter, } } @@ -206,6 +213,7 @@ type tagBlockKeyIterator struct { blk *TagBlock keyData []byte e TagBlockKeyElem + limiter *mincore.Limiter } // Next returns the next element in the iterator. @@ -217,6 +225,7 @@ func (itr *tagBlockKeyIterator) Next() TagKeyElem { // Unmarshal next element & move data forward. itr.e.unmarshal(itr.keyData, itr.blk.data) + _ = wait(itr.limiter, itr.keyData[:itr.e.size]) itr.keyData = itr.keyData[itr.e.size:] assert(len(itr.e.Key()) > 0, "invalid zero-length tag key") @@ -225,8 +234,9 @@ func (itr *tagBlockKeyIterator) Next() TagKeyElem { // tagBlockValueIterator represents an iterator over all values for a tag key. type tagBlockValueIterator struct { - data []byte - e TagBlockValueElem + data []byte + e TagBlockValueElem + limiter *mincore.Limiter } // Next returns the next element in the iterator. @@ -239,6 +249,7 @@ func (itr *tagBlockValueIterator) Next() TagValueElem { // Unmarshal next element & move data forward. itr.e.unmarshal(itr.data) itr.data = itr.data[itr.e.size:] + _ = wait(itr.limiter, itr.data[:itr.e.size]) assert(len(itr.e.Value()) > 0, "invalid zero-length tag value") return &itr.e @@ -273,8 +284,8 @@ func (e *TagBlockKeyElem) Deleted() bool { return (e.flag & TagKeyTombstoneFlag) func (e *TagBlockKeyElem) Key() []byte { return e.key } // TagValueIterator returns an iterator over the key's values. -func (e *TagBlockKeyElem) TagValueIterator() TagValueIterator { - return &tagBlockValueIterator{data: e.data.buf} +func (e *TagBlockKeyElem) TagValueIterator(limiter *mincore.Limiter) TagValueIterator { + return &tagBlockValueIterator{data: e.data.buf, limiter: limiter} } // unmarshal unmarshals buf into e. diff --git a/tsdb/tsi1/tag_block_test.go b/tsdb/tsi1/tag_block_test.go index 664144b0d1..bf5aa0b1db 100644 --- a/tsdb/tsi1/tag_block_test.go +++ b/tsdb/tsi1/tag_block_test.go @@ -56,7 +56,7 @@ func TestTagBlockWriter(t *testing.T) { } // Verify data. - if e := blk.TagValueElem([]byte("region"), []byte("us-east")); e == nil { + if e := blk.TagValueElem([]byte("region"), []byte("us-east"), nil); e == nil { t.Fatal("expected element") } else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil { t.Fatalf("unexpected error: %v", err) @@ -64,28 +64,28 @@ func TestTagBlockWriter(t *testing.T) { t.Fatalf("unexpected series ids: %#v", a) } - if e := blk.TagValueElem([]byte("region"), []byte("us-west")); e == nil { + if e := blk.TagValueElem([]byte("region"), []byte("us-west"), nil); e == nil { t.Fatal("expected element") } else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil { t.Fatalf("unexpected error: %v", err) } else if !reflect.DeepEqual(a, []uint64{3}) { t.Fatalf("unexpected series ids: %#v", a) } - if e := blk.TagValueElem([]byte("host"), []byte("server0")); e == nil { + if e := blk.TagValueElem([]byte("host"), []byte("server0"), nil); e == nil { t.Fatal("expected element") } else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil { t.Fatalf("unexpected error: %v", err) } else if !reflect.DeepEqual(a, []uint64{1}) { t.Fatalf("unexpected series ids: %#v", a) } - if e := blk.TagValueElem([]byte("host"), []byte("server1")); e == nil { + if e := blk.TagValueElem([]byte("host"), []byte("server1"), nil); e == nil { t.Fatal("expected element") } else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil { t.Fatalf("unexpected error: %v", err) } else if !reflect.DeepEqual(a, []uint64{2}) { t.Fatalf("unexpected series ids: %#v", a) } - if e := blk.TagValueElem([]byte("host"), []byte("server2")); e == nil { + if e := blk.TagValueElem([]byte("host"), []byte("server2"), nil); e == nil { t.Fatal("expected element") } else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil { t.Fatalf("unexpected error: %v", err) @@ -149,7 +149,7 @@ func benchmarkTagBlock_SeriesN(b *testing.B, tagN, valueN int, blk **tsi1.TagBlo key, value := []byte("0"), []byte("0") for i := 0; i < b.N; i++ { - if e := (*blk).TagValueElem(key, value); e == nil { + if e := (*blk).TagValueElem(key, value, nil); e == nil { b.Fatal("expected element") } else if n := e.(*tsi1.TagBlockValueElem).SeriesN(); n != 1 { b.Fatalf("unexpected series count: %d", n) diff --git a/tsdb/tsi1/tsi1.go b/tsdb/tsi1/tsi1.go index 190349def8..b59f726bf2 100644 --- a/tsdb/tsi1/tsi1.go +++ b/tsdb/tsi1/tsi1.go @@ -6,6 +6,7 @@ import ( "fmt" "io" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/tsdb" ) @@ -143,7 +144,7 @@ func (itr *tsdbMeasurementIteratorAdapter) Next() ([]byte, error) { type TagKeyElem interface { Key() []byte Deleted() bool - TagValueIterator() TagValueIterator + TagValueIterator(*mincore.Limiter) TagValueIterator } // TagKeyIterator represents a iterator over a list of tag keys. @@ -261,14 +262,14 @@ func (p tagKeyMergeElem) Deleted() bool { } // TagValueIterator returns a merge iterator for all elements until a tombstone occurs. -func (p tagKeyMergeElem) TagValueIterator() TagValueIterator { +func (p tagKeyMergeElem) TagValueIterator(limiter *mincore.Limiter) TagValueIterator { if len(p) == 0 { return nil } a := make([]TagValueIterator, 0, len(p)) for _, e := range p { - itr := e.TagValueIterator() + itr := e.TagValueIterator(limiter) a = append(a, itr) if e.Deleted() { diff --git a/tsdb/tsi1/tsi1_test.go b/tsdb/tsi1/tsi1_test.go index b4c743d41f..1dadd1c791 100644 --- a/tsdb/tsi1/tsi1_test.go +++ b/tsdb/tsi1/tsi1_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/pkg/mincore" "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/seriesfile" "github.com/influxdata/influxdb/v2/tsdb/tsi1" @@ -203,9 +204,9 @@ type TagKeyElem struct { deleted bool } -func (e *TagKeyElem) Key() []byte { return e.key } -func (e *TagKeyElem) Deleted() bool { return e.deleted } -func (e *TagKeyElem) TagValueIterator() tsi1.TagValueIterator { return nil } +func (e *TagKeyElem) Key() []byte { return e.key } +func (e *TagKeyElem) Deleted() bool { return e.deleted } +func (e *TagKeyElem) TagValueIterator(_ *mincore.Limiter) tsi1.TagValueIterator { return nil } // TagKeyIterator represents an iterator over a slice of tag keys. type TagKeyIterator struct {