fix: series file index compaction (#23916)

Series file indices monotonically grew even
when series were deleted.  Also stop 
ignoring error in series index recovery

Partially closes https://github.com/influxdata/EAR/issues/3643
pull/24270/head
davidby-influx 2023-06-01 10:49:23 -07:00 committed by GitHub
parent ab85be01a2
commit 53856cdaae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 60 deletions

View File

@ -4,12 +4,14 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/pkg/rhh"
"go.uber.org/zap"
@ -86,7 +88,7 @@ func (p *SeriesPartition) Open() error {
p.index = NewSeriesIndex(p.IndexPath())
if err := p.index.Open(); err != nil {
return err
} else if p.index.Recover(p.segments); err != nil {
} else if err = p.index.Recover(p.segments); err != nil {
return err
}
@ -573,94 +575,108 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
return nil
}
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
var errDone error = errors.New("done")
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) (err error) {
hdr := NewSeriesIndexHeader()
hdr.Count = seriesN
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)
var keyIDMap []byte
var idOffsetMap []byte
// Allocate space for maps.
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
hdr.Count = math.MaxUint64
// seriesN is the current size of the index. Because it may contain tombstones
// for deleted series, we recalculate that number (as seriesCount) without the
// deleted series as we rebuild the index. If the count of existing series does
// not equal the seriesN passed in (meaning there were tombstones), we rebuild
// the index a second time with the correct size.
seriesCount := seriesN
for {
seriesN = seriesCount
seriesCount = uint64(0)
// This only loops if there are deleted entries, which shrinks the size
hdr.Capacity = pow2((int64(seriesN) * 100) / SeriesIndexLoadFactor)
// Allocate space for maps, guaranteeing slices are initialized to zero
keyIDMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)
idOffsetMap = make([]byte, hdr.Capacity*SeriesIndexElemSize)
// Reindex all partitions.
var entryN int
for _, segment := range segments {
errDone := errors.New("done")
// Reindex all partitions.
var entryN int
for _, segment := range segments {
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
if err = segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag:
// does not fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
}
seriesCount++
// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
break
} else if err != nil {
return err
}
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
}
// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
}
hdr.Count = seriesCount
if seriesN != seriesCount {
continue
} else {
break
} else if err != nil {
return err
}
}
// Open file handler.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
defer errors2.Capture(&err, f.Close)()
// Calculate map positions.
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))
// Write header.
if _, err := hdr.WriteTo(f); err != nil {
if _, err = hdr.WriteTo(f); err != nil {
return err
}
// Write maps.
if _, err := f.Write(keyIDMap); err != nil {
if _, err = f.Write(keyIDMap); err != nil {
return err
} else if _, err := f.Write(idOffsetMap); err != nil {
return err
}
// Sync & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
return nil
// Sync, then deferred close
return f.Sync()
}
func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {