Merge pull request #8853 from influxdata/sgc-readentries-perf

Reduce allocations, improve readEntries performance by simplifying loop
pull/8855/head
Stuart Carnie 2017-09-19 13:28:43 -07:00 committed by GitHub
commit cf0a1d9d99
5 changed files with 345 additions and 278 deletions

View File

@ -28,6 +28,7 @@
- [#8784](https://github.com/influxdata/influxdb/pull/8784): Add support for the Prometheus remote read and write APIs.
- [#8851](https://github.com/influxdata/influxdb/pull/8851): Improve performance of `Include` and `Exclude` functions
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.
- [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop
### Bugfixes

View File

@ -38,7 +38,7 @@ type TSMFile interface {
// Entries returns the index entries for all blocks for the given key.
Entries(key []byte) []IndexEntry
ReadEntries(key []byte, entries *[]IndexEntry)
ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
// Returns true if the TSMFile may contain a value with the specified
// key and time.
@ -769,7 +769,7 @@ func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
// We need to determine the possible files that may be accessed by this query given
// the time range.
func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
var entries []IndexEntry
var cache []IndexEntry
cost := query.IteratorCost{}
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
@ -779,7 +779,7 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
skipped := true
tombstones := fd.TombstoneRange(key)
fd.ReadEntries(key, &entries)
entries := fd.ReadEntries(key, &cache)
ENTRIES:
for i := 0; i < len(entries); i++ {
ie := entries[i]
@ -811,7 +811,7 @@ func (f *FileStore) cost(key []byte, min, max int64) query.IteratorCost {
// whether the key will be scan in ascending time order or descenging time order.
// This function assumes the read-lock has been taken.
func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
var entries []IndexEntry
var cache []IndexEntry
locations := make([]*location, 0, len(f.files))
for _, fd := range f.files {
minTime, maxTime := fd.TimeRange()
@ -829,22 +829,18 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
// This file could potential contain points we are looking for so find the blocks for
// the given key.
fd.ReadEntries(key, &entries)
entries := fd.ReadEntries(key, &cache)
LOOP:
for i := 0; i < len(entries); i++ {
ie := entries[i]
// Skip any blocks only contain values that are tombstoned.
var skip bool
for _, t := range tombstones {
if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
skip = true
break
continue LOOP
}
}
if skip {
continue
}
// If we ascending and the max time of a block is before where we are looking, skip
// it since the data is out of our range
if ascending && ie.MaxTime < t {

View File

@ -61,14 +61,14 @@ type TSMIndex interface {
Entries(key []byte) []IndexEntry
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries *[]IndexEntry)
ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key []byte, timestamp int64) *IndexEntry
// Key returns the key in the index at the given position.
Key(index int) ([]byte, byte, []IndexEntry)
// Key returns the key in the index at the given position, using entries to avoid allocations.
Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
// KeyAt returns the key in the index at the given position.
KeyAt(index int) ([]byte, byte)
@ -119,6 +119,7 @@ type BlockIterator struct {
n int
key []byte
cache []IndexEntry
entries []IndexEntry
err error
typ byte
@ -149,7 +150,7 @@ func (b *BlockIterator) Next() bool {
}
if b.n-b.i > 0 {
b.key, b.typ, b.entries = b.r.Key(b.i)
b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
b.i++
if len(b.entries) > 0 {
@ -265,8 +266,8 @@ func (t *TSMReader) Path() string {
}
// Key returns the key and the underlying entry at the numeric index.
func (t *TSMReader) Key(index int) ([]byte, byte, []IndexEntry) {
return t.index.Key(index)
func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
return t.index.Key(index, entries)
}
// KeyAt returns the key and key type at position idx in the index.
@ -487,8 +488,8 @@ func (t *TSMReader) Entries(key []byte) []IndexEntry {
}
// ReadEntries reads the index entries for key into entries.
func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) {
t.index.ReadEntries(key, entries)
func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
return t.index.ReadEntries(key, entries)
}
// IndexSize returns the size of the index in bytes.
@ -678,6 +679,11 @@ func (d *indirectIndex) search(key []byte) int {
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key []byte) []IndexEntry {
return d.ReadEntries(key, nil)
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
@ -697,22 +703,23 @@ func (d *indirectIndex) Entries(key []byte) []IndexEntry {
// Read and return all the entries
ofs += n
var entries indexEntries
if _, err := readEntries(d.b[ofs:], &entries); err != nil {
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err = readEntries(d.b[ofs:], &ie); err != nil {
panic(fmt.Sprintf("error reading entries: %v", err))
}
return entries.entries
if entries != nil {
*entries = ie.entries
}
return ie.entries
}
// The key is not in the index. i is the index where it would be inserted.
return nil
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) {
*entries = d.Entries(key)
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
@ -726,7 +733,7 @@ func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
}
// Key returns the key in the index at the given position.
func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -741,11 +748,18 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
typ := d.b[int(ofs)+n]
var entries indexEntries
if _, err := readEntries(d.b[int(ofs)+n:], &entries); err != nil {
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err = readEntries(d.b[int(ofs)+n:], &ie); err != nil {
return nil, 0, nil
}
return key, typ, entries.entries
if entries != nil {
*entries = ie.entries
}
return key, typ, ie.entries
}
// KeyAt returns the key in the index at the given position.
@ -1451,19 +1465,21 @@ func readEntries(b []byte, entries *indexEntries) (n int, err error) {
count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
n += indexCountSize
entries.entries = make([]IndexEntry, count)
for i := 0; i < count; i++ {
var ie IndexEntry
start := i*indexEntrySize + indexCountSize + indexTypeSize
end := start + indexEntrySize
if end > len(b) {
return 0, fmt.Errorf("readEntries: data too short for indexEntry %d", i)
}
if err := ie.UnmarshalBinary(b[start:end]); err != nil {
if cap(entries.entries) < count {
entries.entries = make([]IndexEntry, count)
} else {
entries.entries = entries.entries[:count]
}
b = b[indexCountSize+indexTypeSize:]
for i := 0; i < len(entries.entries); i++ {
if err = entries.entries[i].UnmarshalBinary(b); err != nil {
return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries.entries[i] = ie
n += indexEntrySize
b = b[indexEntrySize:]
}
n += count * indexEntrySize
return
}

File diff suppressed because it is too large Load Diff

View File

@ -184,8 +184,8 @@ type IndexEntry struct {
// UnmarshalBinary decodes an IndexEntry from a byte slice.
func (e *IndexEntry) UnmarshalBinary(b []byte) error {
if len(b) != indexEntrySize {
return fmt.Errorf("unmarshalBinary: short buf: %v != %v", indexEntrySize, len(b))
if len(b) < indexEntrySize {
return fmt.Errorf("unmarshalBinary: short buf: %v < %v", len(b), indexEntrySize)
}
e.MinTime = int64(binary.BigEndian.Uint64(b[:8]))
e.MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))