diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index 08aabde2c0..fa8b7de095 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -2,16 +2,10 @@ package tsm1 import ( "bufio" - "bytes" "encoding/binary" - "errors" "fmt" "io" - "math" - "math/rand" "os" - "runtime" - "sort" "sync" "sync/atomic" @@ -51,76 +45,6 @@ type TSMReader struct { deleteMu sync.Mutex } -// TSMIndex represent the index section of a TSM file. The index records all -// blocks, their locations, sizes, min and max times. -type TSMIndex interface { - // Delete removes the given keys from the index. - Delete(keys [][]byte) - - // DeleteRange removes the given keys with data between minTime and maxTime from the index. - DeleteRange(keys [][]byte, minTime, maxTime int64) - - // DeletePrefix removes keys that begin with the given prefix with data between minTime and - // maxTime from the index. - DeletePrefix(prefix []byte, minTime, maxTime int64) - - // ContainsKey returns true if the given key may exist in the index. This func is faster than - // Contains but, may return false positives. - ContainsKey(key []byte) bool - - // Contains return true if the given key exists in the index. - Contains(key []byte) bool - - // ContainsValue returns true if key and time might exist in this file. This function could - // return true even though the actual point does not exists. For example, the key may - // exist in this file, but not have a point exactly at time t. - ContainsValue(key []byte, timestamp int64) bool - - // ReadEntries reads the index entries for key into entries. - ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) - - // Entry returns the index entry for the specified key and timestamp. If no entry - // matches the key and timestamp, nil is returned. - Entry(key []byte, timestamp int64) *IndexEntry - - // KeyCount returns the count of unique keys in the index. - KeyCount() int - - // Iterator returns an iterator over the keys starting at the provided key. You must - // call Next before calling any of the accessors. - Iterator([]byte) *TSMIndexIterator - - // OverlapsTimeRange returns true if the time range of the file intersect min and max. - OverlapsTimeRange(min, max int64) bool - - // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. - OverlapsKeyRange(min, max []byte) bool - - // Size returns the size of the current index in bytes. - Size() uint32 - - // TimeRange returns the min and max time across all keys in the file. - TimeRange() (int64, int64) - - // TombstoneRange returns ranges of time that are deleted for the given key. - TombstoneRange(key []byte, buf []TimeRange) []TimeRange - - // KeyRange returns the min and max keys in the file. - KeyRange() ([]byte, []byte) - - // Type returns the block type of the values stored for the key. Returns one of - // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, - // an error is returned. - Type(key []byte) (byte, error) - - // UnmarshalBinary populates an index from an encoded byte slice - // representation of an index. - UnmarshalBinary(b []byte) error - - // Close closes the index and releases any resources. - Close() error -} - type tsmReaderOption func(*TSMReader) // WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. @@ -618,653 +542,6 @@ func (a BatchDeleters) Rollback() error { return err } -// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This -// implementation can be used for indexes that may be MMAPed into memory. -type indirectIndex struct { - mu sync.RWMutex - logger *zap.Logger - - // indirectIndex works a follows. Assuming we have an index structure in memory as - // the diagram below: - // - // ┌────────────────────────────────────────────────────────────────────┐ - // │ Index │ - // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ - // │0│ │62│ │145│ - // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ - // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ - // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ - // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ - - // We would build an `offsets` slices where each element pointers to the byte location - // for the first key in the index slice. - - // ┌────────────────────────────────────────────────────────────────────┐ - // │ Offsets │ - // ├────┬────┬────┬─────────────────────────────────────────────────────┘ - // │ 0 │ 62 │145 │ - // └────┴────┴────┘ - - // Using this offset slice we can find `Key 2` by doing a binary search - // over the offsets slice. Instead of comparing the value in the offsets - // (e.g. `62`), we use that as an index into the underlying index to - // retrieve the key at postion `62` and perform our comparisons with that. - - // When we have identified the correct position in the index for a given - // key, we could perform another binary search or a linear scan. This - // should be fast as well since each index entry is 28 bytes and all - // contiguous in memory. The current implementation uses a linear scan since the - // number of block entries is expected to be < 100 per key. - - // b is the underlying index byte slice. This could be a copy on the heap or an MMAP - // slice reference - b faultBuffer - - // ro contains the positions in b for each key as well as the first bytes of each key - // to avoid disk seeks. - ro readerOffsets - - // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the - // file - minKey, maxKey []byte - - // minTime, maxTime are the minimum and maximum times contained in the file across all - // series. - minTime, maxTime int64 - - // tombstones contains only the tombstoned keys with subset of time values deleted. An - // entry would exist here if a subset of the points for a key were deleted and the file - // had not be re-compacted to remove the points on disk. - tombstones map[uint32][]TimeRange - - // prefixTombstones contains the tombestoned keys with a subset of the values deleted that - // all share the same prefix. - prefixTombstones *prefixTree -} - -// NewIndirectIndex returns a new indirect index. -func NewIndirectIndex() *indirectIndex { - return &indirectIndex{ - tombstones: make(map[uint32][]TimeRange), - prefixTombstones: newPrefixTree(), - } -} - -// ContainsKey returns true of key may exist in this index. -func (d *indirectIndex) ContainsKey(key []byte) bool { - return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 -} - -// ReadEntries returns all index entries for a key. -func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - iter := d.ro.Iterator() - exact, _ := iter.Seek(key, &d.b) - if !exact { - return nil, nil - } - - entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries) - if err != nil { - return nil, err - } - - return entries, nil -} - -// Entry returns the index entry for the specified key and timestamp. If no entry -// matches the key an timestamp, nil is returned. -func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { - entries, err := d.ReadEntries(key, nil) - if err != nil { - d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key))) - return nil - } - for _, entry := range entries { - if entry.Contains(timestamp) { - return &entry - } - } - return nil -} - -// KeyCount returns the count of unique keys in the index. -func (d *indirectIndex) KeyCount() int { - d.mu.RLock() - n := len(d.ro.offsets) - d.mu.RUnlock() - return n -} - -// Iterator returns an iterator over the keys starting at the provided key. You must -// call Next before calling any of the accessors. -func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator { - d.mu.RLock() - iter := d.ro.Iterator() - _, ok := iter.Seek(key, &d.b) - ti := &TSMIndexIterator{ - d: d, - n: int(len(d.ro.offsets)), - b: &d.b, - iter: &iter, - first: true, - ok: ok, - } - d.mu.RUnlock() - - return ti -} - -// Delete removes the given keys from the index. -func (d *indirectIndex) Delete(keys [][]byte) { - if len(keys) == 0 { - return - } - - d.mu.RLock() - iter := d.ro.Iterator() - for _, key := range keys { - if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) { - if exact, _ := iter.Seek(key, &d.b); !exact { - continue - } - } - - delete(d.tombstones, iter.Offset()) - iter.Delete() - } - d.mu.RUnlock() - - if !iter.HasDeletes() { - return - } - - d.mu.Lock() - iter.Done() - d.mu.Unlock() -} - -// insertTimeRange adds a time range described by the minTime and maxTime into ts. -func insertTimeRange(ts []TimeRange, minTime, maxTime int64) []TimeRange { - n := sort.Search(len(ts), func(i int) bool { - if ts[i].Min == minTime { - return ts[i].Max >= maxTime - } - return ts[i].Min > minTime - }) - - ts = append(ts, TimeRange{}) - copy(ts[n+1:], ts[n:]) - ts[n] = TimeRange{Min: minTime, Max: maxTime} - return ts -} - -// pendingTombstone is a type that describes a pending insertion of a tombstone. -type pendingTombstone struct { - Key int - Index int - Offset uint32 - EntryOffset uint32 - Tombstones int -} - -// coversEntries checks if all of the stored tombstones including one for minTime and maxTime cover -// all of the index entries. It mutates the entries slice to do the work, so be sure to make a copy -// if you must. -func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange, - entries []IndexEntry, minTime, maxTime int64) ([]TimeRange, bool) { - - // grab the tombstones from the prefixes. these come out unsorted, so we sort - // them and place them in the merger section named unsorted. - buf = d.prefixTombstones.Search(key, buf[:0]) - if len(buf) > 1 { - sort.Slice(buf, func(i, j int) bool { return buf[i].Less(buf[j]) }) - } - - // create the merger with the other tombstone entries: the ones for the specific - // key and the one we have proposed to add. - merger := timeRangeMerger{ - sorted: d.tombstones[offset], - unsorted: buf, - single: TimeRange{Min: minTime, Max: maxTime}, - used: false, - } - - return buf, timeRangesCoverEntries(merger, entries) -} - -// DeleteRange removes the given keys with data between minTime and maxTime from the index. -func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { - // If we're deleting everything, we won't need to worry about partial deletes. - if minTime <= d.minTime && maxTime >= d.maxTime { - d.Delete(keys) - return - } - - // Is the range passed in outside of the time range for the file? - if minTime > d.maxTime || maxTime < d.minTime { - return - } - - // General outline: - // Under the read lock, determine the set of actions we need to - // take and on what keys to take them. Then, under the write - // lock, perform those actions. We keep track of some state - // during the read lock to make double checking under the - // write lock cheap. - - d.mu.RLock() - iter := d.ro.Iterator() - var ( - ok bool - trbuf []TimeRange - entries []IndexEntry - pending []pendingTombstone - err error - ) - - for i, key := range keys { - if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) { - if exact, _ := iter.Seek(key, &d.b); !exact { - continue - } - } - - entryOffset := iter.EntryOffset(&d.b) - entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries) - if err != nil { - // If we have an error reading the entries for a key, we should just pretend - // the whole key is deleted. Maybe a better idea is to report this up somehow - // but that's for another time. - iter.Delete() - continue - } - - // Is the time range passed outside of the time range we have stored for this key? - min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime - if minTime > max || maxTime < min { - continue - } - - // Does the range passed in cover every value for the key? - if minTime <= min && maxTime >= max { - iter.Delete() - continue - } - - // Does adding the minTime and maxTime cover the entries? - offset := iter.Offset() - trbuf, ok = d.coversEntries(offset, key, trbuf, entries, minTime, maxTime) - if ok { - iter.Delete() - continue - } - - // Save that we should add a tombstone for this key, and how many tombstones - // already existed to avoid double checks. - pending = append(pending, pendingTombstone{ - Key: i, - Index: iter.Index(), - Offset: offset, - EntryOffset: entryOffset, - Tombstones: len(d.tombstones[offset]), - }) - } - - d.mu.RUnlock() - - if len(pending) == 0 && !iter.HasDeletes() { - return - } - - d.mu.Lock() - defer d.mu.Unlock() - - for _, p := range pending { - // Check the existing tombstones. If the length did not/ change, then we know - // that we don't need to double check coverage, since we only ever increase the - // number of tombstones for a key. - if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) { - d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) - continue - } - - // Since the length changed, we have to do the expensive overlap check again. - // We re-read the entries again under the write lock because this should be - // rare and only during concurrent deletes to the same key. We could make - // a copy of the entries before getting here, but that penalizes the common - // no-concurrent case. - entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries) - if err != nil { - // If we have an error reading the entries for a key, we should just pretend - // the whole key is deleted. Maybe a better idea is to report this up somehow - // but that's for another time. - delete(d.tombstones, p.Offset) - iter.SetIndex(p.Index) - if iter.Offset() == p.Offset { - iter.Delete() - } - continue - } - - trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime) - if ok { - delete(d.tombstones, p.Offset) - iter.SetIndex(p.Index) - if iter.Offset() == p.Offset { - iter.Delete() - } - continue - } - - // Append the TimeRange into the tombstones. - trs := d.tombstones[p.Offset] - d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) - } - - iter.Done() -} - -func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) { - // If we're deleting everything, we won't need to worry about partial deletes. - partial := !(minTime <= d.minTime && maxTime >= d.maxTime) - - // Is the range passed in outside of the time range for the file? - if minTime > d.maxTime || maxTime < d.minTime { - return - } - - d.mu.RLock() - var ( - ok bool - trbuf []TimeRange - entries []IndexEntry - err error - mustTrack bool - ) - - // seek to the earliest key with the prefix, and start iterating. we can't call - // next until after we've checked the key, so keep a "first" flag. - first := true - iter := d.ro.Iterator() - iter.Seek(prefix, &d.b) - for { - if (!first && !iter.Next()) || !bytes.HasPrefix(iter.Key(&d.b), prefix) { - break - } - first = false - - // if we're not doing a partial delete, we don't need to read the entries and - // can just delete the key and move on. - if !partial { - iter.Delete() - continue - } - - entryOffset := iter.EntryOffset(&d.b) - entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries) - if err != nil { - // If we have an error reading the entries for a key, we should just pretend - // the whole key is deleted. Maybe a better idea is to report this up somehow - // but that's for another time. - iter.Delete() - continue - } - - // Is the time range passed outside the range we have stored for the key? - min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime - if minTime > max || maxTime < min { - continue - } - - // Does the range passed cover every value for the key? - if minTime <= min && maxTime >= max { - iter.Delete() - continue - } - - // Does adding the minTime and maxTime cover the entries? - trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime) - if ok { - iter.Delete() - continue - } - - // Otherwise, we have to track it in the prefix tombstones list. - mustTrack = true - } - d.mu.RUnlock() - - // Check and abort if nothing needs to be done. - if !mustTrack && !iter.HasDeletes() { - return - } - - d.mu.Lock() - defer d.mu.Unlock() - - if mustTrack { - d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime}) - } - - if iter.HasDeletes() { - iter.Done() - } -} - -// TombstoneRange returns ranges of time that are deleted for the given key. -func (d *indirectIndex) TombstoneRange(key []byte, buf []TimeRange) []TimeRange { - d.mu.RLock() - rs := d.prefixTombstones.Search(key, buf[:0]) - iter := d.ro.Iterator() - exact, _ := iter.Seek(key, &d.b) - if exact { - rs = append(rs, d.tombstones[iter.Offset()]...) - } - d.mu.RUnlock() - return rs -} - -// Contains return true if the given key exists in the index. -func (d *indirectIndex) Contains(key []byte) bool { - d.mu.RLock() - iter := d.ro.Iterator() - exact, _ := iter.Seek(key, &d.b) - d.mu.RUnlock() - return exact -} - -// ContainsValue returns true if key and time might exist in this file. -func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { - d.mu.RLock() - defer d.mu.RUnlock() - - iter := d.ro.Iterator() - exact, _ := iter.Seek(key, &d.b) - if !exact { - return false - } - - for _, t := range d.tombstones[iter.Offset()] { - if t.Min <= timestamp && timestamp <= t.Max { - return false - } - } - - if d.prefixTombstones.checkOverlap(key, timestamp) { - return false - } - - entries, err := d.ReadEntries(key, nil) - if err != nil { - d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key))) - return false - } - - for _, entry := range entries { - if entry.Contains(timestamp) { - return true - } - } - - return false -} - -// Type returns the block type of the values stored for the key. -func (d *indirectIndex) Type(key []byte) (byte, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - iter := d.ro.Iterator() - exact, _ := iter.Seek(key, &d.b) - if !exact { - return 0, errors.New("key does not exist") - } - - return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil -} - -// OverlapsTimeRange returns true if the time range of the file intersect min and max. -func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { - return d.minTime <= max && d.maxTime >= min -} - -// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. -func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { - return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 -} - -// KeyRange returns the min and max keys in the index. -func (d *indirectIndex) KeyRange() ([]byte, []byte) { - return d.minKey, d.maxKey -} - -// TimeRange returns the min and max time across all keys in the index. -func (d *indirectIndex) TimeRange() (int64, int64) { - return d.minTime, d.maxTime -} - -// MarshalBinary returns a byte slice encoded version of the index. -func (d *indirectIndex) MarshalBinary() ([]byte, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - return d.b.b, nil -} - -// UnmarshalBinary populates an index from an encoded byte slice -// representation of an index. -func (d *indirectIndex) UnmarshalBinary(b []byte) error { - d.mu.Lock() - defer d.mu.Unlock() - - // Keep a reference to the actual index bytes - d.b = faultBuffer{b: b} - if len(b) == 0 { - return nil - } - - // make sure a uint32 is sufficient to store any offset into the index. - if uint64(len(b)) != uint64(uint32(len(b))) { - return fmt.Errorf("indirectIndex: too large to open") - } - - var minTime, maxTime int64 = math.MaxInt64, math.MinInt64 - - // To create our "indirect" index, we need to find the location of all the keys in - // the raw byte slice. The keys are listed once each (in sorted order). Following - // each key is a time ordered list of index entry blocks for that key. The loop below - // basically skips across the slice keeping track of the counter when we are at a key - // field. - var i uint32 - var ro readerOffsets - - iMax := uint32(len(b)) - if iMax > math.MaxInt32 { - return fmt.Errorf("indirectIndex: too large to store offsets") - } - - for i < iMax { - offset := i // save for when we add to the data structure - - // Skip to the start of the values - // key length value (2) + type (1) + length of key - if i+2 >= iMax { - return fmt.Errorf("indirectIndex: not enough data for key length value") - } - keyLength := uint32(binary.BigEndian.Uint16(b[i : i+2])) - i += 2 - - if i+keyLength+indexTypeSize >= iMax { - return fmt.Errorf("indirectIndex: not enough data for key and type") - } - ro.AddKey(offset, b[i:i+keyLength]) - i += keyLength + indexTypeSize - - // count of index entries - if i+indexCountSize >= iMax { - return fmt.Errorf("indirectIndex: not enough data for index entries count") - } - count := uint32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) - if count == 0 { - return fmt.Errorf("indirectIndex: key exits with no entries") - } - i += indexCountSize - - // Find the min time for the block - if i+8 >= iMax { - return fmt.Errorf("indirectIndex: not enough data for min time") - } - minT := int64(binary.BigEndian.Uint64(b[i : i+8])) - if minT < minTime { - minTime = minT - } - - i += (count - 1) * indexEntrySize - - // Find the max time for the block - if i+16 >= iMax { - return fmt.Errorf("indirectIndex: not enough data for max time") - } - maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) - if maxT > maxTime { - maxTime = maxT - } - - i += indexEntrySize - } - - ro.Done() - - firstOfs := ro.offsets[0] - key := readKey(b[firstOfs:]) - d.minKey = key - - lastOfs := ro.offsets[len(ro.offsets)-1] - key = readKey(b[lastOfs:]) - d.maxKey = key - - d.minTime = minTime - d.maxTime = maxTime - d.ro = ro - - return nil -} - -// Size returns the size of the current index in bytes. -func (d *indirectIndex) Size() uint32 { - d.mu.RLock() - defer d.mu.RUnlock() - - return d.b.len() -} - -func (d *indirectIndex) Close() error { - return nil -} - // mmapAccess is mmap based block accessor. It access blocks through an // MMAP file interface. type mmapAccessor struct { @@ -1563,94 +840,3 @@ func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { return total, nil } - -func readKey(b []byte) (key []byte) { - size := binary.BigEndian.Uint16(b[:2]) - return b[2 : 2+size] -} - -func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) { - if len(b) < indexTypeSize+indexCountSize { - return entries[:0], errors.New("readEntries: data too short for headers") - } - - count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) - if cap(entries) < count { - entries = make([]IndexEntry, count) - } else { - entries = entries[:count] - } - b = b[indexTypeSize+indexCountSize:] - - for i := range entries { - if err := entries[i].UnmarshalBinary(b); err != nil { - return entries[:0], err - } - b = b[indexEntrySize:] - } - - return entries, nil -} - -// readEntriesTimes is a helper function to read entries at the provided buffer but -// only reading in the min and max times. -func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) { - if len(b) < indexTypeSize+indexCountSize { - return entries[:0], errors.New("readEntries: data too short for headers") - } - - count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) - if cap(entries) < count { - entries = make([]IndexEntry, count) - } else { - entries = entries[:count] - } - b = b[indexTypeSize+indexCountSize:] - - for i := range entries { - if len(b) < indexEntrySize { - return entries[:0], errors.New("readEntries: stream too short for entry") - } - entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8])) - entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16])) - b = b[indexEntrySize:] - } - - return entries, nil -} - -const ( - faultBufferEnabled = false - faultBufferSampleStacks = false -) - -type faultBuffer struct { - faults uint64 - page uint64 - b []byte - samples [][]uintptr -} - -func (m *faultBuffer) len() uint32 { return uint32(len(m.b)) } - -func (m *faultBuffer) access(start, length uint32) []byte { - if faultBufferEnabled { - current, page := int64(atomic.LoadUint64(&m.page)), int64(start)/4096 - if page != current && page != current+1 { // assume kernel precaches next page - atomic.AddUint64(&m.faults, 1) - if faultBufferSampleStacks && rand.Intn(1000) == 0 { - var stack [256]uintptr - n := runtime.Callers(0, stack[:]) - m.samples = append(m.samples, stack[:n:n]) - } - } - atomic.StoreUint64(&m.page, uint64(page)) - } - - end := m.len() - if length > 0 { - end = start + length - } - - return m.b[start:end] -} diff --git a/tsdb/tsm1/reader_block_iterator_test.go b/tsdb/tsm1/reader_block_iterator_test.go new file mode 100644 index 0000000000..48893c4a7d --- /dev/null +++ b/tsdb/tsm1/reader_block_iterator_test.go @@ -0,0 +1,280 @@ +package tsm1 + +import ( + "os" + "sort" + "testing" +) + +func TestBlockIterator_Single(t *testing.T) { + dir := mustTempDir() + defer os.RemoveAll(dir) + f := mustTempFile(dir) + + w, err := NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := []Value{NewValue(0, int64(1))} + if err := w.Write([]byte("cpu"), values); err != nil { + t.Fatalf("unexpected error writing: %v", err) + + } + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + fd, err := os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error opening: %v", err) + } + + r, err := NewTSMReader(fd) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + var count int + iter := r.BlockIterator() + for iter.Next() { + key, minTime, maxTime, typ, _, buf, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error creating iterator: %v", err) + } + + if got, exp := string(key), "cpu"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := minTime, int64(0); got != exp { + t.Fatalf("min time mismatch: got %v, exp %v", got, exp) + } + + if got, exp := maxTime, int64(0); got != exp { + t.Fatalf("max time mismatch: got %v, exp %v", got, exp) + } + + if got, exp := typ, BlockInteger; got != exp { + t.Fatalf("block type mismatch: got %v, exp %v", got, exp) + } + + if len(buf) == 0 { + t.Fatalf("buf length = 0") + } + + count++ + } + + if got, exp := count, len(values); got != exp { + t.Fatalf("value count mismatch: got %v, exp %v", got, exp) + } +} + +func TestBlockIterator_Tombstone(t *testing.T) { + dir := mustTempDir() + defer os.RemoveAll(dir) + f := mustTempFile(dir) + + w, err := NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := []Value{NewValue(0, int64(1))} + if err := w.Write([]byte("cpu"), values); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.Write([]byte("mem"), values); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + fd, err := os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error opening: %v", err) + } + + r, err := NewTSMReader(fd) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + iter := r.BlockIterator() + for iter.Next() { + // Trigger a delete during iteration. This should cause an error condition for + // the BlockIterator + r.Delete([][]byte{[]byte("cpu")}) + } + + if iter.Err() == nil { + t.Fatalf("expected error: got nil") + } +} + +func TestBlockIterator_MultipleBlocks(t *testing.T) { + dir := mustTempDir() + defer os.RemoveAll(dir) + f := mustTempFile(dir) + + w, err := NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values1 := []Value{NewValue(0, int64(1))} + if err := w.Write([]byte("cpu"), values1); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + values2 := []Value{NewValue(1, int64(2))} + if err := w.Write([]byte("cpu"), values2); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + fd, err := os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error opening: %v", err) + } + + r, err := NewTSMReader(fd) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + var count int + expData := []Values{values1, values2} + iter := r.BlockIterator() + var i int + for iter.Next() { + key, minTime, maxTime, typ, _, buf, err := iter.Read() + + if err != nil { + t.Fatalf("unexpected error creating iterator: %v", err) + } + + if got, exp := string(key), "cpu"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := minTime, expData[i][0].UnixNano(); got != exp { + t.Fatalf("min time mismatch: got %v, exp %v", got, exp) + } + + if got, exp := maxTime, expData[i][0].UnixNano(); got != exp { + t.Fatalf("max time mismatch: got %v, exp %v", got, exp) + } + + if got, exp := typ, BlockInteger; got != exp { + t.Fatalf("block type mismatch: got %v, exp %v", got, exp) + } + + if len(buf) == 0 { + t.Fatalf("buf length = 0") + } + + count++ + i++ + } + + if got, exp := count, 2; got != exp { + t.Fatalf("value count mismatch: got %v, exp %v", got, exp) + } +} + +func TestBlockIterator_Sorted(t *testing.T) { + dir := mustTempDir() + defer os.RemoveAll(dir) + f := mustTempFile(dir) + + w, err := NewTSMWriter(f) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + values := map[string][]Value{ + "mem": []Value{NewValue(0, int64(1))}, + "cycles": []Value{NewValue(0, ^uint64(0))}, + "cpu": []Value{NewValue(1, float64(2))}, + "disk": []Value{NewValue(1, true)}, + "load": []Value{NewValue(1, "string")}, + } + + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + if err := w.Write([]byte(k), values[k]); err != nil { + t.Fatalf("unexpected error writing: %v", err) + + } + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + fd, err := os.Open(f.Name()) + if err != nil { + t.Fatalf("unexpected error opening: %v", err) + } + + r, err := NewTSMReader(fd) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + var count int + iter := r.BlockIterator() + var lastKey string + for iter.Next() { + key, _, _, _, _, buf, err := iter.Read() + + if string(key) < lastKey { + t.Fatalf("keys not sorted: got %v, last %v", key, lastKey) + } + + lastKey = string(key) + + if err != nil { + t.Fatalf("unexpected error creating iterator: %v", err) + } + + if len(buf) == 0 { + t.Fatalf("buf length = 0") + } + + count++ + } + + if got, exp := count, len(values); got != exp { + t.Fatalf("value count mismatch: got %v, exp %v", got, exp) + } +} diff --git a/tsdb/tsm1/reader_fault_buffer.go b/tsdb/tsm1/reader_fault_buffer.go new file mode 100644 index 0000000000..fa484b92a0 --- /dev/null +++ b/tsdb/tsm1/reader_fault_buffer.go @@ -0,0 +1,47 @@ +package tsm1 + +import ( + "math/rand" + "runtime" + "sync/atomic" +) + +// fault buffer is a by-default disabled helper to keep track of estimates of page faults +// during accesses. use the constants below to turn it on or off and benchmarks will report +// their estimates. + +const ( + faultBufferEnabled = false + faultBufferSampleStacks = false +) + +type faultBuffer struct { + faults uint64 + page uint64 + b []byte + samples [][]uintptr +} + +func (m *faultBuffer) len() uint32 { return uint32(len(m.b)) } + +func (m *faultBuffer) access(start, length uint32) []byte { + if faultBufferEnabled { + current, page := int64(atomic.LoadUint64(&m.page)), int64(start)/4096 + if page != current && page != current+1 { // assume kernel precaches next page + atomic.AddUint64(&m.faults, 1) + if faultBufferSampleStacks && rand.Intn(1000) == 0 { + var stack [256]uintptr + n := runtime.Callers(0, stack[:]) + m.samples = append(m.samples, stack[:n:n]) + } + } + atomic.StoreUint64(&m.page, uint64(page)) + } + + end := m.len() + if length > 0 { + end = start + length + } + + return m.b[start:end] +} diff --git a/tsdb/tsm1/reader_index.go b/tsdb/tsm1/reader_index.go new file mode 100644 index 0000000000..0a88fc0017 --- /dev/null +++ b/tsdb/tsm1/reader_index.go @@ -0,0 +1,785 @@ +package tsm1 + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math" + "sort" + "sync" + + "go.uber.org/zap" +) + +// TSMIndex represent the index section of a TSM file. The index records all +// blocks, their locations, sizes, min and max times. +type TSMIndex interface { + // Delete removes the given keys from the index. + Delete(keys [][]byte) + + // DeleteRange removes the given keys with data between minTime and maxTime from the index. + DeleteRange(keys [][]byte, minTime, maxTime int64) + + // DeletePrefix removes keys that begin with the given prefix with data between minTime and + // maxTime from the index. + DeletePrefix(prefix []byte, minTime, maxTime int64) + + // ContainsKey returns true if the given key may exist in the index. This func is faster than + // Contains but, may return false positives. + ContainsKey(key []byte) bool + + // Contains return true if the given key exists in the index. + Contains(key []byte) bool + + // ContainsValue returns true if key and time might exist in this file. This function could + // return true even though the actual point does not exists. For example, the key may + // exist in this file, but not have a point exactly at time t. + ContainsValue(key []byte, timestamp int64) bool + + // ReadEntries reads the index entries for key into entries. + ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) + + // Entry returns the index entry for the specified key and timestamp. If no entry + // matches the key and timestamp, nil is returned. + Entry(key []byte, timestamp int64) *IndexEntry + + // KeyCount returns the count of unique keys in the index. + KeyCount() int + + // Iterator returns an iterator over the keys starting at the provided key. You must + // call Next before calling any of the accessors. + Iterator([]byte) *TSMIndexIterator + + // OverlapsTimeRange returns true if the time range of the file intersect min and max. + OverlapsTimeRange(min, max int64) bool + + // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. + OverlapsKeyRange(min, max []byte) bool + + // Size returns the size of the current index in bytes. + Size() uint32 + + // TimeRange returns the min and max time across all keys in the file. + TimeRange() (int64, int64) + + // TombstoneRange returns ranges of time that are deleted for the given key. + TombstoneRange(key []byte, buf []TimeRange) []TimeRange + + // KeyRange returns the min and max keys in the file. + KeyRange() ([]byte, []byte) + + // Type returns the block type of the values stored for the key. Returns one of + // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, + // an error is returned. + Type(key []byte) (byte, error) + + // UnmarshalBinary populates an index from an encoded byte slice + // representation of an index. + UnmarshalBinary(b []byte) error + + // Close closes the index and releases any resources. + Close() error +} + +// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This +// implementation can be used for indexes that may be MMAPed into memory. +type indirectIndex struct { + mu sync.RWMutex + logger *zap.Logger + + // indirectIndex works a follows. Assuming we have an index structure in memory as + // the diagram below: + // + // ┌────────────────────────────────────────────────────────────────────┐ + // │ Index │ + // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ + // │0│ │62│ │145│ + // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ + // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ + // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ + // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ + + // We would build an `offsets` slices where each element pointers to the byte location + // for the first key in the index slice. + + // ┌────────────────────────────────────────────────────────────────────┐ + // │ Offsets │ + // ├────┬────┬────┬─────────────────────────────────────────────────────┘ + // │ 0 │ 62 │145 │ + // └────┴────┴────┘ + + // Using this offset slice we can find `Key 2` by doing a binary search + // over the offsets slice. Instead of comparing the value in the offsets + // (e.g. `62`), we use that as an index into the underlying index to + // retrieve the key at postion `62` and perform our comparisons with that. + + // When we have identified the correct position in the index for a given + // key, we could perform another binary search or a linear scan. This + // should be fast as well since each index entry is 28 bytes and all + // contiguous in memory. The current implementation uses a linear scan since the + // number of block entries is expected to be < 100 per key. + + // b is the underlying index byte slice. This could be a copy on the heap or an MMAP + // slice reference + b faultBuffer + + // ro contains the positions in b for each key as well as the first bytes of each key + // to avoid disk seeks. + ro readerOffsets + + // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the + // file + minKey, maxKey []byte + + // minTime, maxTime are the minimum and maximum times contained in the file across all + // series. + minTime, maxTime int64 + + // tombstones contains only the tombstoned keys with subset of time values deleted. An + // entry would exist here if a subset of the points for a key were deleted and the file + // had not be re-compacted to remove the points on disk. + tombstones map[uint32][]TimeRange + + // prefixTombstones contains the tombestoned keys with a subset of the values deleted that + // all share the same prefix. + prefixTombstones *prefixTree +} + +// NewIndirectIndex returns a new indirect index. +func NewIndirectIndex() *indirectIndex { + return &indirectIndex{ + tombstones: make(map[uint32][]TimeRange), + prefixTombstones: newPrefixTree(), + } +} + +// ContainsKey returns true of key may exist in this index. +func (d *indirectIndex) ContainsKey(key []byte) bool { + return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 +} + +// ReadEntries returns all index entries for a key. +func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { + return nil, nil + } + + entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries) + if err != nil { + return nil, err + } + + return entries, nil +} + +// Entry returns the index entry for the specified key and timestamp. If no entry +// matches the key an timestamp, nil is returned. +func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { + entries, err := d.ReadEntries(key, nil) + if err != nil { + d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key))) + return nil + } + for _, entry := range entries { + if entry.Contains(timestamp) { + return &entry + } + } + return nil +} + +// KeyCount returns the count of unique keys in the index. +func (d *indirectIndex) KeyCount() int { + d.mu.RLock() + n := len(d.ro.offsets) + d.mu.RUnlock() + return n +} + +// Iterator returns an iterator over the keys starting at the provided key. You must +// call Next before calling any of the accessors. +func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator { + d.mu.RLock() + iter := d.ro.Iterator() + _, ok := iter.Seek(key, &d.b) + ti := &TSMIndexIterator{ + d: d, + n: int(len(d.ro.offsets)), + b: &d.b, + iter: &iter, + first: true, + ok: ok, + } + d.mu.RUnlock() + + return ti +} + +// Delete removes the given keys from the index. +func (d *indirectIndex) Delete(keys [][]byte) { + if len(keys) == 0 { + return + } + + d.mu.RLock() + iter := d.ro.Iterator() + for _, key := range keys { + if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) { + if exact, _ := iter.Seek(key, &d.b); !exact { + continue + } + } + + delete(d.tombstones, iter.Offset()) + iter.Delete() + } + d.mu.RUnlock() + + if !iter.HasDeletes() { + return + } + + d.mu.Lock() + iter.Done() + d.mu.Unlock() +} + +// insertTimeRange adds a time range described by the minTime and maxTime into ts. +func insertTimeRange(ts []TimeRange, minTime, maxTime int64) []TimeRange { + n := sort.Search(len(ts), func(i int) bool { + if ts[i].Min == minTime { + return ts[i].Max >= maxTime + } + return ts[i].Min > minTime + }) + + ts = append(ts, TimeRange{}) + copy(ts[n+1:], ts[n:]) + ts[n] = TimeRange{Min: minTime, Max: maxTime} + return ts +} + +// pendingTombstone is a type that describes a pending insertion of a tombstone. +type pendingTombstone struct { + Key int + Index int + Offset uint32 + EntryOffset uint32 + Tombstones int +} + +// coversEntries checks if all of the stored tombstones including one for minTime and maxTime cover +// all of the index entries. It mutates the entries slice to do the work, so be sure to make a copy +// if you must. +func (d *indirectIndex) coversEntries(offset uint32, key []byte, buf []TimeRange, + entries []IndexEntry, minTime, maxTime int64) ([]TimeRange, bool) { + + // grab the tombstones from the prefixes. these come out unsorted, so we sort + // them and place them in the merger section named unsorted. + buf = d.prefixTombstones.Search(key, buf[:0]) + if len(buf) > 1 { + sort.Slice(buf, func(i, j int) bool { return buf[i].Less(buf[j]) }) + } + + // create the merger with the other tombstone entries: the ones for the specific + // key and the one we have proposed to add. + merger := timeRangeMerger{ + sorted: d.tombstones[offset], + unsorted: buf, + single: TimeRange{Min: minTime, Max: maxTime}, + used: false, + } + + return buf, timeRangesCoverEntries(merger, entries) +} + +// DeleteRange removes the given keys with data between minTime and maxTime from the index. +func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { + // If we're deleting everything, we won't need to worry about partial deletes. + if minTime <= d.minTime && maxTime >= d.maxTime { + d.Delete(keys) + return + } + + // Is the range passed in outside of the time range for the file? + if minTime > d.maxTime || maxTime < d.minTime { + return + } + + // General outline: + // Under the read lock, determine the set of actions we need to + // take and on what keys to take them. Then, under the write + // lock, perform those actions. We keep track of some state + // during the read lock to make double checking under the + // write lock cheap. + + d.mu.RLock() + iter := d.ro.Iterator() + var ( + ok bool + trbuf []TimeRange + entries []IndexEntry + pending []pendingTombstone + err error + ) + + for i, key := range keys { + if !iter.Next() || !bytes.Equal(iter.Key(&d.b), key) { + if exact, _ := iter.Seek(key, &d.b); !exact { + continue + } + } + + entryOffset := iter.EntryOffset(&d.b) + entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries) + if err != nil { + // If we have an error reading the entries for a key, we should just pretend + // the whole key is deleted. Maybe a better idea is to report this up somehow + // but that's for another time. + iter.Delete() + continue + } + + // Is the time range passed outside of the time range we have stored for this key? + min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime + if minTime > max || maxTime < min { + continue + } + + // Does the range passed in cover every value for the key? + if minTime <= min && maxTime >= max { + iter.Delete() + continue + } + + // Does adding the minTime and maxTime cover the entries? + offset := iter.Offset() + trbuf, ok = d.coversEntries(offset, key, trbuf, entries, minTime, maxTime) + if ok { + iter.Delete() + continue + } + + // Save that we should add a tombstone for this key, and how many tombstones + // already existed to avoid double checks. + pending = append(pending, pendingTombstone{ + Key: i, + Index: iter.Index(), + Offset: offset, + EntryOffset: entryOffset, + Tombstones: len(d.tombstones[offset]), + }) + } + + d.mu.RUnlock() + + if len(pending) == 0 && !iter.HasDeletes() { + return + } + + d.mu.Lock() + defer d.mu.Unlock() + + for _, p := range pending { + // Check the existing tombstones. If the length did not/ change, then we know + // that we don't need to double check coverage, since we only ever increase the + // number of tombstones for a key. + if trs := d.tombstones[p.Offset]; p.Tombstones == len(trs) { + d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) + continue + } + + // Since the length changed, we have to do the expensive overlap check again. + // We re-read the entries again under the write lock because this should be + // rare and only during concurrent deletes to the same key. We could make + // a copy of the entries before getting here, but that penalizes the common + // no-concurrent case. + entries, err = readEntriesTimes(d.b.access(p.EntryOffset, 0), entries) + if err != nil { + // If we have an error reading the entries for a key, we should just pretend + // the whole key is deleted. Maybe a better idea is to report this up somehow + // but that's for another time. + delete(d.tombstones, p.Offset) + iter.SetIndex(p.Index) + if iter.Offset() == p.Offset { + iter.Delete() + } + continue + } + + trbuf, ok = d.coversEntries(p.Offset, keys[p.Key], trbuf, entries, minTime, maxTime) + if ok { + delete(d.tombstones, p.Offset) + iter.SetIndex(p.Index) + if iter.Offset() == p.Offset { + iter.Delete() + } + continue + } + + // Append the TimeRange into the tombstones. + trs := d.tombstones[p.Offset] + d.tombstones[p.Offset] = insertTimeRange(trs, minTime, maxTime) + } + + iter.Done() +} + +func (d *indirectIndex) DeletePrefix(prefix []byte, minTime, maxTime int64) { + // If we're deleting everything, we won't need to worry about partial deletes. + partial := !(minTime <= d.minTime && maxTime >= d.maxTime) + + // Is the range passed in outside of the time range for the file? + if minTime > d.maxTime || maxTime < d.minTime { + return + } + + d.mu.RLock() + var ( + ok bool + trbuf []TimeRange + entries []IndexEntry + err error + mustTrack bool + ) + + // seek to the earliest key with the prefix, and start iterating. we can't call + // next until after we've checked the key, so keep a "first" flag. + first := true + iter := d.ro.Iterator() + iter.Seek(prefix, &d.b) + for { + if (!first && !iter.Next()) || !bytes.HasPrefix(iter.Key(&d.b), prefix) { + break + } + first = false + + // if we're not doing a partial delete, we don't need to read the entries and + // can just delete the key and move on. + if !partial { + iter.Delete() + continue + } + + entryOffset := iter.EntryOffset(&d.b) + entries, err = readEntriesTimes(d.b.access(entryOffset, 0), entries) + if err != nil { + // If we have an error reading the entries for a key, we should just pretend + // the whole key is deleted. Maybe a better idea is to report this up somehow + // but that's for another time. + iter.Delete() + continue + } + + // Is the time range passed outside the range we have stored for the key? + min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime + if minTime > max || maxTime < min { + continue + } + + // Does the range passed cover every value for the key? + if minTime <= min && maxTime >= max { + iter.Delete() + continue + } + + // Does adding the minTime and maxTime cover the entries? + trbuf, ok = d.coversEntries(iter.Offset(), iter.Key(&d.b), trbuf, entries, minTime, maxTime) + if ok { + iter.Delete() + continue + } + + // Otherwise, we have to track it in the prefix tombstones list. + mustTrack = true + } + d.mu.RUnlock() + + // Check and abort if nothing needs to be done. + if !mustTrack && !iter.HasDeletes() { + return + } + + d.mu.Lock() + defer d.mu.Unlock() + + if mustTrack { + d.prefixTombstones.Append(prefix, TimeRange{Min: minTime, Max: maxTime}) + } + + if iter.HasDeletes() { + iter.Done() + } +} + +// TombstoneRange returns ranges of time that are deleted for the given key. +func (d *indirectIndex) TombstoneRange(key []byte, buf []TimeRange) []TimeRange { + d.mu.RLock() + rs := d.prefixTombstones.Search(key, buf[:0]) + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if exact { + rs = append(rs, d.tombstones[iter.Offset()]...) + } + d.mu.RUnlock() + return rs +} + +// Contains return true if the given key exists in the index. +func (d *indirectIndex) Contains(key []byte) bool { + d.mu.RLock() + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + d.mu.RUnlock() + return exact +} + +// ContainsValue returns true if key and time might exist in this file. +func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { + d.mu.RLock() + defer d.mu.RUnlock() + + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { + return false + } + + for _, t := range d.tombstones[iter.Offset()] { + if t.Min <= timestamp && timestamp <= t.Max { + return false + } + } + + if d.prefixTombstones.checkOverlap(key, timestamp) { + return false + } + + entries, err := d.ReadEntries(key, nil) + if err != nil { + d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key))) + return false + } + + for _, entry := range entries { + if entry.Contains(timestamp) { + return true + } + } + + return false +} + +// Type returns the block type of the values stored for the key. +func (d *indirectIndex) Type(key []byte) (byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { + return 0, errors.New("key does not exist") + } + + return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil +} + +// OverlapsTimeRange returns true if the time range of the file intersect min and max. +func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { + return d.minTime <= max && d.maxTime >= min +} + +// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. +func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { + return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 +} + +// KeyRange returns the min and max keys in the index. +func (d *indirectIndex) KeyRange() ([]byte, []byte) { + return d.minKey, d.maxKey +} + +// TimeRange returns the min and max time across all keys in the index. +func (d *indirectIndex) TimeRange() (int64, int64) { + return d.minTime, d.maxTime +} + +// MarshalBinary returns a byte slice encoded version of the index. +func (d *indirectIndex) MarshalBinary() ([]byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + return d.b.b, nil +} + +// UnmarshalBinary populates an index from an encoded byte slice +// representation of an index. +func (d *indirectIndex) UnmarshalBinary(b []byte) error { + d.mu.Lock() + defer d.mu.Unlock() + + // Keep a reference to the actual index bytes + d.b = faultBuffer{b: b} + if len(b) == 0 { + return nil + } + + // make sure a uint32 is sufficient to store any offset into the index. + if uint64(len(b)) != uint64(uint32(len(b))) { + return fmt.Errorf("indirectIndex: too large to open") + } + + var minTime, maxTime int64 = math.MaxInt64, math.MinInt64 + + // To create our "indirect" index, we need to find the location of all the keys in + // the raw byte slice. The keys are listed once each (in sorted order). Following + // each key is a time ordered list of index entry blocks for that key. The loop below + // basically skips across the slice keeping track of the counter when we are at a key + // field. + var i uint32 + var ro readerOffsets + + iMax := uint32(len(b)) + if iMax > math.MaxInt32 { + return fmt.Errorf("indirectIndex: too large to store offsets") + } + + for i < iMax { + offset := i // save for when we add to the data structure + + // Skip to the start of the values + // key length value (2) + type (1) + length of key + if i+2 >= iMax { + return fmt.Errorf("indirectIndex: not enough data for key length value") + } + keyLength := uint32(binary.BigEndian.Uint16(b[i : i+2])) + i += 2 + + if i+keyLength+indexTypeSize >= iMax { + return fmt.Errorf("indirectIndex: not enough data for key and type") + } + ro.AddKey(offset, b[i:i+keyLength]) + i += keyLength + indexTypeSize + + // count of index entries + if i+indexCountSize >= iMax { + return fmt.Errorf("indirectIndex: not enough data for index entries count") + } + count := uint32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) + if count == 0 { + return fmt.Errorf("indirectIndex: key exits with no entries") + } + i += indexCountSize + + // Find the min time for the block + if i+8 >= iMax { + return fmt.Errorf("indirectIndex: not enough data for min time") + } + minT := int64(binary.BigEndian.Uint64(b[i : i+8])) + if minT < minTime { + minTime = minT + } + + i += (count - 1) * indexEntrySize + + // Find the max time for the block + if i+16 >= iMax { + return fmt.Errorf("indirectIndex: not enough data for max time") + } + maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) + if maxT > maxTime { + maxTime = maxT + } + + i += indexEntrySize + } + + ro.Done() + + firstOfs := ro.offsets[0] + key := readKey(b[firstOfs:]) + d.minKey = key + + lastOfs := ro.offsets[len(ro.offsets)-1] + key = readKey(b[lastOfs:]) + d.maxKey = key + + d.minTime = minTime + d.maxTime = maxTime + d.ro = ro + + return nil +} + +// Size returns the size of the current index in bytes. +func (d *indirectIndex) Size() uint32 { + d.mu.RLock() + defer d.mu.RUnlock() + + return d.b.len() +} + +func (d *indirectIndex) Close() error { + return nil +} + +func readKey(b []byte) (key []byte) { + size := binary.BigEndian.Uint16(b[:2]) + return b[2 : 2+size] +} + +func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) { + if len(b) < indexTypeSize+indexCountSize { + return entries[:0], errors.New("readEntries: data too short for headers") + } + + count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) + if cap(entries) < count { + entries = make([]IndexEntry, count) + } else { + entries = entries[:count] + } + b = b[indexTypeSize+indexCountSize:] + + for i := range entries { + if err := entries[i].UnmarshalBinary(b); err != nil { + return entries[:0], err + } + b = b[indexEntrySize:] + } + + return entries, nil +} + +// readEntriesTimes is a helper function to read entries at the provided buffer but +// only reading in the min and max times. +func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) { + if len(b) < indexTypeSize+indexCountSize { + return entries[:0], errors.New("readEntries: data too short for headers") + } + + count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) + if cap(entries) < count { + entries = make([]IndexEntry, count) + } else { + entries = entries[:count] + } + b = b[indexTypeSize+indexCountSize:] + + for i := range entries { + if len(b) < indexEntrySize { + return entries[:0], errors.New("readEntries: stream too short for entry") + } + entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8])) + entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16])) + b = b[indexEntrySize:] + } + + return entries, nil +} diff --git a/tsdb/tsm1/reader_index_iterator_test.go b/tsdb/tsm1/reader_index_iterator_test.go new file mode 100644 index 0000000000..9d92aaede1 --- /dev/null +++ b/tsdb/tsm1/reader_index_iterator_test.go @@ -0,0 +1,86 @@ +package tsm1 + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestIndirectIndexIterator(t *testing.T) { + checkEqual := func(t *testing.T, got, exp interface{}) { + t.Helper() + if !reflect.DeepEqual(got, exp) { + t.Fatalf("expected: %v but got: %v\n%v", exp, got, cmp.Diff(got, exp)) + } + } + + index := NewIndexWriter() + index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, index) + + // check that the iterator walks the whole index + iter := ind.Iterator(nil) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte("cpu2")) + checkEqual(t, iter.Key(), []byte("cpu1")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + {10, 20, 10, 20}, + }) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte("mem")) + checkEqual(t, iter.Key(), []byte("cpu2")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + {10, 20, 10, 20}, + }) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte(nil)) + checkEqual(t, iter.Key(), []byte("mem")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + }) + checkEqual(t, iter.Next(), false) + checkEqual(t, iter.Err(), error(nil)) + + // delete the cpu2 key and make sure it's skipped + ind.Delete([][]byte{[]byte("cpu2")}) + iter = ind.Iterator(nil) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte("mem")) + checkEqual(t, iter.Key(), []byte("cpu1")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + {10, 20, 10, 20}, + }) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte(nil)) + checkEqual(t, iter.Key(), []byte("mem")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + }) + checkEqual(t, iter.Next(), false) + checkEqual(t, iter.Err(), error(nil)) + + // check that seek works + iter = ind.Iterator([]byte("d")) + checkEqual(t, iter.Next(), true) + checkEqual(t, iter.Peek(), []byte(nil)) + checkEqual(t, iter.Key(), []byte("mem")) + checkEqual(t, iter.Type(), BlockInteger) + checkEqual(t, iter.Entries(), []IndexEntry{ + {0, 10, 10, 20}, + }) + checkEqual(t, iter.Next(), false) + checkEqual(t, iter.Err(), error(nil)) +} diff --git a/tsdb/tsm1/reader_index_test.go b/tsdb/tsm1/reader_index_test.go new file mode 100644 index 0000000000..918d80bb93 --- /dev/null +++ b/tsdb/tsm1/reader_index_test.go @@ -0,0 +1,532 @@ +package tsm1 + +import ( + "fmt" + "math" + "sync/atomic" + "testing" +) + +func loadIndex(tb testing.TB, w IndexWriter) *indirectIndex { + tb.Helper() + + b, err := w.MarshalBinary() + fatalIfErr(tb, "marshaling index", err) + + indir := NewIndirectIndex() + fatalIfErr(tb, "unmarshaling index", indir.UnmarshalBinary(b)) + + return indir +} + +func TestIndirectIndex_Entries_NonExistent(t *testing.T) { + index := NewIndexWriter() + index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100) + index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200) + ind := loadIndex(t, index) + + // mem has not been added to the index so we should get no entries back + // for both + exp := index.Entries([]byte("mem")) + entries, err := ind.ReadEntries([]byte("mem"), nil) + if err != nil { + t.Fatal(err) + } + + if got, exp := len(entries), len(exp); got != exp && exp != 0 { + t.Fatalf("entries length mismatch: got %v, exp %v", got, exp) + } +} + +func TestIndirectIndex_Type(t *testing.T) { + index := NewIndexWriter() + index.Add([]byte("cpu"), BlockInteger, 0, 1, 10, 20) + ind := loadIndex(t, index) + + typ, err := ind.Type([]byte("cpu")) + if err != nil { + fatal(t, "reading type", err) + } + + if got, exp := typ, BlockInteger; got != exp { + t.Fatalf("type mismatch: got %v, exp %v", got, exp) + } +} + +func TestIndirectIndex_Delete(t *testing.T) { + check := func(t *testing.T, got, exp bool) { + t.Helper() + if exp != got { + t.Fatalf("expected: %v but got: %v", exp, got) + } + } + + index := NewIndexWriter() + index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, index) + + ind.Delete([][]byte{[]byte("cpu1")}) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), false) + check(t, ind.Contains([]byte("cpu2")), true) + + ind.Delete([][]byte{[]byte("cpu1"), []byte("cpu2")}) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), false) + check(t, ind.Contains([]byte("cpu2")), false) + + ind.Delete([][]byte{[]byte("mem")}) + + check(t, ind.Contains([]byte("mem")), false) + check(t, ind.Contains([]byte("cpu1")), false) + check(t, ind.Contains([]byte("cpu2")), false) +} + +func TestIndirectIndex_DeleteRange(t *testing.T) { + check := func(t *testing.T, got, exp bool) { + t.Helper() + if exp != got { + t.Fatalf("expected: %v but got: %v", exp, got) + } + } + + index := NewIndexWriter() + index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, index) + + ind.DeleteRange([][]byte{[]byte("cpu1")}, 5, 15) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), true) + check(t, ind.ContainsValue([]byte("cpu1"), 4), true) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), true) + check(t, ind.Contains([]byte("cpu2")), true) + check(t, ind.ContainsValue([]byte("cpu2"), 4), true) + check(t, ind.ContainsValue([]byte("cpu2"), 5), true) + check(t, ind.ContainsValue([]byte("cpu2"), 10), true) + check(t, ind.ContainsValue([]byte("cpu2"), 15), true) + check(t, ind.ContainsValue([]byte("cpu2"), 16), true) + + ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 0, 5) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), true) + check(t, ind.ContainsValue([]byte("cpu1"), 4), false) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), true) + check(t, ind.Contains([]byte("cpu2")), true) + check(t, ind.ContainsValue([]byte("cpu2"), 4), false) + check(t, ind.ContainsValue([]byte("cpu2"), 5), false) + check(t, ind.ContainsValue([]byte("cpu2"), 10), true) + check(t, ind.ContainsValue([]byte("cpu2"), 15), true) + check(t, ind.ContainsValue([]byte("cpu2"), 16), true) + + ind.DeleteRange([][]byte{[]byte("cpu1"), []byte("cpu2")}, 15, 20) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), false) + check(t, ind.ContainsValue([]byte("cpu1"), 4), false) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), false) + check(t, ind.Contains([]byte("cpu2")), true) + check(t, ind.ContainsValue([]byte("cpu2"), 4), false) + check(t, ind.ContainsValue([]byte("cpu2"), 5), false) + check(t, ind.ContainsValue([]byte("cpu2"), 10), true) + check(t, ind.ContainsValue([]byte("cpu2"), 15), false) + check(t, ind.ContainsValue([]byte("cpu2"), 16), false) +} + +func TestIndirectIndex_DeletePrefix(t *testing.T) { + check := func(t *testing.T, got, exp bool) { + t.Helper() + if exp != got { + t.Fatalf("expected: %v but got: %v", exp, got) + } + } + + index := NewIndexWriter() + index.Add([]byte("cpu1"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu1"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 0, 10, 10, 20) + index.Add([]byte("cpu2"), BlockInteger, 10, 20, 10, 20) + index.Add([]byte("mem"), BlockInteger, 0, 10, 10, 20) + ind := loadIndex(t, index) + + ind.DeletePrefix([]byte("c"), 5, 15) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), true) + check(t, ind.ContainsValue([]byte("cpu1"), 4), true) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), true) + check(t, ind.Contains([]byte("cpu2")), true) + check(t, ind.ContainsValue([]byte("cpu2"), 4), true) + check(t, ind.ContainsValue([]byte("cpu2"), 5), false) + check(t, ind.ContainsValue([]byte("cpu2"), 10), false) + check(t, ind.ContainsValue([]byte("cpu2"), 15), false) + check(t, ind.ContainsValue([]byte("cpu2"), 16), true) + + ind.DeletePrefix([]byte("cp"), 0, 5) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), true) + check(t, ind.ContainsValue([]byte("cpu1"), 4), false) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), true) + check(t, ind.Contains([]byte("cpu2")), true) + check(t, ind.ContainsValue([]byte("cpu2"), 4), false) + check(t, ind.ContainsValue([]byte("cpu2"), 5), false) + check(t, ind.ContainsValue([]byte("cpu2"), 10), false) + check(t, ind.ContainsValue([]byte("cpu2"), 15), false) + check(t, ind.ContainsValue([]byte("cpu2"), 16), true) + + ind.DeletePrefix([]byte("cpu"), 15, 20) + + check(t, ind.Contains([]byte("mem")), true) + check(t, ind.Contains([]byte("cpu1")), false) + check(t, ind.ContainsValue([]byte("cpu1"), 4), false) + check(t, ind.ContainsValue([]byte("cpu1"), 5), false) + check(t, ind.ContainsValue([]byte("cpu1"), 10), false) + check(t, ind.ContainsValue([]byte("cpu1"), 15), false) + check(t, ind.ContainsValue([]byte("cpu1"), 16), false) + check(t, ind.Contains([]byte("cpu2")), false) + check(t, ind.ContainsValue([]byte("cpu2"), 4), false) + check(t, ind.ContainsValue([]byte("cpu2"), 5), false) + check(t, ind.ContainsValue([]byte("cpu2"), 10), false) + check(t, ind.ContainsValue([]byte("cpu2"), 15), false) + check(t, ind.ContainsValue([]byte("cpu2"), 16), false) +} + +// +// indirectIndex benchmarks +// + +const ( + indexKeyCount = 500000 + indexBlockCount = 100 +) + +type indexCacheInfo struct { + index *indirectIndex + offsets []uint32 + prefixes []prefixEntry + allKeys [][]byte + bytes []byte +} + +func (i *indexCacheInfo) reset() { + i.index.ro.offsets = append([]uint32(nil), i.offsets...) + i.index.ro.prefixes = append([]prefixEntry(nil), i.prefixes...) + i.index.tombstones = make(map[uint32][]TimeRange) + i.index.prefixTombstones = newPrefixTree() + resetFaults(i.index) +} + +var ( + indexCache = map[string]*indexCacheInfo{} + indexSizes = map[string][2]int{ + "large": {500000, 100}, + "med": {1000, 1000}, + "small": {5000, 2}, + } +) + +func getFaults(indirect *indirectIndex) int64 { + return int64(atomic.LoadUint64(&indirect.b.faults)) +} + +func resetFaults(indirect *indirectIndex) { + if indirect != nil { + indirect.b = faultBuffer{b: indirect.b.b} + } +} + +func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) { + info, ok := indexCache[name] + if ok { + info.reset() + return info.index, info + } + info = new(indexCacheInfo) + + sizes, ok := indexSizes[name] + if !ok { + sizes = [2]int{indexKeyCount, indexBlockCount} + } + keys, blocks := sizes[0], sizes[1] + + writer := NewIndexWriter() + + // add a ballast key that starts at -1 so that we don't trigger optimizations + // when deleting [0, MaxInt] + writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100) + + for i := 0; i < keys; i++ { + key := []byte(fmt.Sprintf("cpu-%08d", i)) + info.allKeys = append(info.allKeys, key) + for j := 0; j < blocks; j++ { + writer.Add(key, BlockFloat64, 0, 100, 10, 100) + } + } + + var err error + info.bytes, err = writer.MarshalBinary() + if err != nil { + tb.Fatalf("unexpected error marshaling index: %v", err) + } + + info.index = NewIndirectIndex() + if err = info.index.UnmarshalBinary(info.bytes); err != nil { + tb.Fatalf("unexpected error unmarshaling index: %v", err) + } + info.offsets = append([]uint32(nil), info.index.ro.offsets...) + info.prefixes = append([]prefixEntry(nil), info.index.ro.prefixes...) + + indexCache[name] = info + return info.index, info +} + +func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) { + indirect, info := getIndex(b, "large") + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := indirect.UnmarshalBinary(info.bytes); err != nil { + b.Fatalf("unexpected error unmarshaling index: %v", err) + } + } +} + +func BenchmarkIndirectIndex_Entries(b *testing.B) { + indirect, _ := getIndex(b, "med") + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resetFaults(indirect) + indirect.ReadEntries([]byte("cpu-00000001"), nil) + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } +} + +func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { + var entries []IndexEntry + indirect, _ := getIndex(b, "med") + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resetFaults(indirect) + entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries) + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } +} + +func BenchmarkBlockIterator_Next(b *testing.B) { + indirect, _ := getIndex(b, "med") + r := TSMReader{index: indirect} + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resetFaults(indirect) + bi := r.BlockIterator() + for bi.Next() { + } + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } +} + +func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) { + indirect, _ := getIndex(b, "large") + keys := [][]byte{[]byte("cpu-00999999")} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + resetFaults(indirect) + indirect.DeleteRange(keys, 10, 50) + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } +} + +func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) { + run := func(b *testing.B, name string) { + indirect, _ := getIndex(b, name) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + var info *indexCacheInfo + indirect, info = getIndex(b, name) + b.StartTimer() + + for i := 0; i < len(info.allKeys); i += 4096 { + n := i + 4096 + if n > len(info.allKeys) { + n = len(info.allKeys) + } + indirect.DeleteRange(info.allKeys[i:n], 10, 50) + } + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } + } + + b.Run("Large", func(b *testing.B) { run(b, "large") }) + b.Run("Small", func(b *testing.B) { run(b, "small") }) +} + +func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) { + run := func(b *testing.B, name string) { + indirect, _ := getIndex(b, name) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + var info *indexCacheInfo + indirect, info = getIndex(b, name) + b.StartTimer() + + for i := 0; i < len(info.allKeys); i += 4096 { + n := i + 4096 + if n > len(info.allKeys) { + n = len(info.allKeys) + } + indirect.DeleteRange(info.allKeys[i:n], 0, math.MaxInt64) + } + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } + } + + b.Run("Large", func(b *testing.B) { run(b, "large") }) + b.Run("Small", func(b *testing.B) { run(b, "small") }) +} + +func BenchmarkIndirectIndex_Delete(b *testing.B) { + run := func(b *testing.B, name string) { + indirect, _ := getIndex(b, name) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + var info *indexCacheInfo + indirect, info = getIndex(b, name) + b.StartTimer() + + for i := 0; i < len(info.allKeys); i += 4096 { + n := i + 4096 + if n > len(info.allKeys) { + n = len(info.allKeys) + } + indirect.Delete(info.allKeys[i:n]) + } + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } + } + + b.Run("Large", func(b *testing.B) { run(b, "large") }) + b.Run("Small", func(b *testing.B) { run(b, "small") }) +} + +func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) { + prefix := []byte("cpu-") + run := func(b *testing.B, name string) { + indirect, _ := getIndex(b, name) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + indirect, _ = getIndex(b, name) + b.StartTimer() + + indirect.DeletePrefix(prefix, 10, 50) + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } + } + + b.Run("Large", func(b *testing.B) { run(b, "large") }) + b.Run("Small", func(b *testing.B) { run(b, "small") }) +} + +func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) { + prefix := []byte("cpu-") + run := func(b *testing.B, name string) { + indirect, _ := getIndex(b, name) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + indirect, _ = getIndex(b, name) + b.StartTimer() + + indirect.DeletePrefix(prefix, 0, math.MaxInt64) + } + + if faultBufferEnabled { + b.SetBytes(getFaults(indirect) * 4096) + b.Log("recorded faults:", getFaults(indirect)) + } + } + + b.Run("Large", func(b *testing.B) { run(b, "large") }) + b.Run("Small", func(b *testing.B) { run(b, "small") }) +} diff --git a/tsdb/tsm1/reader_test.go b/tsdb/tsm1/reader_test.go index 97ce481b7a..1c1f4fe725 100644 --- a/tsdb/tsm1/reader_test.go +++ b/tsdb/tsm1/reader_test.go @@ -1,20 +1,26 @@ package tsm1 import ( - "fmt" "io/ioutil" "math" "os" "path/filepath" "sort" - "sync/atomic" "testing" ) -func fatal(t *testing.T, msg string, err error) { +func fatal(t testing.TB, msg string, err error) { + t.Helper() t.Fatalf("unexpected error %v: %v", msg, err) } +func fatalIfErr(t testing.TB, msg string, err error) { + t.Helper() + if err != nil { + fatal(t, msg, err) + } +} + func TestTSMReader_Type(t *testing.T) { dir := mustTempDir() defer os.RemoveAll(dir) @@ -57,6 +63,17 @@ func TestTSMReader_Type(t *testing.T) { } } +func TestIndexWriter_MaxBlocks(t *testing.T) { + index := NewIndexWriter() + for i := 0; i < 1<<16; i++ { + index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20) + } + + if _, err := index.MarshalBinary(); err == nil { + t.Fatalf("expected max block count error. got nil") + } +} + func TestTSMReader_MMAP_ReadAll(t *testing.T) { dir := mustTempDir() defer os.RemoveAll(dir) @@ -1118,71 +1135,6 @@ func TestIndirectIndex_Entries(t *testing.T) { } } -func TestIndirectIndex_Entries_NonExistent(t *testing.T) { - index := NewIndexWriter() - index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 100) - index.Add([]byte("cpu"), BlockFloat64, 2, 3, 20, 200) - - b, err := index.MarshalBinary() - if err != nil { - t.Fatalf("unexpected error marshaling index: %v", err) - } - - indirect := NewIndirectIndex() - if err := indirect.UnmarshalBinary(b); err != nil { - t.Fatalf("unexpected error unmarshaling index: %v", err) - } - - // mem has not been added to the index so we should get no entries back - // for both - exp := index.Entries([]byte("mem")) - entries, err := indirect.ReadEntries([]byte("mem"), nil) - if err != nil { - t.Fatal(err) - } - - if got, exp := len(entries), len(exp); got != exp && exp != 0 { - t.Fatalf("entries length mismatch: got %v, exp %v", got, exp) - } -} - -func TestIndirectIndex_MaxBlocks(t *testing.T) { - index := NewIndexWriter() - for i := 0; i < 1<<16; i++ { - index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20) - } - - if _, err := index.MarshalBinary(); err == nil { - t.Fatalf("expected max block count error. got nil") - } else { - println(err.Error()) - } -} - -func TestIndirectIndex_Type(t *testing.T) { - index := NewIndexWriter() - index.Add([]byte("cpu"), BlockInteger, 0, 1, 10, 20) - - b, err := index.MarshalBinary() - if err != nil { - t.Fatal(err) - } - - ind := NewIndirectIndex() - if err := ind.UnmarshalBinary(b); err != nil { - fatal(t, "unmarshal binary", err) - } - - typ, err := ind.Type([]byte("cpu")) - if err != nil { - fatal(t, "reading type", err) - } - - if got, exp := typ, BlockInteger; got != exp { - t.Fatalf("type mismatch: got %v, exp %v", got, exp) - } -} - func TestDirectIndex_KeyCount(t *testing.T) { index := NewIndexWriter() index.Add([]byte("cpu"), BlockFloat64, 0, 1, 10, 20) @@ -1195,280 +1147,7 @@ func TestDirectIndex_KeyCount(t *testing.T) { } } -func TestBlockIterator_Single(t *testing.T) { - dir := mustTempDir() - defer os.RemoveAll(dir) - f := mustTempFile(dir) - - w, err := NewTSMWriter(f) - if err != nil { - t.Fatalf("unexpected error creating writer: %v", err) - } - - values := []Value{NewValue(0, int64(1))} - if err := w.Write([]byte("cpu"), values); err != nil { - t.Fatalf("unexpected error writing: %v", err) - - } - if err := w.WriteIndex(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - if err := w.Close(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - fd, err := os.Open(f.Name()) - if err != nil { - t.Fatalf("unexpected error opening: %v", err) - } - - r, err := NewTSMReader(fd) - if err != nil { - t.Fatalf("unexpected error created reader: %v", err) - } - - var count int - iter := r.BlockIterator() - for iter.Next() { - key, minTime, maxTime, typ, _, buf, err := iter.Read() - if err != nil { - t.Fatalf("unexpected error creating iterator: %v", err) - } - - if got, exp := string(key), "cpu"; got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } - - if got, exp := minTime, int64(0); got != exp { - t.Fatalf("min time mismatch: got %v, exp %v", got, exp) - } - - if got, exp := maxTime, int64(0); got != exp { - t.Fatalf("max time mismatch: got %v, exp %v", got, exp) - } - - if got, exp := typ, BlockInteger; got != exp { - t.Fatalf("block type mismatch: got %v, exp %v", got, exp) - } - - if len(buf) == 0 { - t.Fatalf("buf length = 0") - } - - count++ - } - - if got, exp := count, len(values); got != exp { - t.Fatalf("value count mismatch: got %v, exp %v", got, exp) - } -} - -func TestBlockIterator_Tombstone(t *testing.T) { - dir := mustTempDir() - defer os.RemoveAll(dir) - f := mustTempFile(dir) - - w, err := NewTSMWriter(f) - if err != nil { - t.Fatalf("unexpected error creating writer: %v", err) - } - - values := []Value{NewValue(0, int64(1))} - if err := w.Write([]byte("cpu"), values); err != nil { - t.Fatalf("unexpected error writing: %v", err) - } - - if err := w.Write([]byte("mem"), values); err != nil { - t.Fatalf("unexpected error writing: %v", err) - } - - if err := w.WriteIndex(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - if err := w.Close(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - fd, err := os.Open(f.Name()) - if err != nil { - t.Fatalf("unexpected error opening: %v", err) - } - - r, err := NewTSMReader(fd) - if err != nil { - t.Fatalf("unexpected error created reader: %v", err) - } - - iter := r.BlockIterator() - for iter.Next() { - // Trigger a delete during iteration. This should cause an error condition for - // the BlockIterator - r.Delete([][]byte{[]byte("cpu")}) - } - - if iter.Err() == nil { - t.Fatalf("expected error: got nil") - } -} - -func TestBlockIterator_MultipleBlocks(t *testing.T) { - dir := mustTempDir() - defer os.RemoveAll(dir) - f := mustTempFile(dir) - - w, err := NewTSMWriter(f) - if err != nil { - t.Fatalf("unexpected error creating writer: %v", err) - } - - values1 := []Value{NewValue(0, int64(1))} - if err := w.Write([]byte("cpu"), values1); err != nil { - t.Fatalf("unexpected error writing: %v", err) - } - - values2 := []Value{NewValue(1, int64(2))} - if err := w.Write([]byte("cpu"), values2); err != nil { - t.Fatalf("unexpected error writing: %v", err) - } - - if err := w.WriteIndex(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - if err := w.Close(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - fd, err := os.Open(f.Name()) - if err != nil { - t.Fatalf("unexpected error opening: %v", err) - } - - r, err := NewTSMReader(fd) - if err != nil { - t.Fatalf("unexpected error created reader: %v", err) - } - - var count int - expData := []Values{values1, values2} - iter := r.BlockIterator() - var i int - for iter.Next() { - key, minTime, maxTime, typ, _, buf, err := iter.Read() - - if err != nil { - t.Fatalf("unexpected error creating iterator: %v", err) - } - - if got, exp := string(key), "cpu"; got != exp { - t.Fatalf("key mismatch: got %v, exp %v", got, exp) - } - - if got, exp := minTime, expData[i][0].UnixNano(); got != exp { - t.Fatalf("min time mismatch: got %v, exp %v", got, exp) - } - - if got, exp := maxTime, expData[i][0].UnixNano(); got != exp { - t.Fatalf("max time mismatch: got %v, exp %v", got, exp) - } - - if got, exp := typ, BlockInteger; got != exp { - t.Fatalf("block type mismatch: got %v, exp %v", got, exp) - } - - if len(buf) == 0 { - t.Fatalf("buf length = 0") - } - - count++ - i++ - } - - if got, exp := count, 2; got != exp { - t.Fatalf("value count mismatch: got %v, exp %v", got, exp) - } -} - -func TestBlockIterator_Sorted(t *testing.T) { - dir := mustTempDir() - defer os.RemoveAll(dir) - f := mustTempFile(dir) - - w, err := NewTSMWriter(f) - if err != nil { - t.Fatalf("unexpected error creating writer: %v", err) - } - - values := map[string][]Value{ - "mem": []Value{NewValue(0, int64(1))}, - "cycles": []Value{NewValue(0, ^uint64(0))}, - "cpu": []Value{NewValue(1, float64(2))}, - "disk": []Value{NewValue(1, true)}, - "load": []Value{NewValue(1, "string")}, - } - - keys := make([]string, 0, len(values)) - for k := range values { - keys = append(keys, k) - } - sort.Strings(keys) - - for _, k := range keys { - if err := w.Write([]byte(k), values[k]); err != nil { - t.Fatalf("unexpected error writing: %v", err) - - } - } - - if err := w.WriteIndex(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - if err := w.Close(); err != nil { - t.Fatalf("unexpected error closing: %v", err) - } - - fd, err := os.Open(f.Name()) - if err != nil { - t.Fatalf("unexpected error opening: %v", err) - } - - r, err := NewTSMReader(fd) - if err != nil { - t.Fatalf("unexpected error created reader: %v", err) - } - - var count int - iter := r.BlockIterator() - var lastKey string - for iter.Next() { - key, _, _, _, _, buf, err := iter.Read() - - if string(key) < lastKey { - t.Fatalf("keys not sorted: got %v, last %v", key, lastKey) - } - - lastKey = string(key) - - if err != nil { - t.Fatalf("unexpected error creating iterator: %v", err) - } - - if len(buf) == 0 { - t.Fatalf("buf length = 0") - } - - count++ - } - - if got, exp := count, len(values); got != exp { - t.Fatalf("value count mismatch: got %v, exp %v", got, exp) - } -} - -func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) { +func TestTSMReader_UnmarshalBinary_BlockCountOverflow(t *testing.T) { dir := mustTempDir() defer os.RemoveAll(dir) f := mustTempFile(dir) @@ -1503,6 +1182,7 @@ func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) { defer r.Close() } + func TestCompacted_NotFull(t *testing.T) { dir := mustTempDir() defer os.RemoveAll(dir) @@ -1871,316 +1551,3 @@ func TestTSMReader_References(t *testing.T) { t.Fatalf("unexpected error removing reader: %v", err) } } - -// -// indirectIndex benchmarks -// - -const ( - indexKeyCount = 500000 - indexBlockCount = 100 -) - -type indexCacheInfo struct { - index *indirectIndex - offsets []uint32 - prefixes []prefixEntry - allKeys [][]byte - bytes []byte -} - -func (i *indexCacheInfo) reset() { - i.index.ro.offsets = append([]uint32(nil), i.offsets...) - i.index.ro.prefixes = append([]prefixEntry(nil), i.prefixes...) - i.index.tombstones = make(map[uint32][]TimeRange) - i.index.prefixTombstones = newPrefixTree() - resetFaults(i.index) -} - -var ( - indexCache = map[string]*indexCacheInfo{} - indexSizes = map[string][2]int{ - "large": {500000, 100}, - "med": {1000, 1000}, - "small": {5000, 2}, - } -) - -func getFaults(indirect *indirectIndex) int64 { - return int64(atomic.LoadUint64(&indirect.b.faults)) -} - -func resetFaults(indirect *indirectIndex) { - if indirect != nil { - indirect.b = faultBuffer{b: indirect.b.b} - } -} - -func getIndex(tb testing.TB, name string) (*indirectIndex, *indexCacheInfo) { - info, ok := indexCache[name] - if ok { - info.reset() - return info.index, info - } - info = new(indexCacheInfo) - - sizes, ok := indexSizes[name] - if !ok { - sizes = [2]int{indexKeyCount, indexBlockCount} - } - keys, blocks := sizes[0], sizes[1] - - writer := NewIndexWriter() - - // add a ballast key that starts at -1 so that we don't trigger optimizations - // when deleting [0, MaxInt] - writer.Add([]byte("ballast"), BlockFloat64, -1, 1, 0, 100) - - for i := 0; i < keys; i++ { - key := []byte(fmt.Sprintf("cpu-%08d", i)) - info.allKeys = append(info.allKeys, key) - for j := 0; j < blocks; j++ { - writer.Add(key, BlockFloat64, 0, 100, 10, 100) - } - } - - var err error - info.bytes, err = writer.MarshalBinary() - if err != nil { - tb.Fatalf("unexpected error marshaling index: %v", err) - } - - info.index = NewIndirectIndex() - if err = info.index.UnmarshalBinary(info.bytes); err != nil { - tb.Fatalf("unexpected error unmarshaling index: %v", err) - } - info.offsets = append([]uint32(nil), info.index.ro.offsets...) - info.prefixes = append([]prefixEntry(nil), info.index.ro.prefixes...) - - indexCache[name] = info - return info.index, info -} - -func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) { - indirect, info := getIndex(b, "large") - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if err := indirect.UnmarshalBinary(info.bytes); err != nil { - b.Fatalf("unexpected error unmarshaling index: %v", err) - } - } -} - -func BenchmarkIndirectIndex_Entries(b *testing.B) { - indirect, _ := getIndex(b, "med") - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - resetFaults(indirect) - indirect.ReadEntries([]byte("cpu-00000001"), nil) - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } -} - -func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { - var entries []IndexEntry - indirect, _ := getIndex(b, "med") - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - resetFaults(indirect) - entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries) - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } -} - -func BenchmarkBlockIterator_Next(b *testing.B) { - indirect, _ := getIndex(b, "med") - r := TSMReader{index: indirect} - b.ResetTimer() - - for i := 0; i < b.N; i++ { - resetFaults(indirect) - bi := r.BlockIterator() - for bi.Next() { - } - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } -} - -func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) { - indirect, _ := getIndex(b, "large") - keys := [][]byte{[]byte("cpu-00999999")} - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - resetFaults(indirect) - indirect.DeleteRange(keys, 10, 50) - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } -} - -func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) { - run := func(b *testing.B, name string) { - indirect, _ := getIndex(b, name) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - var info *indexCacheInfo - indirect, info = getIndex(b, name) - b.StartTimer() - - for i := 0; i < len(info.allKeys); i += 4096 { - n := i + 4096 - if n > len(info.allKeys) { - n = len(info.allKeys) - } - indirect.DeleteRange(info.allKeys[i:n], 10, 50) - } - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } - } - - b.Run("Large", func(b *testing.B) { run(b, "large") }) - b.Run("Small", func(b *testing.B) { run(b, "small") }) -} - -func BenchmarkIndirectIndex_DeleteRangeFull_Covered(b *testing.B) { - run := func(b *testing.B, name string) { - indirect, _ := getIndex(b, name) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - var info *indexCacheInfo - indirect, info = getIndex(b, name) - b.StartTimer() - - for i := 0; i < len(info.allKeys); i += 4096 { - n := i + 4096 - if n > len(info.allKeys) { - n = len(info.allKeys) - } - indirect.DeleteRange(info.allKeys[i:n], 0, math.MaxInt64) - } - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } - } - - b.Run("Large", func(b *testing.B) { run(b, "large") }) - b.Run("Small", func(b *testing.B) { run(b, "small") }) -} - -func BenchmarkIndirectIndex_Delete(b *testing.B) { - run := func(b *testing.B, name string) { - indirect, _ := getIndex(b, name) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - var info *indexCacheInfo - indirect, info = getIndex(b, name) - b.StartTimer() - - for i := 0; i < len(info.allKeys); i += 4096 { - n := i + 4096 - if n > len(info.allKeys) { - n = len(info.allKeys) - } - indirect.Delete(info.allKeys[i:n]) - } - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } - } - - b.Run("Large", func(b *testing.B) { run(b, "large") }) - b.Run("Small", func(b *testing.B) { run(b, "small") }) -} - -func BenchmarkIndirectIndex_DeletePrefixFull(b *testing.B) { - prefix := []byte("cpu-") - run := func(b *testing.B, name string) { - indirect, _ := getIndex(b, name) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - indirect, _ = getIndex(b, name) - b.StartTimer() - - indirect.DeletePrefix(prefix, 10, 50) - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } - } - - b.Run("Large", func(b *testing.B) { run(b, "large") }) - b.Run("Small", func(b *testing.B) { run(b, "small") }) -} - -func BenchmarkIndirectIndex_DeletePrefixFull_Covered(b *testing.B) { - prefix := []byte("cpu-") - run := func(b *testing.B, name string) { - indirect, _ := getIndex(b, name) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - indirect, _ = getIndex(b, name) - b.StartTimer() - - indirect.DeletePrefix(prefix, 0, math.MaxInt64) - } - - if faultBufferEnabled { - b.SetBytes(getFaults(indirect) * 4096) - b.Log("recorded faults:", getFaults(indirect)) - } - } - - b.Run("Large", func(b *testing.B) { run(b, "large") }) - b.Run("Small", func(b *testing.B) { run(b, "small") }) -} diff --git a/tsdb/tsm1/reader_time_range.go b/tsdb/tsm1/reader_time_range.go index a20d287b97..a588799de7 100644 --- a/tsdb/tsm1/reader_time_range.go +++ b/tsdb/tsm1/reader_time_range.go @@ -15,6 +15,10 @@ func (t TimeRange) Less(o TimeRange) bool { // timeRangesCoverEntries returns true if the time ranges fully cover the entries. func timeRangesCoverEntries(merger timeRangeMerger, entries []IndexEntry) (covers bool) { + if len(entries) == 0 { + return true + } + mustCover := entries[0].MinTime ts, ok := merger.Pop() diff --git a/tsdb/tsm1/reader_time_range_test.go b/tsdb/tsm1/reader_time_range_test.go new file mode 100644 index 0000000000..54bbc459c1 --- /dev/null +++ b/tsdb/tsm1/reader_time_range_test.go @@ -0,0 +1,100 @@ +package tsm1 + +import ( + "reflect" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestTimeRangeMerger(t *testing.T) { + ranges := func(ns ...int64) (out []TimeRange) { + for _, n := range ns { + out = append(out, TimeRange{n, n}) + } + return out + } + + check := func(t *testing.T, exp []TimeRange, merger timeRangeMerger) { + t.Helper() + + var got []TimeRange + for { + tr, ok := merger.Pop() + if !ok { + break + } + got = append(got, tr) + } + + if !reflect.DeepEqual(got, exp) { + t.Fatalf("bad merge:\n%v", cmp.Diff(got, exp)) + } + } + + check(t, ranges(0, 1, 2, 3, 4, 5, 6), timeRangeMerger{ + sorted: ranges(0, 2, 6), + unsorted: ranges(1, 3, 5), + single: TimeRange{4, 4}, + }) + + check(t, ranges(0, 1, 2), timeRangeMerger{ + sorted: ranges(0, 1, 2), + used: true, + }) + + check(t, ranges(0, 1, 2), timeRangeMerger{ + unsorted: ranges(0, 1, 2), + used: true, + }) + + check(t, ranges(0), timeRangeMerger{ + single: TimeRange{0, 0}, + }) + + check(t, ranges(0, 0, 0), timeRangeMerger{ + sorted: ranges(0), + unsorted: ranges(0), + single: TimeRange{0, 0}, + }) +} + +func TestTimeRangeCoverEntries(t *testing.T) { + ranges := func(ns ...int64) (out []TimeRange) { + for i := 0; i+1 < len(ns); i += 2 { + out = append(out, TimeRange{ns[i], ns[i+1]}) + } + return out + } + + entries := func(ns ...int64) (out []IndexEntry) { + for i := 0; i+1 < len(ns); i += 2 { + out = append(out, IndexEntry{MinTime: ns[i], MaxTime: ns[i+1]}) + } + return out + } + + check := func(t *testing.T, ranges []TimeRange, entries []IndexEntry, covers bool) { + t.Helper() + sort.Slice(ranges, func(i, j int) bool { return ranges[i].Less(ranges[j]) }) + got := timeRangesCoverEntries(timeRangeMerger{sorted: ranges, used: true}, entries) + if got != covers { + t.Fatalf("bad covers:\nranges: %v\nentries: %v\ncovers: %v\ngot: %v", + ranges, entries, covers, got) + } + } + + check(t, ranges(0, 0, 1, 1, 2, 2), entries(0, 0, 1, 1, 2, 2), true) + check(t, ranges(0, 0, 1, 1, 2, 2), entries(0, 0, 2, 2), true) + check(t, ranges(0, 0, 1, 1, 2, 2), entries(3, 3), false) + check(t, ranges(0, 0, 1, 1, 2, 2), entries(-1, -1), false) + check(t, ranges(0, 10), entries(1, 1, 2, 2), true) + check(t, ranges(0, 1, 1, 2), entries(0, 0, 1, 1, 2, 2), true) + check(t, ranges(0, 10), entries(0, 0, 2, 2), true) + check(t, ranges(0, 1, 1, 2), entries(0, 0, 2, 2), true) + check(t, ranges(0, 1, 4, 5), entries(0, 0, 5, 5), true) + check(t, ranges(), entries(), true) + check(t, ranges(), entries(0, 0), false) + check(t, ranges(0, 0), entries(), true) +}