feat(tsm1): Improve performance of Gorilla float block decoding

```
name                        old time/op   new time/op    delta
FloatArrayDecodeAll/1-8      45.9ns ± 1%    13.8ns ± 1%   -70.00%  (p=0.000 n=9+9)
FloatArrayDecodeAll/55-8      686ns ± 0%     232ns ± 1%   -66.10%  (p=0.000 n=9+8)
FloatArrayDecodeAll/550-8    5.78µs ± 0%    2.22µs ± 1%   -61.61%  (p=0.000 n=9+9)
FloatArrayDecodeAll/1000-8   10.2µs ± 2%     4.0µs ± 5%   -60.47%  (p=0.000 n=10+10)

name                        old speed     new speed      delta
FloatArrayDecodeAll/1-8     414MB/s ± 1%  1383MB/s ± 1%  +233.76%  (p=0.000 n=9+9)
FloatArrayDecodeAll/55-8    144MB/s ± 0%   424MB/s ± 1%  +194.19%  (p=0.000 n=9+9)
FloatArrayDecodeAll/550-8   133MB/s ± 0%   346MB/s ± 1%  +160.09%  (p=0.000 n=9+10)
FloatArrayDecodeAll/1000-8  135MB/s ± 2%   340MB/s ± 5%  +153.03%  (p=0.000 n=10+10)
```
pull/10616/head
Stuart Carnie 2018-10-17 09:05:36 -07:00 committed by Edd Robinson
parent 353df7edca
commit a0300064df
2 changed files with 302 additions and 364 deletions

View File

@ -6,6 +6,7 @@ import (
"io"
"math"
"math/bits"
"unsafe"
)
// FloatArrayEncodeAll encodes src into b, returning b and any error encountered.
@ -252,222 +253,262 @@ func FloatArrayEncodeAll(src []float64, b []byte) ([]byte, error) {
return b[:length], nil
}
func FloatArrayDecodeAll(b []byte, dst []float64) ([]float64, error) {
if len(b) == 0 {
// bitMask contains a lookup table where the index is the number of bits
// and the value is a mask. The table is always read by ANDing the index
// with 0x3f, such that if the index is 64, position 0 will be read, which
// is a 0xffffffffffffffff, thus returning all bits.
//
// 00 = 0xffffffffffffffff
// 01 = 0x0000000000000001
// 02 = 0x0000000000000003
// 03 = 0x0000000000000007
// ...
// 62 = 0x3fffffffffffffff
// 63 = 0x7fffffffffffffff
var bitMask [64]uint64
func init() {
v := uint64(1)
for i := 1; i <= 64; i++ {
bitMask[i&0x3f] = v
v = v<<1 | 1
}
}
func FloatArrayDecodeAll(b []byte, buf []float64) ([]float64, error) {
if len(b) < 9 {
return []float64{}, nil
}
sz := cap(dst)
if sz == 0 {
sz = 64
dst = make([]float64, sz)
} else {
dst = dst[:sz]
}
var (
val uint64 // current value
leading uint64
trailing uint64
bit bool
br BatchBitReader
val uint64 // current value
trailingN uint8 // trailing zero count
meaningfulN uint8 = 64 // meaningful bit count
)
j := 0
// first byte is the compression type; always Gorilla
b = b[1:]
// first byte is the compression type.
// we currently just have gorilla compression.
br.Reset(b[1:])
val = br.ReadBits(64)
val = binary.BigEndian.Uint64(b)
if val == uvnan {
if buf == nil {
var tmp [1]float64
buf = tmp[:0]
}
// special case: there were no values to decode
return dst[:0], nil
return buf[:0], nil
}
dst[j] = math.Float64frombits(val)
j++
buf = buf[:0]
// convert the []float64 to []uint64 to avoid calling math.Float64Frombits,
// which results in unnecessary moves between Xn registers before moving
// the value into the float64 slice. This change increased performance from
// 320 MB/s to 340 MB/s on an Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
dst := *(*[]uint64)(unsafe.Pointer(&buf))
dst = append(dst, val)
// The expected exit condition is for `uvnan` to be decoded.
b = b[8:]
// The bit reader code uses brCachedVal to store up to the next 8 bytes
// of MSB data read from b. brValidBits stores the number of remaining unread
// bits starting from the MSB. Before N bits are read from brCachedVal,
// they are left-rotated N bits, such that they end up in the left-most position.
// Using bits.RotateLeft64 results in a single instruction on many CPU architectures.
// This approach permits simple tests, such as for the two control bits:
//
// brCachedVal&1 > 0
//
// The alternative was to leave brCachedValue alone and perform shifts and
// masks to read specific bits. The original approach looked like the
// following:
//
// brCachedVal&(1<<(brValidBits&0x3f)) > 0
//
var (
brCachedVal = uint64(0) // a buffer of up to the next 8 bytes read from b in MSB order
brValidBits = uint8(0) // the number of unread bits remaining in brCachedVal
)
// Refill brCachedVal, reading up to 8 bytes from b
if len(b) >= 8 {
// fast path reads 8 bytes directly
brCachedVal = binary.BigEndian.Uint64(b)
brValidBits = 64
b = b[8:]
} else if len(b) > 0 {
brCachedVal = 0
brValidBits = uint8(len(b) * 8)
for i := range b {
brCachedVal = (brCachedVal << 8) | uint64(b[i])
}
brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits))
b = b[:0]
} else {
goto ERROR
}
// The expected exit condition is for a uvnan to be decoded.
// Any other error (EOF) indicates a truncated stream.
for br.Err() == nil {
// read compressed value
if br.CanReadBitFast() {
bit = br.ReadBitFast()
} else {
bit = br.ReadBit()
for {
if brValidBits > 0 {
// brValidBits > 0 is impossible to predict, so we place the
// most likely case inside the if and immediately jump, keeping
// the instruction pipeline consistently full.
// This is a similar approach to using the GCC __builtin_expect
// intrinsic, which modifies the order of branches such that the
// likely case follows the conditional jump.
//
// Written as if brValidBits == 0 and placing the Refill brCachedVal
// code inside reduces benchmarks from 318 MB/s to 260 MB/s on an
// Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
goto READ0
}
if bit {
if br.CanReadBitFast() {
bit = br.ReadBitFast()
} else {
bit = br.ReadBit()
// Refill brCachedVal, reading up to 8 bytes from b
if len(b) >= 8 {
brCachedVal = binary.BigEndian.Uint64(b)
brValidBits = 64
b = b[8:]
} else if len(b) > 0 {
brCachedVal = 0
brValidBits = uint8(len(b) * 8)
for i := range b {
brCachedVal = (brCachedVal << 8) | uint64(b[i])
}
brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits))
b = b[:0]
} else {
goto ERROR
}
READ0:
// read control bit 0
brValidBits -= 1
brCachedVal = bits.RotateLeft64(brCachedVal, 1)
if brCachedVal&1 > 0 {
if brValidBits > 0 {
goto READ1
}
if bit {
leading = br.ReadBits(5)
mbits := br.ReadBits(6)
if mbits == 0 {
mbits = 64
// Refill brCachedVal, reading up to 8 bytes from b
if len(b) >= 8 {
brCachedVal = binary.BigEndian.Uint64(b)
brValidBits = 64
b = b[8:]
} else if len(b) > 0 {
brCachedVal = 0
brValidBits = uint8(len(b) * 8)
for i := range b {
brCachedVal = (brCachedVal << 8) | uint64(b[i])
}
trailing = 64 - leading - mbits
brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits))
b = b[:0]
} else {
goto ERROR
}
mbits := uint(64 - leading - trailing)
bits := br.ReadBits(mbits)
val ^= bits << trailing
if val == uvnan { // IsNaN, eof
READ1:
// read control bit 1
brValidBits -= 1
brCachedVal = bits.RotateLeft64(brCachedVal, 1)
if brCachedVal&1 > 0 {
// read 5 bits for leading zero count and 6 bits for the meaningful data count
const leadingTrailingBitCount = 11
var lmBits uint64 // leading + meaningful data counts
if brValidBits >= leadingTrailingBitCount {
// decode 5 bits leading + 6 bits meaningful for a total of 11 bits
brValidBits -= leadingTrailingBitCount
brCachedVal = bits.RotateLeft64(brCachedVal, leadingTrailingBitCount)
lmBits = brCachedVal
} else {
bits01 := uint8(11)
if brValidBits > 0 {
bits01 -= brValidBits
lmBits = bits.RotateLeft64(brCachedVal, 11)
}
// Refill brCachedVal, reading up to 8 bytes from b
if len(b) >= 8 {
brCachedVal = binary.BigEndian.Uint64(b)
brValidBits = 64
b = b[8:]
} else if len(b) > 0 {
brCachedVal = 0
brValidBits = uint8(len(b) * 8)
for i := range b {
brCachedVal = (brCachedVal << 8) | uint64(b[i])
}
brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits))
b = b[:0]
} else {
goto ERROR
}
brCachedVal = bits.RotateLeft64(brCachedVal, int(bits01))
brValidBits -= bits01
lmBits &^= bitMask[bits01&0x3f]
lmBits |= brCachedVal & bitMask[bits01&0x3f]
}
lmBits &= 0x7ff
leadingN := uint8((lmBits >> 6) & 0x1f) // 5 bits leading
meaningfulN = uint8(lmBits & 0x3f) // 6 bits meaningful
if meaningfulN > 0 {
trailingN = 64 - leadingN - meaningfulN
} else {
// meaningfulN == 0 is a special case, such that all bits
// are meaningful
trailingN = 0
meaningfulN = 64
}
}
var sBits uint64 // significant bits
if brValidBits >= meaningfulN {
brValidBits -= meaningfulN
brCachedVal = bits.RotateLeft64(brCachedVal, int(meaningfulN))
sBits = brCachedVal
} else {
mBits := meaningfulN
if brValidBits > 0 {
mBits -= brValidBits
sBits = bits.RotateLeft64(brCachedVal, int(meaningfulN))
}
// Refill brCachedVal, reading up to 8 bytes from b
if len(b) >= 8 {
brCachedVal = binary.BigEndian.Uint64(b)
brValidBits = 64
b = b[8:]
} else if len(b) > 0 {
brCachedVal = 0
brValidBits = uint8(len(b) * 8)
for i := range b {
brCachedVal = (brCachedVal << 8) | uint64(b[i])
}
brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits))
b = b[:0]
} else {
goto ERROR
}
brCachedVal = bits.RotateLeft64(brCachedVal, int(mBits))
brValidBits -= mBits
sBits &^= bitMask[mBits&0x3f]
sBits |= brCachedVal & bitMask[mBits&0x3f]
}
sBits &= bitMask[meaningfulN&0x3f]
val ^= sBits << (trailingN & 0x3f)
if val == uvnan {
// IsNaN, eof
break
}
}
f := math.Float64frombits(val)
if j < len(dst) {
dst[j] = f
} else {
dst = append(dst, f) // force a resize
dst = dst[:cap(dst)]
}
j++
dst = append(dst, val)
}
return dst[:j], br.Err()
}
// BatchBitReader reads bits from an io.Reader.
type BatchBitReader struct {
data []byte
buf struct {
v uint64 // bit buffer
n uint // available bits
}
err error
}
// NewBatchBitReader returns a new instance of BatchBitReader that reads from data.
func NewBatchBitReader(data []byte) *BatchBitReader {
b := new(BatchBitReader)
b.Reset(data)
return b
}
// Reset sets the underlying reader on b and reinitializes.
func (r *BatchBitReader) Reset(data []byte) {
r.data = data
r.buf.v, r.buf.n, r.err = 0, 0, nil
r.readBuf()
}
func (r *BatchBitReader) Err() error { return r.err }
// CanReadBitFast returns true if calling ReadBitFast() is allowed.
// Fast bit reads are allowed when at least 2 values are in the buffer.
// This is because it is not required to refilled the buffer and the caller
// can inline the calls.
func (r *BatchBitReader) CanReadBitFast() bool { return r.buf.n > 1 }
// ReadBitFast is an optimized bit read.
// IMPORTANT: Only allowed if CanReadFastBit() is true!
func (r *BatchBitReader) ReadBitFast() bool {
v := r.buf.v&(1<<63) != 0
r.buf.v <<= 1
r.buf.n -= 1
return v
}
// ReadBit returns the next bit from the underlying data.
func (r *BatchBitReader) ReadBit() bool {
return r.ReadBits(1) != 0
}
// ReadBits reads nbits from the underlying data into a uint64.
// nbits must be from 1 to 64, inclusive.
func (r *BatchBitReader) ReadBits(nbits uint) uint64 {
// Return EOF if there is no more data.
if r.buf.n == 0 {
r.err = io.EOF
return 0
}
// Return bits from buffer if less than available bits.
if nbits <= r.buf.n {
// Return all bits, if requested.
if nbits == 64 {
v := r.buf.v
r.buf.v, r.buf.n = 0, 0
r.readBuf()
return v
}
// Otherwise mask returned bits.
v := r.buf.v >> (64 - nbits)
r.buf.v <<= nbits
r.buf.n -= nbits
if r.buf.n == 0 {
r.readBuf()
}
return v
}
// Otherwise read all available bits in current buffer.
v, n := r.buf.v, r.buf.n
// Read new buffer.
r.buf.v, r.buf.n = 0, 0
r.readBuf()
// Append new buffer to previous buffer and shift to remove unnecessary bits.
v |= r.buf.v >> n
v >>= 64 - nbits
// Remove used bits from new buffer.
bufN := nbits - n
if bufN > r.buf.n {
bufN = r.buf.n
}
r.buf.v <<= bufN
r.buf.n -= bufN
if r.buf.n == 0 {
r.readBuf()
}
return v
}
func (r *BatchBitReader) readBuf() {
// Determine number of bytes to read to fill buffer.
byteN := 8 - (r.buf.n / 8)
// Limit to the length of our data.
if n := uint(len(r.data)); byteN > n {
byteN = n
}
// Optimized 8-byte read.
if byteN == 8 {
r.buf.v = binary.BigEndian.Uint64(r.data)
r.buf.n = 64
r.data = r.data[8:]
return
}
i := uint(0)
if byteN > 3 {
r.buf.n += 32
r.buf.v |= uint64(binary.BigEndian.Uint32(r.data)) << (64 - r.buf.n)
i += 4
}
// Otherwise append bytes to buffer.
for ; i < byteN; i++ {
r.buf.n += 8
r.buf.v |= uint64(r.data[i]) << (64 - r.buf.n)
}
// Move data forward.
r.data = r.data[byteN:]
return *(*[]float64)(unsafe.Pointer(&dst)), nil
ERROR:
return (*(*[]float64)(unsafe.Pointer(&dst)))[:0], io.EOF
}

View File

@ -3,14 +3,12 @@ package tsm1_test
import (
"bytes"
"fmt"
"io"
"math"
"math/rand"
"reflect"
"testing"
"testing/quick"
"github.com/dgryski/go-bitstream"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/tsdb/tsm1"
)
@ -266,174 +264,6 @@ func TestFloatArrayDecodeAll_Empty(t *testing.T) {
}
}
func TestBatchBitStreamEOF(t *testing.T) {
br := tsm1.NewBatchBitReader([]byte("0"))
b := br.ReadBits(8)
if br.Err() != nil {
t.Fatal(br.Err())
}
if b != '0' {
t.Error("ReadBits(8) didn't return first byte")
}
br.ReadBits(8)
if br.Err() != io.EOF {
t.Error("ReadBits(8) on empty string didn't return EOF")
}
// 0 = 0b00110000
br = tsm1.NewBatchBitReader([]byte("0"))
buf := bytes.NewBuffer(nil)
bw := bitstream.NewWriter(buf)
for i := 0; i < 4; i++ {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
bw.Flush(bitstream.One)
err := bw.WriteByte(0xAA)
if err != nil {
t.Error("unable to WriteByte")
}
c := buf.Bytes()
if len(c) != 2 || c[1] != 0xAA || c[0] != 0x3f {
t.Error("bad return from 4 read bytes")
}
br = tsm1.NewBatchBitReader([]byte(""))
br.ReadBit()
if br.Err() != io.EOF {
t.Error("ReadBit on empty string didn't return EOF")
}
}
func TestBatchBitStream(t *testing.T) {
buf := bytes.NewBuffer(nil)
br := tsm1.NewBatchBitReader([]byte("hello"))
bw := bitstream.NewWriter(buf)
for {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
s := buf.String()
if s != "hello" {
t.Error("expected 'hello', got=", []byte(s))
}
}
func TestBatchByteStream(t *testing.T) {
buf := bytes.NewBuffer(nil)
br := tsm1.NewBatchBitReader([]byte("hello"))
bw := bitstream.NewWriter(buf)
for i := 0; i < 3; i++ {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
for i := 0; i < 3; i++ {
byt := br.ReadBits(8)
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("ReadBits(8) returned error err=", br.Err().Error())
return
}
bw.WriteByte(byte(byt))
}
u := br.ReadBits(13)
if br.Err() != nil {
t.Error("ReadBits returned error err=", br.Err().Error())
return
}
bw.WriteBits(u, 13)
bw.WriteBits(('!'<<12)|('.'<<4)|0x02, 20)
// 0x2f == '/'
bw.Flush(bitstream.One)
s := buf.String()
if s != "hello!./" {
t.Errorf("expected 'hello!./', got=%x", []byte(s))
}
}
// Ensure bit reader can read random bits written to a stream.
func TestBatchBitReader_Quick(t *testing.T) {
if err := quick.Check(func(values []uint64, nbits []uint) bool {
// Limit nbits to 64.
for i := 0; i < len(values) && i < len(nbits); i++ {
nbits[i] = (nbits[i] % 64) + 1
values[i] = values[i] & (math.MaxUint64 >> (64 - nbits[i]))
}
// Write bits to a buffer.
var buf bytes.Buffer
w := bitstream.NewWriter(&buf)
for i := 0; i < len(values) && i < len(nbits); i++ {
w.WriteBits(values[i], int(nbits[i]))
}
w.Flush(bitstream.Zero)
// Read bits from the buffer.
r := tsm1.NewBatchBitReader(buf.Bytes())
for i := 0; i < len(values) && i < len(nbits); i++ {
v := r.ReadBits(nbits[i])
if r.Err() != nil {
t.Errorf("unexpected error(%d): %s", i, r.Err())
return false
} else if v != values[i] {
t.Errorf("value mismatch(%d): got=%d, exp=%d (nbits=%d)", i, v, values[i], nbits[i])
return false
}
}
return true
}, &quick.Config{
Values: func(a []reflect.Value, rand *rand.Rand) {
a[0], _ = quick.Value(reflect.TypeOf([]uint64{}), rand)
a[1], _ = quick.Value(reflect.TypeOf([]uint{}), rand)
},
}); err != nil {
t.Fatal(err)
}
}
var bufResult []byte
func BenchmarkEncodeFloats(b *testing.B) {
@ -460,6 +290,8 @@ func BenchmarkEncodeFloats(b *testing.B) {
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
} else {
b.SetBytes(int64(len(bufResult)))
}
}
})
@ -470,6 +302,8 @@ func BenchmarkEncodeFloats(b *testing.B) {
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
} else {
b.SetBytes(int64(len(bufResult)))
}
}
})
@ -493,6 +327,8 @@ func BenchmarkEncodeFloats(b *testing.B) {
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
} else {
b.SetBytes(int64(len(bufResult)))
}
}
})
@ -503,6 +339,8 @@ func BenchmarkEncodeFloats(b *testing.B) {
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
} else {
b.SetBytes(int64(len(bufResult)))
}
}
})
@ -510,6 +348,65 @@ func BenchmarkEncodeFloats(b *testing.B) {
}
}
func BenchmarkDecodeFloats(b *testing.B) {
cases := []int{1, 55, 550, 1000}
for _, n := range cases {
b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) {
s := tsm1.NewFloatEncoder()
for i := 0; i < n; i++ {
s.Write(float64(i))
}
s.Flush()
data, err := s.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
b.SetBytes(int64(len(data)))
b.ResetTimer()
dst := make([]float64, n)
for i := 0; i < b.N; i++ {
got, err := tsm1.FloatArrayDecodeAll(data, dst)
if err != nil {
b.Fatalf("unexpected error\n%s", err.Error())
}
if len(got) != n {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), n))
}
}
})
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
s := tsm1.NewFloatEncoder()
for i := 0; i < n; i++ {
s.Write(rand.Float64() * 100.0)
}
s.Flush()
data, err := s.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
b.SetBytes(int64(len(data)))
b.ResetTimer()
dst := make([]float64, n)
for i := 0; i < b.N; i++ {
got, err := tsm1.FloatArrayDecodeAll(data, dst)
if err != nil {
b.Fatalf("unexpected error\n%s", err.Error())
}
if len(got) != n {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), n))
}
}
})
}
}
func BenchmarkFloatArrayDecodeAll(b *testing.B) {
benchmarks := []int{
1,
@ -537,10 +434,10 @@ func BenchmarkFloatArrayDecodeAll(b *testing.B) {
got, err := tsm1.FloatArrayDecodeAll(bytes, dst)
if err != nil {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), size))
b.Fatalf("unexpected error\n%s", err.Error())
}
if len(got) != size {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), size))
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), size))
}
}
})