From 14cf01911e4fcfc3fb33f7ca0457875291989b6d Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Wed, 19 Dec 2018 18:37:00 -0700 Subject: [PATCH] tsm1: change TSMFile to use an iterator style api --- cmd/influx_inspect/buildtsi/buildtsi.go | 10 +- tsdb/tsm1/compact_test.go | 29 +- tsdb/tsm1/engine.go | 19 +- tsdb/tsm1/file_store.go | 42 ++- tsdb/tsm1/file_store_key_iterator.go | 32 +- tsdb/tsm1/file_store_key_iterator_test.go | 94 +---- tsdb/tsm1/reader.go | 430 ++++++---------------- tsdb/tsm1/reader_block_iterator.go | 52 +++ tsdb/tsm1/reader_index_iterator.go | 106 ++++++ tsdb/tsm1/reader_test.go | 16 +- 10 files changed, 382 insertions(+), 448 deletions(-) create mode 100644 tsdb/tsm1/reader_block_iterator.go create mode 100644 tsdb/tsm1/reader_index_iterator.go diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index d1cfe49df9..f3c54c9b45 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -350,12 +350,13 @@ func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger Types: make([]models.FieldType, 0, batchSize), } var ti int - for i := 0; i < r.KeyCount(); i++ { - key, _ := r.KeyAt(i) + iter := r.Iterator(nil) + for iter.Next() { + key := iter.Key() seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key) var name []byte name, collection.Tags[ti] = models.ParseKeyBytesWithTags(seriesKey, collection.Tags[ti]) - typ, _ := r.Type(key) + typ := iter.Type() if verboseLogging { log.Info("Series", zap.String("name", string(name)), zap.String("tags", collection.Tags[ti].String())) @@ -377,6 +378,9 @@ func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger ti = 0 // Reset tags. } } + if err := iter.Err(); err != nil { + return fmt.Errorf("problem creating series: (%s)", err) + } // Flush any remaining series in the batches if len(collection.Keys) > 0 { diff --git a/tsdb/tsm1/compact_test.go b/tsdb/tsm1/compact_test.go index a3f268fa8a..03a0df9cfb 100644 --- a/tsdb/tsm1/compact_test.go +++ b/tsdb/tsm1/compact_test.go @@ -126,7 +126,10 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { } r := MustOpenTSMReader(files[0]) - entries := r.Entries([]byte("cpu,host=A#!~#value")) + entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil) + if err != nil { + t.Fatal(err) + } _, b, err := r.ReadBytes(&entries[0], nil) if err != nil { t.Fatalf("ReadBytes: unexpected error %v", err) @@ -661,7 +664,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { } } - if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp { + entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil) + if err != nil { + t.Fatal(err) + } + if got, exp := len(entries), 2; got != exp { t.Fatalf("block count mismatch: got %v, exp %v", got, exp) } } @@ -774,7 +781,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { } } - if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 1; got != exp { + entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil) + if err != nil { + t.Fatal(err) + } + if got, exp := len(entries), 1; got != exp { t.Fatalf("block count mismatch: got %v, exp %v", got, exp) } } @@ -888,7 +899,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { } } - if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp { + entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil) + if err != nil { + t.Fatal(err) + } + if got, exp := len(entries), 2; got != exp { t.Fatalf("block count mismatch: got %v, exp %v", got, exp) } } @@ -996,7 +1011,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { } } - if got, exp := len(r.Entries([]byte("cpu,host=A#!~#value"))), 2; got != exp { + entries, err := r.ReadEntries([]byte("cpu,host=A#!~#value"), nil) + if err != nil { + t.Fatal(err) + } + if got, exp := len(entries), 2; got != exp { t.Fatalf("block count mismatch: got %v, exp %v", got, exp) } } diff --git a/tsdb/tsm1/engine.go b/tsdb/tsm1/engine.go index 2cc9a3ff26..075878323f 100644 --- a/tsdb/tsm1/engine.go +++ b/tsdb/tsm1/engine.go @@ -818,10 +818,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // Delete each key we find in the file. We seek to the min key and walk from there. batch := r.BatchDelete() - n := r.KeyCount() + iter := r.Iterator(minKey) var j int - for i := r.Seek(minKey); i < n; i++ { - indexKey, _ := r.KeyAt(i) + for iter.Next() { + indexKey := iter.Key() seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) for j < len(seriesKeys) && bytes.Compare(seriesKeys[j], seriesKey) < 0 { @@ -838,6 +838,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { } } } + if err := iter.Err(); err != nil { + batch.Rollback() + return err + } return batch.Commit() }); err != nil { @@ -884,16 +888,16 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // Apply runs this func concurrently. The seriesKeys slice is mutated concurrently // by different goroutines setting positions to nil. if err := e.FileStore.Apply(func(r TSMFile) error { - n := r.KeyCount() var j int // Start from the min deleted key that exists in this file. - for i := r.Seek(minKey); i < n; i++ { + iter := r.Iterator(minKey) + for iter.Next() { if j >= len(seriesKeys) { return nil } - indexKey, _ := r.KeyAt(i) + indexKey := iter.Key() seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey) // Skip over any deleted keys that are less than our tsm key @@ -912,7 +916,8 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { j++ } } - return nil + + return iter.Err() }); err != nil { return err } diff --git a/tsdb/tsm1/file_store.go b/tsdb/tsm1/file_store.go index 7a79cf97b9..d353717517 100644 --- a/tsdb/tsm1/file_store.go +++ b/tsdb/tsm1/file_store.go @@ -34,6 +34,15 @@ const ( BadTSMFileExtension = "bad" ) +type TSMIterator interface { + Next() bool + Peek() []byte + Key() []byte + Type() byte + Entries() []IndexEntry + Err() error +} + // TSMFile represents an on-disk TSM file. type TSMFile interface { // Path returns the underlying file path for the TSMFile. If the file @@ -57,8 +66,7 @@ type TSMFile interface { ReadBooleanArrayBlockAt(entry *IndexEntry, values *tsdb.BooleanArray) error // Entries returns the index entries for all blocks for the given key. - Entries(key []byte) []IndexEntry - ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry + ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) // Returns true if the TSMFile may contain a value with the specified // key and time. @@ -86,11 +94,9 @@ type TSMFile interface { // KeyCount returns the number of distinct keys in the file. KeyCount() int - // Seek returns the position in the index with the key <= key. - Seek(key []byte) int - - // KeyAt returns the key located at index position idx. - KeyAt(idx int) ([]byte, byte) + // Iterator returns an iterator over the keys starting at the provided key. You must + // call Next before calling any of the accessors. + Iterator([]byte) TSMIterator // Type returns the block type of the values stored for the key. Returns one of // BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist, @@ -981,7 +987,9 @@ func (f *FileStore) BlockCount(path string, idx int) int { // We need to determine the possible files that may be accessed by this query given // the time range. func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost { - var cache []IndexEntry + var entries []IndexEntry + var err error + cost := query.IteratorCost{} for _, fd := range f.files { minTime, maxTime := fd.TimeRange() @@ -991,7 +999,12 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost { skipped := true tombstones := fd.TombstoneRange(key) - entries := fd.ReadEntries(key, &cache) + entries, err = fd.ReadEntries(key, entries) + if err != nil { + // TODO(jeff): log this somehow? we have an invalid entry in the tsm index + continue + } + ENTRIES: for i := 0; i < len(entries); i++ { ie := entries[i] @@ -1023,7 +1036,9 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost { // whether the key will be scan in ascending time order or descenging time order. // This function assumes the read-lock has been taken. func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { - var cache []IndexEntry + var entries []IndexEntry + var err error + locations := make([]*location, 0, len(f.files)) for _, fd := range f.files { minTime, maxTime := fd.TimeRange() @@ -1041,7 +1056,12 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location { // This file could potential contain points we are looking for so find the blocks for // the given key. - entries := fd.ReadEntries(key, &cache) + entries, err = fd.ReadEntries(key, entries) + if err != nil { + // TODO(jeff): log this somehow? we have an invalid entry in the tsm index + continue + } + LOOP: for i := 0; i < len(entries); i++ { ie := entries[i] diff --git a/tsdb/tsm1/file_store_key_iterator.go b/tsdb/tsm1/file_store_key_iterator.go index c3613b654c..bf4bfdc2ef 100644 --- a/tsdb/tsm1/file_store_key_iterator.go +++ b/tsdb/tsm1/file_store_key_iterator.go @@ -6,33 +6,20 @@ import ( ) type keyIterator struct { - f TSMFile - c int // current key index - n int // key count - key []byte - typ byte + iter TSMIterator + key []byte + typ byte } func newKeyIterator(f TSMFile, seek []byte) *keyIterator { - c, n := 0, f.KeyCount() - if len(seek) > 0 { - c = f.Seek(seek) - } - - if c >= n { - return nil - } - - k := &keyIterator{f: f, c: c, n: n} + k := &keyIterator{iter: f.Iterator(seek)} k.next() - return k } func (k *keyIterator) next() bool { - if k.c < k.n { - k.key, k.typ = k.f.KeyAt(k.c) - k.c++ + if k.iter.Next() { + k.key, k.typ = k.iter.Key(), k.iter.Type() return true } return false @@ -98,9 +85,10 @@ func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ } type keyIterators []*keyIterator -func (k keyIterators) Len() int { return len(k) } -func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 } -func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] } +func (k keyIterators) Len() int { return len(k) } +func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 } +func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] } + func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) } func (k *keyIterators) Pop() interface{} { diff --git a/tsdb/tsm1/file_store_key_iterator_test.go b/tsdb/tsm1/file_store_key_iterator_test.go index a2678190db..6561064218 100644 --- a/tsdb/tsm1/file_store_key_iterator_test.go +++ b/tsdb/tsm1/file_store_key_iterator_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/influxdata/platform/tsdb" ) func TestNewMergeKeyIterator(t *testing.T) { @@ -128,6 +127,7 @@ func newTSMFiles(keys ...[]string) []TSMFile { } type mockTSMFile struct { + TSMFile keys []string } @@ -136,85 +136,25 @@ func newMockTSMFile(keys ...string) *mockTSMFile { return &mockTSMFile{keys: keys} } -func (t *mockTSMFile) KeyCount() int { return len(t.keys) } - -func (t *mockTSMFile) Seek(key []byte) int { - k := string(key) - return sort.Search(len(t.keys), func(i int) bool { - return t.keys[i] >= k - }) +func (m *mockTSMFile) Iterator(seek []byte) TSMIterator { + skey := string(seek) + n := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= skey }) + return &mockTSMIterator{ + n: n - 1, + keys: m.keys, + } } -func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) { - return []byte(t.keys[idx]), BlockFloat64 +type mockTSMIterator struct { + TSMIndexIterator + n int + keys []string } -func (*mockTSMFile) Path() string { panic("implement me") } -func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") } -func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") } -func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") } -func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") } -func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") } -func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") } -func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") } -func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") } -func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") } -func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") } -func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") } -func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") } -func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") } -func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") } -func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") } -func (*mockTSMFile) HasTombstones() bool { panic("implement me") } -func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") } -func (*mockTSMFile) Close() error { panic("implement me") } -func (*mockTSMFile) Size() uint32 { panic("implement me") } -func (*mockTSMFile) Rename(path string) error { panic("implement me") } -func (*mockTSMFile) Remove() error { panic("implement me") } -func (*mockTSMFile) InUse() bool { panic("implement me") } -func (*mockTSMFile) Ref() { panic("implement me") } -func (*mockTSMFile) Unref() { panic("implement me") } -func (*mockTSMFile) Stats() FileStat { panic("implement me") } -func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") } -func (*mockTSMFile) Free() error { panic("implement me") } -func (*mockTSMFile) MeasurementStats() (MeasurementStats, error) { panic("implement me") } - -func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) { - panic("implement me") +func (m *mockTSMIterator) Next() bool { + m.n++ + return m.n < len(m.keys) } -func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) { - panic("implement me") -} - -func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) { - panic("implement me") -} - -func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) { - panic("implement me") -} - -func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) { - panic("implement me") -} - -func (*mockTSMFile) ReadFloatArrayBlockAt(*IndexEntry, *tsdb.FloatArray) error { - panic("implement me") -} - -func (*mockTSMFile) ReadIntegerArrayBlockAt(*IndexEntry, *tsdb.IntegerArray) error { - panic("implement me") -} - -func (*mockTSMFile) ReadUnsignedArrayBlockAt(*IndexEntry, *tsdb.UnsignedArray) error { - panic("implement me") -} - -func (*mockTSMFile) ReadStringArrayBlockAt(*IndexEntry, *tsdb.StringArray) error { - panic("implement me") -} - -func (*mockTSMFile) ReadBooleanArrayBlockAt(*IndexEntry, *tsdb.BooleanArray) error { - panic("implement me") -} +func (m *mockTSMIterator) Key() []byte { return []byte(m.keys[m.n]) } +func (m *mockTSMIterator) Type() byte { return 0 } diff --git a/tsdb/tsm1/reader.go b/tsdb/tsm1/reader.go index cd2a024284..424341d350 100644 --- a/tsdb/tsm1/reader.go +++ b/tsdb/tsm1/reader.go @@ -71,27 +71,19 @@ type TSMIndex interface { // exist in this file, but not have a point exactly at time t. ContainsValue(key []byte, timestamp int64) bool - // Entries returns all index entries for a key. - Entries(key []byte) []IndexEntry - // ReadEntries reads the index entries for key into entries. - ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry + ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) // Entry returns the index entry for the specified key and timestamp. If no entry // matches the key and timestamp, nil is returned. Entry(key []byte, timestamp int64) *IndexEntry - // Key returns the key in the index at the given position, using entries to avoid allocations. - Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) - - // KeyAt returns the key in the index at the given position. - KeyAt(index int) ([]byte, byte) - // KeyCount returns the count of unique keys in the index. KeyCount() int - // Seek returns the position in the index where key <= value in the index. - Seek(key []byte) int + // Iterator returns an iterator over the keys starting at the provided key. You must + // call Next before calling any of the accessors. + Iterator([]byte) *TSMIndexIterator // OverlapsTimeRange returns true if the time range of the file intersect min and max. OverlapsTimeRange(min, max int64) bool @@ -124,88 +116,6 @@ type TSMIndex interface { Close() error } -// BlockIterator allows iterating over each block in a TSM file in order. It provides -// raw access to the block bytes without decoding them. -type BlockIterator struct { - r *TSMReader - - // i is the current key index - i int - - // n is the total number of keys - n int - - key []byte - cache []IndexEntry - entries []IndexEntry - err error - typ byte -} - -// PeekNext returns the next key to be iterated or an empty string. -func (b *BlockIterator) PeekNext() []byte { - if len(b.entries) > 1 { - return b.key - } else if b.n-b.i > 1 { - key, _ := b.r.KeyAt(b.i + 1) - return key - } - return nil -} - -// Next returns true if there are more blocks to iterate through. -func (b *BlockIterator) Next() bool { - if b.err != nil { - return false - } - - if b.n-b.i == 0 && len(b.entries) == 0 { - return false - } - - if len(b.entries) > 0 { - b.entries = b.entries[1:] - if len(b.entries) > 0 { - return true - } - } - - if b.n-b.i > 0 { - b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache) - b.i++ - - // If there were deletes on the TSMReader, then our index is now off and we - // can't proceed. What we just read may not actually the next block. - if b.n != b.r.KeyCount() { - b.err = fmt.Errorf("delete during iteration") - return false - } - - if len(b.entries) > 0 { - return true - } - } - - return false -} - -// Read reads information about the next block to be iterated. -func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { - if b.err != nil { - return nil, 0, 0, 0, 0, nil, b.err - } - checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) - if err != nil { - return nil, 0, 0, 0, 0, nil, err - } - return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err -} - -// Err returns any errors encounter during iteration. -func (b *BlockIterator) Err() error { - return b.err -} - type tsmReaderOption func(*TSMReader) // WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. @@ -310,20 +220,6 @@ func (t *TSMReader) Path() string { return p } -// Key returns the key and the underlying entry at the numeric index. -func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { - return t.index.Key(index, entries) -} - -// KeyAt returns the key and key type at position idx in the index. -func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { - return t.index.KeyAt(idx) -} - -func (t *TSMReader) Seek(key []byte) int { - return t.index.Seek(key) -} - // ReadAt returns the values corresponding to the given index entry. func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { t.mu.RLock() @@ -490,6 +386,12 @@ func (t *TSMReader) Delete(keys [][]byte) error { return nil } +// Iterator returns an iterator over the keys starting at the provided key. You must +// call Next before calling any of the accessors. +func (t *TSMReader) Iterator(key []byte) TSMIterator { + return t.index.Iterator(key) +} + // OverlapsTimeRange returns true if the time range of the file intersect min and max. func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { return t.index.OverlapsTimeRange(min, max) @@ -515,13 +417,8 @@ func (t *TSMReader) KeyCount() int { return t.index.KeyCount() } -// Entries returns all index entries for key. -func (t *TSMReader) Entries(key []byte) []IndexEntry { - return t.index.Entries(key) -} - // ReadEntries reads the index entries for key into entries. -func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { +func (t *TSMReader) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) { return t.index.ReadEntries(key, entries) } @@ -593,9 +490,13 @@ func (t *TSMReader) Stats() FileStat { // BlockIterator returns a BlockIterator for the underlying TSM file. func (t *TSMReader) BlockIterator() *BlockIterator { + t.mu.RLock() + iter := t.index.Iterator(nil) + t.mu.RUnlock() + return &BlockIterator{ - r: t, - n: t.index.KeyCount(), + r: t, + iter: iter, } } @@ -778,125 +679,38 @@ func NewIndirectIndex() *indirectIndex { } } -func (d *indirectIndex) Seek(key []byte) int { - d.mu.RLock() - defer d.mu.RUnlock() - return d.searchOffset(key) -} - -func searchPrefixesIndex(prefixes []prefixEntry, n int) int { - return sort.Search(len(prefixes), func(i int) bool { - return prefixes[i].total > n - }) -} - -func searchPrefixes(prefixes []prefixEntry, n int) (prefix, bool) { - i := searchPrefixesIndex(prefixes, n) - if i < len(prefixes) { - return prefixes[i].pre, true - } - return prefix{}, false -} - -// searchOffset searches the offsets slice for key and returns the position in -// offsets where key would exist. -func (d *indirectIndex) searchOffset(key []byte) (index int) { - pre := keyPrefix(key) - return sort.Search(len(d.ro.offsets), func(i int) bool { - if prei, ok := searchPrefixes(d.ro.prefixes, i); ok { - if cmp := comparePrefix(prei, pre); cmp == -1 { - return false - } else if cmp == 1 { - return true - } - } - _, k := readKey(d.b.access(d.ro.offsets[i], 0)) - return bytes.Compare(k, key) >= 0 - }) -} - -// search returns the byte position of key in the index. If key is not -// in the index, len(index) is returned. -func (d *indirectIndex) search(key []byte) uint32 { - if !d.ContainsKey(key) { - return d.b.len() - } - - // We use a binary search across our indirect offsets (pointers to all the keys - // in the index slice). We then check if we have found the right index. - if i := d.searchOffset(key); i < len(d.ro.offsets) { - offset := d.ro.offsets[i] - _, k := readKey(d.b.access(offset, 0)) - - // The search may have returned an i == 0 which could indicated that the value - // searched should be inserted at postion 0. Make sure the key in the index - // matches the search value. - if !bytes.Equal(key, k) { - return d.b.len() - } - - return offset - } - - // The key is not in the index. i is the index where it would be inserted so return - // a value outside our offset range. - return d.b.len() -} - // ContainsKey returns true of key may exist in this index. func (d *indirectIndex) ContainsKey(key []byte) bool { return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 } -// Entries returns all index entries for a key. -func (d *indirectIndex) Entries(key []byte) []IndexEntry { - return d.ReadEntries(key, nil) -} - -func (d *indirectIndex) readEntriesAt(offset uint32, entries *[]IndexEntry) ([]byte, []IndexEntry) { - n, k := readKey(d.b.access(offset, 0)) - - // Read and return all the entries - offset += n - var ie indexEntries - if entries != nil { - ie.entries = *entries - } - if _, err := readEntries(d.b.access(offset, 0), &ie); err != nil { - panic(fmt.Sprintf("error reading entries: %v", err)) - } - if entries != nil { - *entries = ie.entries - } - return k, ie.entries -} - // ReadEntries returns all index entries for a key. -func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { +func (d *indirectIndex) ReadEntries(key []byte, entries []IndexEntry) ([]IndexEntry, error) { d.mu.RLock() defer d.mu.RUnlock() - offset := d.search(key) - if offset < d.b.len() { - k, entries := d.readEntriesAt(offset, entries) - // The search may have returned an i == 0 which could indicated that the value - // searched should be inserted at position 0. Make sure the key in the index - // matches the search value. - if !bytes.Equal(key, k) { - return nil - } - - return entries + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { + return nil, nil } - // The key is not in the index. i is the index where it would be inserted. - return nil + entries, err := readEntries(d.b.access(iter.EntryOffset(&d.b), 0), entries) + if err != nil { + return nil, err + } + + return entries, nil } // Entry returns the index entry for the specified key and timestamp. If no entry // matches the key an timestamp, nil is returned. func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { - entries := d.Entries(key) + entries, err := d.ReadEntries(key, nil) + if err != nil { + // TODO(jeff): log this somehow? we have an invalid entry in the tsm index + return nil + } for _, entry := range entries { if entry.Contains(timestamp) { return &entry @@ -905,49 +719,6 @@ func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { return nil } -// Key returns the key in the index at the given position. -func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { - d.mu.RLock() - defer d.mu.RUnlock() - - if idx < 0 || idx >= len(d.ro.offsets) { - return nil, 0, nil - } - - offset := d.ro.offsets[idx] - n, key := readKey(d.b.access(offset, 0)) - typ := d.b.access(offset+n, 1)[0] - - var ie indexEntries - if entries != nil { - ie.entries = *entries - } - if _, err := readEntries(d.b.access(offset+n, 0), &ie); err != nil { - return nil, 0, nil - } - if entries != nil { - *entries = ie.entries - } - - return key, typ, ie.entries -} - -// KeyAt returns the key in the index at the given position. -func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { - d.mu.RLock() - - if idx < 0 || idx >= len(d.ro.offsets) { - d.mu.RUnlock() - return nil, 0 - } - offset := d.ro.offsets[idx] - n, key := readKey(d.b.access(offset, 0)) - offset = offset + uint32(n) - typ := d.b.access(offset, 1)[0] - d.mu.RUnlock() - return key, typ -} - // KeyCount returns the count of unique keys in the index. func (d *indirectIndex) KeyCount() int { d.mu.RLock() @@ -956,6 +727,25 @@ func (d *indirectIndex) KeyCount() int { return n } +// Iterator returns an iterator over the keys starting at the provided key. You must +// call Next before calling any of the accessors. +func (d *indirectIndex) Iterator(key []byte) *TSMIndexIterator { + d.mu.RLock() + iter := d.ro.Iterator() + _, ok := iter.Seek(key, &d.b) + ti := &TSMIndexIterator{ + d: d, + n: int(len(d.ro.offsets)), + b: &d.b, + iter: &iter, + first: true, + ok: ok, + } + d.mu.RUnlock() + + return ti +} + // Delete removes the given keys from the index. func (d *indirectIndex) Delete(keys [][]byte) { if len(keys) == 0 { @@ -1163,36 +953,56 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { } // TombstoneRange returns ranges of time that are deleted for the given key. -func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange { +func (d *indirectIndex) TombstoneRange(key []byte) (r []TimeRange) { d.mu.RLock() - r := d.tombstones[d.search(key)] + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if exact { + r = d.tombstones[iter.Offset()] + } d.mu.RUnlock() return r } // Contains return true if the given key exists in the index. func (d *indirectIndex) Contains(key []byte) bool { - return len(d.Entries(key)) > 0 + d.mu.RLock() + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + d.mu.RUnlock() + return exact } // ContainsValue returns true if key and time might exist in this file. func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { - entry := d.Entry(key, timestamp) - if entry == nil { + d.mu.RLock() + defer d.mu.RUnlock() + + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { return false } - d.mu.RLock() - // TODO(jeff): we already did the search when calling d.Entry - tombstones := d.tombstones[d.search(key)] - d.mu.RUnlock() - - for _, t := range tombstones { - if t.Min <= timestamp && t.Max >= timestamp { + for _, t := range d.tombstones[iter.Offset()] { + if t.Min <= timestamp && timestamp <= t.Max { return false } } - return true + + entries, err := d.ReadEntries(key, nil) + if err != nil { + // TODO(jeff): log this somehow? we have an invalid entry in the tsm index + return false + } + + for _, entry := range entries { + if entry.Contains(timestamp) { + return true + } + } + + return false } // Type returns the block type of the values stored for the key. @@ -1200,13 +1010,13 @@ func (d *indirectIndex) Type(key []byte) (byte, error) { d.mu.RLock() defer d.mu.RUnlock() - if offset := d.search(key); offset < d.b.len() { - n, _ := readKey(d.b.access(offset, 0)) - offset += n - return d.b.access(offset, 1)[0], nil + iter := d.ro.Iterator() + exact, _ := iter.Seek(key, &d.b) + if !exact { + return 0, errors.New("key does not exist") } - return 0, fmt.Errorf("key does not exist: %s", key) + return d.b.access(iter.EntryOffset(&d.b), 1)[0], nil } // OverlapsTimeRange returns true if the time range of the file intersect min and max. @@ -1318,11 +1128,11 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { ro.Done() firstOfs := ro.offsets[0] - _, key := readKey(b[firstOfs:]) + key := readKey(b[firstOfs:]) d.minKey = key lastOfs := ro.offsets[len(ro.offsets)-1] - _, key = readKey(b[lastOfs:]) + key = readKey(b[lastOfs:]) d.maxKey = key d.minTime = minTime @@ -1536,9 +1346,9 @@ func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, e func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { m.incAccess() - blocks := m.index.Entries(key) - if len(blocks) == 0 { - return nil, nil + blocks, err := m.index.ReadEntries(key, nil) + if len(blocks) == 0 || err != nil { + return nil, err } tombstones := m.index.TombstoneRange(key) @@ -1547,7 +1357,6 @@ func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { defer m.mu.RUnlock() var temp []Value - var err error var values []Value for _, block := range blocks { var skip bool @@ -1642,54 +1451,39 @@ func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { return total, nil } -func readKey(b []byte) (n uint32, key []byte) { - // 2 byte size of key - n, size := 2, uint32(binary.BigEndian.Uint16(b[:2])) - - // N byte key - key = b[n : n+size] - - n += uint32(len(key)) - return +func readKey(b []byte) (key []byte) { + size := binary.BigEndian.Uint16(b[:2]) + return b[2 : 2+size] } -func readEntries(b []byte, entries *indexEntries) (n int, err error) { - if len(b) < 1+indexCountSize { - return 0, fmt.Errorf("readEntries: data too short for headers") +func readEntries(b []byte, entries []IndexEntry) ([]IndexEntry, error) { + if len(b) < indexTypeSize+indexCountSize { + return entries[:0], errors.New("readEntries: data too short for headers") } - // 1 byte block type - entries.Type = b[n] - n++ - - // 2 byte count of index entries - count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize])) - n += indexCountSize - - if cap(entries.entries) < count { - entries.entries = make([]IndexEntry, count) + count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) + if cap(entries) < count { + entries = make([]IndexEntry, count) } else { - entries.entries = entries.entries[:count] + entries = entries[:count] } + b = b[indexTypeSize+indexCountSize:] - b = b[indexCountSize+indexTypeSize:] - for i := 0; i < len(entries.entries); i++ { - if err = entries.entries[i].UnmarshalBinary(b); err != nil { - return 0, fmt.Errorf("readEntries: unmarshal error: %v", err) + for i := range entries { + if err := entries[i].UnmarshalBinary(b); err != nil { + return entries[:0], err } b = b[indexEntrySize:] } - n += count * indexEntrySize - - return + return entries, nil } // readEntriesTimes is a helper function to read entries at the provided buffer but // only reading in the min and max times. func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) { if len(b) < indexTypeSize+indexCountSize { - return nil, errors.New("readEntries: data too short for headers") + return entries[:0], errors.New("readEntries: data too short for headers") } count := int(binary.BigEndian.Uint16(b[indexTypeSize : indexTypeSize+indexCountSize])) @@ -1702,7 +1496,7 @@ func readEntriesTimes(b []byte, entries []IndexEntry) ([]IndexEntry, error) { for i := range entries { if len(b) < indexEntrySize { - return nil, errors.New("readEntries: stream too short for entry") + return entries[:0], errors.New("readEntries: stream too short for entry") } entries[i].MinTime = int64(binary.BigEndian.Uint64(b[0:8])) entries[i].MaxTime = int64(binary.BigEndian.Uint64(b[8:16])) diff --git a/tsdb/tsm1/reader_block_iterator.go b/tsdb/tsm1/reader_block_iterator.go new file mode 100644 index 0000000000..9bf6b2626f --- /dev/null +++ b/tsdb/tsm1/reader_block_iterator.go @@ -0,0 +1,52 @@ +package tsm1 + +// BlockIterator allows iterating over each block in a TSM file in order. It provides +// raw access to the block bytes without decoding them. +type BlockIterator struct { + r *TSMReader + iter *TSMIndexIterator + entries []IndexEntry +} + +// PeekNext returns the next key to be iterated or an empty string. +func (b *BlockIterator) PeekNext() []byte { + return b.iter.Peek() +} + +// Next returns true if there are more blocks to iterate through. +func (b *BlockIterator) Next() bool { + if b.iter.Err() != nil { + return false + } + + if len(b.entries) > 0 { + b.entries = b.entries[1:] + if len(b.entries) > 0 { + return true + } + } + + if !b.iter.Next() { + return false + } + b.entries = b.iter.Entries() + + return len(b.entries) > 0 +} + +// Read reads information about the next block to be iterated. +func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { + if err := b.iter.Err(); err != nil { + return nil, 0, 0, 0, 0, nil, err + } + checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) + if err != nil { + return nil, 0, 0, 0, 0, nil, err + } + return b.iter.Key(), b.entries[0].MinTime, b.entries[0].MaxTime, b.iter.Type(), checksum, buf, err +} + +// Err returns any errors encounter during iteration. +func (b *BlockIterator) Err() error { + return b.iter.Err() +} diff --git a/tsdb/tsm1/reader_index_iterator.go b/tsdb/tsm1/reader_index_iterator.go new file mode 100644 index 0000000000..cb80d09f6a --- /dev/null +++ b/tsdb/tsm1/reader_index_iterator.go @@ -0,0 +1,106 @@ +package tsm1 + +import ( + "fmt" +) + +// TSMIndexIterator allows one to iterate over the TSM index. +type TSMIndexIterator struct { + b *faultBuffer + n int + d *indirectIndex + iter *readerOffsetsIterator + + // if true, don't need to advance iter on the call to Next + first bool + peeked bool + + ok bool + err error + + offset uint32 + eoffset uint32 + key []byte + typ byte + entries []IndexEntry +} + +// Next advances the iterator and reports if it is still valid. +func (t *TSMIndexIterator) Next() bool { + if t.n != t.d.KeyCount() { + t.err, t.ok = fmt.Errorf("Key count changed during iteration"), false + } + if !t.ok || t.err != nil { + return false + } + if !t.peeked && !t.first { + t.ok = t.iter.Next() + } + if !t.ok { + return false + } + + t.peeked = false + t.first = false + + t.offset = t.iter.Offset() + t.eoffset = t.iter.EntryOffset(t.b) + t.key = nil + t.typ = 0 + t.entries = t.entries[:0] + return true +} + +// Peek reports the next key or nil if there is not one or an error happened. +func (t *TSMIndexIterator) Peek() []byte { + if !t.ok || t.err != nil { + return nil + } + if !t.peeked { + t.ok = t.iter.Next() + t.peeked = true + } + + if !t.ok { + return nil + } + + return t.iter.Key(t.b) +} + +// Key reports the current key. +func (t *TSMIndexIterator) Key() []byte { + if t.key == nil { + buf := t.b.access(t.offset, 0) + t.key = readKey(buf) + t.typ = buf[2+len(t.key)] + } + return t.key +} + +// Type reports the current type. +func (t *TSMIndexIterator) Type() byte { + if t.key == nil { + buf := t.b.access(t.offset, 0) + t.key = readKey(buf) + t.typ = buf[2+len(t.key)] + } + return t.typ +} + +// Entries reports the current list of entries. +func (t *TSMIndexIterator) Entries() []IndexEntry { + if len(t.entries) == 0 { + buf := t.b.access(t.eoffset, 0) + t.entries, t.err = readEntries(buf, t.entries) + } + if t.err != nil { + return nil + } + return t.entries +} + +// Err reports if an error stopped the iteration. +func (t *TSMIndexIterator) Err() error { + return t.err +} diff --git a/tsdb/tsm1/reader_test.go b/tsdb/tsm1/reader_test.go index b7cf0d39c3..b811611797 100644 --- a/tsdb/tsm1/reader_test.go +++ b/tsdb/tsm1/reader_test.go @@ -1090,7 +1090,10 @@ func TestIndirectIndex_Entries(t *testing.T) { t.Fatalf("unexpected error unmarshaling index: %v", err) } - entries := indirect.Entries([]byte("cpu")) + entries, err := indirect.ReadEntries([]byte("cpu"), nil) + if err != nil { + t.Fatal(err) + } if got, exp := len(entries), len(exp); got != exp { t.Fatalf("entries length mismatch: got %v, exp %v", got, exp) @@ -1133,7 +1136,10 @@ func TestIndirectIndex_Entries_NonExistent(t *testing.T) { // mem has not been added to the index so we should get no entries back // for both exp := index.Entries([]byte("mem")) - entries := indirect.Entries([]byte("mem")) + entries, err := indirect.ReadEntries([]byte("mem"), nil) + if err != nil { + t.Fatal(err) + } if got, exp := len(entries), len(exp); got != exp && exp != 0 { t.Fatalf("entries length mismatch: got %v, exp %v", got, exp) @@ -1953,7 +1959,7 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) { for i := 0; i < b.N; i++ { resetFaults(indirect) - indirect.Entries([]byte("cpu-00000001")) + indirect.ReadEntries([]byte("cpu-00000001"), nil) } b.SetBytes(getFaults(globalIndex) * 4096) @@ -1961,14 +1967,14 @@ func BenchmarkIndirectIndex_Entries(b *testing.B) { } func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { - var cache []IndexEntry + var entries []IndexEntry indirect, _ := mustMakeIndex(b, 1000, 1000) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { resetFaults(indirect) - indirect.ReadEntries([]byte("cpu-00000001"), &cache) + entries, _ = indirect.ReadEntries([]byte("cpu-00000001"), entries) } b.SetBytes(getFaults(globalIndex) * 4096)