influxdb/tsdb/series_partition.go

743 lines
18 KiB
Go

package tsdb
import (
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/limiter"
"github.com/influxdata/influxdb/v2/pkg/rhh"
"go.uber.org/zap"
)
var (
ErrSeriesPartitionClosed = errors.New("tsdb: series partition closed")
ErrSeriesPartitionCompactionCancelled = errors.New("tsdb: series partition compaction cancelled")
)
// DefaultSeriesPartitionCompactThreshold is the number of series IDs to hold in the in-memory
// series map before compacting and rebuilding the on-disk representation.
const DefaultSeriesPartitionCompactThreshold = 1 << 17 // 128K
// SeriesPartition represents a subset of series file data.
type SeriesPartition struct {
mu sync.RWMutex
wg sync.WaitGroup
id int
path string
closed bool
closing chan struct{}
once sync.Once
segments []*SeriesSegment
index *SeriesIndex
seq uint64 // series id sequence
compacting bool
compactionLimiter limiter.Fixed
compactionsDisabled int
CompactThreshold int
Logger *zap.Logger
}
// NewSeriesPartition returns a new instance of SeriesPartition.
func NewSeriesPartition(id int, path string, compactionLimiter limiter.Fixed) *SeriesPartition {
return &SeriesPartition{
id: id,
path: path,
closing: make(chan struct{}),
compactionLimiter: compactionLimiter,
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
Logger: zap.NewNop(),
seq: uint64(id) + 1,
}
}
// Open memory maps the data file at the partition's path.
func (p *SeriesPartition) Open() error {
if p.closed {
return errors.New("tsdb: cannot reopen series partition")
}
// Create path if it doesn't exist.
if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil {
return err
}
// Open components.
if err := func() (err error) {
if err := p.openSegments(); err != nil {
return err
}
// Init last segment for writes.
if err := p.activeSegment().InitForWrite(); err != nil {
return err
}
p.index = NewSeriesIndex(p.IndexPath())
if err := p.index.Open(); err != nil {
return err
} else if err := p.index.Recover(p.segments); err != nil {
return err
}
return nil
}(); err != nil {
p.Close()
return err
}
return nil
}
func (p *SeriesPartition) openSegments() error {
des, err := os.ReadDir(p.path)
if err != nil {
return err
}
for _, de := range des {
segmentID, err := ParseSeriesSegmentFilename(de.Name())
if err != nil {
continue
}
segment := NewSeriesSegment(segmentID, filepath.Join(p.path, de.Name()))
if err := segment.Open(); err != nil {
return err
}
p.segments = append(p.segments, segment)
}
// Find max series id by searching segments in reverse order.
for i := len(p.segments) - 1; i >= 0; i-- {
if seq := p.segments[i].MaxSeriesID(); seq >= p.seq {
// Reset our sequence num to the next one to assign
p.seq = seq + SeriesFilePartitionN
break
}
}
// Create initial segment if none exist.
if len(p.segments) == 0 {
segment, err := CreateSeriesSegment(0, filepath.Join(p.path, "0000"))
if err != nil {
return err
}
p.segments = append(p.segments, segment)
}
return nil
}
// Close unmaps the data files.
func (p *SeriesPartition) Close() (err error) {
p.once.Do(func() { close(p.closing) })
p.wg.Wait()
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
for _, s := range p.segments {
if e := s.Close(); e != nil && err == nil {
err = e
}
}
p.segments = nil
if p.index != nil {
if e := p.index.Close(); e != nil && err == nil {
err = e
}
}
p.index = nil
return err
}
// ID returns the partition id.
func (p *SeriesPartition) ID() int { return p.id }
// Path returns the path to the partition.
func (p *SeriesPartition) Path() string { return p.path }
// IndexPath returns the path to the series index.
func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") }
// Index returns the partition's index.
func (p *SeriesPartition) Index() *SeriesIndex { return p.index }
// Segments returns a list of partition segments. Used for testing.
func (p *SeriesPartition) Segments() []*SeriesSegment { return p.segments }
// FileSize returns the size of all partitions, in bytes.
func (p *SeriesPartition) FileSize() (n int64, err error) {
for _, ss := range p.segments {
fi, err := os.Stat(ss.Path())
if err != nil {
return 0, err
}
n += fi.Size()
}
return n, err
}
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The ids parameter is modified to contain series IDs for all keys belonging to this partition.
func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error {
var writeRequired bool
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return ErrSeriesPartitionClosed
}
for i := range keys {
if keyPartitionIDs[i] != p.id {
continue
}
id := p.index.FindIDBySeriesKey(p.segments, keys[i])
if id == 0 {
writeRequired = true
continue
}
ids[i] = id
}
p.mu.RUnlock()
// Exit if all series for this partition already exist.
if !writeRequired {
return nil
}
type keyRange struct {
id uint64
offset int64
}
newKeyRanges := make([]keyRange, 0, len(keys))
// Obtain write lock to create new series.
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrSeriesPartitionClosed
}
// Track offsets of duplicate series.
newIDs := make(map[string]uint64, len(ids))
for i := range keys {
// Skip series that don't belong to the partition or have already been created.
if keyPartitionIDs[i] != p.id || ids[i] != 0 {
continue
}
// Re-attempt lookup under write lock.
key := keys[i]
if ids[i] = newIDs[string(key)]; ids[i] != 0 {
continue
} else if ids[i] = p.index.FindIDBySeriesKey(p.segments, key); ids[i] != 0 {
continue
}
// Write to series log and save offset.
id, offset, err := p.insert(key)
if err != nil {
return err
}
// Append new key to be added to hash map after flush.
ids[i] = id
newIDs[string(key)] = id
newKeyRanges = append(newKeyRanges, keyRange{id, offset})
}
// Flush active segment writes so we can access data in mmap.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
}
}
// Add keys to hash map(s).
for _, keyRange := range newKeyRanges {
p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
}
// Check if we've crossed the compaction threshold.
if p.compactionsEnabled() && !p.compacting &&
p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) &&
p.compactionLimiter.TryTake() {
p.compacting = true
log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path))
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.compactionLimiter.Release()
compactor := NewSeriesPartitionCompactor()
compactor.cancel = p.closing
if err := compactor.Compact(p); err != nil {
log.Error("series partition compaction failed", zap.Error(err))
}
logEnd()
// Clear compaction flag.
p.mu.Lock()
p.compacting = false
p.mu.Unlock()
}()
}
return nil
}
// Compacting returns if the SeriesPartition is currently compacting.
func (p *SeriesPartition) Compacting() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.compacting
}
// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrSeriesPartitionClosed
}
// Already tombstoned, ignore.
if p.index.IsDeleted(id) {
return nil
}
// Write tombstone entry.
_, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil))
if err != nil {
return err
}
// Flush active segment write.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
}
}
// Mark tombstone in memory.
p.index.Delete(id)
return nil
}
// IsDeleted returns true if the ID has been deleted before.
func (p *SeriesPartition) IsDeleted(id uint64) bool {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return false
}
v := p.index.IsDeleted(id)
p.mu.RUnlock()
return v
}
// SeriesKey returns the series key for a given id.
func (p *SeriesPartition) SeriesKey(id uint64) []byte {
if id == 0 {
return nil
}
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return nil
}
key := p.seriesKeyByOffset(p.index.FindOffsetByID(id))
p.mu.RUnlock()
return key
}
// Series returns the parsed series name and tags for an offset.
func (p *SeriesPartition) Series(id uint64) ([]byte, models.Tags) {
key := p.SeriesKey(id)
if key == nil {
return nil, nil
}
return ParseSeriesKey(key)
}
// FindIDBySeriesKey return the series id for the series key.
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) uint64 {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return 0
}
id := p.index.FindIDBySeriesKey(p.segments, key)
p.mu.RUnlock()
return id
}
// SeriesCount returns the number of series.
func (p *SeriesPartition) SeriesCount() uint64 {
p.mu.RLock()
if p.closed {
p.mu.RUnlock()
return 0
}
n := p.index.Count()
p.mu.RUnlock()
return n
}
func (p *SeriesPartition) DisableCompactions() {
p.mu.Lock()
defer p.mu.Unlock()
p.compactionsDisabled++
}
func (p *SeriesPartition) EnableCompactions() {
p.mu.Lock()
defer p.mu.Unlock()
if p.compactionsEnabled() {
return
}
p.compactionsDisabled--
}
func (p *SeriesPartition) compactionsEnabled() bool {
return p.compactionLimiter != nil && p.compactionsDisabled == 0
}
// AppendSeriesIDs returns a list of all series ids.
func (p *SeriesPartition) AppendSeriesIDs(a []uint64) []uint64 {
for _, segment := range p.segments {
a = segment.AppendSeriesIDs(a)
}
return a
}
// activeSegment returns the last segment.
func (p *SeriesPartition) activeSegment() *SeriesSegment {
if len(p.segments) == 0 {
return nil
}
return p.segments[len(p.segments)-1]
}
func (p *SeriesPartition) insert(key []byte) (id uint64, offset int64, err error) {
id = p.seq
offset, err = p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key))
if err != nil {
return 0, 0, err
}
p.seq += SeriesFilePartitionN
return id, offset, nil
}
// writeLogEntry appends an entry to the end of the active segment.
// If there is no more room in the segment then a new segment is added.
func (p *SeriesPartition) writeLogEntry(data []byte) (offset int64, err error) {
segment := p.activeSegment()
if segment == nil || !segment.CanWrite(data) {
if segment, err = p.createSegment(); err != nil {
return 0, err
}
}
return segment.WriteLogEntry(data)
}
// createSegment appends a new segment
func (p *SeriesPartition) createSegment() (*SeriesSegment, error) {
// Close writer for active segment, if one exists.
if segment := p.activeSegment(); segment != nil {
if err := segment.CloseForWrite(); err != nil {
return nil, err
}
}
// Generate a new sequential segment identifier.
var id uint16
if len(p.segments) > 0 {
id = p.segments[len(p.segments)-1].ID() + 1
}
filename := fmt.Sprintf("%04x", id)
// Generate new empty segment.
segment, err := CreateSeriesSegment(id, filepath.Join(p.path, filename))
if err != nil {
return nil, err
}
p.segments = append(p.segments, segment)
// Allow segment to write.
if err := segment.InitForWrite(); err != nil {
return nil, err
}
return segment, nil
}
func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
if offset == 0 {
return nil
}
segmentID, pos := SplitSeriesOffset(offset)
for _, segment := range p.segments {
if segment.ID() != segmentID {
continue
}
key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize))
return key
}
return nil
}
// SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments.
type SeriesPartitionCompactor struct {
cancel <-chan struct{}
}
// NewSeriesPartitionCompactor returns a new instance of SeriesPartitionCompactor.
func NewSeriesPartitionCompactor() *SeriesPartitionCompactor {
return &SeriesPartitionCompactor{}
}
// Compact rebuilds the series partition index.
func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
// Snapshot the partitions and index so we can check tombstones and replay at the end under lock.
p.mu.RLock()
segments := CloneSeriesSegments(p.segments)
index := p.index.Clone()
seriesN := p.index.Count()
p.mu.RUnlock()
// Compact index to a temporary location.
indexPath := index.path + ".compacting"
if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil {
return err
}
// Swap compacted index under lock & replay since compaction.
if err := func() error {
p.mu.Lock()
defer p.mu.Unlock()
// Reopen index with new file.
if err := p.index.Close(); err != nil {
return err
} else if err := os.Rename(indexPath, index.path); err != nil {
return err
} else if err := p.index.Open(); err != nil {
return err
}
// Replay new entries.
if err := p.index.Recover(p.segments); err != nil {
return err
}
return nil
}(); err != nil {
return err
}
return nil
}
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
hdr := NewSeriesIndexHeader()
hdr.Count = seriesN
hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)
// Allocate space for maps.
keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
// Reindex all partitions.
var entryN int
for _, segment := range segments {
errDone := errors.New("done")
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
// Make sure we don't go past the offset where the compaction began.
if offset > index.maxOffset {
return errDone
}
// Check for cancellation periodically.
if entryN++; entryN%1000 == 0 {
select {
case <-c.cancel:
return ErrSeriesPartitionCompactionCancelled
default:
}
}
// Only process insert entries.
switch flag {
case SeriesEntryInsertFlag: // fallthrough
case SeriesEntryTombstoneFlag:
return nil
default:
return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
}
// Save max series identifier processed.
hdr.MaxSeriesID, hdr.MaxOffset = id, offset
// Ignore entry if tombstoned.
if index.IsDeleted(id) {
return nil
}
// Insert into maps.
c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
}); err == errDone {
break
} else if err != nil {
return err
}
}
// Open file handler.
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Calculate map positions.
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))
// Write header.
if _, err := hdr.WriteTo(f); err != nil {
return err
}
// Write maps.
if _, err := f.Write(keyIDMap); err != nil {
return err
} else if _, err := f.Write(idOffsetMap); err != nil {
return err
}
// Sync & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
return nil
}
func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, segments []*SeriesSegment, key []byte, offset int64, id uint64) error {
mask := capacity - 1
hash := rhh.HashKey(key)
// Continue searching until we find an empty slot or lower probe distance.
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
assert(i <= capacity, "key/id map full")
elem := dst[(pos * SeriesIndexElemSize):]
// If empty slot found or matching offset, insert and exit.
elemOffset := int64(binary.BigEndian.Uint64(elem[:8]))
elemID := binary.BigEndian.Uint64(elem[8:])
if elemOffset == 0 || elemOffset == offset {
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
binary.BigEndian.PutUint64(elem[8:], id)
return nil
}
// Read key at position & hash.
elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize)
elemHash := rhh.HashKey(elemKey)
// If the existing elem has probed less than us, then swap places with
// existing elem, and keep going to find another slot for that elem.
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
// Insert current values.
binary.BigEndian.PutUint64(elem[:8], uint64(offset))
binary.BigEndian.PutUint64(elem[8:], id)
// Swap with values in that position.
_, _, offset, id = elemHash, elemKey, elemOffset, elemID
// Update current distance.
dist = d
}
}
}
func (c *SeriesPartitionCompactor) insertIDOffsetMap(dst []byte, capacity int64, id uint64, offset int64) {
mask := capacity - 1
hash := rhh.HashUint64(id)
// Continue searching until we find an empty slot or lower probe distance.
for i, dist, pos := int64(0), int64(0), hash&mask; ; i, dist, pos = i+1, dist+1, (pos+1)&mask {
assert(i <= capacity, "id/offset map full")
elem := dst[(pos * SeriesIndexElemSize):]
// If empty slot found or matching id, insert and exit.
elemID := binary.BigEndian.Uint64(elem[:8])
elemOffset := int64(binary.BigEndian.Uint64(elem[8:]))
if elemOffset == 0 || elemOffset == offset {
binary.BigEndian.PutUint64(elem[:8], id)
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
return
}
// Hash key.
elemHash := rhh.HashUint64(elemID)
// If the existing elem has probed less than us, then swap places with
// existing elem, and keep going to find another slot for that elem.
if d := rhh.Dist(elemHash, pos, capacity); d < dist {
// Insert current values.
binary.BigEndian.PutUint64(elem[:8], id)
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
// Swap with values in that position.
_, id, offset = elemHash, elemID, elemOffset
// Update current distance.
dist = d
}
}
}
// pow2 returns the number that is the next highest power of 2.
// Returns v if it is a power of 2.
func pow2(v int64) int64 {
for i := int64(2); i < 1<<62; i *= 2 {
if i >= v {
return i
}
}
panic("unreachable")
}