Batch oriented float encoders

This commit adds a tsm1 function for encoding a batch of floats into a
buffer. Further, it replaces the `bitstream` library used in the
existing encoders (and all the current decoders) with inlined bit
expressions within the encoder, significantly reducing the function call
overhead for larger batches.

The following benchmarks compare the performance of the existing
iterator based encoders, and the new batch oriented encoders. They look
at a sequential input slice and a randomly generated input slice.

name                   old time/op    new time/op    delta
EncodeFloats/10_seq      1.14µs ± 3%    0.24µs ± 3%  -78.94%  (p=0.000 n=10+10)
EncodeFloats/10_ran      1.69µs ± 2%    0.21µs ± 3%  -87.43%  (p=0.000 n=10+10)
EncodeFloats/100_seq     7.07µs ± 1%    1.72µs ± 1%  -75.62%  (p=0.000 n=7+9)
EncodeFloats/100_ran     15.8µs ± 4%     1.8µs ± 1%  -88.60%  (p=0.000 n=10+9)
EncodeFloats/1000_seq    50.2µs ± 3%    16.2µs ± 2%  -67.66%  (p=0.000 n=10+10)
EncodeFloats/1000_ran     174µs ± 2%      16µs ± 2%  -90.77%  (p=0.000 n=10+10)

name                   old alloc/op   new alloc/op   delta
EncodeFloats/10_seq       0.00B          0.00B          ~     (all equal)
EncodeFloats/10_ran       0.00B          0.00B          ~     (all equal)
EncodeFloats/100_seq      0.00B          0.00B          ~     (all equal)
EncodeFloats/100_ran      0.00B          0.00B          ~     (all equal)
EncodeFloats/1000_seq     0.00B          0.00B          ~     (all equal)
EncodeFloats/1000_ran     0.00B          0.00B          ~     (all equal)

name                   old allocs/op  new allocs/op  delta
EncodeFloats/10_seq        0.00           0.00          ~     (all equal)
EncodeFloats/10_ran        0.00           0.00          ~     (all equal)
EncodeFloats/100_seq       0.00           0.00          ~     (all equal)
EncodeFloats/100_ran       0.00           0.00          ~     (all equal)
EncodeFloats/1000_seq      0.00           0.00          ~     (all equal)
EncodeFloats/1000_ran      0.00           0.00          ~     (all equal)
pull/10300/head
Edd Robinson 2018-09-11 13:04:52 +01:00
parent 9ecadd1a9c
commit 6b52231a37
3 changed files with 524 additions and 4 deletions

View File

@ -2,10 +2,251 @@ package tsm1
import (
"encoding/binary"
"fmt"
"io"
"math"
"math/bits"
)
// FloatArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// Currently only the float compression scheme used in Facebook's Gorilla is
// supported, so this method implements a batch oriented version of that.
func FloatArrayEncodeAll(src []float64, b []byte) ([]byte, error) {
if cap(b) < 9 {
b = make([]byte, 0, 9) // Enough room for the header and one value.
}
b = b[:1]
b[0] = floatCompressedGorilla << 4
var first float64
var finished bool
if len(src) > 0 && math.IsNaN(src[0]) {
return nil, fmt.Errorf("unsupported value: NaN")
} else if len(src) == 0 {
first = math.NaN() // Write sentinal value to terminate batch.
finished = true
} else {
first = src[0]
src = src[1:]
}
b = b[:9]
n := uint64(8 + 64) // Number of bits written.
prev := math.Float64bits(first)
// Write first value.
binary.BigEndian.PutUint64(b[1:], prev)
prevLeading, prevTrailing := ^uint64(0), uint64(0)
var leading, trailing uint64
var mask uint64
encode := func(x float64) {
cur := math.Float64bits(x)
vDelta := cur ^ prev
if vDelta == 0 {
n++ // Write a zero bit. Nothing else to do.
prev = cur
return
}
// First the current bit of the current byte is set to indicate we're
// writing a delta value to the stream.
if n>>3 >= uint64(len(b)) { // Grow b — no room in current byte.
b = append(b, byte(0))
}
// n&7 - current bit in current byte.
// n>>3 - the current byte.
b[n>>3] |= 128 >> (n & 7) // Sets the current bit of the current byte.
n++
// Write the delta to b.
// Determine the leading and trailing zeros.
leading = uint64(bits.LeadingZeros64(vDelta))
trailing = uint64(bits.TrailingZeros64(vDelta))
// Clamp number of leading zeros to avoid overflow when encoding
leading &= 0x1F
if leading >= 32 {
leading = 31
}
// At least 2 further bits will be required.
if (n+2)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
if prevLeading != ^uint64(0) && leading >= prevLeading && trailing >= prevTrailing {
n++ // Write a zero bit.
// Write the l least significant bits of vDelta to b, most significant
// bit first.
l := uint64(64 - prevLeading - prevTrailing)
for (n+l)>>3 >= uint64(len(b)) { // Keep growing b until we can fit all bits in.
b = append(b, byte(0))
}
// Full value to write.
v := (vDelta >> prevTrailing) << (64 - l) // l least signifciant bits of v.
var m = n & 7 // Current bit in current byte.
var written uint64
if m > 0 { // In this case the current byte is not full.
written = 8 - m
if l < written {
written = l
}
mask = v >> 56 // Move 8 MSB to 8 LSB
b[n>>3] |= byte(mask >> m)
n += written
if l-written == 0 {
prev = cur
return
}
}
vv := v << written // Move written bits out of the way.
// TODO(edd): Optimise this. It's unlikely we actually have 8 bytes to write.
if (n>>3)+8 >= uint64(len(b)) {
b = append(b, make([]byte, 8)...)
}
binary.BigEndian.PutUint64(b[n>>3:], vv)
n += (l - written)
} else {
prevLeading, prevTrailing = leading, trailing
// Set a single bit to indicate a value will follow.
b[n>>3] |= 128 >> (n & 7) // Set current bit on current byte
n++
// Write 5 bits of leading.
if (n+5)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
// Enough room to write the 5 bits in the current byte?
var m = n & 7
l := uint64(5)
v := leading << 59 // 5 LSB of leading.
mask = v >> 56 // Move 5 MSB to 8 LSB
if m <= 3 { // 5 bits fit into current byte.
b[n>>3] |= byte(mask >> m)
n += l
} else { // In this case there are fewer than 5 bits available in current byte.
// First step is to fill current byte
written := 8 - m
b[n>>3] |= byte(mask >> m) // Some of mask will get lost.
n += written
// Second step is to write the lost part of mask into the next byte.
mask = v << written // Move written bits in previous byte out of way.
mask >>= 56
m = n & 7 // Recompute current bit.
b[n>>3] |= byte(mask >> m)
n += (l - written)
}
// Note that if leading == trailing == 0, then sigbits == 64. But that
// value doesn't actually fit into the 6 bits we have.
// Luckily, we never need to encode 0 significant bits, since that would
// put us in the other case (vdelta == 0). So instead we write out a 0 and
// adjust it back to 64 on unpacking.
sigbits := 64 - leading - trailing
if (n+6)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
m = n & 7
l = uint64(6)
v = sigbits << 58 // Move 6 LSB of sigbits to MSB
mask = v >> 56 // Move 6 MSB to 8 LSB
if m <= 2 {
// The 6 bits fir into the current byte.
b[n>>3] |= byte(mask >> m)
n += l
} else { // In this case there are fewer than 6 bits available in current byte.
// First step is to fill the current byte.
written := 8 - m
b[n>>3] |= byte(mask >> m) // Write to the current bit.
n += written
// Second step is to write the lost part of mask into the next byte.
// Write l remaining bits into current byte.
mask = v << written // Remove bits written in previous byte out of way.
mask >>= 56
m = n & 7 // Recompute current bit.
b[n>>3] |= byte(mask >> m)
n += l - written
}
// Write final value.
m = n & 7
l = sigbits
v = (vDelta >> trailing) << (64 - l) // Move l LSB into MSB
for (n+l)>>3 >= uint64(len(b)) { // Keep growing b until we can fit all bits in.
b = append(b, byte(0))
}
var written uint64
if m > 0 { // In this case the current byte is not full.
written = 8 - m
if l < written {
written = l
}
mask = v >> 56 // Move 8 MSB to 8 LSB
b[n>>3] |= byte(mask >> m)
n += written
if l-written == 0 {
prev = cur
return
}
}
// Shift remaining bits and write out in one go.
vv := v << written // Remove bits written in previous byte.
// TODO(edd): Optimise this.
if (n>>3)+8 >= uint64(len(b)) {
b = append(b, make([]byte, 8)...)
}
binary.BigEndian.PutUint64(b[n>>3:], vv)
n += (l - written)
}
prev = cur
}
// Encode remaining values.
for _, v := range src {
if math.IsNaN(v) {
return nil, fmt.Errorf("unsupported value: NaN")
}
encode(v)
}
// Encode sentinal value to terminate batch
if !finished {
encode(math.NaN())
}
length := n >> 3
if n&7 > 0 {
length++ // Add an extra byte to capture overflowing bits.
}
return b[:length], nil
}
func FloatArrayDecodeAll(b []byte, dst []float64) ([]float64, error) {
if len(b) == 0 {
return []float64{}, nil

View File

@ -15,6 +15,188 @@ import (
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestFloatArrayEncodeAll(t *testing.T) {
examples := [][]float64{
{12, 12, 24, 13, 24, 24, 24, 24}, // From example paper.
{-3.8970913068231994e+307, -9.036931257783943e+307, 1.7173073833490201e+308,
-9.312369166661538e+307, -2.2435523083555231e+307, 1.4779121287289644e+307,
1.771273431601434e+308, 8.140360378221364e+307, 4.783405048208089e+307,
-2.8044680049605344e+307, 4.412915337205696e+307, -1.2779380602005046e+308,
1.6235802318921885e+308, -1.3402901846299688e+307, 1.6961015582104055e+308,
-1.067980796435633e+308, -3.02868987458268e+307, 1.7641793640790284e+308,
1.6587191845856813e+307, -1.786073304985983e+308, 1.0694549382051123e+308,
3.5635180996210295e+307}, // Failed during early development
{6.00065e+06, 6.000656e+06, 6.000657e+06, 6.000659e+06, 6.000661e+06}, // Similar values.
twoHoursData,
{},
}
for _, example := range examples {
src := example
var buf []byte
buf, err := tsm1.FloatArrayEncodeAll(src, buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
result, err := tsm1.FloatArrayDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := result, src; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
}
}
func TestFloatArrayEncode_Compare(t *testing.T) {
// generate random values
input := make([]float64, 1000)
for i := 0; i < len(input); i++ {
input[i] = (rand.Float64() * math.MaxFloat64) - math.MaxFloat32
}
// Example from the paper
s := tsm1.NewFloatEncoder()
for _, v := range input {
s.Write(v)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var buf2 []byte
buf2, err = tsm1.FloatArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
result, err := tsm1.FloatArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := result, input; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func dumpBufs(a, b []byte) {
longest := len(a)
if len(b) > longest {
longest = len(b)
}
for i := 0; i < longest; i++ {
var as, bs string
if i < len(a) {
as = fmt.Sprintf("%08b", a[i])
}
if i < len(b) {
bs = fmt.Sprintf("%08b", b[i])
}
same := as == bs
fmt.Printf("%d (%d) %s - %s :: %v\n", i, i*8, as, bs, same)
}
fmt.Println()
}
func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%d %08b\n", i, v)
}
fmt.Println()
}
func TestFloatArrayEncodeAll_NaN(t *testing.T) {
examples := [][]float64{
{1.0, math.NaN(), 2.0},
{1.22, math.NaN()},
{math.NaN(), math.NaN()},
{math.NaN()},
}
for _, example := range examples {
var buf []byte
_, err := tsm1.FloatArrayEncodeAll(example, buf)
if err == nil {
t.Fatalf("expected error. got nil")
}
}
}
func Test_FloatArrayEncodeAll_Quick(t *testing.T) {
quick.Check(func(values []float64) bool {
src := values
if src == nil {
src = []float64{}
}
for i, v := range src {
if math.IsNaN(v) {
src[i] = 1.0 // Remove invalid values
}
}
s := tsm1.NewFloatEncoder()
for _, p := range src {
s.Write(p)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var buf2 []byte
buf2, err = tsm1.FloatArrayEncodeAll(src, buf2)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
result, err := tsm1.FloatArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
fmt.Println(src)
t.Fatalf("unexpected error: %v", err)
}
if got, exp := result, src[:len(src)]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
return true
}, nil)
}
func TestDecodeFloatArrayAll_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
b, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var got []float64
if _, err := tsm1.FloatArrayDecodeAll(b, got); err != nil {
t.Fatal(err)
}
}
func TestFloatArrayDecodeAll_Simple(t *testing.T) {
// Example from the paper
s := tsm1.NewFloatEncoder()
@ -57,7 +239,7 @@ func TestFloatArrayDecodeAll_Simple(t *testing.T) {
}
}
func TestFloatBatchDecodeAll_Empty(t *testing.T) {
func TestFloatArrayDecodeAll_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
@ -67,7 +249,7 @@ func TestFloatBatchDecodeAll_Empty(t *testing.T) {
}
buf := make([]float64, 8)
got, err := tsm1.FloatBatchDecodeAll(b, buf)
got, err := tsm1.FloatArrayDecodeAll(b, buf)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -245,6 +427,81 @@ func TestBatchBitReader_Quick(t *testing.T) {
}
}
var bufResult []byte
func BenchmarkEncodeFloats(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
enc := tsm1.NewFloatEncoder()
for _, n := range cases {
b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) {
input := make([]float64, n)
for i := 0; i < n; i++ {
input[i] = float64(i)
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
input := make([]float64, n)
for i := 0; i < n; i++ {
input[i] = rand.Float64() * 100.0
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
}
}
func BenchmarkFloatArrayDecodeAll(b *testing.B) {
benchmarks := []int{
1,

View File

@ -174,7 +174,6 @@ func TestFloatEncoder_Roundtrip(t *testing.T) {
}
func TestFloatEncoder_Roundtrip_NaN(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Write(1.0)
s.Write(math.NaN())
@ -182,12 +181,35 @@ func TestFloatEncoder_Roundtrip_NaN(t *testing.T) {
s.Flush()
_, err := s.Bytes()
if err == nil {
t.Fatalf("expected error. got nil")
}
}
func TestFloatEncoder_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
b, err := s.Bytes()
if err != nil {
t.Fatal(err)
}
var dec tsm1.FloatDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatal(err)
}
var got []float64
for dec.Next() {
got = append(got, dec.Values())
}
if len(got) != 0 {
t.Fatalf("got len %d, expected 0", len(got))
}
}
func Test_FloatEncoder_Quick(t *testing.T) {
quick.Check(func(values []float64) bool {