feat(tsm1): New APIs to decode an entire buffer of data

* APIs decode an entire byte slice of encoded data into the provided
  `dst` slice
* APIs are stateless and in almost all cases avoid any allocations
* Intended to be used future batch-oriented TSM block decode APIs
* duplicated tests from original iterator-based APIs
pull/10084/head
Stuart Carnie 2018-07-02 16:56:25 -07:00
parent 06257822c2
commit b3e53ae2dc
10 changed files with 2143 additions and 0 deletions

View File

@ -0,0 +1,43 @@
package tsm1
import (
"encoding/binary"
"fmt"
)
func BooleanBatchDecodeAll(b []byte, dst []bool) ([]bool, error) {
if len(b) == 0 {
return nil, nil
}
// First byte stores the encoding type, only have 1 bit-packet format
// currently ignore for now.
b = b[1:]
val, n := binary.Uvarint(b)
if n <= 0 {
return nil, fmt.Errorf("BooleanBatchDecoder: invalid count")
}
count := int(val)
b = b[n:]
if min := len(b) * 8; min < count {
// Shouldn't happen - TSM file was truncated/corrupted
count = min
}
if cap(dst) < count {
dst = make([]bool, count)
} else {
dst = dst[:count]
}
j := 0
for _, v := range b {
for i := byte(128); i > 0 && j < len(dst); i >>= 1 {
dst[j] = v&i != 0
j++
}
}
return dst, nil
}

View File

@ -0,0 +1,122 @@
package tsm1_test
import (
"fmt"
"math/rand"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func Test_BooleanBatchDecodeAll_Single(t *testing.T) {
enc := tsm1.NewBooleanEncoder(1)
exp := true
enc.Write(exp)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, _ := tsm1.BooleanBatchDecodeAll(b, nil)
if len(got) != 1 {
t.Fatalf("expected 1 value")
}
if got := got[0]; got != exp {
t.Fatalf("unexpected value -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func Test_BooleanBatchDecodeAll_Multi_Compressed(t *testing.T) {
cases := []struct {
n int
p float64 // probability of a true value
}{
{10, 0.33},
{100, 0.55},
{1000, 0.68},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%d_%0.2f", tc.n, tc.p), func(t *testing.T) {
rand.Seed(int64(tc.n * tc.n))
enc := tsm1.NewBooleanEncoder(tc.n)
values := make([]bool, tc.n)
for i := range values {
values[i] = rand.Float64() < tc.p
enc.Write(values[i])
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := tsm1.BooleanBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error %q", err.Error())
}
if !cmp.Equal(got, values) {
t.Fatalf("unexpected values, -got/+exp\n%s", cmp.Diff(got, values))
}
})
}
}
func Test_BooleanBatchDecoder_Corrupt(t *testing.T) {
cases := []struct {
name string
d string
}{
{"empty", ""},
{"invalid count", "\x10\x90"},
{"count greater than remaining bits, multiple bytes expected", "\x10\x7f"},
{"count greater than remaining bits, one byte expected", "\x10\x01"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dst, _ := tsm1.BooleanBatchDecodeAll([]byte(c.d), nil)
if len(dst) != 0 {
t.Fatalf("unexpected result -got/+want\n%s", cmp.Diff(dst, nil))
}
})
}
}
func BenchmarkBooleanBatchDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
}{
{1},
{55},
{555},
{1000},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
size := bm.n
e := tsm1.NewBooleanEncoder(size)
for i := 0; i < size; i++ {
e.Write(i&1 == 1)
}
bytes, err := e.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
b.SetBytes(int64(len(bytes)))
b.ResetTimer()
dst := make([]bool, size)
for i := 0; i < b.N; i++ {
res, _ := tsm1.BooleanBatchDecodeAll(bytes, dst)
if len(res) != size {
b.Fatalf("expected to read %d booleans, but read %d", size, len(res))
}
}
})
}
}

View File

@ -0,0 +1,222 @@
package tsm1
import (
"encoding/binary"
"io"
"math"
)
func FloatBatchDecodeAll(b []byte, dst []float64) ([]float64, error) {
if len(b) == 0 {
return []float64{}, nil
}
sz := cap(dst)
if sz == 0 {
sz = 64
dst = make([]float64, sz)
} else {
dst = dst[:sz]
}
var (
val uint64 // current value
leading uint64
trailing uint64
bit bool
br BatchBitReader
)
j := 0
// first byte is the compression type.
// we currently just have gorilla compression.
br.Reset(b[1:])
val = br.ReadBits(64)
dst[j] = math.Float64frombits(val)
j++
// The expected exit condition is for `uvnan` to be decoded.
// Any other error (EOF) indicates a truncated stream.
for br.Err() == nil {
// read compressed value
if br.CanReadBitFast() {
bit = br.ReadBitFast()
} else {
bit = br.ReadBit()
}
if bit {
if br.CanReadBitFast() {
bit = br.ReadBitFast()
} else {
bit = br.ReadBit()
}
if bit {
leading = br.ReadBits(5)
mbits := br.ReadBits(6)
if mbits == 0 {
mbits = 64
}
trailing = 64 - leading - mbits
}
mbits := uint(64 - leading - trailing)
bits := br.ReadBits(mbits)
val ^= bits << trailing
if val == uvnan { // IsNaN, eof
break
}
}
f := math.Float64frombits(val)
if j < len(dst) {
dst[j] = f
} else {
dst = append(dst, f) // force a resize
dst = dst[:cap(dst)]
}
j++
}
return dst[:j], br.Err()
}
// BatchBitReader reads bits from an io.Reader.
type BatchBitReader struct {
data []byte
buf struct {
v uint64 // bit buffer
n uint // available bits
}
err error
}
// NewBatchBitReader returns a new instance of BatchBitReader that reads from data.
func NewBatchBitReader(data []byte) *BatchBitReader {
b := new(BatchBitReader)
b.Reset(data)
return b
}
// Reset sets the underlying reader on b and reinitializes.
func (r *BatchBitReader) Reset(data []byte) {
r.data = data
r.buf.v, r.buf.n, r.err = 0, 0, nil
r.readBuf()
}
func (r *BatchBitReader) Err() error { return r.err }
// CanReadBitFast returns true if calling ReadBitFast() is allowed.
// Fast bit reads are allowed when at least 2 values are in the buffer.
// This is because it is not required to refilled the buffer and the caller
// can inline the calls.
func (r *BatchBitReader) CanReadBitFast() bool { return r.buf.n > 1 }
// ReadBitFast is an optimized bit read.
// IMPORTANT: Only allowed if CanReadFastBit() is true!
func (r *BatchBitReader) ReadBitFast() bool {
v := r.buf.v&(1<<63) != 0
r.buf.v <<= 1
r.buf.n -= 1
return v
}
// ReadBit returns the next bit from the underlying data.
func (r *BatchBitReader) ReadBit() bool {
return r.ReadBits(1) != 0
}
// ReadBits reads nbits from the underlying data into a uint64.
// nbits must be from 1 to 64, inclusive.
func (r *BatchBitReader) ReadBits(nbits uint) uint64 {
// Return EOF if there is no more data.
if r.buf.n == 0 {
r.err = io.EOF
return 0
}
// Return bits from buffer if less than available bits.
if nbits <= r.buf.n {
// Return all bits, if requested.
if nbits == 64 {
v := r.buf.v
r.buf.v, r.buf.n = 0, 0
r.readBuf()
return v
}
// Otherwise mask returned bits.
v := r.buf.v >> (64 - nbits)
r.buf.v <<= nbits
r.buf.n -= nbits
if r.buf.n == 0 {
r.readBuf()
}
return v
}
// Otherwise read all available bits in current buffer.
v, n := r.buf.v, r.buf.n
// Read new buffer.
r.buf.v, r.buf.n = 0, 0
r.readBuf()
// Append new buffer to previous buffer and shift to remove unnecessary bits.
v |= r.buf.v >> n
v >>= 64 - nbits
// Remove used bits from new buffer.
bufN := nbits - n
if bufN > r.buf.n {
bufN = r.buf.n
}
r.buf.v <<= bufN
r.buf.n -= bufN
if r.buf.n == 0 {
r.readBuf()
}
return v
}
func (r *BatchBitReader) readBuf() {
// Determine number of bytes to read to fill buffer.
byteN := 8 - (r.buf.n / 8)
// Limit to the length of our data.
if n := uint(len(r.data)); byteN > n {
byteN = n
}
// Optimized 8-byte read.
if byteN == 8 {
r.buf.v = binary.BigEndian.Uint64(r.data)
r.buf.n = 64
r.data = r.data[8:]
return
}
i := uint(0)
if byteN > 3 {
r.buf.n += 32
r.buf.v |= uint64(binary.BigEndian.Uint32(r.data)) << (64 - r.buf.n)
i += 4
}
// Otherwise append bytes to buffer.
for ; i < byteN; i++ {
r.buf.n += 8
r.buf.v |= uint64(r.data[i]) << (64 - r.buf.n)
}
// Move data forward.
r.data = r.data[byteN:]
}

View File

@ -0,0 +1,265 @@
package tsm1_test
import (
"bytes"
"fmt"
"io"
"math"
"math/rand"
"reflect"
"testing"
"testing/quick"
"github.com/dgryski/go-bitstream"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestFloatBatchDecodeAll_Simple(t *testing.T) {
// Example from the paper
s := tsm1.NewFloatEncoder()
exp := []float64{
12,
12,
24,
// extra tests
// floating point masking/shifting bug
13,
24,
// delta-of-delta sizes
24,
24,
24,
}
for _, f := range exp {
s.Write(f)
}
s.Flush()
b, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
buf := make([]float64, 8)
got, err := tsm1.FloatBatchDecodeAll(b, buf)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestBatchBitStreamEOF(t *testing.T) {
br := tsm1.NewBatchBitReader([]byte("0"))
b := br.ReadBits(8)
if br.Err() != nil {
t.Fatal(br.Err())
}
if b != '0' {
t.Error("ReadBits(8) didn't return first byte")
}
br.ReadBits(8)
if br.Err() != io.EOF {
t.Error("ReadBits(8) on empty string didn't return EOF")
}
// 0 = 0b00110000
br = tsm1.NewBatchBitReader([]byte("0"))
buf := bytes.NewBuffer(nil)
bw := bitstream.NewWriter(buf)
for i := 0; i < 4; i++ {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
bw.Flush(bitstream.One)
err := bw.WriteByte(0xAA)
if err != nil {
t.Error("unable to WriteByte")
}
c := buf.Bytes()
if len(c) != 2 || c[1] != 0xAA || c[0] != 0x3f {
t.Error("bad return from 4 read bytes")
}
br = tsm1.NewBatchBitReader([]byte(""))
br.ReadBit()
if br.Err() != io.EOF {
t.Error("ReadBit on empty string didn't return EOF")
}
}
func TestBatchBitStream(t *testing.T) {
buf := bytes.NewBuffer(nil)
br := tsm1.NewBatchBitReader([]byte("hello"))
bw := bitstream.NewWriter(buf)
for {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
s := buf.String()
if s != "hello" {
t.Error("expected 'hello', got=", []byte(s))
}
}
func TestBatchByteStream(t *testing.T) {
buf := bytes.NewBuffer(nil)
br := tsm1.NewBatchBitReader([]byte("hello"))
bw := bitstream.NewWriter(buf)
for i := 0; i < 3; i++ {
bit := br.ReadBit()
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("GetBit returned error err=", br.Err().Error())
return
}
bw.WriteBit(bitstream.Bit(bit))
}
for i := 0; i < 3; i++ {
byt := br.ReadBits(8)
if br.Err() == io.EOF {
break
}
if br.Err() != nil {
t.Error("ReadBits(8) returned error err=", br.Err().Error())
return
}
bw.WriteByte(byte(byt))
}
u := br.ReadBits(13)
if br.Err() != nil {
t.Error("ReadBits returned error err=", br.Err().Error())
return
}
bw.WriteBits(u, 13)
bw.WriteBits(('!'<<12)|('.'<<4)|0x02, 20)
// 0x2f == '/'
bw.Flush(bitstream.One)
s := buf.String()
if s != "hello!./" {
t.Errorf("expected 'hello!./', got=%x", []byte(s))
}
}
// Ensure bit reader can read random bits written to a stream.
func TestBatchBitReader_Quick(t *testing.T) {
if err := quick.Check(func(values []uint64, nbits []uint) bool {
// Limit nbits to 64.
for i := 0; i < len(values) && i < len(nbits); i++ {
nbits[i] = (nbits[i] % 64) + 1
values[i] = values[i] & (math.MaxUint64 >> (64 - nbits[i]))
}
// Write bits to a buffer.
var buf bytes.Buffer
w := bitstream.NewWriter(&buf)
for i := 0; i < len(values) && i < len(nbits); i++ {
w.WriteBits(values[i], int(nbits[i]))
}
w.Flush(bitstream.Zero)
// Read bits from the buffer.
r := tsm1.NewBatchBitReader(buf.Bytes())
for i := 0; i < len(values) && i < len(nbits); i++ {
v := r.ReadBits(nbits[i])
if r.Err() != nil {
t.Errorf("unexpected error(%d): %s", i, r.Err())
return false
} else if v != values[i] {
t.Errorf("value mismatch(%d): got=%d, exp=%d (nbits=%d)", i, v, values[i], nbits[i])
return false
}
}
return true
}, &quick.Config{
Values: func(a []reflect.Value, rand *rand.Rand) {
a[0], _ = quick.Value(reflect.TypeOf([]uint64{}), rand)
a[1], _ = quick.Value(reflect.TypeOf([]uint{}), rand)
},
}); err != nil {
t.Fatal(err)
}
}
func BenchmarkFloatBatchDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
}{
{1},
{55},
{550},
{1000},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
s := tsm1.NewFloatEncoder()
for c := 0; c < bm.n; c++ {
s.Write(twoHoursData[c%len(twoHoursData)])
}
s.Flush()
bytes, err := s.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
dst := make([]float64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
got, err := tsm1.FloatBatchDecodeAll(bytes, dst)
if err != nil {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), bm.n))
}
if len(got) != bm.n {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), bm.n))
}
}
})
}
}

View File

@ -0,0 +1,170 @@
package tsm1
import (
"encoding/binary"
"fmt"
"unsafe"
"github.com/influxdata/influxdb/pkg/encoding/simple8b"
)
var (
integerBatchDecoderFunc = [...]func(b []byte, dst []int64) ([]int64, error){
integerBatchDecodeAllUncompressed,
integerBatchDecodeAllSimple,
integerBatchDecodeAllRLE,
integerBatchDecodeAllInvalid,
}
)
func IntegerBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
if len(b) == 0 {
return []int64{}, nil
}
encoding := b[0] >> 4
if encoding > intCompressedRLE {
encoding = 3 // integerBatchDecodeAllInvalid
}
return integerBatchDecoderFunc[encoding&3](b, dst)
}
func UnsignedBatchDecodeAll(b []byte, dst []uint64) ([]uint64, error) {
if len(b) == 0 {
return []uint64{}, nil
}
encoding := b[0] >> 4
if encoding > intCompressedRLE {
encoding = 3 // integerBatchDecodeAllInvalid
}
res, err := integerBatchDecoderFunc[encoding&3](b, reintepretUint64ToInt64Slice(dst))
return reintepretInt64ToUint64Slice(res), err
}
func integerBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b)&0x7 != 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: expected multiple of 8 bytes")
}
count := len(b) / 8
if cap(dst) < count {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
prev := int64(0)
for i := range dst {
prev += ZigZagDecode(binary.BigEndian.Uint64(b[i*8:]))
dst[i] = prev
}
return dst, nil
}
func integerBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b) < 8 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: not enough data to decode packed value")
}
count, err := simple8b.CountBytes(b[8:])
if err != nil {
return []int64{}, err
}
count += 1
if cap(dst) < count {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
// first value
dst[0] = ZigZagDecode(binary.BigEndian.Uint64(b))
// decode compressed values
buf := reintepretInt64ToUint64Slice(dst)
n, err := simple8b.DecodeBytesBigEndian(buf[1:], b[8:])
if err != nil {
return []int64{}, err
}
if n != count-1 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
}
// calculate prefix sum
prev := dst[0]
for i := 1; i < len(dst); i++ {
prev += ZigZagDecode(uint64(dst[i]))
dst[i] = prev
}
return dst, nil
}
func integerBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b) < 8 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: not enough data to decode RLE starting value")
}
var k, n int
// Next 8 bytes is the starting value
first := ZigZagDecode(binary.BigEndian.Uint64(b[k : k+8]))
k += 8
// Next 1-10 bytes is the delta value
value, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: invalid RLE delta value")
}
k += n
delta := ZigZagDecode(value)
// Last 1-10 bytes is how many times the value repeats
count, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: invalid RLE repeat value")
}
count += 1
if cap(dst) < int(count) {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
if delta == 0 {
for i := range dst {
dst[i] = first
}
} else {
acc := first
for i := range dst {
dst[i] = acc
acc += delta
}
}
return dst, nil
}
func integerBatchDecodeAllInvalid(b []byte, _ []int64) ([]int64, error) {
return []int64{}, fmt.Errorf("unknown encoding %v", b[0]>>4)
}
func reintepretInt64ToUint64Slice(src []int64) []uint64 {
return *(*[]uint64)(unsafe.Pointer(&src))
}
func reintepretUint64ToInt64Slice(src []uint64) []int64 {
return *(*[]int64)(unsafe.Pointer(&src))
}

View File

@ -0,0 +1,393 @@
package tsm1
import (
"fmt"
"math"
"math/rand"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
)
func TestIntegerBatchDecodeAll_NegativeUncompressed(t *testing.T) {
exp := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
1432686216250034552, 3663244026151507025, -3068113732684750258,
-1949953187327444488, 3713374280993588804, 3226153669854871355,
-2093273755080502606, 1006087192578600616, -2272122301622271655,
2533238229511593671, -4450454445568858273, 2647789901083530435,
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
enc := NewIntegerEncoder(256)
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("expected error: %v", err)
}
if got := b[0] >> 4; intUncompressed != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_AllNegative(t *testing.T) {
enc := NewIntegerEncoder(3)
exp := []int64{
-10, -5, -1,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intCompressedSimple != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_CounterPacked(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedSimple {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte + 2, 8 byte words if delta-encoding is used based on
// values sizes. Without delta-encoding, we'd get 49 bytes.
if exp := 17; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_CounterRLE(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected RLE, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 11; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_Descending(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
7094, 4472, 1850,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 12; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_Flat(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1, 1, 1, 1,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 11; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_MinMax(t *testing.T) {
enc := NewIntegerEncoder(2)
exp := []int64{
math.MinInt64, math.MaxInt64,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intUncompressed {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
if exp := 17; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerBatchDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
exp := values
if values == nil {
exp = []int64{} // is this really expected?
}
// Write values to encoder.
enc := NewIntegerEncoder(1024)
for _, v := range values {
enc.Write(v)
}
// Retrieve encoded bytes from encoder.
buf, err := enc.Bytes()
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got, err := IntegerBatchDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
return true
}, nil)
}
func BenchmarkIntegerBatchDecodeAllUncompressed(b *testing.B) {
benchmarks := []struct {
n int
}{
{5},
{55},
{555},
{1000},
}
values := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
1432686216250034552, 3663244026151507025, -3068113732684750258,
-1949953187327444488, 3713374280993588804, 3226153669854871355,
-2093273755080502606, 1006087192578600616, -2272122301622271655,
2533238229511593671, -4450454445568858273, 2647789901083530435,
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
enc := NewIntegerEncoder(bm.n)
for i := 0; i < bm.n; i++ {
enc.Write(values[rand.Int()%len(values)])
}
bytes, _ := enc.Bytes()
dst := make([]int64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dst, _ = IntegerBatchDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkIntegerBatchDecodeAllPackedSimple(b *testing.B) {
benchmarks := []struct {
n int
}{
{5},
{55},
{555},
{1000},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
enc := NewIntegerEncoder(bm.n)
for i := 0; i < bm.n; i++ {
// Small amount of randomness prevents RLE from being used
enc.Write(int64(i) + int64(rand.Intn(10)))
}
bytes, _ := enc.Bytes()
dst := make([]int64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
IntegerBatchDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkIntegerBatchDecodeAllRLE(b *testing.B) {
benchmarks := []struct {
n int
delta int64
}{
{5, 1},
{55, 1},
{555, 1},
{1000, 1},
{1000, 0},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d_delta_%d", bm.n, bm.delta), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
enc := NewIntegerEncoder(bm.n)
acc := int64(0)
for i := 0; i < bm.n; i++ {
enc.Write(acc)
acc += bm.delta
}
bytes, _ := enc.Bytes()
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
dst := make([]int64, bm.n)
for i := 0; i < b.N; i++ {
IntegerBatchDecodeAll(bytes, dst)
}
})
}
}

View File

@ -0,0 +1,81 @@
package tsm1
import (
"encoding/binary"
"fmt"
"unsafe"
"github.com/golang/snappy"
)
var (
errStringBatchDecodeInvalidStringLength = fmt.Errorf("StringBatchDecodeAll: invalid encoded string length")
errStringBatchDecodeLengthOverflow = fmt.Errorf("StringBatchDecodeAll: length overflow")
errStringBatchDecodeShortBuffer = fmt.Errorf("StringBatchDecodeAll: short buffer")
)
func StringBatchDecodeAll(b []byte, dst []string) ([]string, error) {
// First byte stores the encoding type, only have snappy format
// currently so ignore for now.
if len(b) > 0 {
var err error
// it is important that to note that `snappy.Decode` always returns
// a newly allocated slice as the final strings reference this slice
// directly.
b, err = snappy.Decode(nil, b[1:])
if err != nil {
return []string{}, fmt.Errorf("failed to decode string block: %v", err.Error())
}
} else {
return []string{}, nil
}
var (
i, l int
)
sz := cap(dst)
if sz == 0 {
sz = 64
dst = make([]string, sz)
} else {
dst = dst[:sz]
}
j := 0
for i < len(b) {
length, n := binary.Uvarint(b[i:])
if n <= 0 {
return []string{}, errStringBatchDecodeInvalidStringLength
}
// The length of this string plus the length of the variable byte encoded length
l = int(length) + n
lower := i + n
upper := lower + int(length)
if upper < lower {
return []string{}, errStringBatchDecodeLengthOverflow
}
if upper > len(b) {
return []string{}, errStringBatchDecodeShortBuffer
}
// NOTE: this optimization is critical for performance and to reduce
// allocations. This is just as "safe" as string.Builder, which
// returns a string mapped to the original byte slice
s := b[lower:upper]
val := *(*string)(unsafe.Pointer(&s))
if j < len(dst) {
dst[j] = val
} else {
dst = append(dst, val) // force a resize
dst = dst[:cap(dst)]
}
i += l
j++
}
return dst[:j], nil
}

View File

@ -0,0 +1,192 @@
package tsm1
import (
"fmt"
"math/rand"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/internal/testutil"
)
func TestStringBatchDecodeAll_NoValues(t *testing.T) {
enc := NewStringEncoder(1024)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := StringBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
exp := []string{}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestStringBatchDecodeAll_Single(t *testing.T) {
enc := NewStringEncoder(1024)
v1 := "v1"
enc.Write(v1)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := StringBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
exp := []string{"v1"}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestStringBatchDecodeAll_Multi_Compressed(t *testing.T) {
enc := NewStringEncoder(1024)
exp := make([]string, 10)
for i := range exp {
exp[i] = fmt.Sprintf("value %d", i)
enc.Write(exp[i])
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != stringCompressedSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy)
}
if exp := 51; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
got, err := StringBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestStringBatchDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []string) bool {
exp := values
if values == nil {
exp = []string{}
}
// Write values to encoder.
enc := NewStringEncoder(1024)
for _, v := range values {
enc.Write(v)
}
// Retrieve encoded bytes from encoder.
buf, err := enc.Bytes()
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got, err := StringBatchDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
return true
}, nil)
}
func TestStringBatchDecodeAll_Empty(t *testing.T) {
got, err := StringBatchDecodeAll([]byte{}, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
exp := []string{}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestStringBatchDecodeAll_CorruptBytes(t *testing.T) {
cases := []string{
"\x10\x03\b\x03Hi", // Higher length than actual data
"\x10\x1dp\x9c\x90\x90\x90\x90\x90\x90\x90\x90\x90length overflow----",
"0t\x00\x01\x000\x00\x01\x000\x00\x01\x000\x00\x01\x000\x00\x01" +
"\x000\x00\x01\x000\x00\x01\x000\x00\x00\x00\xff:\x01\x00\x01\x00\x01" +
"\x00\x01\x00\x01\x00\x01\x00\x010\x010\x000\x010\x010\x010\x01" +
"0\x010\x010\x010\x010\x010\x010\x010\x010\x010\x010", // Upper slice bounds overflows negative
}
for _, c := range cases {
t.Run(fmt.Sprintf("%q", c), func(t *testing.T) {
got, err := StringBatchDecodeAll([]byte(c), nil)
if err == nil {
t.Fatal("exp an err, got nil")
}
exp := []string{}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
})
}
}
func BenchmarkStringBatchDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
w int
}{
{1, 10},
{55, 10},
{550, 10},
{1000, 10},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
s := NewStringEncoder(bm.n)
for c := 0; c < bm.n; c++ {
s.Write(testutil.MakeSentence(bm.w))
}
s.Flush()
bytes, err := s.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
dst := make([]string, bm.n)
b.ReportAllocs()
b.SetBytes(int64(len(bytes)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
got, err := StringBatchDecodeAll(bytes, dst)
if err != nil {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), bm.n))
}
if len(got) != bm.n {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), bm.n))
}
}
})
}
}

View File

@ -0,0 +1,154 @@
package tsm1
import (
"encoding/binary"
"fmt"
"math"
"unsafe"
"github.com/influxdata/influxdb/pkg/encoding/simple8b"
)
var (
timeBatchDecoderFunc = [...]func(b []byte, dst []int64) ([]int64, error){
timeBatchDecodeAllUncompressed,
timeBatchDecodeAllSimple,
timeBatchDecodeAllRLE,
timeBatchDecodeAllInvalid,
}
)
func TimeBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
if len(b) == 0 {
return []int64{}, nil
}
encoding := b[0] >> 4
if encoding > timeCompressedRLE {
encoding = 3 // timeBatchDecodeAllInvalid
}
return timeBatchDecoderFunc[encoding&3](b, dst)
}
func timeBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b)&0x7 != 0 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: expected multiple of 8 bytes")
}
count := len(b) / 8
if cap(dst) < count {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
prev := uint64(0)
for i := range dst {
prev += binary.BigEndian.Uint64(b[i*8:])
dst[i] = int64(prev)
}
return dst, nil
}
func timeBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
if len(b) < 9 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: not enough data to decode packed timestamps")
}
div := uint64(math.Pow10(int(b[0] & 0xF))) // multiplier
count, err := simple8b.CountBytes(b[9:])
if err != nil {
return []int64{}, err
}
count += 1
if cap(dst) < count {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
buf := *(*[]uint64)(unsafe.Pointer(&dst))
// first value
buf[0] = binary.BigEndian.Uint64(b[1:9])
n, err := simple8b.DecodeBytesBigEndian(buf[1:], b[9:])
if err != nil {
return []int64{}, err
}
if n != count-1 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
}
// Compute the prefix sum and scale the deltas back up
last := buf[0]
if div > 1 {
for i := 1; i < len(buf); i++ {
dgap := buf[i] * div
buf[i] = last + dgap
last = buf[i]
}
} else {
for i := 1; i < len(buf); i++ {
buf[i] += last
last = buf[i]
}
}
return dst, nil
}
func timeBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
if len(b) < 9 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: not enough data to decode RLE starting value")
}
var k, n int
// Lower 4 bits hold the 10 based exponent so we can scale the values back up
mod := int64(math.Pow10(int(b[k] & 0xF)))
k++
// Next 8 bytes is the starting timestamp
first := binary.BigEndian.Uint64(b[k:])
k += 8
// Next 1-10 bytes is our (scaled down by factor of 10) run length delta
delta, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: invalid run length in decodeRLE")
}
k += n
// Scale the delta back up
delta *= uint64(mod)
// Last 1-10 bytes is how many times the value repeats
count, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("TimeDecoder: invalid repeat value in decodeRLE")
}
if cap(dst) < int(count) {
dst = make([]int64, count)
} else {
dst = dst[:count]
}
acc := first
for i := range dst {
dst[i] = int64(acc)
acc += delta
}
return dst, nil
}
func timeBatchDecodeAllInvalid(b []byte, _ []int64) ([]int64, error) {
return []int64{}, fmt.Errorf("unknown encoding %v", b[0]>>4)
}

View File

@ -0,0 +1,501 @@
package tsm1
import (
"fmt"
"math/rand"
"testing"
"testing/quick"
"time"
"github.com/google/go-cmp/cmp"
)
func TestTimeBatchDecodeAll_NoValues(t *testing.T) {
enc := NewTimeEncoder(0)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
exp := []int64{}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_One(t *testing.T) {
enc := NewTimeEncoder(1)
exp := []int64{0}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Two(t *testing.T) {
enc := NewTimeEncoder(2)
exp := []int64{0, 1}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Three(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{0, 1, 3}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Large_Range(t *testing.T) {
enc := NewTimeEncoder(2)
exp := []int64{1442369134000000000, 1442369135000000000}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Uncompressed(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
time.Unix(0, 0).UnixNano(),
time.Unix(1, 0).UnixNano(),
// about 36.5yrs in NS resolution is max range for compressed format
// This should cause the encoding to fallback to raw points
time.Unix(2, 2<<59).UnixNano(),
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("expected error: %v", err)
}
if exp := 25; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_RLE(t *testing.T) {
enc := NewTimeEncoder(512)
var exp []int64
for i := 0; i < 500; i++ {
exp = append(exp, int64(i))
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if exp := 12; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Reverse(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
int64(3),
int64(2),
int64(0),
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Negative(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
1432686216250034552, 3663244026151507025, -3068113732684750258,
-1949953187327444488, 3713374280993588804, 3226153669854871355,
-2093273755080502606, 1006087192578600616, -2272122301622271655,
2533238229511593671, -4450454445568858273, 2647789901083530435,
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_220SecondDelta(t *testing.T) {
enc := NewTimeEncoder(256)
var exp []int64
now := time.Now()
for i := 0; i < 220; i++ {
exp = append(exp, now.Add(time.Duration(i*60)*time.Second).UnixNano())
}
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Using RLE, should get 12 bytes
if exp := 12; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
// Write values to encoder.
enc := NewTimeEncoder(1024)
exp := make([]int64, len(values))
for i, v := range values {
exp[i] = int64(v)
enc.Write(exp[i])
}
// Retrieve encoded bytes from encoder.
buf, err := enc.Bytes()
if err != nil {
t.Fatal(err)
}
got, err := TimeBatchDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
return true
}, nil)
}
func TestTimeBatchDecodeAll_RLESeconds(t *testing.T) {
enc := NewTimeEncoder(6)
exp := make([]int64, 6)
exp[0] = int64(1444448158000000000)
exp[1] = int64(1444448168000000000)
exp[2] = int64(1444448178000000000)
exp[3] = int64(1444448188000000000)
exp[4] = int64(1444448198000000000)
exp[5] = int64(1444448208000000000)
for _, v := range exp {
enc.Write(v)
}
b, err := enc.Bytes()
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected values: -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestTimeBatchDecodeAll_Corrupt(t *testing.T) {
cases := []string{
"\x10\x14", // Packed: not enough data
"\x20\x00", // RLE: not enough data for starting timestamp
"\x2012345678\x90", // RLE: initial timestamp but invalid uvarint encoding
"\x2012345678\x7f", // RLE: timestamp, RLE but invalid repeat
"\x00123", // Raw: data length not multiple of 8
}
for _, c := range cases {
t.Run(fmt.Sprintf("%q", c), func(t *testing.T) {
got, err := TimeBatchDecodeAll([]byte(c), nil)
if err == nil {
t.Fatal("exp an err, got nil")
}
exp := []int64{}
if !cmp.Equal(got, exp) {
t.Fatalf("unexpected value: -got/+exp\n%s", cmp.Diff(got, exp))
}
})
}
}
func BenchmarkTimeBatchDecodeAllUncompressed(b *testing.B) {
benchmarks := []struct {
n int
}{
{5},
{55},
{555},
{1000},
}
values := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
1432686216250034552, 3663244026151507025, -3068113732684750258,
-1949953187327444488, 3713374280993588804, 3226153669854871355,
-2093273755080502606, 1006087192578600616, -2272122301622271655,
2533238229511593671, -4450454445568858273, 2647789901083530435,
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
enc := NewTimeEncoder(bm.n)
for i := 0; i < bm.n; i++ {
enc.Write(values[rand.Int()%len(values)])
}
bytes, _ := enc.Bytes()
dst := make([]int64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkTimeBatchDecodeAllPackedSimple(b *testing.B) {
benchmarks := []struct {
n int
}{
{5},
{55},
{555},
{1000},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d", bm.n), func(b *testing.B) {
rand.Seed(int64(bm.n * 1e3))
enc := NewTimeEncoder(bm.n)
for i := 0; i < bm.n; i++ {
// Small amount of randomness prevents RLE from being used
enc.Write(int64(i*1000) + int64(rand.Intn(10)))
}
bytes, _ := enc.Bytes()
dst := make([]int64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkTimeBatchDecodeAllRLE(b *testing.B) {
benchmarks := []struct {
n int
delta int64
}{
{5, 10},
{55, 10},
{555, 10},
{1000, 10},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("%d_delta_%d", bm.n, bm.delta), func(b *testing.B) {
enc := NewTimeEncoder(bm.n)
acc := int64(0)
for i := 0; i < bm.n; i++ {
enc.Write(acc)
acc += bm.delta
}
bytes, _ := enc.Bytes()
dst := make([]int64, bm.n)
b.SetBytes(int64(len(bytes)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
}
})
}
}