influxdb/tsdb/series_index.go

374 lines
9.6 KiB
Go

package tsdb
import (
"bytes"
"encoding/binary"
"errors"
"io"
"os"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/mmap"
"github.com/influxdata/influxdb/v2/pkg/rhh"
)
const (
SeriesIndexVersion = 1
SeriesIndexMagic = "SIDX"
)
const (
SeriesIndexElemSize = 16 // offset + id
SeriesIndexLoadFactor = 90 // rhh load factor
SeriesIndexHeaderSize = 0 +
4 + 1 + // magic + version
8 + 8 + // max series + max offset
8 + 8 + // count + capacity
8 + 8 + // key/id map offset & size
8 + 8 + // id/offset map offset & size
0
)
var ErrInvalidSeriesIndex = errors.New("invalid series index")
// SeriesIndex represents an index of key-to-id & id-to-offset mappings.
type SeriesIndex struct {
path string
count uint64
capacity int64
mask int64
maxSeriesID uint64
maxOffset int64
data []byte // mmap data
keyIDData []byte // key/id mmap data
idOffsetData []byte // id/offset mmap data
// In-memory data since rebuild.
keyIDMap *rhh.HashMap
idOffsetMap map[uint64]int64
tombstones map[uint64]struct{}
}
func NewSeriesIndex(path string) *SeriesIndex {
return &SeriesIndex{
path: path,
}
}
// Open memory-maps the index file.
func (idx *SeriesIndex) Open() (err error) {
// Map data file, if it exists.
if err := func() error {
if _, err := os.Stat(idx.path); err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
if idx.data, err = mmap.Map(idx.path, 0); err != nil {
return err
}
hdr, err := ReadSeriesIndexHeader(idx.data)
if err != nil {
return err
}
idx.count, idx.capacity, idx.mask = hdr.Count, hdr.Capacity, hdr.Capacity-1
idx.maxSeriesID, idx.maxOffset = hdr.MaxSeriesID, hdr.MaxOffset
idx.keyIDData = idx.data[hdr.KeyIDMap.Offset : hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size]
idx.idOffsetData = idx.data[hdr.IDOffsetMap.Offset : hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size]
}
return nil
}(); err != nil {
idx.Close()
return err
}
idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
idx.idOffsetMap = make(map[uint64]int64)
idx.tombstones = make(map[uint64]struct{})
return nil
}
// Close unmaps the index file.
func (idx *SeriesIndex) Close() (err error) {
if idx.data != nil {
err = mmap.Unmap(idx.data)
}
idx.keyIDData = nil
idx.idOffsetData = nil
idx.keyIDMap = nil
idx.idOffsetMap = nil
idx.tombstones = nil
return err
}
// Recover rebuilds the in-memory index for all new entries.
func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
// Allocate new in-memory maps.
idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
idx.idOffsetMap = make(map[uint64]int64)
idx.tombstones = make(map[uint64]struct{})
// Process all entries since the maximum offset in the on-disk index.
minSegmentID, _ := SplitSeriesOffset(idx.maxOffset)
for _, segment := range segments {
if segment.ID() < minSegmentID {
continue
}
if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
if offset <= idx.maxOffset {
return nil
}
idx.execEntry(flag, id, offset, key)
return nil
}); err != nil {
return err
}
}
return nil
}
// Count returns the number of series in the index.
func (idx *SeriesIndex) Count() uint64 {
return idx.OnDiskCount() + idx.InMemCount()
}
// OnDiskCount returns the number of series in the on-disk index.
func (idx *SeriesIndex) OnDiskCount() uint64 { return idx.count }
// InMemCount returns the number of series in the in-memory index.
func (idx *SeriesIndex) InMemCount() uint64 { return uint64(len(idx.idOffsetMap)) }
func (idx *SeriesIndex) Insert(key []byte, id uint64, offset int64) {
idx.execEntry(SeriesEntryInsertFlag, id, offset, key)
}
// Delete marks the series id as deleted.
func (idx *SeriesIndex) Delete(id uint64) {
idx.execEntry(SeriesEntryTombstoneFlag, id, 0, nil)
}
// IsDeleted returns true if series id has been deleted.
func (idx *SeriesIndex) IsDeleted(id uint64) bool {
if _, ok := idx.tombstones[id]; ok {
return true
}
return idx.FindOffsetByID(id) == 0
}
func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {
switch flag {
case SeriesEntryInsertFlag:
idx.keyIDMap.Put(key, id)
idx.idOffsetMap[id] = offset
if id > idx.maxSeriesID {
idx.maxSeriesID = id
}
if offset > idx.maxOffset {
idx.maxOffset = offset
}
case SeriesEntryTombstoneFlag:
idx.tombstones[id] = struct{}{}
default:
panic("unreachable")
}
}
func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 {
if v := idx.keyIDMap.Get(key); v != nil {
if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) {
return id
}
}
if len(idx.data) == 0 {
return 0
}
hash := rhh.HashKey(key)
for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {
elem := idx.keyIDData[(pos * SeriesIndexElemSize):]
elemOffset := int64(binary.BigEndian.Uint64(elem[:8]))
if elemOffset == 0 {
return 0
}
elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize)
elemHash := rhh.HashKey(elemKey)
if d > rhh.Dist(elemHash, pos, idx.capacity) {
return 0
} else if elemHash == hash && bytes.Equal(elemKey, key) {
id := binary.BigEndian.Uint64(elem[8:])
if idx.IsDeleted(id) {
return 0
}
return id
}
}
}
func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) uint64 {
id := idx.FindIDBySeriesKey(segments, AppendSeriesKey(buf[:0], name, tags))
if _, ok := idx.tombstones[id]; ok {
return 0
}
return id
}
func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, ok bool) {
ids, ok = make([]uint64, len(names)), true
for i := range names {
id := idx.FindIDByNameTags(segments, names[i], tagsSlice[i], buf)
if id == 0 {
ok = false
continue
}
ids[i] = id
}
return ids, ok
}
func (idx *SeriesIndex) FindOffsetByID(id uint64) int64 {
if offset := idx.idOffsetMap[id]; offset != 0 {
return offset
} else if len(idx.data) == 0 {
return 0
}
hash := rhh.HashUint64(id)
for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {
elem := idx.idOffsetData[(pos * SeriesIndexElemSize):]
elemID := binary.BigEndian.Uint64(elem[:8])
if elemID == id {
return int64(binary.BigEndian.Uint64(elem[8:]))
} else if elemID == 0 || d > rhh.Dist(rhh.HashUint64(elemID), pos, idx.capacity) {
return 0
}
}
}
// Clone returns a copy of idx for use during compaction. In-memory maps are not cloned.
func (idx *SeriesIndex) Clone() *SeriesIndex {
tombstones := make(map[uint64]struct{}, len(idx.tombstones))
for id := range idx.tombstones {
tombstones[id] = struct{}{}
}
idOffsetMap := make(map[uint64]int64)
for k, v := range idx.idOffsetMap {
idOffsetMap[k] = v
}
return &SeriesIndex{
path: idx.path,
count: idx.count,
capacity: idx.capacity,
mask: idx.mask,
maxSeriesID: idx.maxSeriesID,
maxOffset: idx.maxOffset,
data: idx.data,
keyIDData: idx.keyIDData,
idOffsetData: idx.idOffsetData,
tombstones: tombstones,
idOffsetMap: idOffsetMap,
}
}
// SeriesIndexHeader represents the header of a series index.
type SeriesIndexHeader struct {
Version uint8
MaxSeriesID uint64
MaxOffset int64
Count uint64
Capacity int64
KeyIDMap struct {
Offset int64
Size int64
}
IDOffsetMap struct {
Offset int64
Size int64
}
}
// NewSeriesIndexHeader returns a new instance of SeriesIndexHeader.
func NewSeriesIndexHeader() SeriesIndexHeader {
return SeriesIndexHeader{Version: SeriesIndexVersion}
}
// ReadSeriesIndexHeader returns the header from data.
func ReadSeriesIndexHeader(data []byte) (hdr SeriesIndexHeader, err error) {
r := bytes.NewReader(data)
// Read magic number.
magic := make([]byte, len(SeriesIndexMagic))
if _, err := io.ReadFull(r, magic); err != nil {
return hdr, err
} else if !bytes.Equal([]byte(SeriesIndexMagic), magic) {
return hdr, ErrInvalidSeriesIndex
}
// Read version.
if err := binary.Read(r, binary.BigEndian, &hdr.Version); err != nil {
return hdr, err
}
// Read max offset.
if err := binary.Read(r, binary.BigEndian, &hdr.MaxSeriesID); err != nil {
return hdr, err
} else if err := binary.Read(r, binary.BigEndian, &hdr.MaxOffset); err != nil {
return hdr, err
}
// Read count & capacity.
if err := binary.Read(r, binary.BigEndian, &hdr.Count); err != nil {
return hdr, err
} else if err := binary.Read(r, binary.BigEndian, &hdr.Capacity); err != nil {
return hdr, err
}
// Read key/id map position.
if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Offset); err != nil {
return hdr, err
} else if err := binary.Read(r, binary.BigEndian, &hdr.KeyIDMap.Size); err != nil {
return hdr, err
}
// Read offset/id map position.
if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Offset); err != nil {
return hdr, err
} else if err := binary.Read(r, binary.BigEndian, &hdr.IDOffsetMap.Size); err != nil {
return hdr, err
}
return hdr, nil
}
// WriteTo writes the header to w.
func (hdr *SeriesIndexHeader) WriteTo(w io.Writer) (n int64, err error) {
var buf bytes.Buffer
buf.WriteString(SeriesIndexMagic)
binary.Write(&buf, binary.BigEndian, hdr.Version)
binary.Write(&buf, binary.BigEndian, hdr.MaxSeriesID)
binary.Write(&buf, binary.BigEndian, hdr.MaxOffset)
binary.Write(&buf, binary.BigEndian, hdr.Count)
binary.Write(&buf, binary.BigEndian, hdr.Capacity)
binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Offset)
binary.Write(&buf, binary.BigEndian, hdr.KeyIDMap.Size)
binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Offset)
binary.Write(&buf, binary.BigEndian, hdr.IDOffsetMap.Size)
return buf.WriteTo(w)
}