diff --git a/tsdb/tsm1/batch_float.go b/tsdb/tsm1/batch_float.go index b844ee6ddf..eb1787473e 100644 --- a/tsdb/tsm1/batch_float.go +++ b/tsdb/tsm1/batch_float.go @@ -6,6 +6,7 @@ import ( "io" "math" "math/bits" + "unsafe" ) // FloatArrayEncodeAll encodes src into b, returning b and any error encountered. @@ -252,222 +253,262 @@ func FloatArrayEncodeAll(src []float64, b []byte) ([]byte, error) { return b[:length], nil } -func FloatArrayDecodeAll(b []byte, dst []float64) ([]float64, error) { - if len(b) == 0 { +// bitMask contains a lookup table where the index is the number of bits +// and the value is a mask. The table is always read by ANDing the index +// with 0x3f, such that if the index is 64, position 0 will be read, which +// is a 0xffffffffffffffff, thus returning all bits. +// +// 00 = 0xffffffffffffffff +// 01 = 0x0000000000000001 +// 02 = 0x0000000000000003 +// 03 = 0x0000000000000007 +// ... +// 62 = 0x3fffffffffffffff +// 63 = 0x7fffffffffffffff +var bitMask [64]uint64 + +func init() { + v := uint64(1) + for i := 1; i <= 64; i++ { + bitMask[i&0x3f] = v + v = v<<1 | 1 + } +} + +func FloatArrayDecodeAll(b []byte, buf []float64) ([]float64, error) { + if len(b) < 9 { return []float64{}, nil } - sz := cap(dst) - if sz == 0 { - sz = 64 - dst = make([]float64, sz) - } else { - dst = dst[:sz] - } - var ( - val uint64 // current value - leading uint64 - trailing uint64 - bit bool - br BatchBitReader + val uint64 // current value + trailingN uint8 // trailing zero count + meaningfulN uint8 = 64 // meaningful bit count ) - j := 0 + // first byte is the compression type; always Gorilla + b = b[1:] - // first byte is the compression type. - // we currently just have gorilla compression. - br.Reset(b[1:]) - val = br.ReadBits(64) + val = binary.BigEndian.Uint64(b) if val == uvnan { + if buf == nil { + var tmp [1]float64 + buf = tmp[:0] + } // special case: there were no values to decode - return dst[:0], nil + return buf[:0], nil } - dst[j] = math.Float64frombits(val) - j++ + buf = buf[:0] + // convert the []float64 to []uint64 to avoid calling math.Float64Frombits, + // which results in unnecessary moves between Xn registers before moving + // the value into the float64 slice. This change increased performance from + // 320 MB/s to 340 MB/s on an Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + dst := *(*[]uint64)(unsafe.Pointer(&buf)) + dst = append(dst, val) - // The expected exit condition is for `uvnan` to be decoded. + b = b[8:] + + // The bit reader code uses brCachedVal to store up to the next 8 bytes + // of MSB data read from b. brValidBits stores the number of remaining unread + // bits starting from the MSB. Before N bits are read from brCachedVal, + // they are left-rotated N bits, such that they end up in the left-most position. + // Using bits.RotateLeft64 results in a single instruction on many CPU architectures. + // This approach permits simple tests, such as for the two control bits: + // + // brCachedVal&1 > 0 + // + // The alternative was to leave brCachedValue alone and perform shifts and + // masks to read specific bits. The original approach looked like the + // following: + // + // brCachedVal&(1<<(brValidBits&0x3f)) > 0 + // + var ( + brCachedVal = uint64(0) // a buffer of up to the next 8 bytes read from b in MSB order + brValidBits = uint8(0) // the number of unread bits remaining in brCachedVal + ) + + // Refill brCachedVal, reading up to 8 bytes from b + if len(b) >= 8 { + // fast path reads 8 bytes directly + brCachedVal = binary.BigEndian.Uint64(b) + brValidBits = 64 + b = b[8:] + } else if len(b) > 0 { + brCachedVal = 0 + brValidBits = uint8(len(b) * 8) + for i := range b { + brCachedVal = (brCachedVal << 8) | uint64(b[i]) + } + brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits)) + b = b[:0] + } else { + goto ERROR + } + + // The expected exit condition is for a uvnan to be decoded. // Any other error (EOF) indicates a truncated stream. - for br.Err() == nil { - // read compressed value - if br.CanReadBitFast() { - bit = br.ReadBitFast() - } else { - bit = br.ReadBit() + for { + if brValidBits > 0 { + // brValidBits > 0 is impossible to predict, so we place the + // most likely case inside the if and immediately jump, keeping + // the instruction pipeline consistently full. + // This is a similar approach to using the GCC __builtin_expect + // intrinsic, which modifies the order of branches such that the + // likely case follows the conditional jump. + // + // Written as if brValidBits == 0 and placing the Refill brCachedVal + // code inside reduces benchmarks from 318 MB/s to 260 MB/s on an + // Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + goto READ0 } - if bit { - if br.CanReadBitFast() { - bit = br.ReadBitFast() - } else { - bit = br.ReadBit() + // Refill brCachedVal, reading up to 8 bytes from b + if len(b) >= 8 { + brCachedVal = binary.BigEndian.Uint64(b) + brValidBits = 64 + b = b[8:] + } else if len(b) > 0 { + brCachedVal = 0 + brValidBits = uint8(len(b) * 8) + for i := range b { + brCachedVal = (brCachedVal << 8) | uint64(b[i]) + } + brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits)) + b = b[:0] + } else { + goto ERROR + } + + READ0: + // read control bit 0 + brValidBits -= 1 + brCachedVal = bits.RotateLeft64(brCachedVal, 1) + if brCachedVal&1 > 0 { + if brValidBits > 0 { + goto READ1 } - if bit { - leading = br.ReadBits(5) - mbits := br.ReadBits(6) - if mbits == 0 { - mbits = 64 + // Refill brCachedVal, reading up to 8 bytes from b + if len(b) >= 8 { + brCachedVal = binary.BigEndian.Uint64(b) + brValidBits = 64 + b = b[8:] + } else if len(b) > 0 { + brCachedVal = 0 + brValidBits = uint8(len(b) * 8) + for i := range b { + brCachedVal = (brCachedVal << 8) | uint64(b[i]) } - trailing = 64 - leading - mbits + brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits)) + b = b[:0] + } else { + goto ERROR } - mbits := uint(64 - leading - trailing) - bits := br.ReadBits(mbits) - val ^= bits << trailing - if val == uvnan { // IsNaN, eof + READ1: + // read control bit 1 + brValidBits -= 1 + brCachedVal = bits.RotateLeft64(brCachedVal, 1) + if brCachedVal&1 > 0 { + // read 5 bits for leading zero count and 6 bits for the meaningful data count + const leadingTrailingBitCount = 11 + var lmBits uint64 // leading + meaningful data counts + if brValidBits >= leadingTrailingBitCount { + // decode 5 bits leading + 6 bits meaningful for a total of 11 bits + brValidBits -= leadingTrailingBitCount + brCachedVal = bits.RotateLeft64(brCachedVal, leadingTrailingBitCount) + lmBits = brCachedVal + } else { + bits01 := uint8(11) + if brValidBits > 0 { + bits01 -= brValidBits + lmBits = bits.RotateLeft64(brCachedVal, 11) + } + + // Refill brCachedVal, reading up to 8 bytes from b + if len(b) >= 8 { + brCachedVal = binary.BigEndian.Uint64(b) + brValidBits = 64 + b = b[8:] + } else if len(b) > 0 { + brCachedVal = 0 + brValidBits = uint8(len(b) * 8) + for i := range b { + brCachedVal = (brCachedVal << 8) | uint64(b[i]) + } + brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits)) + b = b[:0] + } else { + goto ERROR + } + brCachedVal = bits.RotateLeft64(brCachedVal, int(bits01)) + brValidBits -= bits01 + lmBits &^= bitMask[bits01&0x3f] + lmBits |= brCachedVal & bitMask[bits01&0x3f] + } + + lmBits &= 0x7ff + leadingN := uint8((lmBits >> 6) & 0x1f) // 5 bits leading + meaningfulN = uint8(lmBits & 0x3f) // 6 bits meaningful + if meaningfulN > 0 { + trailingN = 64 - leadingN - meaningfulN + } else { + // meaningfulN == 0 is a special case, such that all bits + // are meaningful + trailingN = 0 + meaningfulN = 64 + } + } + + var sBits uint64 // significant bits + if brValidBits >= meaningfulN { + brValidBits -= meaningfulN + brCachedVal = bits.RotateLeft64(brCachedVal, int(meaningfulN)) + sBits = brCachedVal + } else { + mBits := meaningfulN + if brValidBits > 0 { + mBits -= brValidBits + sBits = bits.RotateLeft64(brCachedVal, int(meaningfulN)) + } + + // Refill brCachedVal, reading up to 8 bytes from b + if len(b) >= 8 { + brCachedVal = binary.BigEndian.Uint64(b) + brValidBits = 64 + b = b[8:] + } else if len(b) > 0 { + brCachedVal = 0 + brValidBits = uint8(len(b) * 8) + for i := range b { + brCachedVal = (brCachedVal << 8) | uint64(b[i]) + } + brCachedVal = bits.RotateLeft64(brCachedVal, -int(brValidBits)) + b = b[:0] + } else { + goto ERROR + } + brCachedVal = bits.RotateLeft64(brCachedVal, int(mBits)) + brValidBits -= mBits + sBits &^= bitMask[mBits&0x3f] + sBits |= brCachedVal & bitMask[mBits&0x3f] + } + sBits &= bitMask[meaningfulN&0x3f] + + val ^= sBits << (trailingN & 0x3f) + if val == uvnan { + // IsNaN, eof break } } - f := math.Float64frombits(val) - if j < len(dst) { - dst[j] = f - } else { - dst = append(dst, f) // force a resize - dst = dst[:cap(dst)] - } - j++ + dst = append(dst, val) } - return dst[:j], br.Err() -} - -// BatchBitReader reads bits from an io.Reader. -type BatchBitReader struct { - data []byte - - buf struct { - v uint64 // bit buffer - n uint // available bits - } - err error -} - -// NewBatchBitReader returns a new instance of BatchBitReader that reads from data. -func NewBatchBitReader(data []byte) *BatchBitReader { - b := new(BatchBitReader) - b.Reset(data) - return b -} - -// Reset sets the underlying reader on b and reinitializes. -func (r *BatchBitReader) Reset(data []byte) { - r.data = data - r.buf.v, r.buf.n, r.err = 0, 0, nil - r.readBuf() -} - -func (r *BatchBitReader) Err() error { return r.err } - -// CanReadBitFast returns true if calling ReadBitFast() is allowed. -// Fast bit reads are allowed when at least 2 values are in the buffer. -// This is because it is not required to refilled the buffer and the caller -// can inline the calls. -func (r *BatchBitReader) CanReadBitFast() bool { return r.buf.n > 1 } - -// ReadBitFast is an optimized bit read. -// IMPORTANT: Only allowed if CanReadFastBit() is true! -func (r *BatchBitReader) ReadBitFast() bool { - v := r.buf.v&(1<<63) != 0 - r.buf.v <<= 1 - r.buf.n -= 1 - return v -} - -// ReadBit returns the next bit from the underlying data. -func (r *BatchBitReader) ReadBit() bool { - return r.ReadBits(1) != 0 -} - -// ReadBits reads nbits from the underlying data into a uint64. -// nbits must be from 1 to 64, inclusive. -func (r *BatchBitReader) ReadBits(nbits uint) uint64 { - // Return EOF if there is no more data. - if r.buf.n == 0 { - r.err = io.EOF - return 0 - } - - // Return bits from buffer if less than available bits. - if nbits <= r.buf.n { - // Return all bits, if requested. - if nbits == 64 { - v := r.buf.v - r.buf.v, r.buf.n = 0, 0 - r.readBuf() - return v - } - - // Otherwise mask returned bits. - v := r.buf.v >> (64 - nbits) - r.buf.v <<= nbits - r.buf.n -= nbits - - if r.buf.n == 0 { - r.readBuf() - } - return v - } - - // Otherwise read all available bits in current buffer. - v, n := r.buf.v, r.buf.n - - // Read new buffer. - r.buf.v, r.buf.n = 0, 0 - r.readBuf() - - // Append new buffer to previous buffer and shift to remove unnecessary bits. - v |= r.buf.v >> n - v >>= 64 - nbits - - // Remove used bits from new buffer. - bufN := nbits - n - if bufN > r.buf.n { - bufN = r.buf.n - } - r.buf.v <<= bufN - r.buf.n -= bufN - - if r.buf.n == 0 { - r.readBuf() - } - - return v -} - -func (r *BatchBitReader) readBuf() { - // Determine number of bytes to read to fill buffer. - byteN := 8 - (r.buf.n / 8) - - // Limit to the length of our data. - if n := uint(len(r.data)); byteN > n { - byteN = n - } - - // Optimized 8-byte read. - if byteN == 8 { - r.buf.v = binary.BigEndian.Uint64(r.data) - r.buf.n = 64 - r.data = r.data[8:] - return - } - - i := uint(0) - - if byteN > 3 { - r.buf.n += 32 - r.buf.v |= uint64(binary.BigEndian.Uint32(r.data)) << (64 - r.buf.n) - i += 4 - } - - // Otherwise append bytes to buffer. - for ; i < byteN; i++ { - r.buf.n += 8 - r.buf.v |= uint64(r.data[i]) << (64 - r.buf.n) - } - - // Move data forward. - r.data = r.data[byteN:] + return *(*[]float64)(unsafe.Pointer(&dst)), nil + +ERROR: + return (*(*[]float64)(unsafe.Pointer(&dst)))[:0], io.EOF } diff --git a/tsdb/tsm1/batch_float_test.go b/tsdb/tsm1/batch_float_test.go index 6b4a1aa9b4..470d908516 100644 --- a/tsdb/tsm1/batch_float_test.go +++ b/tsdb/tsm1/batch_float_test.go @@ -3,14 +3,12 @@ package tsm1_test import ( "bytes" "fmt" - "io" "math" "math/rand" "reflect" "testing" "testing/quick" - "github.com/dgryski/go-bitstream" "github.com/google/go-cmp/cmp" "github.com/influxdata/platform/tsdb/tsm1" ) @@ -266,174 +264,6 @@ func TestFloatArrayDecodeAll_Empty(t *testing.T) { } } -func TestBatchBitStreamEOF(t *testing.T) { - br := tsm1.NewBatchBitReader([]byte("0")) - - b := br.ReadBits(8) - if br.Err() != nil { - t.Fatal(br.Err()) - } - if b != '0' { - t.Error("ReadBits(8) didn't return first byte") - } - - br.ReadBits(8) - if br.Err() != io.EOF { - t.Error("ReadBits(8) on empty string didn't return EOF") - } - - // 0 = 0b00110000 - br = tsm1.NewBatchBitReader([]byte("0")) - - buf := bytes.NewBuffer(nil) - bw := bitstream.NewWriter(buf) - - for i := 0; i < 4; i++ { - bit := br.ReadBit() - if br.Err() == io.EOF { - break - } - if br.Err() != nil { - t.Error("GetBit returned error err=", br.Err().Error()) - return - } - bw.WriteBit(bitstream.Bit(bit)) - } - - bw.Flush(bitstream.One) - - err := bw.WriteByte(0xAA) - if err != nil { - t.Error("unable to WriteByte") - } - - c := buf.Bytes() - - if len(c) != 2 || c[1] != 0xAA || c[0] != 0x3f { - t.Error("bad return from 4 read bytes") - } - - br = tsm1.NewBatchBitReader([]byte("")) - br.ReadBit() - if br.Err() != io.EOF { - t.Error("ReadBit on empty string didn't return EOF") - } -} - -func TestBatchBitStream(t *testing.T) { - buf := bytes.NewBuffer(nil) - br := tsm1.NewBatchBitReader([]byte("hello")) - bw := bitstream.NewWriter(buf) - - for { - bit := br.ReadBit() - if br.Err() == io.EOF { - break - } - if br.Err() != nil { - t.Error("GetBit returned error err=", br.Err().Error()) - return - } - bw.WriteBit(bitstream.Bit(bit)) - } - - s := buf.String() - - if s != "hello" { - t.Error("expected 'hello', got=", []byte(s)) - } -} - -func TestBatchByteStream(t *testing.T) { - buf := bytes.NewBuffer(nil) - br := tsm1.NewBatchBitReader([]byte("hello")) - bw := bitstream.NewWriter(buf) - - for i := 0; i < 3; i++ { - bit := br.ReadBit() - if br.Err() == io.EOF { - break - } - if br.Err() != nil { - t.Error("GetBit returned error err=", br.Err().Error()) - return - } - bw.WriteBit(bitstream.Bit(bit)) - } - - for i := 0; i < 3; i++ { - byt := br.ReadBits(8) - if br.Err() == io.EOF { - break - } - if br.Err() != nil { - t.Error("ReadBits(8) returned error err=", br.Err().Error()) - return - } - bw.WriteByte(byte(byt)) - } - - u := br.ReadBits(13) - - if br.Err() != nil { - t.Error("ReadBits returned error err=", br.Err().Error()) - return - } - - bw.WriteBits(u, 13) - - bw.WriteBits(('!'<<12)|('.'<<4)|0x02, 20) - // 0x2f == '/' - bw.Flush(bitstream.One) - - s := buf.String() - - if s != "hello!./" { - t.Errorf("expected 'hello!./', got=%x", []byte(s)) - } -} - -// Ensure bit reader can read random bits written to a stream. -func TestBatchBitReader_Quick(t *testing.T) { - if err := quick.Check(func(values []uint64, nbits []uint) bool { - // Limit nbits to 64. - for i := 0; i < len(values) && i < len(nbits); i++ { - nbits[i] = (nbits[i] % 64) + 1 - values[i] = values[i] & (math.MaxUint64 >> (64 - nbits[i])) - } - - // Write bits to a buffer. - var buf bytes.Buffer - w := bitstream.NewWriter(&buf) - for i := 0; i < len(values) && i < len(nbits); i++ { - w.WriteBits(values[i], int(nbits[i])) - } - w.Flush(bitstream.Zero) - - // Read bits from the buffer. - r := tsm1.NewBatchBitReader(buf.Bytes()) - for i := 0; i < len(values) && i < len(nbits); i++ { - v := r.ReadBits(nbits[i]) - if r.Err() != nil { - t.Errorf("unexpected error(%d): %s", i, r.Err()) - return false - } else if v != values[i] { - t.Errorf("value mismatch(%d): got=%d, exp=%d (nbits=%d)", i, v, values[i], nbits[i]) - return false - } - } - - return true - }, &quick.Config{ - Values: func(a []reflect.Value, rand *rand.Rand) { - a[0], _ = quick.Value(reflect.TypeOf([]uint64{}), rand) - a[1], _ = quick.Value(reflect.TypeOf([]uint{}), rand) - }, - }); err != nil { - t.Fatal(err) - } -} - var bufResult []byte func BenchmarkEncodeFloats(b *testing.B) { @@ -460,6 +290,8 @@ func BenchmarkEncodeFloats(b *testing.B) { enc.Flush() if bufResult, err = enc.Bytes(); err != nil { b.Fatal(err) + } else { + b.SetBytes(int64(len(bufResult))) } } }) @@ -470,6 +302,8 @@ func BenchmarkEncodeFloats(b *testing.B) { for n := 0; n < b.N; n++ { if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil { b.Fatal(err) + } else { + b.SetBytes(int64(len(bufResult))) } } }) @@ -493,6 +327,8 @@ func BenchmarkEncodeFloats(b *testing.B) { enc.Flush() if bufResult, err = enc.Bytes(); err != nil { b.Fatal(err) + } else { + b.SetBytes(int64(len(bufResult))) } } }) @@ -503,6 +339,8 @@ func BenchmarkEncodeFloats(b *testing.B) { for n := 0; n < b.N; n++ { if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil { b.Fatal(err) + } else { + b.SetBytes(int64(len(bufResult))) } } }) @@ -510,6 +348,65 @@ func BenchmarkEncodeFloats(b *testing.B) { } } +func BenchmarkDecodeFloats(b *testing.B) { + cases := []int{1, 55, 550, 1000} + for _, n := range cases { + b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) { + s := tsm1.NewFloatEncoder() + for i := 0; i < n; i++ { + s.Write(float64(i)) + } + s.Flush() + data, err := s.Bytes() + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + b.SetBytes(int64(len(data))) + b.ResetTimer() + + dst := make([]float64, n) + for i := 0; i < b.N; i++ { + + got, err := tsm1.FloatArrayDecodeAll(data, dst) + if err != nil { + b.Fatalf("unexpected error\n%s", err.Error()) + } + if len(got) != n { + b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), n)) + } + } + }) + + b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) { + s := tsm1.NewFloatEncoder() + for i := 0; i < n; i++ { + s.Write(rand.Float64() * 100.0) + } + s.Flush() + data, err := s.Bytes() + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + + b.SetBytes(int64(len(data))) + b.ResetTimer() + + dst := make([]float64, n) + for i := 0; i < b.N; i++ { + + got, err := tsm1.FloatArrayDecodeAll(data, dst) + if err != nil { + b.Fatalf("unexpected error\n%s", err.Error()) + } + if len(got) != n { + b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), n)) + } + } + }) + } +} + func BenchmarkFloatArrayDecodeAll(b *testing.B) { benchmarks := []int{ 1, @@ -537,10 +434,10 @@ func BenchmarkFloatArrayDecodeAll(b *testing.B) { got, err := tsm1.FloatArrayDecodeAll(bytes, dst) if err != nil { - b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), size)) + b.Fatalf("unexpected error\n%s", err.Error()) } if len(got) != size { - b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), size)) + b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(got), size)) } } })