feat(tsi1): Add optional mincore limiter to TSI

pull/19018/head
Ben Johnson 2020-07-22 10:12:27 -06:00
parent 3c6b728702
commit 3cc2638bbf
17 changed files with 182 additions and 67 deletions

View File

@ -34,6 +34,10 @@ type Limiter struct {
// NewLimiter returns a new instance of Limiter associated with an mmap.
// The underlying limiter can be shared to limit faults across the entire process.
func NewLimiter(underlying *rate.Limiter, data []byte) *Limiter {
if underlying == nil {
return nil
}
return &Limiter{
underlying: underlying,
data: data,

View File

@ -171,6 +171,8 @@ func WithWritePointsValidationEnabled(v bool) Option {
func WithPageFaultLimiter(limiter *rate.Limiter) Option {
return func(e *Engine) {
e.engine.WithPageFaultLimiter(limiter)
e.index.WithPageFaultLimiter(limiter)
e.sfile.WithPageFaultLimiter(limiter)
}
}

View File

@ -17,12 +17,14 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/binaryutil"
"github.com/influxdata/influxdb/v2/pkg/lifecycle"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/rhh"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
var (
@ -50,6 +52,8 @@ type SeriesFile struct {
defaultMetricLabels prometheus.Labels
metricsEnabled bool
pageFaultLimiter *rate.Limiter // Limits page faults by the series file
LargeWriteThreshold int
Logger *zap.Logger
@ -86,6 +90,11 @@ func (f *SeriesFile) DisableMetrics() {
f.metricsEnabled = false
}
// WithPageFaultLimiter sets a limiter to restrict the number of page faults.
func (f *SeriesFile) WithPageFaultLimiter(limiter *rate.Limiter) {
f.pageFaultLimiter = limiter
}
// Open memory maps the data file at the file's path.
func (f *SeriesFile) Open(ctx context.Context) error {
f.mu.Lock()
@ -129,6 +138,7 @@ func (f *SeriesFile) Open(ctx context.Context) error {
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
p.LargeWriteThreshold = f.LargeWriteThreshold
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
p.pageFaultLimiter = f.pageFaultLimiter
// For each series file index, rhh trackers are used to track the RHH Hashmap.
// Each of the trackers needs to be given slightly different default
@ -608,3 +618,11 @@ func SeriesKeySize(name []byte, tags models.Tags) int {
n += binaryutil.UvarintSize(uint64(n))
return n
}
// wait rate limits page faults to the underlying data. Skipped if limiter is not set.
func wait(limiter *mincore.Limiter, b []byte) error {
if limiter == nil {
return nil
}
return limiter.WaitRange(context.Background(), b)
}

View File

@ -8,10 +8,12 @@ import (
"os"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/mmap"
"github.com/influxdata/influxdb/v2/pkg/rhh"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)
const (
@ -63,6 +65,8 @@ type SeriesIndex struct {
keyIDMap *rhh.HashMap
idOffsetMap map[tsdb.SeriesID]int64
tombstones map[tsdb.SeriesID]struct{}
limiter *mincore.Limiter // Limits page faults by the partition
}
func NewSeriesIndex(path string) *SeriesIndex {
@ -124,6 +128,12 @@ func (idx *SeriesIndex) Close() (err error) {
return err
}
// SetPageFaultLimiter sets the limiter used for rate limiting page faults.
// Must be called after Open().
func (idx *SeriesIndex) SetPageFaultLimiter(limiter *rate.Limiter) {
idx.limiter = mincore.NewLimiter(limiter, idx.data)
}
// Recover rebuilds the in-memory index for all new entries.
func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
// Allocate new in-memory maps.
@ -247,6 +257,7 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte)
for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {
elem := idx.keyIDData[(pos * SeriesIndexElemSize):]
elemOffset := int64(binary.BigEndian.Uint64(elem[:SeriesOffsetSize]))
_ = wait(idx.limiter, elem[:SeriesOffsetSize]) // elem size is two uint64s
if elemOffset == 0 {
return tsdb.SeriesIDTyped{}
@ -298,6 +309,7 @@ func (idx *SeriesIndex) FindOffsetByID(id tsdb.SeriesID) int64 {
for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {
elem := idx.idOffsetData[(pos * SeriesIndexElemSize):]
elemID := tsdb.NewSeriesID(binary.BigEndian.Uint64(elem[:SeriesIDSize]))
_ = wait(idx.limiter, elem[:SeriesIDSize])
if elemID == id {
return int64(binary.BigEndian.Uint64(elem[SeriesIDSize:]))

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
var (
@ -48,6 +49,8 @@ type SeriesPartition struct {
compacting bool
compactionsDisabled int
pageFaultLimiter *rate.Limiter // Limits page faults by the partition
CompactThreshold int
LargeWriteThreshold int
@ -94,7 +97,10 @@ func (p *SeriesPartition) Open() error {
if err := p.index.Open(); err != nil {
return err
} else if err = p.index.Recover(p.segments); err != nil {
}
p.index.SetPageFaultLimiter(p.pageFaultLimiter)
if err = p.index.Recover(p.segments); err != nil {
return err
}
return nil
@ -124,6 +130,7 @@ func (p *SeriesPartition) openSegments() error {
if err := segment.Open(); err != nil {
return err
}
segment.SetPageFaultLimiter(p.pageFaultLimiter)
p.segments = append(p.segments, segment)
}
@ -142,6 +149,7 @@ func (p *SeriesPartition) openSegments() error {
if err != nil {
return err
}
segment.SetPageFaultLimiter(p.pageFaultLimiter)
p.segments = append(p.segments, segment)
}
@ -569,6 +577,7 @@ func (p *SeriesPartition) createSegment() (*SeriesSegment, error) {
if err != nil {
return nil, err
}
segment.SetPageFaultLimiter(p.pageFaultLimiter)
p.segments = append(p.segments, segment)
// Allow segment to write.
@ -591,7 +600,9 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
continue
}
key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize))
buf := segment.Slice(pos + SeriesEntryHeaderSize)
key, _ := ReadSeriesKey(buf)
_ = wait(segment.limiter, buf[:len(key)])
return key
}
@ -769,7 +780,10 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) (time.Duration, e
return err
} else if err := fs.RenameFileWithReplacement(indexPath, index.path); err != nil {
return err
} else if err := p.index.Open(); err != nil {
}
p.index.SetPageFaultLimiter(p.pageFaultLimiter)
if err := p.index.Open(); err != nil {
return err
}

View File

@ -12,8 +12,10 @@ import (
"strconv"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/mmap"
"github.com/influxdata/influxdb/v2/tsdb"
"golang.org/x/time/rate"
)
const (
@ -47,6 +49,8 @@ type SeriesSegment struct {
file *os.File // write file handle
w *bufio.Writer // bufferred file handle
size uint32 // current file size
limiter *mincore.Limiter
}
// NewSeriesSegment returns a new instance of SeriesSegment.
@ -125,6 +129,7 @@ func (s *SeriesSegment) InitForWrite() (err error) {
if !IsValidSeriesEntryFlag(flag) {
break
}
_ = wait(s.limiter, s.data[s.size:int64(s.size)+sz])
s.size += uint32(sz)
}
@ -172,6 +177,12 @@ func (s *SeriesSegment) CloseForWrite() (err error) {
return err
}
// SetPageFaultLimiter sets the limiter used for rate limiting page faults.
// Must be called after Open().
func (s *SeriesSegment) SetPageFaultLimiter(limiter *rate.Limiter) {
s.limiter = mincore.NewLimiter(limiter, s.data)
}
// Data returns the raw data.
func (s *SeriesSegment) Data() []byte { return s.data }
@ -247,6 +258,7 @@ func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id tsdb.SeriesIDTyped,
if !IsValidSeriesEntryFlag(flag) {
break
}
_ = wait(s.limiter, s.data[pos:int64(pos)+sz])
offset := JoinSeriesOffset(s.id, pos)
if err := fn(flag, id, offset, key); err != nil {
@ -335,6 +347,7 @@ func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte {
}
buf := segment.Slice(pos)
key, _ := ReadSeriesKey(buf)
_ = wait(segment.limiter, buf[:len(key)])
return key
}

View File

@ -21,6 +21,7 @@ import (
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/lifecycle"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/slices"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/tsdb"
@ -29,6 +30,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
// ErrCompactionInterrupted is returned if compactions are disabled or
@ -115,13 +117,14 @@ type Index struct {
metricsEnabled bool
// The following may be set when initializing an Index.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
logger *zap.Logger // Index's logger.
config Config // The index configuration
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
pageFaultLimiter *rate.Limiter // Limits page faults by the index.
logger *zap.Logger // Index's logger.
config Config // The index configuration
// The following must be set when initializing an Index.
sfile *seriesfile.SeriesFile // series lookup file
@ -162,6 +165,11 @@ func NewIndex(sfile *seriesfile.SeriesFile, c Config, options ...IndexOption) *I
return idx
}
// WithPageFaultLimiter sets a limiter to restrict the number of page faults.
func (i *Index) WithPageFaultLimiter(limiter *rate.Limiter) {
i.pageFaultLimiter = limiter
}
// SetDefaultMetricLabels sets the default labels on the trackers.
func (i *Index) SetDefaultMetricLabels(labels prometheus.Labels) {
i.defaultLabels = make(prometheus.Labels, len(labels))
@ -253,6 +261,7 @@ func (i *Index) Open(ctx context.Context) error {
p.StatsTTL = i.StatsTTL
p.nosync = i.disableFsync
p.logbufferSize = i.logfileBufferSize
p.pageFaultLimiter = i.pageFaultLimiter
p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
// Each of the trackers needs to be given slightly different default
@ -1686,3 +1695,11 @@ type DropSeriesItem struct {
SeriesID tsdb.SeriesID
Key []byte
}
// wait rate limits page faults to the underlying data. Skipped if limiter is not set.
func wait(limiter *mincore.Limiter, b []byte) error {
if limiter == nil {
return nil
}
return limiter.WaitRange(context.Background(), b)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/lifecycle"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/mmap"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
@ -73,6 +74,8 @@ type IndexFile struct {
// Path to data file.
path string
pageFaultLimiter *mincore.Limiter
}
// NewIndexFile returns a new instance of IndexFile.
@ -215,7 +218,7 @@ func (f *IndexFile) UnmarshalBinary(data []byte) error {
// Unmarshal each tag block.
f.tblks = make(map[string]*TagBlock)
itr := f.mblk.Iterator()
itr := f.mblk.Iterator(f.pageFaultLimiter)
for m := itr.Next(); m != nil; m = itr.Next() {
e := m.(*MeasurementBlockElem)
@ -243,7 +246,7 @@ func (f *IndexFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
if err := ss.UnmarshalBinary(f.seriesIDSetData); err != nil {
return nil, err
}
return ss, nil
return ss, wait(f.pageFaultLimiter, f.seriesIDSetData)
}
func (f *IndexFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
@ -251,12 +254,12 @@ func (f *IndexFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
if err := ss.UnmarshalBinaryUnsafe(f.tombstoneSeriesIDSetData); err != nil {
return nil, err
}
return ss, nil
return ss, wait(f.pageFaultLimiter, f.tombstoneSeriesIDSetData)
}
// Measurement returns a measurement element.
func (f *IndexFile) Measurement(name []byte) MeasurementElem {
e, ok := f.mblk.Elem(name)
e, ok := f.mblk.Elem(name, f.pageFaultLimiter)
if !ok {
return nil
}
@ -265,7 +268,7 @@ func (f *IndexFile) Measurement(name []byte) MeasurementElem {
// MeasurementN returns the number of measurements in the file.
func (f *IndexFile) MeasurementN() (n uint64) {
mitr := f.mblk.Iterator()
mitr := f.mblk.Iterator(f.pageFaultLimiter)
for me := mitr.Next(); me != nil; me = mitr.Next() {
n++
}
@ -274,7 +277,7 @@ func (f *IndexFile) MeasurementN() (n uint64) {
// MeasurementHasSeries returns true if a measurement has any non-tombstoned series.
func (f *IndexFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) (ok bool) {
e, ok := f.mblk.Elem(name)
e, ok := f.mblk.Elem(name, f.pageFaultLimiter)
if !ok {
return false
}
@ -299,13 +302,13 @@ func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator {
}
// Find key element.
ke := tblk.TagKeyElem(key)
ke := tblk.TagKeyElem(key, f.pageFaultLimiter)
if ke == nil {
return nil
}
// Merge all value series iterators together.
return ke.TagValueIterator()
return ke.TagValueIterator(f.pageFaultLimiter)
}
// TagKeySeriesIDIterator returns a series iterator for a tag key and a flag
@ -317,13 +320,13 @@ func (f *IndexFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDItera
}
// Find key element.
ke := tblk.TagKeyElem(key)
ke := tblk.TagKeyElem(key, f.pageFaultLimiter)
if ke == nil {
return nil, nil
}
// Merge all value series iterators together.
vitr := ke.TagValueIterator()
vitr := ke.TagValueIterator(f.pageFaultLimiter)
var itrs []tsdb.SeriesIDIterator
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
@ -351,7 +354,7 @@ func (f *IndexFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesID
// Find value element.
var valueElem TagBlockValueElem
if !tblk.DecodeTagValueElem(key, value, &valueElem) {
if !tblk.DecodeTagValueElem(key, value, &valueElem, f.pageFaultLimiter) {
return nil, nil
} else if valueElem.SeriesN() == 0 {
return nil, nil
@ -365,7 +368,7 @@ func (f *IndexFile) TagKey(name, key []byte) TagKeyElem {
if tblk == nil {
return nil
}
return tblk.TagKeyElem(key)
return tblk.TagKeyElem(key, f.pageFaultLimiter)
}
// TagValue returns a tag value.
@ -374,7 +377,7 @@ func (f *IndexFile) TagValue(name, key, value []byte) TagValueElem {
if tblk == nil {
return nil
}
return tblk.TagValueElem(key, value)
return tblk.TagValueElem(key, value, f.pageFaultLimiter)
}
// HasSeries returns flags indicating if the series exists and if it is tombstoned.
@ -388,12 +391,12 @@ func (f *IndexFile) TagValueElem(name, key, value []byte) TagValueElem {
if !ok {
return nil
}
return tblk.TagValueElem(key, value)
return tblk.TagValueElem(key, value, f.pageFaultLimiter)
}
// MeasurementIterator returns an iterator over all measurements.
func (f *IndexFile) MeasurementIterator() MeasurementIterator {
return f.mblk.Iterator()
return f.mblk.Iterator(f.pageFaultLimiter)
}
// TagKeyIterator returns an iterator over all tag keys for a measurement.
@ -402,13 +405,12 @@ func (f *IndexFile) TagKeyIterator(name []byte) TagKeyIterator {
if blk == nil {
return nil
}
return blk.TagKeyIterator()
return blk.TagKeyIterator(f.pageFaultLimiter)
}
// MeasurementSeriesIDIterator returns an iterator over a measurement's series.
func (f *IndexFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
return f.mblk.SeriesIDIterator(name)
return f.mblk.SeriesIDIterator(name, f.pageFaultLimiter)
}
// ReadIndexFileTrailer returns the index file trailer from data.

View File

@ -284,7 +284,7 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn
}
// Iterate over tag values.
vitr := ke.TagValueIterator()
vitr := ke.TagValueIterator(nil)
for ve := vitr.Next(); ve != nil; ve = vitr.Next() {
seriesIDs = seriesIDs[:0]

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bloom"
"github.com/influxdata/influxdb/v2/pkg/lifecycle"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/mmap"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
@ -453,7 +454,7 @@ func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
return nil
}
return tk.TagValueIterator()
return tk.TagValueIterator(nil)
}
// deleteTagKey adds a tombstone for a tag key to the log file without a lock.
@ -1390,7 +1391,7 @@ func (tk *logTagKey) bytes() int {
func (tk *logTagKey) Key() []byte { return tk.name }
func (tk *logTagKey) Deleted() bool { return tk.deleted }
func (tk *logTagKey) TagValueIterator() TagValueIterator {
func (tk *logTagKey) TagValueIterator(_ *mincore.Limiter) TagValueIterator {
a := make([]logTagValue, 0, len(tk.tagValues))
for _, v := range tk.tagValues {
a = append(a, v)

View File

@ -8,6 +8,7 @@ import (
"sort"
"unsafe"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/rhh"
"github.com/influxdata/influxdb/v2/tsdb"
)
@ -70,7 +71,8 @@ func (blk *MeasurementBlock) bytes() int {
func (blk *MeasurementBlock) Version() int { return blk.version }
// Elem returns an element for a measurement.
func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) {
func (blk *MeasurementBlock) Elem(name []byte, limiter *mincore.Limiter) (e MeasurementBlockElem, ok bool) {
_ = wait(limiter, blk.hashData[:MeasurementNSize])
n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize]))
hash := rhh.HashKey(name)
pos := hash % n
@ -79,6 +81,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool)
var d int64
for {
// Find offset of measurement.
_ = wait(limiter, blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):MeasurementNSize+(pos*MeasurementOffsetSize)+8])
offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):])
if offset == 0 {
return MeasurementBlockElem{}, false
@ -88,6 +91,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool)
if offset > 0 {
// Parse into element.
var e MeasurementBlockElem
_ = wait(limiter, blk.hashData[offset:offset+1])
e.UnmarshalBinary(blk.data[offset:])
// Return if name match.
@ -132,18 +136,22 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
}
// Iterator returns an iterator over all measurements.
func (blk *MeasurementBlock) Iterator() MeasurementIterator {
return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]}
func (blk *MeasurementBlock) Iterator(limiter *mincore.Limiter) MeasurementIterator {
return &blockMeasurementIterator{
data: blk.data[MeasurementFillSize:],
limiter: limiter,
}
}
// SeriesIDIterator returns an iterator for all series ids in a measurement.
func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
func (blk *MeasurementBlock) SeriesIDIterator(name []byte, limiter *mincore.Limiter) tsdb.SeriesIDIterator {
// Find measurement element.
e, ok := blk.Elem(name)
e, ok := blk.Elem(name, limiter)
if !ok {
return &rawSeriesIDIterator{}
}
if e.seriesIDSet != nil {
_ = wait(limiter, e.seriesIDSetData)
return tsdb.NewSeriesIDSetIterator(e.seriesIDSet)
}
return &rawSeriesIDIterator{n: e.series.n, data: e.series.data}
@ -153,6 +161,8 @@ func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator
type blockMeasurementIterator struct {
elem MeasurementBlockElem
data []byte
limiter *mincore.Limiter
}
// Next returns the next measurement. Returns nil when iterator is complete.
@ -164,6 +174,7 @@ func (itr *blockMeasurementIterator) Next() MeasurementElem {
// Unmarshal the element at the current position.
itr.elem.UnmarshalBinary(itr.data)
_ = wait(itr.limiter, itr.data[:itr.elem.size])
// Move the data forward past the record.
itr.data = itr.data[itr.elem.size:]
@ -304,7 +315,8 @@ type MeasurementBlockElem struct {
data []byte // serialized series data
}
seriesIDSet *tsdb.SeriesIDSet
seriesIDSet *tsdb.SeriesIDSet
seriesIDSetData []byte
// size in bytes, set after unmarshaling.
size int
@ -420,6 +432,7 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
} else {
// data = memalign(data)
e.seriesIDSet = tsdb.NewSeriesIDSet()
e.seriesIDSetData = data[:sz]
if err = e.seriesIDSet.UnmarshalBinaryUnsafe(data[:sz]); err != nil {
return err
}

View File

@ -117,7 +117,7 @@ func TestMeasurementBlockWriter(t *testing.T) {
}
// Verify data in block.
if e, ok := blk.Elem([]byte("foo")); !ok {
if e, ok := blk.Elem([]byte("foo"), nil); !ok {
t.Fatal("expected element")
} else if e.TagBlockOffset() != 100 || e.TagBlockSize() != 10 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize())
@ -125,7 +125,7 @@ func TestMeasurementBlockWriter(t *testing.T) {
t.Fatalf("unexpected series data: %#v", e.SeriesIDs())
}
if e, ok := blk.Elem([]byte("bar")); !ok {
if e, ok := blk.Elem([]byte("bar"), nil); !ok {
t.Fatal("expected element")
} else if e.TagBlockOffset() != 200 || e.TagBlockSize() != 20 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize())
@ -133,7 +133,7 @@ func TestMeasurementBlockWriter(t *testing.T) {
t.Fatalf("unexpected series data: %#v", e.SeriesIDs())
}
if e, ok := blk.Elem([]byte("baz")); !ok {
if e, ok := blk.Elem([]byte("baz"), nil); !ok {
t.Fatal("expected element")
} else if e.TagBlockOffset() != 300 || e.TagBlockSize() != 30 {
t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize())
@ -142,7 +142,7 @@ func TestMeasurementBlockWriter(t *testing.T) {
}
// Verify non-existent measurement doesn't exist.
if _, ok := blk.Elem([]byte("BAD_MEASUREMENT")); ok {
if _, ok := blk.Elem([]byte("BAD_MEASUREMENT"), nil); ok {
t.Fatal("expected no element")
}
}

View File

@ -20,11 +20,13 @@ import (
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/pkg/lifecycle"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// Version is the current version of the TSI index.
@ -86,6 +88,8 @@ type Partition struct {
nosync bool // when true, flushing and syncing of LogFile will be disabled.
logbufferSize int // the LogFile's buffer is set to this value.
pageFaultLimiter *rate.Limiter
logger *zap.Logger
// Current size of MANIFEST. Used to determine partition size.
@ -294,7 +298,6 @@ func (p *Partition) openLogFile(path string) (*LogFile, error) {
f := NewLogFile(p.sfile, path)
f.nosync = p.nosync
f.bufferSize = p.logbufferSize
if err := f.Open(); err != nil {
return nil, err
}
@ -308,6 +311,7 @@ func (p *Partition) openIndexFile(path string) (*IndexFile, error) {
if err := f.Open(); err != nil {
return nil, err
}
f.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, f.data)
return f, nil
}
@ -628,7 +632,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
}
// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
if vitr := k.TagValueIterator(nil); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
@ -1095,6 +1099,7 @@ func (p *Partition) compactToLevel(files []*IndexFile, frefs lifecycle.Reference
log.Error("Cannot open new index file", zap.Error(err))
return
}
file.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, file.data)
// Obtain lock to swap in index file and write manifest.
if err = func() error {
@ -1262,6 +1267,7 @@ func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interr
log.Error("Cannot open compacted index file", zap.Error(err), zap.String("path", file.Path()))
return
}
file.pageFaultLimiter = mincore.NewLimiter(p.pageFaultLimiter, file.data)
// Obtain lock to swap in index file and write manifest.
if err := func() error {

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/pkg/rhh"
"github.com/influxdata/influxdb/v2/tsdb"
)
@ -91,15 +92,15 @@ func (blk *TagBlock) UnmarshalBinary(data []byte) error {
// TagKeyElem returns an element for a tag key.
// Returns an element with a nil key if not found.
func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
func (blk *TagBlock) TagKeyElem(key []byte, limiter *mincore.Limiter) TagKeyElem {
var elem TagBlockKeyElem
if !blk.DecodeTagKeyElem(key, &elem) {
if !blk.DecodeTagKeyElem(key, &elem, limiter) {
return nil
}
return &elem
}
func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem, limiter *mincore.Limiter) bool {
keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize]))
hash := rhh.HashKey(key)
pos := hash % keyN
@ -108,6 +109,7 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
var d int64
for {
// Find offset of tag key.
_ = wait(limiter, blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):TagKeyNSize+(pos*TagKeyOffsetSize)+8])
offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
if offset == 0 {
return false
@ -115,6 +117,7 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
// Parse into element.
elem.unmarshal(blk.data[offset:], blk.data)
_ = wait(limiter, blk.data[offset:offset+uint64(elem.size)])
// Return if keys match.
if bytes.Equal(elem.key, key) {
@ -137,25 +140,26 @@ func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
}
// TagValueElem returns an element for a tag value.
func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
func (blk *TagBlock) TagValueElem(key, value []byte, limiter *mincore.Limiter) TagValueElem {
var valueElem TagBlockValueElem
if !blk.DecodeTagValueElem(key, value, &valueElem) {
if !blk.DecodeTagValueElem(key, value, &valueElem, limiter) {
return nil
}
return &valueElem
}
// DecodeTagValueElem returns an element for a tag value.
func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool {
func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem, limiter *mincore.Limiter) bool {
// Find key element, exit if not found.
var keyElem TagBlockKeyElem
if !blk.DecodeTagKeyElem(key, &keyElem) {
if !blk.DecodeTagKeyElem(key, &keyElem, limiter) {
return false
}
// Slice hash index data.
hashData := keyElem.hashIndex.buf
_ = wait(limiter, hashData[:TagValueNSize])
valueN := int64(binary.BigEndian.Uint64(hashData[:TagValueNSize]))
hash := rhh.HashKey(value)
pos := hash % valueN
@ -164,6 +168,7 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa
var d int64
for {
// Find offset of tag value.
_ = wait(limiter, hashData[TagValueNSize+(pos*TagValueOffsetSize):TagValueNSize+(pos*TagValueOffsetSize)+8])
offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):])
if offset == 0 {
return false
@ -171,6 +176,7 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa
// Parse into element.
valueElem.unmarshal(blk.data[offset:])
_ = wait(limiter, blk.data[offset:offset+uint64(valueElem.size)])
// Return if values match.
if bytes.Equal(valueElem.value, value) {
@ -194,10 +200,11 @@ func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockVa
}
// TagKeyIterator returns an iterator over all the keys in the block.
func (blk *TagBlock) TagKeyIterator() TagKeyIterator {
func (blk *TagBlock) TagKeyIterator(limiter *mincore.Limiter) TagKeyIterator {
return &tagBlockKeyIterator{
blk: blk,
keyData: blk.keyData,
limiter: limiter,
}
}
@ -206,6 +213,7 @@ type tagBlockKeyIterator struct {
blk *TagBlock
keyData []byte
e TagBlockKeyElem
limiter *mincore.Limiter
}
// Next returns the next element in the iterator.
@ -217,6 +225,7 @@ func (itr *tagBlockKeyIterator) Next() TagKeyElem {
// Unmarshal next element & move data forward.
itr.e.unmarshal(itr.keyData, itr.blk.data)
_ = wait(itr.limiter, itr.keyData[:itr.e.size])
itr.keyData = itr.keyData[itr.e.size:]
assert(len(itr.e.Key()) > 0, "invalid zero-length tag key")
@ -225,8 +234,9 @@ func (itr *tagBlockKeyIterator) Next() TagKeyElem {
// tagBlockValueIterator represents an iterator over all values for a tag key.
type tagBlockValueIterator struct {
data []byte
e TagBlockValueElem
data []byte
e TagBlockValueElem
limiter *mincore.Limiter
}
// Next returns the next element in the iterator.
@ -239,6 +249,7 @@ func (itr *tagBlockValueIterator) Next() TagValueElem {
// Unmarshal next element & move data forward.
itr.e.unmarshal(itr.data)
itr.data = itr.data[itr.e.size:]
_ = wait(itr.limiter, itr.data[:itr.e.size])
assert(len(itr.e.Value()) > 0, "invalid zero-length tag value")
return &itr.e
@ -273,8 +284,8 @@ func (e *TagBlockKeyElem) Deleted() bool { return (e.flag & TagKeyTombstoneFlag)
func (e *TagBlockKeyElem) Key() []byte { return e.key }
// TagValueIterator returns an iterator over the key's values.
func (e *TagBlockKeyElem) TagValueIterator() TagValueIterator {
return &tagBlockValueIterator{data: e.data.buf}
func (e *TagBlockKeyElem) TagValueIterator(limiter *mincore.Limiter) TagValueIterator {
return &tagBlockValueIterator{data: e.data.buf, limiter: limiter}
}
// unmarshal unmarshals buf into e.

View File

@ -56,7 +56,7 @@ func TestTagBlockWriter(t *testing.T) {
}
// Verify data.
if e := blk.TagValueElem([]byte("region"), []byte("us-east")); e == nil {
if e := blk.TagValueElem([]byte("region"), []byte("us-east"), nil); e == nil {
t.Fatal("expected element")
} else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil {
t.Fatalf("unexpected error: %v", err)
@ -64,28 +64,28 @@ func TestTagBlockWriter(t *testing.T) {
t.Fatalf("unexpected series ids: %#v", a)
}
if e := blk.TagValueElem([]byte("region"), []byte("us-west")); e == nil {
if e := blk.TagValueElem([]byte("region"), []byte("us-west"), nil); e == nil {
t.Fatal("expected element")
} else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !reflect.DeepEqual(a, []uint64{3}) {
t.Fatalf("unexpected series ids: %#v", a)
}
if e := blk.TagValueElem([]byte("host"), []byte("server0")); e == nil {
if e := blk.TagValueElem([]byte("host"), []byte("server0"), nil); e == nil {
t.Fatal("expected element")
} else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !reflect.DeepEqual(a, []uint64{1}) {
t.Fatalf("unexpected series ids: %#v", a)
}
if e := blk.TagValueElem([]byte("host"), []byte("server1")); e == nil {
if e := blk.TagValueElem([]byte("host"), []byte("server1"), nil); e == nil {
t.Fatal("expected element")
} else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !reflect.DeepEqual(a, []uint64{2}) {
t.Fatalf("unexpected series ids: %#v", a)
}
if e := blk.TagValueElem([]byte("host"), []byte("server2")); e == nil {
if e := blk.TagValueElem([]byte("host"), []byte("server2"), nil); e == nil {
t.Fatal("expected element")
} else if a, err := e.(*tsi1.TagBlockValueElem).SeriesIDs(); err != nil {
t.Fatalf("unexpected error: %v", err)
@ -149,7 +149,7 @@ func benchmarkTagBlock_SeriesN(b *testing.B, tagN, valueN int, blk **tsi1.TagBlo
key, value := []byte("0"), []byte("0")
for i := 0; i < b.N; i++ {
if e := (*blk).TagValueElem(key, value); e == nil {
if e := (*blk).TagValueElem(key, value, nil); e == nil {
b.Fatal("expected element")
} else if n := e.(*tsi1.TagBlockValueElem).SeriesN(); n != 1 {
b.Fatalf("unexpected series count: %d", n)

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/tsdb"
)
@ -143,7 +144,7 @@ func (itr *tsdbMeasurementIteratorAdapter) Next() ([]byte, error) {
type TagKeyElem interface {
Key() []byte
Deleted() bool
TagValueIterator() TagValueIterator
TagValueIterator(*mincore.Limiter) TagValueIterator
}
// TagKeyIterator represents a iterator over a list of tag keys.
@ -261,14 +262,14 @@ func (p tagKeyMergeElem) Deleted() bool {
}
// TagValueIterator returns a merge iterator for all elements until a tombstone occurs.
func (p tagKeyMergeElem) TagValueIterator() TagValueIterator {
func (p tagKeyMergeElem) TagValueIterator(limiter *mincore.Limiter) TagValueIterator {
if len(p) == 0 {
return nil
}
a := make([]TagValueIterator, 0, len(p))
for _, e := range p {
itr := e.TagValueIterator()
itr := e.TagValueIterator(limiter)
a = append(a, itr)
if e.Deleted() {

View File

@ -10,6 +10,7 @@ import (
"testing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/mincore"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/seriesfile"
"github.com/influxdata/influxdb/v2/tsdb/tsi1"
@ -203,9 +204,9 @@ type TagKeyElem struct {
deleted bool
}
func (e *TagKeyElem) Key() []byte { return e.key }
func (e *TagKeyElem) Deleted() bool { return e.deleted }
func (e *TagKeyElem) TagValueIterator() tsi1.TagValueIterator { return nil }
func (e *TagKeyElem) Key() []byte { return e.key }
func (e *TagKeyElem) Deleted() bool { return e.deleted }
func (e *TagKeyElem) TagValueIterator(_ *mincore.Limiter) tsi1.TagValueIterator { return nil }
// TagKeyIterator represents an iterator over a slice of tag keys.
type TagKeyIterator struct {