tsm1: change TSMFile to use an iterator style api

pull/10616/head
Jeff Wendling 2018-12-19 18:37:00 -07:00
parent 917584b054
commit 14cf01911e
10 changed files with 382 additions and 448 deletions

View File

@ -350,12 +350,13 @@ func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger
Types: make([]models.FieldType, 0, batchSize),
}
var ti int
for i := 0; i < r.KeyCount(); i++ {
key, _ := r.KeyAt(i)
iter := r.Iterator(nil)
for iter.Next() {
key := iter.Key()
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
var name []byte
name, collection.Tags[ti] = models.ParseKeyBytesWithTags(seriesKey, collection.Tags[ti])
typ, _ := r.Type(key)
typ := iter.Type()
if verboseLogging {
log.Info("Series", zap.String("name", string(name)), zap.String("tags", collection.Tags[ti].String()))
@ -377,6 +378,9 @@ func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger
ti = 0 // Reset tags.
}
}
if err := iter.Err(); err != nil {
return fmt.Errorf("problem creating series: (%s)", err)
}
// Flush any remaining series in the batches
if len(collection.Keys) > 0 {

View File

@ -126,7 +126,10 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) {
}
r := MustOpenTSMReader(files[0])
entries := r.Entries([]byte("cpu,host=A#!~#value"))
entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil)
if err != nil {
t.Fatal(err)
}
_, b, err := r.ReadBytes(&entries[0], nil)
if err != nil {
t.Fatalf("ReadBytes: unexpected error %v", err)
@ -661,7 +664,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
@ -774,7 +781,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 1; got != exp {
entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), 1; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
@ -888,7 +899,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}
@ -996,7 +1011,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
}
}
if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp {
entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), 2; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -818,10 +818,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// Delete each key we find in the file. We seek to the min key and walk from there.
batch := r.BatchDelete()
n := r.KeyCount()
iter := r.Iterator(minKey)
var j int
for i := r.Seek(minKey); i < n; i++ {
indexKey, _ := r.KeyAt(i)
for iter.Next() {
indexKey := iter.Key()
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 {
@ -838,6 +838,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
}
}
}
if err := iter.Err(); err != nil {
batch.Rollback()
return err
}
return batch.Commit()
}); err != nil {
@ -884,16 +888,16 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// Apply runs this func concurrently. The seriesKeys slice is mutated concurrently
// by different goroutines setting positions to nil.
if err := e.FileStore.Apply(func(r TSMFile) error {
n := r.KeyCount()
var j int
// Start from the min deleted key that exists in this file.
for i := r.Seek(minKey); i < n; i++ {
iter := r.Iterator(minKey)
for iter.Next() {
if j >= len(seriesKeys) {
return nil
}
indexKey, _ := r.KeyAt(i)
indexKey := iter.Key()
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
// Skip over any deleted keys that are less than our tsm key
@ -912,7 +916,8 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
j++
}
}
return nil
return iter.Err()
}); err != nil {
return err
}

View File

@ -34,6 +34,15 @@ const (
BadTSMFileExtension = "bad"
)
type TSMIterator interface {
Next() bool
Peek() []byte
Key() []byte
Type() byte
Entries() []IndexEntry
Err() error
}
// TSMFile represents an on-disk TSM file.
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
@ -57,8 +66,7 @@ type TSMFile interface {
ReadBooleanArrayBlockAt(entry *IndexEntry, values *tsdb.BooleanArray) error
// Entries returns the index entries for all blocks for the given key.
Entries(key []byte) []IndexEntry
ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error)
// Returns true if the TSMFile may contain a value with the specified
// key and time.
@ -86,11 +94,9 @@ type TSMFile interface {
// KeyCount returns the number of distinct keys in the file.
KeyCount() int
// Seek returns the position in the index with the key <= key.
Seek(key []byte) int
// KeyAt returns the key located at index position idx.
KeyAt(idx int) ([]byte, byte)
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
Iterator([]byte) TSMIterator
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist,
@ -981,7 +987,9 @@ func (f *FileStore) BlockCount(path string, idx int) int {
// We need to determine the possible files that may be accessed by this query given
// the time range.
func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
var cache []IndexEntry
var entries []IndexEntry
var err error
cost := query.IteratorCost{}
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
@ -991,7 +999,12 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
skipped := true
tombstones := fd.TombstoneRange(key)
entries := fd.ReadEntries(key, &cache)
entries, err = fd.ReadEntries(key, entries)
if err != nil {
// TODO(jeff): log this somehow? we have an invalid entry in the tsm index
continue
}
ENTRIES:
for i := 0; i < len(entries); i++ {
ie := entries[i]
@ -1023,7 +1036,9 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
// whether the key will be scan in ascending time order or descenging time order.
// This function assumes the read-lock has been taken.
func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
var cache []IndexEntry
var entries []IndexEntry
var err error
locations := make([]*location, 0, len(f.files))
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
@ -1041,7 +1056,12 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
// This file could potential contain points we are looking for so find the blocks for
// the given key.
entries := fd.ReadEntries(key, &cache)
entries, err = fd.ReadEntries(key, entries)
if err != nil {
// TODO(jeff): log this somehow? we have an invalid entry in the tsm index
continue
}
LOOP:
for i := 0; i < len(entries); i++ {
ie := entries[i]

View File

@ -6,33 +6,20 @@ import (
)
type keyIterator struct {
f TSMFile
c int // current key index
n int // key count
key []byte
typ byte
iter TSMIterator
key []byte
typ byte
}
func newKeyIterator(f TSMFile, seek []byte) *keyIterator {
c, n := 0, f.KeyCount()
if len(seek) > 0 {
c = f.Seek(seek)
}
if c >= n {
return nil
}
k := &keyIterator{f: f, c: c, n: n}
k := &keyIterator{iter: f.Iterator(seek)}
k.next()
return k
}
func (k *keyIterator) next() bool {
if k.c < k.n {
k.key, k.typ = k.f.KeyAt(k.c)
k.c++
if k.iter.Next() {
k.key, k.typ = k.iter.Key(), k.iter.Type()
return true
}
return false
@ -98,9 +85,10 @@ func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ }
type keyIterators []*keyIterator
func (k keyIterators) Len() int { return len(k) }
func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 }
func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
func (k keyIterators) Len() int { return len(k) }
func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 }
func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) }
func (k *keyIterators) Pop() interface{} {

View File

@ -5,7 +5,6 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/tsdb"
)
func TestNewMergeKeyIterator(t *testing.T) {
@ -128,6 +127,7 @@ func newTSMFiles(keys ...[]string) []TSMFile {
}
type mockTSMFile struct {
TSMFile
keys []string
}
@ -136,85 +136,25 @@ func newMockTSMFile(keys ...string) *mockTSMFile {
return &mockTSMFile{keys: keys}
}
func (t *mockTSMFile) KeyCount() int { return len(t.keys) }
func (t *mockTSMFile) Seek(key []byte) int {
k := string(key)
return sort.Search(len(t.keys), func(i int) bool {
return t.keys[i] >= k
})
func (m *mockTSMFile) Iterator(seek []byte) TSMIterator {
skey := string(seek)
n := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= skey })
return &mockTSMIterator{
n: n - 1,
keys: m.keys,
}
}
func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) {
return []byte(t.keys[idx]), BlockFloat64
type mockTSMIterator struct {
TSMIndexIterator
n int
keys []string
}
func (*mockTSMFile) Path() string { panic("implement me") }
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
func (*mockTSMFile) Remove() error { panic("implement me") }
func (*mockTSMFile) InUse() bool { panic("implement me") }
func (*mockTSMFile) Ref() { panic("implement me") }
func (*mockTSMFile) Unref() { panic("implement me") }
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
func (*mockTSMFile) Free() error { panic("implement me") }
func (*mockTSMFile) MeasurementStats() (MeasurementStats, error) { panic("implement me") }
func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
panic("implement me")
func (m *mockTSMIterator) Next() bool {
m.n++
return m.n < len(m.keys)
}
func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadFloatArrayBlockAt(*IndexEntry, *tsdb.FloatArray) error {
panic("implement me")
}
func (*mockTSMFile) ReadIntegerArrayBlockAt(*IndexEntry, *tsdb.IntegerArray) error {
panic("implement me")
}
func (*mockTSMFile) ReadUnsignedArrayBlockAt(*IndexEntry, *tsdb.UnsignedArray) error {
panic("implement me")
}
func (*mockTSMFile) ReadStringArrayBlockAt(*IndexEntry, *tsdb.StringArray) error {
panic("implement me")
}
func (*mockTSMFile) ReadBooleanArrayBlockAt(*IndexEntry, *tsdb.BooleanArray) error {
panic("implement me")
}
func (m *mockTSMIterator) Key() []byte { return []byte(m.keys[m.n]) }
func (m *mockTSMIterator) Type() byte { return 0 }

View File

@ -71,27 +71,19 @@ type TSMIndex interface {
// exist in this file, but not have a point exactly at time t.
ContainsValue(key []byte, timestamp int64) bool
// Entries returns all index entries for a key.
Entries(key []byte) []IndexEntry
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error)
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key []byte, timestamp int64) *IndexEntry
// Key returns the key in the index at the given position, using entries to avoid allocations.
Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
// KeyAt returns the key in the index at the given position.
KeyAt(index int) ([]byte, byte)
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Seek returns the position in the index where key <= value in the index.
Seek(key []byte) int
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
Iterator([]byte) *TSMIndexIterator
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
@ -124,88 +116,6 @@ type TSMIndex interface {
Close() error
}
// BlockIterator allows iterating over each block in a TSM file in order. It provides
// raw access to the block bytes without decoding them.
type BlockIterator struct {
r *TSMReader
// i is the current key index
i int
// n is the total number of keys
n int
key []byte
cache []IndexEntry
entries []IndexEntry
err error
typ byte
}
// PeekNext returns the next key to be iterated or an empty string.
func (b *BlockIterator) PeekNext() []byte {
if len(b.entries) > 1 {
return b.key
} else if b.n-b.i > 1 {
key, _ := b.r.KeyAt(b.i + 1)
return key
}
return nil
}
// Next returns true if there are more blocks to iterate through.
func (b *BlockIterator) Next() bool {
if b.err != nil {
return false
}
if b.n-b.i == 0 && len(b.entries) == 0 {
return false
}
if len(b.entries) > 0 {
b.entries = b.entries[1:]
if len(b.entries) > 0 {
return true
}
}
if b.n-b.i > 0 {
b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
b.i++
// If there were deletes on the TSMReader, then our index is now off and we
// can't proceed. What we just read may not actually the next block.
if b.n != b.r.KeyCount() {
b.err = fmt.Errorf("delete during iteration")
return false
}
if len(b.entries) > 0 {
return true
}
}
return false
}
// Read reads information about the next block to be iterated.
func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
if b.err != nil {
return nil, 0, 0, 0, 0, nil, b.err
}
checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
if err != nil {
return nil, 0, 0, 0, 0, nil, err
}
return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err
}
// Err returns any errors encounter during iteration.
func (b *BlockIterator) Err() error {
return b.err
}
type tsmReaderOption func(*TSMReader)
// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
@ -310,20 +220,6 @@ func (t *TSMReader) Path() string {
return p
}
// Key returns the key and the underlying entry at the numeric index.
func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
return t.index.Key(index, entries)
}
// KeyAt returns the key and key type at position idx in the index.
func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx)
}
func (t *TSMReader) Seek(key []byte) int {
return t.index.Seek(key)
}
// ReadAt returns the values corresponding to the given index entry.
func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
t.mu.RLock()
@ -490,6 +386,12 @@ func (t *TSMReader) Delete(keys [][]byte) error {
return nil
}
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
func (t *TSMReader) Iterator(key []byte) TSMIterator {
return t.index.Iterator(key)
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
return t.index.OverlapsTimeRange(min, max)
@ -515,13 +417,8 @@ func (t *TSMReader) KeyCount() int {
return t.index.KeyCount()
}
// Entries returns all index entries for key.
func (t *TSMReader) Entries(key []byte) []IndexEntry {
return t.index.Entries(key)
}
// ReadEntries reads the index entries for key into entries.
func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
func (t *TSMReader) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) {
return t.index.ReadEntries(key, entries)
}
@ -593,9 +490,13 @@ func (t *TSMReader) Stats() FileStat {
// BlockIterator returns a BlockIterator for the underlying TSM file.
func (t *TSMReader) BlockIterator() *BlockIterator {
t.mu.RLock()
iter := t.index.Iterator(nil)
t.mu.RUnlock()
return &BlockIterator{
r: t,
n: t.index.KeyCount(),
r: t,
iter: iter,
}
}
@ -778,125 +679,38 @@ func NewIndirectIndex() *indirectIndex {
}
}
func (d *indirectIndex) Seek(key []byte) int {
d.mu.RLock()
defer d.mu.RUnlock()
return d.searchOffset(key)
}
func searchPrefixesIndex(prefixes []prefixEntry, n int) int {
return sort.Search(len(prefixes), func(i int) bool {
return prefixes[i].total > n
})
}
func searchPrefixes(prefixes []prefixEntry, n int) (prefix, bool) {
i := searchPrefixesIndex(prefixes, n)
if i < len(prefixes) {
return prefixes[i].pre, true
}
return prefix{}, false
}
// searchOffset searches the offsets slice for key and returns the position in
// offsets where key would exist.
func (d *indirectIndex) searchOffset(key []byte) (index int) {
pre := keyPrefix(key)
return sort.Search(len(d.ro.offsets), func(i int) bool {
if prei, ok := searchPrefixes(d.ro.prefixes, i); ok {
if cmp := comparePrefix(prei, pre); cmp == -1 {
return false
} else if cmp == 1 {
return true
}
}
_, k := readKey(d.b.access(d.ro.offsets[i], 0))
return bytes.Compare(k, key) >= 0
})
}
// search returns the byte position of key in the index. If key is not
// in the index, len(index) is returned.
func (d *indirectIndex) search(key []byte) uint32 {
if !d.ContainsKey(key) {
return d.b.len()
}
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice). We then check if we have found the right index.
if i := d.searchOffset(key); i < len(d.ro.offsets) {
offset := d.ro.offsets[i]
_, k := readKey(d.b.access(offset, 0))
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(key, k) {
return d.b.len()
}
return offset
}
// The key is not in the index. i is the index where it would be inserted so return
// a value outside our offset range.
return d.b.len()
}
// ContainsKey returns true of key may exist in this index.
func (d *indirectIndex) ContainsKey(key []byte) bool {
return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
}
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key []byte) []IndexEntry {
return d.ReadEntries(key, nil)
}
func (d *indirectIndex) readEntriesAt(offset uint32, entries *[]IndexEntry) ([]byte, []IndexEntry) {
n, k := readKey(d.b.access(offset, 0))
// Read and return all the entries
offset += n
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err := readEntries(d.b.access(offset, 0), &ie); err != nil {
panic(fmt.Sprintf("error reading entries: %v", err))
}
if entries != nil {
*entries = ie.entries
}
return k, ie.entries
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) {
d.mu.RLock()
defer d.mu.RUnlock()
offset := d.search(key)
if offset < d.b.len() {
k, entries := d.readEntriesAt(offset, entries)
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at position 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(key, k) {
return nil
}
return entries
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return nil, nil
}
// The key is not in the index. i is the index where it would be inserted.
return nil
entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries)
if err != nil {
return nil, err
}
return entries, nil
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries := d.Entries(key)
entries, err := d.ReadEntries(key, nil)
if err != nil {
// TODO(jeff): log this somehow? we have an invalid entry in the tsm index
return nil
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return &entry
@ -905,49 +719,6 @@ func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
return nil
}
// Key returns the key in the index at the given position.
func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.ro.offsets) {
return nil, 0, nil
}
offset := d.ro.offsets[idx]
n, key := readKey(d.b.access(offset, 0))
typ := d.b.access(offset+n, 1)[0]
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err := readEntries(d.b.access(offset+n, 0), &ie); err != nil {
return nil, 0, nil
}
if entries != nil {
*entries = ie.entries
}
return key, typ, ie.entries
}
// KeyAt returns the key in the index at the given position.
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
if idx < 0 || idx >= len(d.ro.offsets) {
d.mu.RUnlock()
return nil, 0
}
offset := d.ro.offsets[idx]
n, key := readKey(d.b.access(offset, 0))
offset = offset + uint32(n)
typ := d.b.access(offset, 1)[0]
d.mu.RUnlock()
return key, typ
}
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
@ -956,6 +727,25 @@ func (d *indirectIndex) KeyCount() int {
return n
}
// Iterator returns an iterator over the keys starting at the provided key. You must
// call Next before calling any of the accessors.
func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator {
d.mu.RLock()
iter := d.ro.Iterator()
_, ok := iter.Seek(key, &d.b)
ti := &TSMIndexIterator{
d: d,
n: int(len(d.ro.offsets)),
b: &d.b,
iter: &iter,
first: true,
ok: ok,
}
d.mu.RUnlock()
return ti
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
if len(keys) == 0 {
@ -1163,36 +953,56 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange {
func (d *indirectIndex) TombstoneRange(key []byte) (r []TimeRange) {
d.mu.RLock()
r := d.tombstones[d.search(key)]
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if exact {
r = d.tombstones[iter.Offset()]
}
d.mu.RUnlock()
return r
}
// Contains return true if the given key exists in the index.
func (d *indirectIndex) Contains(key []byte) bool {
return len(d.Entries(key)) > 0
d.mu.RLock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
d.mu.RUnlock()
return exact
}
// ContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
entry := d.Entry(key, timestamp)
if entry == nil {
d.mu.RLock()
defer d.mu.RUnlock()
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return false
}
d.mu.RLock()
// TODO(jeff): we already did the search when calling d.Entry
tombstones := d.tombstones[d.search(key)]
d.mu.RUnlock()
for _, t := range tombstones {
if t.Min <= timestamp && t.Max >= timestamp {
for _, t := range d.tombstones[iter.Offset()] {
if t.Min <= timestamp && timestamp <= t.Max {
return false
}
}
return true
entries, err := d.ReadEntries(key, nil)
if err != nil {
// TODO(jeff): log this somehow? we have an invalid entry in the tsm index
return false
}
for _, entry := range entries {
if entry.Contains(timestamp) {
return true
}
}
return false
}
// Type returns the block type of the values stored for the key.
@ -1200,13 +1010,13 @@ func (d *indirectIndex) Type(key []byte) (byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
if offset := d.search(key); offset < d.b.len() {
n, _ := readKey(d.b.access(offset, 0))
offset += n
return d.b.access(offset, 1)[0], nil
iter := d.ro.Iterator()
exact, _ := iter.Seek(key, &d.b)
if !exact {
return 0, errors.New("key does not exist")
}
return 0, fmt.Errorf("key does not exist: %s", key)
return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
@ -1318,11 +1128,11 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
ro.Done()
firstOfs := ro.offsets[0]
_, key := readKey(b[firstOfs:])
key := readKey(b[firstOfs:])
d.minKey = key
lastOfs := ro.offsets[len(ro.offsets)-1]
_, key = readKey(b[lastOfs:])
key = readKey(b[lastOfs:])
d.maxKey = key
d.minTime = minTime
@ -1536,9 +1346,9 @@ func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, e
func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
m.incAccess()
blocks := m.index.Entries(key)
if len(blocks) == 0 {
return nil, nil
blocks, err := m.index.ReadEntries(key, nil)
if len(blocks) == 0 || err != nil {
return nil, err
}
tombstones := m.index.TombstoneRange(key)
@ -1547,7 +1357,6 @@ func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
defer m.mu.RUnlock()
var temp []Value
var err error
var values []Value
for _, block := range blocks {
var skip bool
@ -1642,54 +1451,39 @@ func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
return total, nil
}
func readKey(b []byte) (n uint32, key []byte) {
// 2 byte size of key
n, size := 2, uint32(binary.BigEndian.Uint16(b[:2]))
// N byte key
key = b[n : n+size]
n += uint32(len(key))
return
func readKey(b []byte) (key []byte) {
size := binary.BigEndian.Uint16(b[:2])
return b[2 : 2+size]
}
func readEntries(b []byte, entries *indexEntries) (n int, err error) {
if len(b) < 1+indexCountSize {
return 0, fmt.Errorf("readEntries: data too short for headers")
func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return entries[:0], errors.New("readEntries: data too short for headers")
}
// 1 byte block type
entries.Type = b[n]
n++
// 2 byte count of index entries
count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
n += indexCountSize
if cap(entries.entries) < count {
entries.entries = make([]IndexEntry, count)
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
if cap(entries) < count {
entries = make([]IndexEntry, count)
} else {
entries.entries = entries.entries[:count]
entries = entries[:count]
}
b = b[indexTypeSize+indexCountSize:]
b = b[indexCountSize+indexTypeSize:]
for i := 0; i < len(entries.entries); i++ {
if err = entries.entries[i].UnmarshalBinary(b); err != nil {
return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
for i := range entries {
if err := entries[i].UnmarshalBinary(b); err != nil {
return entries[:0], err
}
b = b[indexEntrySize:]
}
n += count * indexEntrySize
return
return entries, nil
}
// readEntriesTimes is a helper function to read entries at the provided buffer but
// only reading in the min and max times.
func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
if len(b) < indexTypeSize+indexCountSize {
return nil, errors.New("readEntries: data too short for headers")
return entries[:0], errors.New("readEntries: data too short for headers")
}
count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize]))
@ -1702,7 +1496,7 @@ func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) {
for i := range entries {
if len(b) < indexEntrySize {
return nil, errors.New("readEntries: stream too short for entry")
return entries[:0], errors.New("readEntries: stream too short for entry")
}
entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8]))
entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))

View File

@ -0,0 +1,52 @@
package tsm1
// BlockIterator allows iterating over each block in a TSM file in order. It provides
// raw access to the block bytes without decoding them.
type BlockIterator struct {
r *TSMReader
iter *TSMIndexIterator
entries []IndexEntry
}
// PeekNext returns the next key to be iterated or an empty string.
func (b *BlockIterator) PeekNext() []byte {
return b.iter.Peek()
}
// Next returns true if there are more blocks to iterate through.
func (b *BlockIterator) Next() bool {
if b.iter.Err() != nil {
return false
}
if len(b.entries) > 0 {
b.entries = b.entries[1:]
if len(b.entries) > 0 {
return true
}
}
if !b.iter.Next() {
return false
}
b.entries = b.iter.Entries()
return len(b.entries) > 0
}
// Read reads information about the next block to be iterated.
func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
if err := b.iter.Err(); err != nil {
return nil, 0, 0, 0, 0, nil, err
}
checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
if err != nil {
return nil, 0, 0, 0, 0, nil, err
}
return b.iter.Key(), b.entries[0].MinTime, b.entries[0].MaxTime, b.iter.Type(), checksum, buf, err
}
// Err returns any errors encounter during iteration.
func (b *BlockIterator) Err() error {
return b.iter.Err()
}

View File

@ -0,0 +1,106 @@
package tsm1
import (
"fmt"
)
// TSMIndexIterator allows one to iterate over the TSM index.
type TSMIndexIterator struct {
b *faultBuffer
n int
d *indirectIndex
iter *readerOffsetsIterator
// if true, don't need to advance iter on the call to Next
first bool
peeked bool
ok bool
err error
offset uint32
eoffset uint32
key []byte
typ byte
entries []IndexEntry
}
// Next advances the iterator and reports if it is still valid.
func (t *TSMIndexIterator) Next() bool {
if t.n != t.d.KeyCount() {
t.err, t.ok = fmt.Errorf("Key count changed during iteration"), false
}
if !t.ok || t.err != nil {
return false
}
if !t.peeked && !t.first {
t.ok = t.iter.Next()
}
if !t.ok {
return false
}
t.peeked = false
t.first = false
t.offset = t.iter.Offset()
t.eoffset = t.iter.EntryOffset(t.b)
t.key = nil
t.typ = 0
t.entries = t.entries[:0]
return true
}
// Peek reports the next key or nil if there is not one or an error happened.
func (t *TSMIndexIterator) Peek() []byte {
if !t.ok || t.err != nil {
return nil
}
if !t.peeked {
t.ok = t.iter.Next()
t.peeked = true
}
if !t.ok {
return nil
}
return t.iter.Key(t.b)
}
// Key reports the current key.
func (t *TSMIndexIterator) Key() []byte {
if t.key == nil {
buf := t.b.access(t.offset, 0)
t.key = readKey(buf)
t.typ = buf[2+len(t.key)]
}
return t.key
}
// Type reports the current type.
func (t *TSMIndexIterator) Type() byte {
if t.key == nil {
buf := t.b.access(t.offset, 0)
t.key = readKey(buf)
t.typ = buf[2+len(t.key)]
}
return t.typ
}
// Entries reports the current list of entries.
func (t *TSMIndexIterator) Entries() []IndexEntry {
if len(t.entries) == 0 {
buf := t.b.access(t.eoffset, 0)
t.entries, t.err = readEntries(buf, t.entries)
}
if t.err != nil {
return nil
}
return t.entries
}
// Err reports if an error stopped the iteration.
func (t *TSMIndexIterator) Err() error {
return t.err
}

View File

@ -1090,7 +1090,10 @@ func TestIndirectIndex_Entries(t *testing.T) {
t.Fatalf("unexpected error unmarshaling index: %v", err)
}
entries := indirect.Entries([]byte("cpu"))
entries, err := indirect.ReadEntries([]byte("cpu"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), len(exp); got != exp {
t.Fatalf("entries length mismatch: got %v, exp %v", got, exp)
@ -1133,7 +1136,10 @@ func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
// mem has not been added to the index so we should get no entries back
// for both
exp := index.Entries([]byte("mem"))
entries := indirect.Entries([]byte("mem"))
entries, err := indirect.ReadEntries([]byte("mem"), nil)
if err != nil {
t.Fatal(err)
}
if got, exp := len(entries), len(exp); got != exp && exp != 0 {
t.Fatalf("entries length mismatch: got %v, exp %v", got, exp)
@ -1953,7 +1959,7 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) {
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.Entries([]byte("cpu-00000001"))
indirect.ReadEntries([]byte("cpu-00000001"), nil)
}
b.SetBytes(getFaults(globalIndex) * 4096)
@ -1961,14 +1967,14 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) {
}
func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
var cache []IndexEntry
var entries []IndexEntry
indirect, _ := mustMakeIndex(b, 1000, 1000)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resetFaults(indirect)
indirect.ReadEntries([]byte("cpu-00000001"), &cache)
entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries)
}
b.SetBytes(getFaults(globalIndex) * 4096)