Merge pull request #1922 from influxdata/jmw-tombstone-perf

tsm1: tombstone performance
pull/10616/head
Jeff Wendling 2018-12-14 12:17:12 -07:00 committed by GitHub
commit 2190ebde59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 32 deletions

View File

@ -1033,7 +1033,7 @@ func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
tombstones := map[string][]TimeRange{} tombstones := map[string][]TimeRange{}
var ie []IndexEntry var ie []IndexEntry
for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { for i := d.searchOffset(keys[0]); len(keys) > 0 && i < d.KeyCount(); i++ {
k, entries := d.readEntriesAt(d.offset(i), &ie) k, entries := d.readEntriesAt(d.offset(i), &ie)
// Skip any keys that don't exist. These are less than the current key. // Skip any keys that don't exist. These are less than the current key.

View File

@ -1865,32 +1865,20 @@ func TestTSMReader_References(t *testing.T) {
} }
} }
func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) { //
// indirectIndex benchmarks
//
const (
indexKeyCount = 500000
indexBlockCount = 100
)
func mustMakeIndex(tb testing.TB, keys, blocks int) (*indirectIndex, []byte) {
index := NewIndexWriter() index := NewIndexWriter()
for i := 0; i < 100000; i++ {
index.Add([]byte(fmt.Sprintf("cpu-%d", i)), BlockFloat64, int64(i*2), int64(i*2+1), 10, 100)
}
bytes, err := index.MarshalBinary()
if err != nil {
b.Fatalf("unexpected error marshaling index: %v", err)
}
indirect := NewIndirectIndex()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := indirect.UnmarshalBinary(bytes); err != nil {
b.Fatalf("unexpected error unmarshaling index: %v", err)
}
}
}
func mustMakeIndex(tb testing.TB, keys, blocks int) *indirectIndex {
index := NewIndexWriter()
// add 1000 keys and 1000 blocks per key
for i := 0; i < keys; i++ { for i := 0; i < keys; i++ {
for j := 0; j < blocks; j++ { for j := 0; j < blocks; j++ {
index.Add([]byte(fmt.Sprintf("cpu-%03d", i)), BlockFloat64, int64(i*j*2), int64(i*j*2+1), 10, 100) index.Add([]byte(fmt.Sprintf("cpu-%08d", i)), BlockFloat64, int64(i*j*2), int64(i*j*2+1), 10, 100)
} }
} }
@ -1904,30 +1892,45 @@ func mustMakeIndex(tb testing.TB, keys, blocks int) *indirectIndex {
tb.Fatalf("unexpected error unmarshaling index: %v", err) tb.Fatalf("unexpected error unmarshaling index: %v", err)
} }
return indirect return indirect, bytes
} }
func BenchmarkIndirectIndex_Entries(b *testing.B) { func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
indirect := mustMakeIndex(b, 1000, 1000) indirect, bytes := mustMakeIndex(b, indexKeyCount, indexBlockCount)
b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
indirect.Entries([]byte("cpu-001")) if err := indirect.UnmarshalBinary(bytes); err != nil {
b.Fatalf("unexpected error unmarshaling index: %v", err)
}
}
}
func BenchmarkIndirectIndex_Entries(b *testing.B) {
indirect, _ := mustMakeIndex(b, 1000, 1000)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
indirect.Entries([]byte("cpu-00000001"))
} }
} }
func BenchmarkIndirectIndex_ReadEntries(b *testing.B) { func BenchmarkIndirectIndex_ReadEntries(b *testing.B) {
var cache []IndexEntry var cache []IndexEntry
indirect := mustMakeIndex(b, 1000, 1000) indirect, _ := mustMakeIndex(b, 1000, 1000)
b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
indirect.ReadEntries([]byte("cpu-001"), &cache) indirect.ReadEntries([]byte("cpu-00000001"), &cache)
} }
} }
func BenchmarkBlockIterator_Next(b *testing.B) { func BenchmarkBlockIterator_Next(b *testing.B) {
r := TSMReader{index: mustMakeIndex(b, 1000, 1000)} indirect, _ := mustMakeIndex(b, 1000, 1000)
r := TSMReader{index: indirect}
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -1936,3 +1939,71 @@ func BenchmarkBlockIterator_Next(b *testing.B) {
} }
} }
} }
func BenchmarkIndirectIndex_DeleteRangeLast(b *testing.B) {
indirect, _ := mustMakeIndex(b, indexKeyCount, indexBlockCount)
keys := [][]byte{[]byte("cpu-00999999")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
indirect.DeleteRange(keys, 10, 50)
// do a cheap reset of the state.
for key := range indirect.tombstones {
delete(indirect.tombstones, key)
}
}
}
func BenchmarkIndirectIndex_DeleteRangeFull(b *testing.B) {
indirect, _ := mustMakeIndex(b, indexKeyCount, indexBlockCount)
var keys [][]byte
for i := 0; i < indexKeyCount; i++ {
keys = append(keys, []byte(fmt.Sprintf("cpu-%08d", i)))
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < len(keys); i += 4096 {
n := i + 4096
if n > len(keys) {
n = len(keys)
}
indirect.DeleteRange(keys[i:n], 10, 50)
}
// do a cheap reset of the state.
for key := range indirect.tombstones {
delete(indirect.tombstones, key)
}
}
}
func BenchmarkIndirectIndex_Delete(b *testing.B) {
indirect, bytes := mustMakeIndex(b, indexKeyCount, indexBlockCount)
var keys [][]byte
for i := 0; i < indexKeyCount; i++ {
keys = append(keys, []byte(fmt.Sprintf("cpu-%08d", i)))
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
indirect.UnmarshalBinary(bytes)
b.StartTimer()
for i := 0; i < len(keys); i += 4096 {
n := i + 4096
if n > len(keys) {
n = len(keys)
}
indirect.Delete(keys[i:n])
}
}
}

View File

@ -390,7 +390,7 @@ func copyBuffer(f syncer, dst io.Writer, src io.Reader, buf []byte) (written int
written += int64(nw) written += int64(nw)
} }
if written-lastSync > fsyncEvery { if f != nil && written-lastSync > fsyncEvery {
if err := f.Sync(); err != nil { if err := f.Sync(); err != nil {
return 0, err return 0, err
} }