tsm1: move code around into smaller files and add tests

pull/10616/head
Jeff Wendling 2019-01-02 19:43:29 -07:00
parent fed3154506
commit f65b0933f6
9 changed files with 1856 additions and 1469 deletions

View File

@ -2,16 +2,10 @@ package tsm1
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"os"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -51,76 +45,6 @@ type TSMReader struct {
deleteMu sync.Mutex
}
// TSMIndex represent the index section of a TSM file. The index records all
// blocks, their locations, sizes, min and max times.
type TSMIndex interface {
// Delete removes the given keys from the index.
Delete(keys [][]byte)
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
DeleteRange(keys [][]byte, minTime, maxTime int64)
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index.
DeletePrefix(prefix []byte, minTime, maxTime int64)
// ContainsKey returns true if the given key may exist in the index. This func is faster than
// Contains but, may return false positives.
ContainsKey(key []byte) bool
// Contains return true if the given key exists in the index.
Contains(key []byte) bool
// ContainsValue returns true if key and time might exist in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exist in this file, but not have a point exactly at time t.
ContainsValue(key []byte, timestamp int64) bool
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error)
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key []byte, timestamp int64) *IndexEntry
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
Iterator([]byte) *TSMIndexIterator
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
OverlapsKeyRange(min, max []byte) bool
// Size returns the size of the current index in bytes.
Size() uint32
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (int64, int64)
// TombstoneRange returns ranges of time that are deleted for the given key.
TombstoneRange(key []byte, buf []TimeRange) []TimeRange
// KeyRange returns the min and max keys in the file.
KeyRange() ([]byte, []byte)
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
Type(key []byte) (byte, error)
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
UnmarshalBinary(b []byte) error
// Close closes the index and releases any resources.
Close() error
}
type tsmReaderOption func(*TSMReader)
// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
@ -618,653 +542,6 @@ func (a BatchDeleters) Rollback() error {
return err
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
mu sync.RWMutex
logger *zap.Logger
// indirectIndex works a follows. Assuming we have an index structure in memory as
// the diagram below:
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ Index │
// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
// │0│ │62│ │145│
// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
// │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │
// │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │
// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
// We would build an `offsets` slices where each element pointers to the byte location
// for the first key in the index slice.
// ┌────────────────────────────────────────────────────────────────────┐
// │ Offsets │
// ├────┬────┬────┬─────────────────────────────────────────────────────┘
// │ 0 │ 62 │145 │
// └────┴────┴────┘
// Using this offset slice we can find `Key 2` by doing a binary search
// over the offsets slice. Instead of comparing the value in the offsets
// (e.g. `62`), we use that as an index into the underlying index to
// retrieve the key at postion `62` and perform our comparisons with that.
// When we have identified the correct position in the index for a given
// key, we could perform another binary search or a linear scan. This
// should be fast as well since each index entry is 28 bytes and all
// contiguous in memory. The current implementation uses a linear scan since the
// number of block entries is expected to be < 100 per key.
// b is the underlying index byte slice. This could be a copy on the heap or an MMAP
// slice reference
b faultBuffer
// ro contains the positions in b for each key as well as the first bytes of each key
// to avoid disk seeks.
ro readerOffsets
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
// file
minKey, maxKey []byte
// minTime, maxTime are the minimum and maximum times contained in the file across all
// series.
minTime, maxTime int64
// tombstones contains only the tombstoned keys with subset of time values deleted. An
// entry would exist here if a subset of the points for a key were deleted and the file
// had not be re-compacted to remove the points on disk.
tombstones map[uint32][]TimeRange
// prefixTombstones contains the tombestoned keys with a subset of the values deleted that
// all share the same prefix.
prefixTombstones *prefixTree
}
// NewIndirectIndex returns a new indirect index.
func NewIndirectIndex() *indirectIndex {
return &indirectIndex{
tombstones: make(map[uint32][]TimeRange),
prefixTombstones: newPrefixTree(),
}
}
// ContainsKey returns true of key may exist in this index.
func (d *indirectIndex) ContainsKey(key []byte) bool {
return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return nil, nil
}
entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries)
if err != nil {
return nil, err
}
return entries, nil
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return nil
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return &entry
}
}
return nil
}
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
n := len(d.ro.offsets)
d.mu.RUnlock()
return n
}
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator {
d.mu.RLock()
iter := d.ro.Iterator()
_, ok := iter.Seek(key, &d.b)
ti := &TSMIndexIterator{
d: d,
n: int(len(d.ro.offsets)),
b: &d.b,
iter: &iter,
first: true,
ok: ok,
}
d.mu.RUnlock()
return ti
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
if len(keys) == 0 {
return
}
d.mu.RLock()
iter := d.ro.Iterator()
for _, key := range keys {
if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) {
if exact, _ := iter.Seek(key, &d.b); !exact {
continue
}
}
delete(d.tombstones, iter.Offset())
iter.Delete()
}
d.mu.RUnlock()
if !iter.HasDeletes() {
return
}
d.mu.Lock()
iter.Done()
d.mu.Unlock()
}
// insertTimeRange adds a time range described by the minTime and maxTime into ts.
func insertTimeRange(ts []TimeRange, minTime, maxTime int64) []TimeRange {
n := sort.Search(len(ts), func(i int) bool {
if ts[i].Min == minTime {
return ts[i].Max >= maxTime
}
return ts[i].Min > minTime
})
ts = append(ts, TimeRange{})
copy(ts[n+1:], ts[n:])
ts[n] = TimeRange{Min: minTime, Max: maxTime}
return ts
}
// pendingTombstone is a type that describes a pending insertion of a tombstone.
type pendingTombstone struct {
Key int
Index int
Offset uint32
EntryOffset uint32
Tombstones int
}
// coversEntries checks if all of the stored tombstones including one for minTime and maxTime cover
// all of the index entries. It mutates the entries slice to do the work, so be sure to make a copy
// if you must.
func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange,
entries []IndexEntry, minTime, maxTime int64) ([]TimeRange, bool) {
// grab the tombstones from the prefixes. these come out unsorted, so we sort
// them and place them in the merger section named unsorted.
buf = d.prefixTombstones.Search(key, buf[:0])
if len(buf) > 1 {
sort.Slice(buf, func(i, j int) bool { return buf[i].Less(buf[j]) })
}
// create the merger with the other tombstone entries: the ones for the specific
// key and the one we have proposed to add.
merger := timeRangeMerger{
sorted: d.tombstones[offset],
unsorted: buf,
single: TimeRange{Min: minTime, Max: maxTime},
used: false,
}
return buf, timeRangesCoverEntries(merger, entries)
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// If we're deleting everything, we won't need to worry about partial deletes.
if minTime <= d.minTime && maxTime >= d.maxTime {
d.Delete(keys)
return
}
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
}
// General outline:
// Under the read lock, determine the set of actions we need to
// take and on what keys to take them. Then, under the write
// lock, perform those actions. We keep track of some state
// during the read lock to make double checking under the
// write lock cheap.
d.mu.RLock()
iter := d.ro.Iterator()
var (
ok bool
trbuf []TimeRange
entries []IndexEntry
pending []pendingTombstone
err error
)
for i, key := range keys {
if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) {
if exact, _ := iter.Seek(key, &d.b); !exact {
continue
}
}
entryOffset := iter.EntryOffset(&d.b)
entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
iter.Delete()
continue
}
// Is the time range passed outside of the time range we have stored for this key?
min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
if minTime > max || maxTime < min {
continue
}
// Does the range passed in cover every value for the key?
if minTime <= min && maxTime >= max {
iter.Delete()
continue
}
// Does adding the minTime and maxTime cover the entries?
offset := iter.Offset()
trbuf, ok = d.coversEntries(offset, key, trbuf, entries, minTime, maxTime)
if ok {
iter.Delete()
continue
}
// Save that we should add a tombstone for this key, and how many tombstones
// already existed to avoid double checks.
pending = append(pending, pendingTombstone{
Key: i,
Index: iter.Index(),
Offset: offset,
EntryOffset: entryOffset,
Tombstones: len(d.tombstones[offset]),
})
}
d.mu.RUnlock()
if len(pending) == 0 && !iter.HasDeletes() {
return
}
d.mu.Lock()
defer d.mu.Unlock()
for _, p := range pending {
// Check the existing tombstones. If the length did not/ change, then we know
// that we don't need to double check coverage, since we only ever increase the
// number of tombstones for a key.
if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) {
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
continue
}
// Since the length changed, we have to do the expensive overlap check again.
// We re-read the entries again under the write lock because this should be
// rare and only during concurrent deletes to the same key. We could make
// a copy of the entries before getting here, but that penalizes the common
// no-concurrent case.
entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
iter.Delete()
}
continue
}
trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime)
if ok {
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
iter.Delete()
}
continue
}
// Append the TimeRange into the tombstones.
trs := d.tombstones[p.Offset]
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
}
iter.Done()
}
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
// If we're deleting everything, we won't need to worry about partial deletes.
partial := !(minTime <= d.minTime && maxTime >= d.maxTime)
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
}
d.mu.RLock()
var (
ok bool
trbuf []TimeRange
entries []IndexEntry
err error
mustTrack bool
)
// seek to the earliest key with the prefix, and start iterating. we can't call
// next until after we've checked the key, so keep a "first" flag.
first := true
iter := d.ro.Iterator()
iter.Seek(prefix, &d.b)
for {
if (!first && !iter.Next()) || !bytes.HasPrefix(iter.Key(&d.b), prefix) {
break
}
first = false
// if we're not doing a partial delete, we don't need to read the entries and
// can just delete the key and move on.
if !partial {
iter.Delete()
continue
}
entryOffset := iter.EntryOffset(&d.b)
entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
iter.Delete()
continue
}
// Is the time range passed outside the range we have stored for the key?
min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
if minTime > max || maxTime < min {
continue
}
// Does the range passed cover every value for the key?
if minTime <= min && maxTime >= max {
iter.Delete()
continue
}
// Does adding the minTime and maxTime cover the entries?
trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime)
if ok {
iter.Delete()
continue
}
// Otherwise, we have to track it in the prefix tombstones list.
mustTrack = true
}
d.mu.RUnlock()
// Check and abort if nothing needs to be done.
if !mustTrack && !iter.HasDeletes() {
return
}
d.mu.Lock()
defer d.mu.Unlock()
if mustTrack {
d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime})
}
if iter.HasDeletes() {
iter.Done()
}
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (d *indirectIndex) TombstoneRange(key []byte, buf []TimeRange) []TimeRange {
d.mu.RLock()
rs := d.prefixTombstones.Search(key, buf[:0])
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if exact {
rs = append(rs, d.tombstones[iter.Offset()]...)
}
d.mu.RUnlock()
return rs
}
// Contains return true if the given key exists in the index.
func (d *indirectIndex) Contains(key []byte) bool {
d.mu.RLock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
d.mu.RUnlock()
return exact
}
// ContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return false
}
for _, t := range d.tombstones[iter.Offset()] {
if t.Min <= timestamp && timestamp <= t.Max {
return false
}
}
if d.prefixTombstones.checkOverlap(key, timestamp) {
return false
}
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return false
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return true
}
}
return false
}
// Type returns the block type of the values stored for the key.
func (d *indirectIndex) Type(key []byte) (byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return 0, errors.New("key does not exist")
}
return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool {
return d.minTime <= max && d.maxTime >= min
}
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
}
// KeyRange returns the min and max keys in the index.
func (d *indirectIndex) KeyRange() ([]byte, []byte) {
return d.minKey, d.maxKey
}
// TimeRange returns the min and max time across all keys in the index.
func (d *indirectIndex) TimeRange() (int64, int64) {
return d.minTime, d.maxTime
}
// MarshalBinary returns a byte slice encoded version of the index.
func (d *indirectIndex) MarshalBinary() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.b.b, nil
}
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
func (d *indirectIndex) UnmarshalBinary(b []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
// Keep a reference to the actual index bytes
d.b = faultBuffer{b: b}
if len(b) == 0 {
return nil
}
// make sure a uint32 is sufficient to store any offset into the index.
if uint64(len(b)) != uint64(uint32(len(b))) {
return fmt.Errorf("indirectIndex: too large to open")
}
var minTime, maxTime int64 = math.MaxInt64, math.MinInt64
// To create our "indirect" index, we need to find the location of all the keys in
// the raw byte slice. The keys are listed once each (in sorted order). Following
// each key is a time ordered list of index entry blocks for that key. The loop below
// basically skips across the slice keeping track of the counter when we are at a key
// field.
var i uint32
var ro readerOffsets
iMax := uint32(len(b))
if iMax > math.MaxInt32 {
return fmt.Errorf("indirectIndex: too large to store offsets")
}
for i < iMax {
offset := i // save for when we add to the data structure
// Skip to the start of the values
// key length value (2) + type (1) + length of key
if i+2 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for key length value")
}
keyLength := uint32(binary.BigEndian.Uint16(b[i : i+2]))
i += 2
if i+keyLength+indexTypeSize >= iMax {
return fmt.Errorf("indirectIndex: not enough data for key and type")
}
ro.AddKey(offset, b[i:i+keyLength])
i += keyLength + indexTypeSize
// count of index entries
if i+indexCountSize >= iMax {
return fmt.Errorf("indirectIndex: not enough data for index entries count")
}
count := uint32(binary.BigEndian.Uint16(b[i : i+indexCountSize]))
if count == 0 {
return fmt.Errorf("indirectIndex: key exits with no entries")
}
i += indexCountSize
// Find the min time for the block
if i+8 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for min time")
}
minT := int64(binary.BigEndian.Uint64(b[i : i+8]))
if minT < minTime {
minTime = minT
}
i += (count - 1) * indexEntrySize
// Find the max time for the block
if i+16 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for max time")
}
maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16]))
if maxT > maxTime {
maxTime = maxT
}
i += indexEntrySize
}
ro.Done()
firstOfs := ro.offsets[0]
key := readKey(b[firstOfs:])
d.minKey = key
lastOfs := ro.offsets[len(ro.offsets)-1]
key = readKey(b[lastOfs:])
d.maxKey = key
d.minTime = minTime
d.maxTime = maxTime
d.ro = ro
return nil
}
// Size returns the size of the current index in bytes.
func (d *indirectIndex) Size() uint32 {
d.mu.RLock()
defer d.mu.RUnlock()
return d.b.len()
}
func (d *indirectIndex) Close() error {
return nil
}
// mmapAccess is mmap based block accessor. It access blocks through an
// MMAP file interface.
type mmapAccessor struct {
@ -1563,94 +840,3 @@ func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
return total, nil
}
func readKey(b []byte) (key []byte) {
size := binary.BigEndian.Uint16(b[:2])
return b[2 : 2+size]
}
func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return entries[:0], errors.New("readEntries: data too short for headers")
}
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
if cap(entries) < count {
entries = make([]IndexEntry, count)
} else {
entries = entries[:count]
}
b = b[indexTypeSize+indexCountSize:]
for i := range entries {
if err := entries[i].UnmarshalBinary(b); err != nil {
return entries[:0], err
}
b = b[indexEntrySize:]
}
return entries, nil
}
// readEntriesTimes is a helper function to read entries at the provided buffer but
// only reading in the min and max times.
func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return entries[:0], errors.New("readEntries: data too short for headers")
}
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
if cap(entries) < count {
entries = make([]IndexEntry, count)
} else {
entries = entries[:count]
}
b = b[indexTypeSize+indexCountSize:]
for i := range entries {
if len(b) < indexEntrySize {
return entries[:0], errors.New("readEntries: stream too short for entry")
}
entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8]))
entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))
b = b[indexEntrySize:]
}
return entries, nil
}
const (
faultBufferEnabled = false
faultBufferSampleStacks = false
)
type faultBuffer struct {
faults uint64
page uint64
b []byte
samples [][]uintptr
}
func (m *faultBuffer) len() uint32 { return uint32(len(m.b)) }
func (m *faultBuffer) access(start, length uint32) []byte {
if faultBufferEnabled {
current, page := int64(atomic.LoadUint64(&m.page)), int64(start)/4096
if page != current && page != current+1 { // assume kernel precaches next page
atomic.AddUint64(&m.faults, 1)
if faultBufferSampleStacks && rand.Intn(1000) == 0 {
var stack [256]uintptr
n := runtime.Callers(0, stack[:])
m.samples = append(m.samples, stack[:n:n])
}
}
atomic.StoreUint64(&m.page, uint64(page))
}
end := m.len()
if length > 0 {
end = start + length
}
return m.b[start:end]
}

View File

@ -0,0 +1,280 @@
package tsm1
import (
"os"
"sort"
"testing"
)
func TestBlockIterator_Single(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
iter := r.BlockIterator()
for iter.Next() {
key, minTime, maxTime, typ, _, buf, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if got, exp := string(key), "cpu"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := minTime, int64(0); got != exp {
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
}
if got, exp := maxTime, int64(0); got != exp {
t.Fatalf("max time mismatch: got %v, exp %v", got, exp)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
}
if got, exp := count, len(values); got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}
func TestBlockIterator_Tombstone(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.Write([]byte("mem"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
iter := r.BlockIterator()
for iter.Next() {
// Trigger a delete during iteration. This should cause an error condition for
// the BlockIterator
r.Delete([][]byte{[]byte("cpu")})
}
if iter.Err() == nil {
t.Fatalf("expected error: got nil")
}
}
func TestBlockIterator_MultipleBlocks(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values1 := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values1); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
values2 := []Value{NewValue(1, int64(2))}
if err := w.Write([]byte("cpu"), values2); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
expData := []Values{values1, values2}
iter := r.BlockIterator()
var i int
for iter.Next() {
key, minTime, maxTime, typ, _, buf, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if got, exp := string(key), "cpu"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := minTime, expData[i][0].UnixNano(); got != exp {
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
}
if got, exp := maxTime, expData[i][0].UnixNano(); got != exp {
t.Fatalf("max time mismatch: got %v, exp %v", got, exp)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
i++
}
if got, exp := count, 2; got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}
func TestBlockIterator_Sorted(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := map[string][]Value{
"mem": []Value{NewValue(0, int64(1))},
"cycles": []Value{NewValue(0, ^uint64(0))},
"cpu": []Value{NewValue(1, float64(2))},
"disk": []Value{NewValue(1, true)},
"load": []Value{NewValue(1, "string")},
}
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), values[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
iter := r.BlockIterator()
var lastKey string
for iter.Next() {
key, _, _, _, _, buf, err := iter.Read()
if string(key) < lastKey {
t.Fatalf("keys not sorted: got %v, last %v", key, lastKey)
}
lastKey = string(key)
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
}
if got, exp := count, len(values); got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -0,0 +1,47 @@
package tsm1
import (
"math/rand"
"runtime"
"sync/atomic"
)
// fault buffer is a by-default disabled helper to keep track of estimates of page faults
// during accesses. use the constants below to turn it on or off and benchmarks will report
// their estimates.
const (
faultBufferEnabled = false
faultBufferSampleStacks = false
)
type faultBuffer struct {
faults uint64
page uint64
b []byte
samples [][]uintptr
}
func (m *faultBuffer) len() uint32 { return uint32(len(m.b)) }
func (m *faultBuffer) access(start, length uint32) []byte {
if faultBufferEnabled {
current, page := int64(atomic.LoadUint64(&m.page)), int64(start)/4096
if page != current && page != current+1 { // assume kernel precaches next page
atomic.AddUint64(&m.faults, 1)
if faultBufferSampleStacks && rand.Intn(1000) == 0 {
var stack [256]uintptr
n := runtime.Callers(0, stack[:])
m.samples = append(m.samples, stack[:n:n])
}
}
atomic.StoreUint64(&m.page, uint64(page))
}
end := m.len()
if length > 0 {
end = start + length
}
return m.b[start:end]
}

785
tsdb/tsm1/reader_index.go Normal file
View File

@ -0,0 +1,785 @@
package tsm1
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"sort"
"sync"
"go.uber.org/zap"
)
// TSMIndex represent the index section of a TSM file. The index records all
// blocks, their locations, sizes, min and max times.
type TSMIndex interface {
// Delete removes the given keys from the index.
Delete(keys [][]byte)
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
DeleteRange(keys [][]byte, minTime, maxTime int64)
// DeletePrefix removes keys that begin with the given prefix with data between minTime and
// maxTime from the index.
DeletePrefix(prefix []byte, minTime, maxTime int64)
// ContainsKey returns true if the given key may exist in the index. This func is faster than
// Contains but, may return false positives.
ContainsKey(key []byte) bool
// Contains return true if the given key exists in the index.
Contains(key []byte) bool
// ContainsValue returns true if key and time might exist in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exist in this file, but not have a point exactly at time t.
ContainsValue(key []byte, timestamp int64) bool
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error)
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key []byte, timestamp int64) *IndexEntry
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
Iterator([]byte) *TSMIndexIterator
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
OverlapsKeyRange(min, max []byte) bool
// Size returns the size of the current index in bytes.
Size() uint32
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (int64, int64)
// TombstoneRange returns ranges of time that are deleted for the given key.
TombstoneRange(key []byte, buf []TimeRange) []TimeRange
// KeyRange returns the min and max keys in the file.
KeyRange() ([]byte, []byte)
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
Type(key []byte) (byte, error)
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
UnmarshalBinary(b []byte) error
// Close closes the index and releases any resources.
Close() error
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
mu sync.RWMutex
logger *zap.Logger
// indirectIndex works a follows. Assuming we have an index structure in memory as
// the diagram below:
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ Index │
// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
// │0│ │62│ │145│
// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
// │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │
// │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │
// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
// We would build an `offsets` slices where each element pointers to the byte location
// for the first key in the index slice.
// ┌────────────────────────────────────────────────────────────────────┐
// │ Offsets │
// ├────┬────┬────┬─────────────────────────────────────────────────────┘
// │ 0 │ 62 │145 │
// └────┴────┴────┘
// Using this offset slice we can find `Key 2` by doing a binary search
// over the offsets slice. Instead of comparing the value in the offsets
// (e.g. `62`), we use that as an index into the underlying index to
// retrieve the key at postion `62` and perform our comparisons with that.
// When we have identified the correct position in the index for a given
// key, we could perform another binary search or a linear scan. This
// should be fast as well since each index entry is 28 bytes and all
// contiguous in memory. The current implementation uses a linear scan since the
// number of block entries is expected to be < 100 per key.
// b is the underlying index byte slice. This could be a copy on the heap or an MMAP
// slice reference
b faultBuffer
// ro contains the positions in b for each key as well as the first bytes of each key
// to avoid disk seeks.
ro readerOffsets
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
// file
minKey, maxKey []byte
// minTime, maxTime are the minimum and maximum times contained in the file across all
// series.
minTime, maxTime int64
// tombstones contains only the tombstoned keys with subset of time values deleted. An
// entry would exist here if a subset of the points for a key were deleted and the file
// had not be re-compacted to remove the points on disk.
tombstones map[uint32][]TimeRange
// prefixTombstones contains the tombestoned keys with a subset of the values deleted that
// all share the same prefix.
prefixTombstones *prefixTree
}
// NewIndirectIndex returns a new indirect index.
func NewIndirectIndex() *indirectIndex {
return &indirectIndex{
tombstones: make(map[uint32][]TimeRange),
prefixTombstones: newPrefixTree(),
}
}
// ContainsKey returns true of key may exist in this index.
func (d *indirectIndex) ContainsKey(key []byte) bool {
return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return nil, nil
}
entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries)
if err != nil {
return nil, err
}
return entries, nil
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return nil
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return &entry
}
}
return nil
}
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
n := len(d.ro.offsets)
d.mu.RUnlock()
return n
}
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator {
d.mu.RLock()
iter := d.ro.Iterator()
_, ok := iter.Seek(key, &d.b)
ti := &TSMIndexIterator{
d: d,
n: int(len(d.ro.offsets)),
b: &d.b,
iter: &iter,
first: true,
ok: ok,
}
d.mu.RUnlock()
return ti
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
if len(keys) == 0 {
return
}
d.mu.RLock()
iter := d.ro.Iterator()
for _, key := range keys {
if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) {
if exact, _ := iter.Seek(key, &d.b); !exact {
continue
}
}
delete(d.tombstones, iter.Offset())
iter.Delete()
}
d.mu.RUnlock()
if !iter.HasDeletes() {
return
}
d.mu.Lock()
iter.Done()
d.mu.Unlock()
}
// insertTimeRange adds a time range described by the minTime and maxTime into ts.
func insertTimeRange(ts []TimeRange, minTime, maxTime int64) []TimeRange {
n := sort.Search(len(ts), func(i int) bool {
if ts[i].Min == minTime {
return ts[i].Max >= maxTime
}
return ts[i].Min > minTime
})
ts = append(ts, TimeRange{})
copy(ts[n+1:], ts[n:])
ts[n] = TimeRange{Min: minTime, Max: maxTime}
return ts
}
// pendingTombstone is a type that describes a pending insertion of a tombstone.
type pendingTombstone struct {
Key int
Index int
Offset uint32
EntryOffset uint32
Tombstones int
}
// coversEntries checks if all of the stored tombstones including one for minTime and maxTime cover
// all of the index entries. It mutates the entries slice to do the work, so be sure to make a copy
// if you must.
func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange,
entries []IndexEntry, minTime, maxTime int64) ([]TimeRange, bool) {
// grab the tombstones from the prefixes. these come out unsorted, so we sort
// them and place them in the merger section named unsorted.
buf = d.prefixTombstones.Search(key, buf[:0])
if len(buf) > 1 {
sort.Slice(buf, func(i, j int) bool { return buf[i].Less(buf[j]) })
}
// create the merger with the other tombstone entries: the ones for the specific
// key and the one we have proposed to add.
merger := timeRangeMerger{
sorted: d.tombstones[offset],
unsorted: buf,
single: TimeRange{Min: minTime, Max: maxTime},
used: false,
}
return buf, timeRangesCoverEntries(merger, entries)
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// If we're deleting everything, we won't need to worry about partial deletes.
if minTime <= d.minTime && maxTime >= d.maxTime {
d.Delete(keys)
return
}
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
}
// General outline:
// Under the read lock, determine the set of actions we need to
// take and on what keys to take them. Then, under the write
// lock, perform those actions. We keep track of some state
// during the read lock to make double checking under the
// write lock cheap.
d.mu.RLock()
iter := d.ro.Iterator()
var (
ok bool
trbuf []TimeRange
entries []IndexEntry
pending []pendingTombstone
err error
)
for i, key := range keys {
if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) {
if exact, _ := iter.Seek(key, &d.b); !exact {
continue
}
}
entryOffset := iter.EntryOffset(&d.b)
entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
iter.Delete()
continue
}
// Is the time range passed outside of the time range we have stored for this key?
min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
if minTime > max || maxTime < min {
continue
}
// Does the range passed in cover every value for the key?
if minTime <= min && maxTime >= max {
iter.Delete()
continue
}
// Does adding the minTime and maxTime cover the entries?
offset := iter.Offset()
trbuf, ok = d.coversEntries(offset, key, trbuf, entries, minTime, maxTime)
if ok {
iter.Delete()
continue
}
// Save that we should add a tombstone for this key, and how many tombstones
// already existed to avoid double checks.
pending = append(pending, pendingTombstone{
Key: i,
Index: iter.Index(),
Offset: offset,
EntryOffset: entryOffset,
Tombstones: len(d.tombstones[offset]),
})
}
d.mu.RUnlock()
if len(pending) == 0 && !iter.HasDeletes() {
return
}
d.mu.Lock()
defer d.mu.Unlock()
for _, p := range pending {
// Check the existing tombstones. If the length did not/ change, then we know
// that we don't need to double check coverage, since we only ever increase the
// number of tombstones for a key.
if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) {
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
continue
}
// Since the length changed, we have to do the expensive overlap check again.
// We re-read the entries again under the write lock because this should be
// rare and only during concurrent deletes to the same key. We could make
// a copy of the entries before getting here, but that penalizes the common
// no-concurrent case.
entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
iter.Delete()
}
continue
}
trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime)
if ok {
delete(d.tombstones, p.Offset)
iter.SetIndex(p.Index)
if iter.Offset() == p.Offset {
iter.Delete()
}
continue
}
// Append the TimeRange into the tombstones.
trs := d.tombstones[p.Offset]
d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime)
}
iter.Done()
}
func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) {
// If we're deleting everything, we won't need to worry about partial deletes.
partial := !(minTime <= d.minTime && maxTime >= d.maxTime)
// Is the range passed in outside of the time range for the file?
if minTime > d.maxTime || maxTime < d.minTime {
return
}
d.mu.RLock()
var (
ok bool
trbuf []TimeRange
entries []IndexEntry
err error
mustTrack bool
)
// seek to the earliest key with the prefix, and start iterating. we can't call
// next until after we've checked the key, so keep a "first" flag.
first := true
iter := d.ro.Iterator()
iter.Seek(prefix, &d.b)
for {
if (!first && !iter.Next()) || !bytes.HasPrefix(iter.Key(&d.b), prefix) {
break
}
first = false
// if we're not doing a partial delete, we don't need to read the entries and
// can just delete the key and move on.
if !partial {
iter.Delete()
continue
}
entryOffset := iter.EntryOffset(&d.b)
entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries)
if err != nil {
// If we have an error reading the entries for a key, we should just pretend
// the whole key is deleted. Maybe a better idea is to report this up somehow
// but that's for another time.
iter.Delete()
continue
}
// Is the time range passed outside the range we have stored for the key?
min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
if minTime > max || maxTime < min {
continue
}
// Does the range passed cover every value for the key?
if minTime <= min && maxTime >= max {
iter.Delete()
continue
}
// Does adding the minTime and maxTime cover the entries?
trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime)
if ok {
iter.Delete()
continue
}
// Otherwise, we have to track it in the prefix tombstones list.
mustTrack = true
}
d.mu.RUnlock()
// Check and abort if nothing needs to be done.
if !mustTrack && !iter.HasDeletes() {
return
}
d.mu.Lock()
defer d.mu.Unlock()
if mustTrack {
d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime})
}
if iter.HasDeletes() {
iter.Done()
}
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (d *indirectIndex) TombstoneRange(key []byte, buf []TimeRange) []TimeRange {
d.mu.RLock()
rs := d.prefixTombstones.Search(key, buf[:0])
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if exact {
rs = append(rs, d.tombstones[iter.Offset()]...)
}
d.mu.RUnlock()
return rs
}
// Contains return true if the given key exists in the index.
func (d *indirectIndex) Contains(key []byte) bool {
d.mu.RLock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
d.mu.RUnlock()
return exact
}
// ContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return false
}
for _, t := range d.tombstones[iter.Offset()] {
if t.Min <= timestamp && timestamp <= t.Max {
return false
}
}
if d.prefixTombstones.checkOverlap(key, timestamp) {
return false
}
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return false
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return true
}
}
return false
}
// Type returns the block type of the values stored for the key.
func (d *indirectIndex) Type(key []byte) (byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return 0, errors.New("key does not exist")
}
return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool {
return d.minTime <= max && d.maxTime >= min
}
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
}
// KeyRange returns the min and max keys in the index.
func (d *indirectIndex) KeyRange() ([]byte, []byte) {
return d.minKey, d.maxKey
}
// TimeRange returns the min and max time across all keys in the index.
func (d *indirectIndex) TimeRange() (int64, int64) {
return d.minTime, d.maxTime
}
// MarshalBinary returns a byte slice encoded version of the index.
func (d *indirectIndex) MarshalBinary() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.b.b, nil
}
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
func (d *indirectIndex) UnmarshalBinary(b []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
// Keep a reference to the actual index bytes
d.b = faultBuffer{b: b}
if len(b) == 0 {
return nil
}
// make sure a uint32 is sufficient to store any offset into the index.
if uint64(len(b)) != uint64(uint32(len(b))) {
return fmt.Errorf("indirectIndex: too large to open")
}
var minTime, maxTime int64 = math.MaxInt64, math.MinInt64
// To create our "indirect" index, we need to find the location of all the keys in
// the raw byte slice. The keys are listed once each (in sorted order). Following
// each key is a time ordered list of index entry blocks for that key. The loop below
// basically skips across the slice keeping track of the counter when we are at a key
// field.
var i uint32
var ro readerOffsets
iMax := uint32(len(b))
if iMax > math.MaxInt32 {
return fmt.Errorf("indirectIndex: too large to store offsets")
}
for i < iMax {
offset := i // save for when we add to the data structure
// Skip to the start of the values
// key length value (2) + type (1) + length of key
if i+2 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for key length value")
}
keyLength := uint32(binary.BigEndian.Uint16(b[i : i+2]))
i += 2
if i+keyLength+indexTypeSize >= iMax {
return fmt.Errorf("indirectIndex: not enough data for key and type")
}
ro.AddKey(offset, b[i:i+keyLength])
i += keyLength + indexTypeSize
// count of index entries
if i+indexCountSize >= iMax {
return fmt.Errorf("indirectIndex: not enough data for index entries count")
}
count := uint32(binary.BigEndian.Uint16(b[i : i+indexCountSize]))
if count == 0 {
return fmt.Errorf("indirectIndex: key exits with no entries")
}
i += indexCountSize
// Find the min time for the block
if i+8 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for min time")
}
minT := int64(binary.BigEndian.Uint64(b[i : i+8]))
if minT < minTime {
minTime = minT
}
i += (count - 1) * indexEntrySize
// Find the max time for the block
if i+16 >= iMax {
return fmt.Errorf("indirectIndex: not enough data for max time")
}
maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16]))
if maxT > maxTime {
maxTime = maxT
}
i += indexEntrySize
}
ro.Done()
firstOfs := ro.offsets[0]
key := readKey(b[firstOfs:])
d.minKey = key
lastOfs := ro.offsets[len(ro.offsets)-1]
key = readKey(b[lastOfs:])
d.maxKey = key
d.minTime = minTime
d.maxTime = maxTime
d.ro = ro
return nil
}
// Size returns the size of the current index in bytes.
func (d *indirectIndex) Size() uint32 {
d.mu.RLock()
defer d.mu.RUnlock()
return d.b.len()
}
func (d *indirectIndex) Close() error {
return nil
}
func readKey(b []byte) (key []byte) {
size := binary.BigEndian.Uint16(b[:2])
return b[2 : 2+size]
}
func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return entries[:0], errors.New("readEntries: data too short for headers")
}
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
if cap(entries) < count {
entries = make([]IndexEntry, count)
} else {
entries = entries[:count]
}
b = b[indexTypeSize+indexCountSize:]
for i := range entries {
if err := entries[i].UnmarshalBinary(b); err != nil {
return entries[:0], err
}
b = b[indexEntrySize:]
}
return entries, nil
}
// readEntriesTimes is a helper function to read entries at the provided buffer but
// only reading in the min and max times.
func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return entries[:0], errors.New("readEntries: data too short for headers")
}
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
if cap(entries) < count {
entries = make([]IndexEntry, count)
} else {
entries = entries[:count]
}
b = b[indexTypeSize+indexCountSize:]
for i := range entries {
if len(b) < indexEntrySize {
return entries[:0], errors.New("readEntries: stream too short for entry")
}
entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8]))
entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))
b = b[indexEntrySize:]
}
return entries, nil
}

View File

@ -0,0 +1,86 @@
package tsm1
import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestIndirectIndexIterator(t *testing.T) {
checkEqual := func(t *testing.T, got, exp interface{}) {
t.Helper()
if !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %v but got: %v\n%v", exp, got, cmp.Diff(got, exp))
}
}
index := NewIndexWriter()
index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
// check that the iterator walks the whole index
iter := ind.Iterator(nil)
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte("cpu2"))
checkEqual(t, iter.Key(), []byte("cpu1"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
{10, 20, 10, 20},
})
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte("mem"))
checkEqual(t, iter.Key(), []byte("cpu2"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
{10, 20, 10, 20},
})
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte(nil))
checkEqual(t, iter.Key(), []byte("mem"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
})
checkEqual(t, iter.Next(), false)
checkEqual(t, iter.Err(), error(nil))
// delete the cpu2 key and make sure it's skipped
ind.Delete([][]byte{[]byte("cpu2")})
iter = ind.Iterator(nil)
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte("mem"))
checkEqual(t, iter.Key(), []byte("cpu1"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
{10, 20, 10, 20},
})
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte(nil))
checkEqual(t, iter.Key(), []byte("mem"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
})
checkEqual(t, iter.Next(), false)
checkEqual(t, iter.Err(), error(nil))
// check that seek works
iter = ind.Iterator([]byte("d"))
checkEqual(t, iter.Next(), true)
checkEqual(t, iter.Peek(), []byte(nil))
checkEqual(t, iter.Key(), []byte("mem"))
checkEqual(t, iter.Type(), BlockInteger)
checkEqual(t, iter.Entries(), []IndexEntry{
{0, 10, 10, 20},
})
checkEqual(t, iter.Next(), false)
checkEqual(t, iter.Err(), error(nil))
}

View File

@ -0,0 +1,532 @@
package tsm1
import (
"fmt"
"math"
"sync/atomic"
"testing"
)
func loadIndex(tb testing.TB, w IndexWriter) *indirectIndex {
tb.Helper()
b, err := w.MarshalBinary()
fatalIfErr(tb, "marshaling index", err)
indir := NewIndirectIndex()
fatalIfErr(tb, "unmarshaling index", indir.UnmarshalBinary(b))
return indir
}
func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
index := NewIndexWriter()
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100)
index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200)
ind := loadIndex(t, index)
// mem has not been added to the index so we should get no entries back
// for both
exp := index.Entries([]byte("mem"))
entries, err := ind.ReadEntries([]byte("mem"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), len(exp); got != exp && exp != 0 {
t.Fatalf("entries length mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_Type(t *testing.T) {
index := NewIndexWriter()
index.Add([]byte("cpu"), BlockInteger, 0, 1, 10, 20)
ind := loadIndex(t, index)
typ, err := ind.Type([]byte("cpu"))
if err != nil {
fatal(t, "reading type", err)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("type mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_Delete(t *testing.T) {
check := func(t *testing.T, got, exp bool) {
t.Helper()
if exp != got {
t.Fatalf("expected: %v but got: %v", exp, got)
}
}
index := NewIndexWriter()
index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.Delete([][]byte{[]byte("cpu1")})
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.Contains([]byte("cpu2")), true)
ind.Delete([][]byte{[]byte("cpu1"), []byte("cpu2")})
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.Contains([]byte("cpu2")), false)
ind.Delete([][]byte{[]byte("mem")})
check(t, ind.Contains([]byte("mem")), false)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.Contains([]byte("cpu2")), false)
}
func TestIndirectIndex_DeleteRange(t *testing.T) {
check := func(t *testing.T, got, exp bool) {
t.Helper()
if exp != got {
t.Fatalf("expected: %v but got: %v", exp, got)
}
}
index := NewIndexWriter()
index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.DeleteRange([][]byte{[]byte("cpu1")}, 5, 15)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), true)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), true)
check(t, ind.ContainsValue([]byte("cpu2"), 5), true)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), true)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 0, 5)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), true)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 15, 20)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), false)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), true)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), false)
}
func TestIndirectIndex_DeletePrefix(t *testing.T) {
check := func(t *testing.T, got, exp bool) {
t.Helper()
if exp != got {
t.Fatalf("expected: %v but got: %v", exp, got)
}
}
index := NewIndexWriter()
index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20)
index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20)
index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20)
ind := loadIndex(t, index)
ind.DeletePrefix([]byte("c"), 5, 15)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), true)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), true)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cp"), 0, 5)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), true)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), true)
check(t, ind.Contains([]byte("cpu2")), true)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), true)
ind.DeletePrefix([]byte("cpu"), 15, 20)
check(t, ind.Contains([]byte("mem")), true)
check(t, ind.Contains([]byte("cpu1")), false)
check(t, ind.ContainsValue([]byte("cpu1"), 4), false)
check(t, ind.ContainsValue([]byte("cpu1"), 5), false)
check(t, ind.ContainsValue([]byte("cpu1"), 10), false)
check(t, ind.ContainsValue([]byte("cpu1"), 15), false)
check(t, ind.ContainsValue([]byte("cpu1"), 16), false)
check(t, ind.Contains([]byte("cpu2")), false)
check(t, ind.ContainsValue([]byte("cpu2"), 4), false)
check(t, ind.ContainsValue([]byte("cpu2"), 5), false)
check(t, ind.ContainsValue([]byte("cpu2"), 10), false)
check(t, ind.ContainsValue([]byte("cpu2"), 15), false)
check(t, ind.ContainsValue([]byte("cpu2"), 16), false)
}
//
// indirectIndex benchmarks
//
const (
indexKeyCount = 500000
indexBlockCount = 100
)
type indexCacheInfo struct {
index *indirectIndex
offsets []uint32
prefixes []prefixEntry
allKeys [][]byte
bytes []byte
}
func (i *indexCacheInfo) reset() {
i.index.ro.offsets = append([]uint32(nil), i.offsets...)
i.index.ro.prefixes = append([]prefixEntry(nil), i.prefixes...)
i.index.tombstones = make(map[uint32][]TimeRange)
i.index.prefixTombstones = newPrefixTree()
resetFaults(i.index)
}
var (
indexCache = map[string]*indexCacheInfo{}
indexSizes = map[string][2]int{
"large": {500000, 100},
"med": {1000, 1000},
"small": {5000, 2},
}
)
func getFaults(indirect *indirectIndex) int64 {
return int64(atomic.LoadUint64(&indirect.b.faults))
}
func resetFaults(indirect *indirectIndex) {
if indirect != nil {
indirect.b = faultBuffer{b: indirect.b.b}
}
}
func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) {
info, ok := indexCache[name]
if ok {
info.reset()
return info.index, info
}
info = new(indexCacheInfo)
sizes, ok := indexSizes[name]
if !ok {
sizes = [2]int{indexKeyCount, indexBlockCount}
}
keys, blocks := sizes[0], sizes[1]
writer := NewIndexWriter()
// add a ballast key that starts at -1 so that we don't trigger optimizations
// when deleting [0, MaxInt]
writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100)
for i := 0; i < keys; i++ {
key := []byte(fmt.Sprintf("cpu-%08d", i))
info.allKeys = append(info.allKeys, key)
for j := 0; j < blocks; j++ {
writer.Add(key, BlockFloat64, 0, 100, 10, 100)
}
}
var err error
info.bytes, err = writer.MarshalBinary()
if err != nil {
tb.Fatalf("unexpected error marshaling index: %v", err)
}
info.index = NewIndirectIndex()
if err = info.index.UnmarshalBinary(info.bytes); err != nil {
tb.Fatalf("unexpected error unmarshaling index: %v", err)
}
info.offsets = append([]uint32(nil), info.index.ro.offsets...)
info.prefixes = append([]prefixEntry(nil), info.index.ro.prefixes...)
indexCache[name] = info
return info.index, info
}
func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
indirect, info := getIndex(b, "large")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := indirect.UnmarshalBinary(info.bytes); err != nil {
b.Fatalf("unexpected error unmarshaling index: %v", err)
}
}
}
func BenchmarkIndirectIndex_Entries(b *testing.B) {
indirect, _ := getIndex(b, "med")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.ReadEntries([]byte("cpu-00000001"), nil)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
var entries []IndexEntry
indirect, _ := getIndex(b, "med")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkBlockIterator_Next(b *testing.B) {
indirect, _ := getIndex(b, "med")
r := TSMReader{index: indirect}
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
bi := r.BlockIterator()
for bi.Next() {
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) {
indirect, _ := getIndex(b, "large")
keys := [][]byte{[]byte("cpu-00999999")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.DeleteRange(keys, 10, 50)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.DeleteRange(info.allKeys[i:n], 10, 50)
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.DeleteRange(info.allKeys[i:n], 0, math.MaxInt64)
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_Delete(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.Delete(info.allKeys[i:n])
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) {
prefix := []byte("cpu-")
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 10, 50)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) {
prefix := []byte("cpu-")
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 0, math.MaxInt64)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}

View File

@ -1,20 +1,26 @@
package tsm1
import (
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"sync/atomic"
"testing"
)
func fatal(t *testing.T, msg string, err error) {
func fatal(t testing.TB, msg string, err error) {
t.Helper()
t.Fatalf("unexpected error %v: %v", msg, err)
}
func fatalIfErr(t testing.TB, msg string, err error) {
t.Helper()
if err != nil {
fatal(t, msg, err)
}
}
func TestTSMReader_Type(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
@ -57,6 +63,17 @@ func TestTSMReader_Type(t *testing.T) {
}
}
func TestIndexWriter_MaxBlocks(t *testing.T) {
index := NewIndexWriter()
for i := 0; i < 1<<16; i++ {
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20)
}
if _, err := index.MarshalBinary(); err == nil {
t.Fatalf("expected max block count error. got nil")
}
}
func TestTSMReader_MMAP_ReadAll(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
@ -1118,71 +1135,6 @@ func TestIndirectIndex_Entries(t *testing.T) {
}
}
func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
index := NewIndexWriter()
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100)
index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200)
b, err := index.MarshalBinary()
if err != nil {
t.Fatalf("unexpected error marshaling index: %v", err)
}
indirect := NewIndirectIndex()
if err := indirect.UnmarshalBinary(b); err != nil {
t.Fatalf("unexpected error unmarshaling index: %v", err)
}
// mem has not been added to the index so we should get no entries back
// for both
exp := index.Entries([]byte("mem"))
entries, err := indirect.ReadEntries([]byte("mem"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), len(exp); got != exp && exp != 0 {
t.Fatalf("entries length mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_MaxBlocks(t *testing.T) {
index := NewIndexWriter()
for i := 0; i < 1<<16; i++ {
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20)
}
if _, err := index.MarshalBinary(); err == nil {
t.Fatalf("expected max block count error. got nil")
} else {
println(err.Error())
}
}
func TestIndirectIndex_Type(t *testing.T) {
index := NewIndexWriter()
index.Add([]byte("cpu"), BlockInteger, 0, 1, 10, 20)
b, err := index.MarshalBinary()
if err != nil {
t.Fatal(err)
}
ind := NewIndirectIndex()
if err := ind.UnmarshalBinary(b); err != nil {
fatal(t, "unmarshal binary", err)
}
typ, err := ind.Type([]byte("cpu"))
if err != nil {
fatal(t, "reading type", err)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("type mismatch: got %v, exp %v", got, exp)
}
}
func TestDirectIndex_KeyCount(t *testing.T) {
index := NewIndexWriter()
index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20)
@ -1195,280 +1147,7 @@ func TestDirectIndex_KeyCount(t *testing.T) {
}
}
func TestBlockIterator_Single(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
iter := r.BlockIterator()
for iter.Next() {
key, minTime, maxTime, typ, _, buf, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if got, exp := string(key), "cpu"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := minTime, int64(0); got != exp {
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
}
if got, exp := maxTime, int64(0); got != exp {
t.Fatalf("max time mismatch: got %v, exp %v", got, exp)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
}
if got, exp := count, len(values); got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}
func TestBlockIterator_Tombstone(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.Write([]byte("mem"), values); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
iter := r.BlockIterator()
for iter.Next() {
// Trigger a delete during iteration. This should cause an error condition for
// the BlockIterator
r.Delete([][]byte{[]byte("cpu")})
}
if iter.Err() == nil {
t.Fatalf("expected error: got nil")
}
}
func TestBlockIterator_MultipleBlocks(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values1 := []Value{NewValue(0, int64(1))}
if err := w.Write([]byte("cpu"), values1); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
values2 := []Value{NewValue(1, int64(2))}
if err := w.Write([]byte("cpu"), values2); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
expData := []Values{values1, values2}
iter := r.BlockIterator()
var i int
for iter.Next() {
key, minTime, maxTime, typ, _, buf, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if got, exp := string(key), "cpu"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
if got, exp := minTime, expData[i][0].UnixNano(); got != exp {
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
}
if got, exp := maxTime, expData[i][0].UnixNano(); got != exp {
t.Fatalf("max time mismatch: got %v, exp %v", got, exp)
}
if got, exp := typ, BlockInteger; got != exp {
t.Fatalf("block type mismatch: got %v, exp %v", got, exp)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
i++
}
if got, exp := count, 2; got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}
func TestBlockIterator_Sorted(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
w, err := NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}
values := map[string][]Value{
"mem": []Value{NewValue(0, int64(1))},
"cycles": []Value{NewValue(0, ^uint64(0))},
"cpu": []Value{NewValue(1, float64(2))},
"disk": []Value{NewValue(1, true)},
"load": []Value{NewValue(1, "string")},
}
keys := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write([]byte(k), values[k]); err != nil {
t.Fatalf("unexpected error writing: %v", err)
}
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
fd, err := os.Open(f.Name())
if err != nil {
t.Fatalf("unexpected error opening: %v", err)
}
r, err := NewTSMReader(fd)
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}
var count int
iter := r.BlockIterator()
var lastKey string
for iter.Next() {
key, _, _, _, _, buf, err := iter.Read()
if string(key) < lastKey {
t.Fatalf("keys not sorted: got %v, last %v", key, lastKey)
}
lastKey = string(key)
if err != nil {
t.Fatalf("unexpected error creating iterator: %v", err)
}
if len(buf) == 0 {
t.Fatalf("buf length = 0")
}
count++
}
if got, exp := count, len(values); got != exp {
t.Fatalf("value count mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
func TestTSMReader_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
f := mustTempFile(dir)
@ -1503,6 +1182,7 @@ func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
defer r.Close()
}
func TestCompacted_NotFull(t *testing.T) {
dir := mustTempDir()
defer os.RemoveAll(dir)
@ -1871,316 +1551,3 @@ func TestTSMReader_References(t *testing.T) {
t.Fatalf("unexpected error removing reader: %v", err)
}
}
//
// indirectIndex benchmarks
//
const (
indexKeyCount = 500000
indexBlockCount = 100
)
type indexCacheInfo struct {
index *indirectIndex
offsets []uint32
prefixes []prefixEntry
allKeys [][]byte
bytes []byte
}
func (i *indexCacheInfo) reset() {
i.index.ro.offsets = append([]uint32(nil), i.offsets...)
i.index.ro.prefixes = append([]prefixEntry(nil), i.prefixes...)
i.index.tombstones = make(map[uint32][]TimeRange)
i.index.prefixTombstones = newPrefixTree()
resetFaults(i.index)
}
var (
indexCache = map[string]*indexCacheInfo{}
indexSizes = map[string][2]int{
"large": {500000, 100},
"med": {1000, 1000},
"small": {5000, 2},
}
)
func getFaults(indirect *indirectIndex) int64 {
return int64(atomic.LoadUint64(&indirect.b.faults))
}
func resetFaults(indirect *indirectIndex) {
if indirect != nil {
indirect.b = faultBuffer{b: indirect.b.b}
}
}
func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) {
info, ok := indexCache[name]
if ok {
info.reset()
return info.index, info
}
info = new(indexCacheInfo)
sizes, ok := indexSizes[name]
if !ok {
sizes = [2]int{indexKeyCount, indexBlockCount}
}
keys, blocks := sizes[0], sizes[1]
writer := NewIndexWriter()
// add a ballast key that starts at -1 so that we don't trigger optimizations
// when deleting [0, MaxInt]
writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100)
for i := 0; i < keys; i++ {
key := []byte(fmt.Sprintf("cpu-%08d", i))
info.allKeys = append(info.allKeys, key)
for j := 0; j < blocks; j++ {
writer.Add(key, BlockFloat64, 0, 100, 10, 100)
}
}
var err error
info.bytes, err = writer.MarshalBinary()
if err != nil {
tb.Fatalf("unexpected error marshaling index: %v", err)
}
info.index = NewIndirectIndex()
if err = info.index.UnmarshalBinary(info.bytes); err != nil {
tb.Fatalf("unexpected error unmarshaling index: %v", err)
}
info.offsets = append([]uint32(nil), info.index.ro.offsets...)
info.prefixes = append([]prefixEntry(nil), info.index.ro.prefixes...)
indexCache[name] = info
return info.index, info
}
func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
indirect, info := getIndex(b, "large")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := indirect.UnmarshalBinary(info.bytes); err != nil {
b.Fatalf("unexpected error unmarshaling index: %v", err)
}
}
}
func BenchmarkIndirectIndex_Entries(b *testing.B) {
indirect, _ := getIndex(b, "med")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.ReadEntries([]byte("cpu-00000001"), nil)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
var entries []IndexEntry
indirect, _ := getIndex(b, "med")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkBlockIterator_Next(b *testing.B) {
indirect, _ := getIndex(b, "med")
r := TSMReader{index: indirect}
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
bi := r.BlockIterator()
for bi.Next() {
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) {
indirect, _ := getIndex(b, "large")
keys := [][]byte{[]byte("cpu-00999999")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.DeleteRange(keys, 10, 50)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.DeleteRange(info.allKeys[i:n], 10, 50)
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.DeleteRange(info.allKeys[i:n], 0, math.MaxInt64)
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_Delete(b *testing.B) {
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var info *indexCacheInfo
indirect, info = getIndex(b, name)
b.StartTimer()
for i := 0; i < len(info.allKeys); i += 4096 {
n := i + 4096
if n > len(info.allKeys) {
n = len(info.allKeys)
}
indirect.Delete(info.allKeys[i:n])
}
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) {
prefix := []byte("cpu-")
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 10, 50)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}
func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) {
prefix := []byte("cpu-")
run := func(b *testing.B, name string) {
indirect, _ := getIndex(b, name)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
indirect, _ = getIndex(b, name)
b.StartTimer()
indirect.DeletePrefix(prefix, 0, math.MaxInt64)
}
if faultBufferEnabled {
b.SetBytes(getFaults(indirect) * 4096)
b.Log("recorded faults:", getFaults(indirect))
}
}
b.Run("Large", func(b *testing.B) { run(b, "large") })
b.Run("Small", func(b *testing.B) { run(b, "small") })
}

View File

@ -15,6 +15,10 @@ func (t TimeRange) Less(o TimeRange) bool {
// timeRangesCoverEntries returns true if the time ranges fully cover the entries.
func timeRangesCoverEntries(merger timeRangeMerger, entries []IndexEntry) (covers bool) {
if len(entries) == 0 {
return true
}
mustCover := entries[0].MinTime
ts, ok := merger.Pop()

View File

@ -0,0 +1,100 @@
package tsm1
import (
"reflect"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestTimeRangeMerger(t *testing.T) {
ranges := func(ns ...int64) (out []TimeRange) {
for _, n := range ns {
out = append(out, TimeRange{n, n})
}
return out
}
check := func(t *testing.T, exp []TimeRange, merger timeRangeMerger) {
t.Helper()
var got []TimeRange
for {
tr, ok := merger.Pop()
if !ok {
break
}
got = append(got, tr)
}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("bad merge:\n%v", cmp.Diff(got, exp))
}
}
check(t, ranges(0, 1, 2, 3, 4, 5, 6), timeRangeMerger{
sorted: ranges(0, 2, 6),
unsorted: ranges(1, 3, 5),
single: TimeRange{4, 4},
})
check(t, ranges(0, 1, 2), timeRangeMerger{
sorted: ranges(0, 1, 2),
used: true,
})
check(t, ranges(0, 1, 2), timeRangeMerger{
unsorted: ranges(0, 1, 2),
used: true,
})
check(t, ranges(0), timeRangeMerger{
single: TimeRange{0, 0},
})
check(t, ranges(0, 0, 0), timeRangeMerger{
sorted: ranges(0),
unsorted: ranges(0),
single: TimeRange{0, 0},
})
}
func TestTimeRangeCoverEntries(t *testing.T) {
ranges := func(ns ...int64) (out []TimeRange) {
for i := 0; i+1 < len(ns); i += 2 {
out = append(out, TimeRange{ns[i], ns[i+1]})
}
return out
}
entries := func(ns ...int64) (out []IndexEntry) {
for i := 0; i+1 < len(ns); i += 2 {
out = append(out, IndexEntry{MinTime: ns[i], MaxTime: ns[i+1]})
}
return out
}
check := func(t *testing.T, ranges []TimeRange, entries []IndexEntry, covers bool) {
t.Helper()
sort.Slice(ranges, func(i, j int) bool { return ranges[i].Less(ranges[j]) })
got := timeRangesCoverEntries(timeRangeMerger{sorted: ranges, used: true}, entries)
if got != covers {
t.Fatalf("bad covers:\nranges: %v\nentries: %v\ncovers: %v\ngot: %v",
ranges, entries, covers, got)
}
}
check(t, ranges(0, 0, 1, 1, 2, 2), entries(0, 0, 1, 1, 2, 2), true)
check(t, ranges(0, 0, 1, 1, 2, 2), entries(0, 0, 2, 2), true)
check(t, ranges(0, 0, 1, 1, 2, 2), entries(3, 3), false)
check(t, ranges(0, 0, 1, 1, 2, 2), entries(-1, -1), false)
check(t, ranges(0, 10), entries(1, 1, 2, 2), true)
check(t, ranges(0, 1, 1, 2), entries(0, 0, 1, 1, 2, 2), true)
check(t, ranges(0, 10), entries(0, 0, 2, 2), true)
check(t, ranges(0, 1, 1, 2), entries(0, 0, 2, 2), true)
check(t, ranges(0, 1, 4, 5), entries(0, 0, 5, 5), true)
check(t, ranges(), entries(), true)
check(t, ranges(), entries(0, 0), false)
check(t, ranges(0, 0), entries(), true)
}