tsm1: refactor and rename some methods

pull/10616/head
Jeff Wendling 2019-01-08 14:50:47 -07:00
parent 8744a82665
commit f712828016
7 changed files with 445 additions and 427 deletions

View File

@ -228,7 +228,7 @@ func (f FileStat) OverlapsKeyRange(min, max []byte) bool {
}
// ContainsKey returns true if the min and max keys of the file overlap the arguments min and max.
func (f FileStat) ContainsKey(key []byte) bool {
func (f FileStat) MaybeContainsKey(key []byte) bool {
return bytes.Compare(f.MinKey, key) >= 0 || bytes.Compare(key, f.MaxKey) <= 0
}

View File

@ -2,14 +2,11 @@ package tsm1
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"github.com/influxdata/platform/pkg/file"
"go.uber.org/zap"
)
@ -87,7 +84,7 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
}
t.index = index
t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
t.tombstoner = NewTombstoner(t.Path(), index.MaybeContainsKey)
if err := t.applyTombstones(); err != nil {
return nil, err
@ -294,53 +291,53 @@ func (t *TSMReader) Contains(key []byte) bool {
return t.index.Contains(key)
}
// ContainsValue returns true if key and time might exists in this file. This function could
// return true even though the actual point does not exist. For example, the key may
// MaybeContainsValue returns true if key and time might exists in this file. This function
// could return true even though the actual point does not exist. For example, the key may
// exist in this file, but not have a point exactly at time t.
func (t *TSMReader) ContainsValue(key []byte, ts int64) bool {
return t.index.ContainsValue(key, ts)
func (t *TSMReader) MaybeContainsValue(key []byte, ts int64) bool {
return t.index.MaybeContainsValue(key, ts)
}
// DeleteRange removes the given points for keys between minTime and maxTime. The series
// DeleteRange removes the given points for keys between minTime and maxTime. The series
// keys passed in must be sorted.
func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
if len(keys) == 0 {
if !t.index.DeleteRange(keys, minTime, maxTime) {
return nil
}
batch := t.BatchDelete()
if err := batch.DeleteRange(keys, minTime, maxTime); err != nil {
batch.Rollback()
if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
return err
}
return batch.Commit()
if err := t.tombstoner.Flush(); err != nil {
return err
}
return nil
}
// DeletePrefix removes the given points for keys beginning with prefix.
func (t *TSMReader) DeletePrefix(prefix []byte, minTime, maxTime int64) error {
if !t.index.DeletePrefix(prefix, minTime, maxTime) {
return nil
}
if err := t.tombstoner.AddPrefixRange(prefix, minTime, maxTime); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
t.index.DeletePrefix(prefix, minTime, maxTime)
return nil
}
// Delete deletes blocks indicated by keys.
func (t *TSMReader) Delete(keys [][]byte) error {
if !t.index.Delete(keys) {
return nil
}
if err := t.tombstoner.Add(keys); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
t.index.Delete(keys)
return nil
}
@ -561,302 +558,3 @@ func (a BatchDeleters) Rollback() error {
}
return err
}
// mmapAccess is mmap based block accessor. It access blocks through an
// MMAP file interface.
type mmapAccessor struct {
accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
freeCount uint64 // Counter to determine whether the accessor can free its resources
logger *zap.Logger
mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
mu sync.RWMutex
b []byte
f *os.File
index *indirectIndex
}
func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
if err := verifyVersion(m.f); err != nil {
return nil, err
}
var err error
if _, err := m.f.Seek(0, 0); err != nil {
return nil, err
}
stat, err := m.f.Stat()
if err != nil {
return nil, err
}
m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
}
// Hint to the kernel that we will be reading the file. It would be better to hint
// that we will be reading the index section, but that's not been
// implemented as yet.
if m.mmapWillNeed {
if err := madviseWillNeed(m.b); err != nil {
return nil, err
}
}
indexOfsPos := len(m.b) - 8
indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
if indexStart >= uint64(indexOfsPos) {
return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
}
m.index = NewIndirectIndex()
if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
return nil, err
}
m.index.logger = m.logger
// Allow resources to be freed immediately if requested
m.incAccess()
atomic.StoreUint64(&m.freeCount, 1)
return m.index, nil
}
func (m *mmapAccessor) free() error {
accessCount := atomic.LoadUint64(&m.accessCount)
freeCount := atomic.LoadUint64(&m.freeCount)
// Already freed everything.
if freeCount == 0 && accessCount == 0 {
return nil
}
// Were there accesses after the last time we tried to free?
// If so, don't free anything and record the access count that we
// see now for the next check.
if accessCount != freeCount {
atomic.StoreUint64(&m.freeCount, accessCount)
return nil
}
// Reset both counters to zero to indicate that we have freed everything.
atomic.StoreUint64(&m.accessCount, 0)
atomic.StoreUint64(&m.freeCount, 0)
m.mu.RLock()
defer m.mu.RUnlock()
return madviseDontNeed(m.b)
}
func (m *mmapAccessor) incAccess() {
atomic.AddUint64(&m.accessCount, 1)
}
func (m *mmapAccessor) rename(path string) error {
m.incAccess()
m.mu.Lock()
defer m.mu.Unlock()
err := munmap(m.b)
if err != nil {
return err
}
if err := m.f.Close(); err != nil {
return err
}
if err := file.RenameFile(m.f.Name(), path); err != nil {
return err
}
m.f, err = os.Open(path)
if err != nil {
return err
}
if _, err := m.f.Seek(0, 0); err != nil {
return err
}
stat, err := m.f.Stat()
if err != nil {
return err
}
m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return err
}
if m.mmapWillNeed {
return madviseWillNeed(m.b)
}
return nil
}
func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
entry := m.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
return m.readBlock(entry, nil)
}
func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
m.incAccess()
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
return nil, ErrTSMClosed
}
//TODO: Validate checksum
var err error
values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
if err != nil {
return nil, err
}
return values, nil
}
func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
m.incAccess()
m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return 0, nil, ErrTSMClosed
}
// return the bytes after the 4 byte checksum
crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]
m.mu.RUnlock()
return crc, block, nil
}
// readAll returns all values for a key in all blocks.
func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
m.incAccess()
blocks, err := m.index.ReadEntries(key, nil)
if len(blocks) == 0 || err != nil {
return nil, err
}
tombstones := m.index.TombstoneRange(key, nil)
m.mu.RLock()
defer m.mu.RUnlock()
var temp []Value
var values []Value
for _, block := range blocks {
var skip bool
for _, t := range tombstones {
// Should we skip this block because it contains points that have been deleted
if t.Min <= block.MinTime && t.Max >= block.MaxTime {
skip = true
break
}
}
if skip {
continue
}
//TODO: Validate checksum
temp = temp[:0]
// The +4 is the 4 byte checksum length
temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
if err != nil {
return nil, err
}
// Filter out any values that were deleted
for _, t := range tombstones {
temp = Values(temp).Exclude(t.Min, t.Max)
}
values = append(values, temp...)
}
return values, nil
}
func (m *mmapAccessor) path() string {
m.mu.RLock()
path := m.f.Name()
m.mu.RUnlock()
return path
}
func (m *mmapAccessor) close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.b == nil {
return nil
}
err := munmap(m.b)
if err != nil {
return err
}
m.b = nil
return m.f.Close()
}
type indexEntries struct {
Type byte
entries []IndexEntry
}
func (a *indexEntries) Len() int { return len(a.entries) }
func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
func (a *indexEntries) Less(i, j int) bool {
return a.entries[i].MinTime < a.entries[j].MinTime
}
func (a *indexEntries) MarshalBinary() ([]byte, error) {
buf := make([]byte, len(a.entries)*indexEntrySize)
for i, entry := range a.entries {
entry.AppendTo(buf[indexEntrySize*i:])
}
return buf, nil
}
func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
var buf [indexEntrySize]byte
var n int
for _, entry := range a.entries {
entry.AppendTo(buf[:])
n, err = w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}
}
return total, nil
}

View File

@ -15,27 +15,28 @@ import (
// TSMIndex represent the index section of a TSM file. The index records all
// blocks, their locations, sizes, min and max times.
type TSMIndex interface {
// Delete removes the given keys from the index.
Delete(keys [][]byte)
// Delete removes the given keys from the index. Returns true if there were any changes.
Delete(keys [][]byte) bool
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
DeleteRange(keys [][]byte, minTime, maxTime int64)
// Returns true if there were any changes.
DeleteRange(keys [][]byte, minTime, maxTime int64) bool
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index.
DeletePrefix(prefix []byte, minTime, maxTime int64)
// maxTime from the index. Returns true if there were any changes.
DeletePrefix(prefix []byte, minTime, maxTime int64) bool
// ContainsKey returns true if the given key may exist in the index. This func is faster than
// MaybeContainsKey returns true if the given key may exist in the index. This is faster than
// Contains but, may return false positives.
ContainsKey(key []byte) bool
MaybeContainsKey(key []byte) bool
// Contains return true if the given key exists in the index.
Contains(key []byte) bool
// ContainsValue returns true if key and time might exist in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// MaybeContainsValue returns true if key and time might exist in this file. This function
// could return true even though the actual point does not exists. For example, the key may
// exist in this file, but not have a point exactly at time t.
ContainsValue(key []byte, timestamp int64) bool
MaybeContainsValue(key []byte, timestamp int64) bool
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error)
@ -154,8 +155,8 @@ func NewIndirectIndex() *indirectIndex {
}
}
// ContainsKey returns true of key may exist in this index.
func (d *indirectIndex) ContainsKey(key []byte) bool {
// MaybeContainsKey returns true of key may exist in this index.
func (d *indirectIndex) MaybeContainsKey(key []byte) bool {
return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
}
@ -222,9 +223,9 @@ func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator {
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
func (d *indirectIndex) Delete(keys [][]byte) bool {
if len(keys) == 0 {
return
return false
}
d.mu.RLock()
@ -242,12 +243,14 @@ func (d *indirectIndex) Delete(keys [][]byte) {
d.mu.RUnlock()
if !iter.HasDeletes() {
return
return false
}
d.mu.Lock()
iter.Done()
d.mu.Unlock()
return true
}
// insertTimeRange adds a time range described by the minTime and maxTime into ts.
@ -300,16 +303,15 @@ func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) bool {
// If we're deleting everything, we won't need to worry about partial deletes.
if minTime <= d.minTime && maxTime >= d.maxTime {
d.Delete(keys)
return
return d.Delete(keys)
}
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
return false
}
// General outline:
@ -380,7 +382,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
d.mu.RUnlock()
if len(pending) == 0 && !iter.HasDeletes() {
return
return false
}
d.mu.Lock()
@ -429,15 +431,18 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
}
iter.Done()
return true
}
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index.
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) bool {
// If we're deleting everything, we won't need to worry about partial deletes.
partial := !(minTime <= d.minTime && maxTime >= d.maxTime)
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
return false
}
d.mu.RLock()
@ -461,10 +466,12 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
} else if !iter.Next() {
break
}
if !bytes.HasPrefix(iter.Key(&d.b), prefix) {
first = false
key := iter.Key(&d.b)
if !bytes.HasPrefix(key, prefix) {
break
}
first = false
// if we're not doing a partial delete, we don't need to read the entries and
// can just delete the key and move on.
@ -509,7 +516,7 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
// Check and abort if nothing needs to be done.
if !mustTrack && !iter.HasDeletes() {
return
return false
}
d.mu.Lock()
@ -522,6 +529,8 @@ func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
if iter.HasDeletes() {
iter.Done()
}
return true
}
// TombstoneRange returns ranges of time that are deleted for the given key.
@ -546,8 +555,8 @@ func (d *indirectIndex) Contains(key []byte) bool {
return exact
}
// ContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
// MaybeContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) MaybeContainsValue(key []byte, timestamp int64) bool {
d.mu.RLock()
defer d.mu.RUnlock()

View File

@ -108,49 +108,49 @@ func TestIndirectIndex_DeleteRange(t *testing.T) {
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), true)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), true)
check(t, ind.ContainsValue([]byte("cpu2"), 5), true)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), true)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 0, 5)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), true)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 15, 20)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), false)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), false)
}
func TestIndirectIndex_DeletePrefix(t *testing.T) {
@ -173,49 +173,49 @@ func TestIndirectIndex_DeletePrefix(t *testing.T) {
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), true)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), true)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cp"), 0, 5)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cpu"), 15, 20)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu1"), 16), false)
check(t, ind.Contains([]byte("cpu2")), false)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 4), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 5), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 10), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 15), false)
check(t, ind.MaybeContainsValue([]byte("cpu2"), 16), false)
}
func TestIndirectIndex_DeletePrefix_NoMatch(t *testing.T) {
@ -234,7 +234,7 @@ func TestIndirectIndex_DeletePrefix_NoMatch(t *testing.T) {
ind.DeletePrefix([]byte("d"), 5, 5)
check(t, ind.Contains([]byte("cpu")), true)
check(t, ind.ContainsValue([]byte("cpu"), 5), true)
check(t, ind.MaybeContainsValue([]byte("cpu"), 5), true)
}
//

274
tsdb/tsm1/reader_mmap.go Normal file
View File

@ -0,0 +1,274 @@
package tsm1
import (
"encoding/binary"
"fmt"
"os"
"sync"
"sync/atomic"
"github.com/influxdata/platform/pkg/file"
"go.uber.org/zap"
)
// mmapAccess is mmap based block accessor. It access blocks through an
// MMAP file interface.
type mmapAccessor struct {
accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
freeCount uint64 // Counter to determine whether the accessor can free its resources
logger *zap.Logger
mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
mu sync.RWMutex
b []byte
f *os.File
index *indirectIndex
}
func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
if err := verifyVersion(m.f); err != nil {
return nil, err
}
var err error
if _, err := m.f.Seek(0, 0); err != nil {
return nil, err
}
stat, err := m.f.Stat()
if err != nil {
return nil, err
}
m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
}
// Hint to the kernel that we will be reading the file. It would be better to hint
// that we will be reading the index section, but that's not been
// implemented as yet.
if m.mmapWillNeed {
if err := madviseWillNeed(m.b); err != nil {
return nil, err
}
}
indexOfsPos := len(m.b) - 8
indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
if indexStart >= uint64(indexOfsPos) {
return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
}
m.index = NewIndirectIndex()
if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
return nil, err
}
m.index.logger = m.logger
// Allow resources to be freed immediately if requested
m.incAccess()
atomic.StoreUint64(&m.freeCount, 1)
return m.index, nil
}
func (m *mmapAccessor) free() error {
accessCount := atomic.LoadUint64(&m.accessCount)
freeCount := atomic.LoadUint64(&m.freeCount)
// Already freed everything.
if freeCount == 0 && accessCount == 0 {
return nil
}
// Were there accesses after the last time we tried to free?
// If so, don't free anything and record the access count that we
// see now for the next check.
if accessCount != freeCount {
atomic.StoreUint64(&m.freeCount, accessCount)
return nil
}
// Reset both counters to zero to indicate that we have freed everything.
atomic.StoreUint64(&m.accessCount, 0)
atomic.StoreUint64(&m.freeCount, 0)
m.mu.RLock()
defer m.mu.RUnlock()
return madviseDontNeed(m.b)
}
func (m *mmapAccessor) incAccess() {
atomic.AddUint64(&m.accessCount, 1)
}
func (m *mmapAccessor) rename(path string) error {
m.incAccess()
m.mu.Lock()
defer m.mu.Unlock()
err := munmap(m.b)
if err != nil {
return err
}
if err := m.f.Close(); err != nil {
return err
}
if err := file.RenameFile(m.f.Name(), path); err != nil {
return err
}
m.f, err = os.Open(path)
if err != nil {
return err
}
if _, err := m.f.Seek(0, 0); err != nil {
return err
}
stat, err := m.f.Stat()
if err != nil {
return err
}
m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return err
}
if m.mmapWillNeed {
return madviseWillNeed(m.b)
}
return nil
}
func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
entry := m.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
return m.readBlock(entry, nil)
}
func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
m.incAccess()
m.mu.RLock()
defer m.mu.RUnlock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
return nil, ErrTSMClosed
}
//TODO: Validate checksum
var err error
values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
if err != nil {
return nil, err
}
return values, nil
}
func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
m.incAccess()
m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return 0, nil, ErrTSMClosed
}
// return the bytes after the 4 byte checksum
crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]
m.mu.RUnlock()
return crc, block, nil
}
// readAll returns all values for a key in all blocks.
func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
m.incAccess()
blocks, err := m.index.ReadEntries(key, nil)
if len(blocks) == 0 || err != nil {
return nil, err
}
tombstones := m.index.TombstoneRange(key, nil)
m.mu.RLock()
defer m.mu.RUnlock()
var temp []Value
var values []Value
for _, block := range blocks {
var skip bool
for _, t := range tombstones {
// Should we skip this block because it contains points that have been deleted
if t.Min <= block.MinTime && t.Max >= block.MaxTime {
skip = true
break
}
}
if skip {
continue
}
//TODO: Validate checksum
temp = temp[:0]
// The +4 is the 4 byte checksum length
temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
if err != nil {
return nil, err
}
// Filter out any values that were deleted
for _, t := range tombstones {
temp = Values(temp).Exclude(t.Min, t.Max)
}
values = append(values, temp...)
}
return values, nil
}
func (m *mmapAccessor) path() string {
m.mu.RLock()
path := m.f.Name()
m.mu.RUnlock()
return path
}
func (m *mmapAccessor) close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.b == nil {
return nil
}
err := munmap(m.b)
if err != nil {
return err
}
m.b = nil
return m.f.Close()
}

View File

@ -402,12 +402,12 @@ func TestTSMReader_MMAP_TombstoneRange(t *testing.T) {
}
defer r.Close()
if got, exp := r.ContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.ContainsValue([]byte("cpu"), 3), false; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 3), false; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
values, err := r.ReadAll([]byte("cpu"))
@ -467,16 +467,16 @@ func TestTSMReader_MMAP_TombstoneOutsideTimeRange(t *testing.T) {
}
defer r.Close()
if got, exp := r.ContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.ContainsValue([]byte("cpu"), 2), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 2), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.ContainsValue([]byte("cpu"), 3), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 3), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.HasTombstones(), false; got != exp {
@ -531,16 +531,16 @@ func TestTSMReader_MMAP_TombstoneOutsideKeyRange(t *testing.T) {
}
defer r.Close()
if got, exp := r.ContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 1), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.ContainsValue([]byte("cpu"), 2), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 2), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.ContainsValue([]byte("cpu"), 3), true; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
if got, exp := r.MaybeContainsValue([]byte("cpu"), 3), true; got != exp {
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.HasTombstones(), false; got != exp {
@ -605,11 +605,11 @@ func TestTSMReader_MMAP_TombstoneOverlapKeyRange(t *testing.T) {
defer r.Close()
if got, exp := r.Contains([]byte("cpu,app=foo,host=server-0#!~#value")), false; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.Contains([]byte("cpu,app=foo,host=server-73379#!~#value")), false; got != exp {
t.Fatalf("ContainsValue mismatch: got %v, exp %v", got, exp)
t.Fatalf("MaybeContainsValue mismatch: got %v, exp %v", got, exp)
}
if got, exp := r.HasTombstones(), true; got != exp {

View File

@ -275,6 +275,43 @@ type directIndex struct {
indexEntries *indexEntries
}
type indexEntries struct {
Type byte
entries []IndexEntry
}
func (a *indexEntries) Len() int { return len(a.entries) }
func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
func (a *indexEntries) Less(i, j int) bool {
return a.entries[i].MinTime < a.entries[j].MinTime
}
func (a *indexEntries) MarshalBinary() ([]byte, error) {
buf := make([]byte, len(a.entries)*indexEntrySize)
for i, entry := range a.entries {
entry.AppendTo(buf[indexEntrySize*i:])
}
return buf, nil
}
func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
var buf [indexEntrySize]byte
var n int
for _, entry := range a.entries {
entry.AppendTo(buf[:])
n, err = w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}
}
return total, nil
}
func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
// Is this the first block being added?
if len(d.key) == 0 {