Merge pull request #10300 from influxdata/er-batch-encoders

Improve Compaction Performance
pull/10379/head
Edd Robinson 2018-10-16 14:34:26 +01:00 committed by GitHub
commit 0857934774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 4786 additions and 195 deletions

View File

@ -347,6 +347,31 @@ func Encode(src []uint64) (value uint64, n int, err error) {
}
}
const (
S8B_BIT_SIZE = 60
)
var (
numBits = [...][2]byte{
// { number of values, max bits per value }
{60, 1},
{30, 2},
{20, 3},
{15, 4},
{12, 5},
{10, 6},
{8, 7},
{7, 8},
{6, 10},
{5, 12},
{4, 15},
{3, 20},
{2, 30},
{1, 60},
}
ErrValueOutOfBounds = errors.New("value out of bounds")
)
// Encode returns a packed slice of the values from src. If a value is over
// 1 << 60, an error is returned. The input src is modified to avoid extra
// allocations. If you need to re-use, use a copy.
@ -357,64 +382,69 @@ func EncodeAll(src []uint64) ([]uint64, error) {
dst := src
j := 0
for {
if i >= len(src) {
break
}
NEXTVALUE:
for i < len(src) {
remaining := src[i:]
if canPack(remaining, 240, 0) {
dst[j] = 0
i += 240
} else if canPack(remaining, 120, 0) {
dst[j] = 1 << 60
i += 120
} else if canPack(remaining, 60, 1) {
dst[j] = pack60(src[i : i+60])
i += 60
} else if canPack(remaining, 30, 2) {
dst[j] = pack30(src[i : i+30])
i += 30
} else if canPack(remaining, 20, 3) {
dst[j] = pack20(src[i : i+20])
i += 20
} else if canPack(remaining, 15, 4) {
dst[j] = pack15(src[i : i+15])
i += 15
} else if canPack(remaining, 12, 5) {
dst[j] = pack12(src[i : i+12])
i += 12
} else if canPack(remaining, 10, 6) {
dst[j] = pack10(src[i : i+10])
i += 10
} else if canPack(remaining, 8, 7) {
dst[j] = pack8(src[i : i+8])
i += 8
} else if canPack(remaining, 7, 8) {
dst[j] = pack7(src[i : i+7])
i += 7
} else if canPack(remaining, 6, 10) {
dst[j] = pack6(src[i : i+6])
i += 6
} else if canPack(remaining, 5, 12) {
dst[j] = pack5(src[i : i+5])
i += 5
} else if canPack(remaining, 4, 15) {
dst[j] = pack4(src[i : i+4])
i += 4
} else if canPack(remaining, 3, 20) {
dst[j] = pack3(src[i : i+3])
i += 3
} else if canPack(remaining, 2, 30) {
dst[j] = pack2(src[i : i+2])
i += 2
} else if canPack(remaining, 1, 60) {
dst[j] = pack1(src[i : i+1])
i += 1
} else {
return nil, fmt.Errorf("value out of bounds")
// try to pack run of 240 or 120 1s
if len(remaining) >= 120 {
var a []uint64
if len(remaining) >= 240 {
a = remaining[:240]
} else {
a = remaining[:120]
}
k := 0
for k = range a {
if a[k] != 1 {
break
}
}
v := uint64(0)
switch {
case k >= 239:
i += 240
case k >= 119:
v = 1 << 60
i += 120
default:
goto CODES
}
dst[j] = v
j++
continue
}
j += 1
CODES:
for code := range numBits {
intN := int(numBits[code][0])
bitN := numBits[code][1]
if intN > len(remaining) {
continue
}
maxVal := uint64(1 << (bitN & 0x3f))
val := uint64(code+2) << S8B_BIT_SIZE
for k, inV := range remaining {
if k < intN {
if inV >= maxVal {
continue CODES
}
val |= inV << ((byte(k) * bitN) & 0x3f)
} else {
break
}
}
dst[j] = val
j += 1
i += intN
continue NEXTVALUE
}
return nil, ErrValueOutOfBounds
}
return dst[:j], nil
}
@ -433,10 +463,7 @@ func Decode(dst *[240]uint64, v uint64) (n int, err error) {
func DecodeAll(dst, src []uint64) (value int, err error) {
j := 0
for _, v := range src {
sel := v >> 60
if sel >= 16 {
return 0, fmt.Errorf("invalid selector value: %b", sel)
}
sel := (v >> 60) & 0xf
selector[sel].unpack(v, (*[240]uint64)(unsafe.Pointer(&dst[j])))
j += selector[sel].n
}

View File

@ -1,8 +1,10 @@
package simple8b_test
import (
"math/rand"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/pkg/encoding/simple8b"
)
@ -18,15 +20,135 @@ func Test_Encode_NoValues(t *testing.T) {
}
}
func Test_TooBig(t *testing.T) {
values := 1
in := make([]uint64, values)
for i := 0; i < values; i++ {
in[i] = 2<<61 - 1
func ones(n int) func() []uint64 {
return func() []uint64 {
in := make([]uint64, n)
for i := 0; i < n; i++ {
in[i] = 1
}
return in
}
_, err := simple8b.EncodeAll(in)
if err == nil {
t.Fatalf("expected error, got nil")
}
func bitsN(b int) func(n int) func() []uint64 {
return func(n int) func() []uint64 {
return bits(n, b)
}
}
func combineN(fns ...func(n int) func() []uint64) func(n int) func() []uint64 {
return func(n int) func() []uint64 {
var out []func() []uint64
for _, fn := range fns {
out = append(out, fn(n))
}
return combine(out...)
}
}
// bits generates sequence of n numbers with max bits,
// ensuring max bit is set for 50% of the values.
func bits(n, bits int) func() []uint64 {
return func() []uint64 {
out := make([]uint64, n)
maxVal := uint64(1 << uint8(bits))
for i := range out {
topBit := uint64((i & 1) << uint8(bits-1))
out[i] = uint64(rand.Int63n(int64(maxVal))) | topBit
if out[i] >= maxVal {
panic("max")
}
}
return out
}
}
func combine(fns ...func() []uint64) func() []uint64 {
return func() []uint64 {
var out []uint64
for _, fn := range fns {
out = append(out, fn()...)
}
return out
}
}
// TestEncodeAll ensures 100% test coverage of simple8b.EncodeAll and
// verifies all output by comparing the original input with the output of simple8b.DecodeAll
func TestEncodeAll(t *testing.T) {
rand.Seed(0)
tests := []struct {
name string
in []uint64
fn func() []uint64
err error
}{
{name: "no values", in: []uint64{}},
{name: "mixed sizes", in: []uint64{7, 6, 256, 4, 3, 2, 1}},
{name: "too big", in: []uint64{7, 6, 2<<61 - 1, 4, 3, 2, 1}, err: simple8b.ErrValueOutOfBounds},
{name: "1 bit", fn: bits(100, 1)},
{name: "2 bits", fn: bits(100, 2)},
{name: "3 bits", fn: bits(100, 3)},
{name: "4 bits", fn: bits(100, 4)},
{name: "5 bits", fn: bits(100, 5)},
{name: "6 bits", fn: bits(100, 6)},
{name: "7 bits", fn: bits(100, 7)},
{name: "8 bits", fn: bits(100, 8)},
{name: "10 bits", fn: bits(100, 10)},
{name: "12 bits", fn: bits(100, 12)},
{name: "15 bits", fn: bits(100, 15)},
{name: "20 bits", fn: bits(100, 20)},
{name: "30 bits", fn: bits(100, 30)},
{name: "60 bits", fn: bits(100, 60)},
{name: "combination", fn: combine(
bits(100, 1),
bits(100, 2),
bits(100, 3),
bits(100, 4),
bits(100, 5),
bits(100, 6),
bits(100, 7),
bits(100, 8),
bits(100, 10),
bits(100, 12),
bits(100, 15),
bits(100, 20),
bits(100, 30),
bits(100, 60),
)},
{name: "240 ones", fn: ones(240)},
{name: "120 ones", fn: func() []uint64 {
in := ones(240)()
in[120] = 5
return in
}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.fn != nil {
test.in = test.fn()
}
encoded, err := simple8b.EncodeAll(append(make([]uint64, 0, len(test.in)), test.in...))
if test.err != nil {
if err != test.err {
t.Fatalf("expected encode error, got\n%s", err)
}
return
}
decoded := make([]uint64, len(test.in))
n, err := simple8b.DecodeAll(decoded, encoded)
if err != nil {
t.Fatalf("unexpected decode error\n%s", err)
}
if !cmp.Equal(decoded[:n], test.in) {
t.Fatalf("unexpected values; +got/-exp\n%s", cmp.Diff(decoded, test.in))
}
})
}
}
@ -272,18 +394,65 @@ func TestCountBytesBetween_SkipMin(t *testing.T) {
}
}
func BenchmarkEncodeAll(b *testing.B) {
benchmarks := []struct {
name string
fn func(n int) func() []uint64
}{
{name: "1 bit", fn: bitsN(1)},
{name: "2 bits", fn: bitsN(2)},
{name: "3 bits", fn: bitsN(3)},
{name: "4 bits", fn: bitsN(4)},
{name: "5 bits", fn: bitsN(5)},
{name: "6 bits", fn: bitsN(6)},
{name: "7 bits", fn: bitsN(7)},
{name: "8 bits", fn: bitsN(8)},
{name: "10 bits", fn: bitsN(10)},
{name: "12 bits", fn: bitsN(12)},
{name: "15 bits", fn: bitsN(15)},
{name: "20 bits", fn: bitsN(20)},
{name: "30 bits", fn: bitsN(30)},
{name: "60 bits", fn: bitsN(60)},
{name: "combination", fn: combineN(
bitsN(1),
bitsN(2),
bitsN(3),
bitsN(4),
bitsN(5),
bitsN(6),
bitsN(7),
bitsN(8),
bitsN(10),
bitsN(12),
bitsN(15),
bitsN(20),
bitsN(30),
bitsN(60),
)},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
in := bm.fn(1000)()
simple8b.EncodeAll(append(make([]uint64, 0, len(in)), in...))
}
})
}
}
func BenchmarkEncode(b *testing.B) {
total := 0
x := make([]uint64, 1024)
for i := 0; i < len(x); i++ {
x[i] = uint64(15)
}
in := make([]uint64, 1024)
b.SetBytes(int64(len(x) * 8))
b.ResetTimer()
for i := 0; i < b.N; i++ {
simple8b.EncodeAll(x)
b.SetBytes(int64(len(x) * 8))
total += len(x)
copy(in, x)
simple8b.EncodeAll(in)
}
}
@ -312,11 +481,11 @@ func BenchmarkDecode(b *testing.B) {
decoded := make([]uint64, len(x))
b.SetBytes(int64(len(decoded) * 8))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = simple8b.DecodeAll(decoded, y)
b.SetBytes(int64(len(decoded) * 8))
total += len(decoded)
}
}

View File

@ -1,7 +0,0 @@
/*
Package tsdb implements a durable time series database.
*/
package tsdb
//go:generate tmpl -data=@arrayvalues.gen.go.tmpldata arrayvalues.gen.go.tmpl

View File

@ -19,11 +19,11 @@ func DecodeBooleanArrayBlock(block []byte, a *tsdb.BooleanArray) error {
return err
}
a.Timestamps, err = TimeBatchDecodeAll(tb, a.Timestamps)
a.Timestamps, err = TimeArrayDecodeAll(tb, a.Timestamps)
if err != nil {
return err
}
a.Values, err = BooleanBatchDecodeAll(vb, a.Values)
a.Values, err = BooleanArrayDecodeAll(vb, a.Values)
return err
}
@ -40,11 +40,11 @@ func DecodeFloatArrayBlock(block []byte, a *tsdb.FloatArray) error {
return err
}
a.Timestamps, err = TimeBatchDecodeAll(tb, a.Timestamps)
a.Timestamps, err = TimeArrayDecodeAll(tb, a.Timestamps)
if err != nil {
return err
}
a.Values, err = FloatBatchDecodeAll(vb, a.Values)
a.Values, err = FloatArrayDecodeAll(vb, a.Values)
return err
}
@ -61,11 +61,11 @@ func DecodeIntegerArrayBlock(block []byte, a *tsdb.IntegerArray) error {
return err
}
a.Timestamps, err = TimeBatchDecodeAll(tb, a.Timestamps)
a.Timestamps, err = TimeArrayDecodeAll(tb, a.Timestamps)
if err != nil {
return err
}
a.Values, err = IntegerBatchDecodeAll(vb, a.Values)
a.Values, err = IntegerArrayDecodeAll(vb, a.Values)
return err
}
@ -82,11 +82,11 @@ func DecodeUnsignedArrayBlock(block []byte, a *tsdb.UnsignedArray) error {
return err
}
a.Timestamps, err = TimeBatchDecodeAll(tb, a.Timestamps)
a.Timestamps, err = TimeArrayDecodeAll(tb, a.Timestamps)
if err != nil {
return err
}
a.Values, err = UnsignedBatchDecodeAll(vb, a.Values)
a.Values, err = UnsignedArrayDecodeAll(vb, a.Values)
return err
}
@ -103,10 +103,10 @@ func DecodeStringArrayBlock(block []byte, a *tsdb.StringArray) error {
return err
}
a.Timestamps, err = TimeBatchDecodeAll(tb, a.Timestamps)
a.Timestamps, err = TimeArrayDecodeAll(tb, a.Timestamps)
if err != nil {
return err
}
a.Values, err = StringBatchDecodeAll(vb, a.Values)
a.Values, err = StringArrayDecodeAll(vb, a.Values)
return err
}

View File

@ -5,7 +5,41 @@ import (
"fmt"
)
func BooleanBatchDecodeAll(b []byte, dst []bool) ([]bool, error) {
// BooleanArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
func BooleanArrayEncodeAll(src []bool, b []byte) ([]byte, error) {
sz := 1 + 8 + ((len(src) + 7) / 8) // Header + Num bools + bool data.
if len(b) < sz && cap(b) > sz {
b = b[:sz]
} else if len(b) < sz {
b = append(b, make([]byte, sz)...)
}
// Store the encoding type in the 4 high bits of the first byte
b[0] = byte(booleanCompressedBitPacked) << 4
n := uint64(8) // Current bit in current byte.
// Encode the number of booleans written.
i := binary.PutUvarint(b[n>>3:], uint64(len(src)))
n += uint64(i * 8)
for _, v := range src {
if v {
b[n>>3] |= 128 >> (n & 7) // Set current bit on current byte.
} else {
b[n>>3] &^= 128 >> (n & 7) // Clear current bit on current byte.
}
n++
}
length := n >> 3
if n&7 > 0 {
length++ // Add an extra byte to capture overflowing bits.
}
return b[:length], nil
}
func BooleanArrayDecodeAll(b []byte, dst []bool) ([]bool, error) {
if len(b) == 0 {
return nil, nil
}

View File

@ -1,15 +1,156 @@
package tsm1_test
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func Test_BooleanBatchDecodeAll_Single(t *testing.T) {
func TestBooleanArrayEncodeAll_NoValues(t *testing.T) {
b, err := tsm1.BooleanArrayEncodeAll(nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec tsm1.BooleanDecoder
dec.SetBytes(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestBooleanArrayEncodeAll_Single(t *testing.T) {
src := []bool{true}
b, err := tsm1.BooleanArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec tsm1.BooleanDecoder
dec.SetBytes(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if src[0] != dec.Read() {
t.Fatalf("unexpected value: got %v, exp %v", dec.Read(), src[0])
}
}
func TestBooleanArrayEncodeAll_Compare(t *testing.T) {
// generate random values
input := make([]bool, 1000)
for i := 0; i < len(input); i++ {
input[i] = rand.Int63n(2) == 1
}
s := tsm1.NewBooleanEncoder(1000)
for _, v := range input {
s.Write(v)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
buf2 := append([]byte("this is some jibberish"), make([]byte, 100, 200)...)
buf2, err = tsm1.BooleanArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
result, err := tsm1.BooleanArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := result, input; !reflect.DeepEqual(got, exp) {
dumpBufs(buf1, buf2)
t.Fatalf("got result %v, expected %v", got, exp)
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func TestBooleanArrayEncodeAll_Multi_Compressed(t *testing.T) {
src := make([]bool, 10)
for i := range src {
src[i] = i%2 == 0
}
b, err := tsm1.BooleanArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if exp := 4; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
var dec tsm1.BooleanDecoder
dec.SetBytes(b)
for i, v := range src {
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v != dec.Read() {
t.Fatalf("unexpected value at pos %d: got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestBooleanArrayEncodeAll_Quick(t *testing.T) {
if err := quick.Check(func(values []bool) bool {
src := values
if values == nil {
src = []bool{}
}
// Retrieve compressed bytes.
buf, err := tsm1.BooleanArrayEncodeAll(src, nil)
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got := make([]bool, 0, len(values))
var dec tsm1.BooleanDecoder
dec.SetBytes(buf)
for dec.Next() {
got = append(got, dec.Read())
}
// Verify that input and output values match.
if !reflect.DeepEqual(src, got) {
t.Fatalf("mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", src, got)
}
return true
}, nil); err != nil {
t.Fatal(err)
}
}
func Test_BooleanArrayDecodeAll_Single(t *testing.T) {
enc := tsm1.NewBooleanEncoder(1)
exp := true
enc.Write(exp)
@ -18,7 +159,7 @@ func Test_BooleanBatchDecodeAll_Single(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, _ := tsm1.BooleanBatchDecodeAll(b, nil)
got, _ := tsm1.BooleanArrayDecodeAll(b, nil)
if len(got) != 1 {
t.Fatalf("expected 1 value")
}
@ -27,7 +168,7 @@ func Test_BooleanBatchDecodeAll_Single(t *testing.T) {
}
}
func Test_BooleanBatchDecodeAll_Multi_Compressed(t *testing.T) {
func Test_BooleanArrayDecodeAll_Multi_Compressed(t *testing.T) {
cases := []struct {
n int
p float64 // probability of a true value
@ -53,7 +194,7 @@ func Test_BooleanBatchDecodeAll_Multi_Compressed(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, err := tsm1.BooleanBatchDecodeAll(b, nil)
got, err := tsm1.BooleanArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error %q", err.Error())
}
@ -78,7 +219,7 @@ func Test_BooleanBatchDecoder_Corrupt(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dst, _ := tsm1.BooleanBatchDecodeAll([]byte(c.d), nil)
dst, _ := tsm1.BooleanArrayDecodeAll([]byte(c.d), nil)
if len(dst) != 0 {
t.Fatalf("unexpected result -got/+want\n%s", cmp.Diff(dst, nil))
}
@ -86,7 +227,49 @@ func Test_BooleanBatchDecoder_Corrupt(t *testing.T) {
}
}
func BenchmarkBooleanBatchDecodeAll(b *testing.B) {
func BenchmarkEncodeBooleans(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
for _, n := range cases {
enc := tsm1.NewBooleanEncoder(n)
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
input := make([]bool, n)
for i := 0; i < n; i++ {
input[i] = rand.Int63n(2) == 1
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
enc.Reset()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.BooleanArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
}
}
func BenchmarkBooleanArrayDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
}{
@ -112,7 +295,7 @@ func BenchmarkBooleanBatchDecodeAll(b *testing.B) {
dst := make([]bool, size)
for i := 0; i < b.N; i++ {
res, _ := tsm1.BooleanBatchDecodeAll(bytes, dst)
res, _ := tsm1.BooleanArrayDecodeAll(bytes, dst)
if len(res) != size {
b.Fatalf("expected to read %d booleans, but read %d", size, len(res))
}

View File

@ -2,11 +2,257 @@ package tsm1
import (
"encoding/binary"
"fmt"
"io"
"math"
"math/bits"
)
func FloatBatchDecodeAll(b []byte, dst []float64) ([]float64, error) {
// FloatArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// Currently only the float compression scheme used in Facebook's Gorilla is
// supported, so this method implements a batch oriented version of that.
func FloatArrayEncodeAll(src []float64, b []byte) ([]byte, error) {
if cap(b) < 9 {
b = make([]byte, 0, 9) // Enough room for the header and one value.
}
b = b[:1]
b[0] = floatCompressedGorilla << 4
var first float64
var finished bool
if len(src) > 0 && math.IsNaN(src[0]) {
return nil, fmt.Errorf("unsupported value: NaN")
} else if len(src) == 0 {
first = math.NaN() // Write sentinal value to terminate batch.
finished = true
} else {
first = src[0]
src = src[1:]
}
b = b[:9]
n := uint64(8 + 64) // Number of bits written.
prev := math.Float64bits(first)
// Write first value.
binary.BigEndian.PutUint64(b[1:], prev)
prevLeading, prevTrailing := ^uint64(0), uint64(0)
var leading, trailing uint64
var mask uint64
var sum float64
// Encode remaining values.
for i := 0; !finished; i++ {
var x float64
if i < len(src) {
x = src[i]
sum += x
} else {
// Encode sentinal value to terminate batch
x = math.NaN()
finished = true
}
{
cur := math.Float64bits(x)
vDelta := cur ^ prev
if vDelta == 0 {
n++ // Write a zero bit. Nothing else to do.
prev = cur
continue
}
// First the current bit of the current byte is set to indicate we're
// writing a delta value to the stream.
for n>>3 >= uint64(len(b)) { // Keep growing b until we can fit all bits in.
b = append(b, byte(0))
}
// n&7 - current bit in current byte.
// n>>3 - the current byte.
b[n>>3] |= 128 >> (n & 7) // Sets the current bit of the current byte.
n++
// Write the delta to b.
// Determine the leading and trailing zeros.
leading = uint64(bits.LeadingZeros64(vDelta))
trailing = uint64(bits.TrailingZeros64(vDelta))
// Clamp number of leading zeros to avoid overflow when encoding
leading &= 0x1F
if leading >= 32 {
leading = 31
}
// At least 2 further bits will be required.
if (n+2)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
if prevLeading != ^uint64(0) && leading >= prevLeading && trailing >= prevTrailing {
n++ // Write a zero bit.
// Write the l least significant bits of vDelta to b, most significant
// bit first.
l := uint64(64 - prevLeading - prevTrailing)
for (n+l)>>3 >= uint64(len(b)) { // Keep growing b until we can fit all bits in.
b = append(b, byte(0))
}
// Full value to write.
v := (vDelta >> prevTrailing) << (64 - l) // l least signifciant bits of v.
var m = n & 7 // Current bit in current byte.
var written uint64
if m > 0 { // In this case the current byte is not full.
written = 8 - m
if l < written {
written = l
}
mask = v >> 56 // Move 8 MSB to 8 LSB
b[n>>3] |= byte(mask >> m)
n += written
if l-written == 0 {
prev = cur
continue
}
}
vv := v << written // Move written bits out of the way.
// TODO(edd): Optimise this. It's unlikely we actually have 8 bytes to write.
if (n>>3)+8 >= uint64(len(b)) {
b = append(b, 0, 0, 0, 0, 0, 0, 0, 0)
}
binary.BigEndian.PutUint64(b[n>>3:], vv)
n += (l - written)
} else {
prevLeading, prevTrailing = leading, trailing
// Set a single bit to indicate a value will follow.
b[n>>3] |= 128 >> (n & 7) // Set current bit on current byte
n++
// Write 5 bits of leading.
if (n+5)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
// Enough room to write the 5 bits in the current byte?
var m = n & 7
l := uint64(5)
v := leading << 59 // 5 LSB of leading.
mask = v >> 56 // Move 5 MSB to 8 LSB
if m <= 3 { // 5 bits fit into current byte.
b[n>>3] |= byte(mask >> m)
n += l
} else { // In this case there are fewer than 5 bits available in current byte.
// First step is to fill current byte
written := 8 - m
b[n>>3] |= byte(mask >> m) // Some of mask will get lost.
n += written
// Second step is to write the lost part of mask into the next byte.
mask = v << written // Move written bits in previous byte out of way.
mask >>= 56
m = n & 7 // Recompute current bit.
b[n>>3] |= byte(mask >> m)
n += (l - written)
}
// Note that if leading == trailing == 0, then sigbits == 64. But that
// value doesn't actually fit into the 6 bits we have.
// Luckily, we never need to encode 0 significant bits, since that would
// put us in the other case (vdelta == 0). So instead we write out a 0 and
// adjust it back to 64 on unpacking.
sigbits := 64 - leading - trailing
if (n+6)>>3 >= uint64(len(b)) {
b = append(b, byte(0))
}
m = n & 7
l = uint64(6)
v = sigbits << 58 // Move 6 LSB of sigbits to MSB
mask = v >> 56 // Move 6 MSB to 8 LSB
if m <= 2 {
// The 6 bits fit into the current byte.
b[n>>3] |= byte(mask >> m)
n += l
} else { // In this case there are fewer than 6 bits available in current byte.
// First step is to fill the current byte.
written := 8 - m
b[n>>3] |= byte(mask >> m) // Write to the current bit.
n += written
// Second step is to write the lost part of mask into the next byte.
// Write l remaining bits into current byte.
mask = v << written // Remove bits written in previous byte out of way.
mask >>= 56
m = n & 7 // Recompute current bit.
b[n>>3] |= byte(mask >> m)
n += l - written
}
// Write final value.
m = n & 7
l = sigbits
v = (vDelta >> trailing) << (64 - l) // Move l LSB into MSB
for (n+l)>>3 >= uint64(len(b)) { // Keep growing b until we can fit all bits in.
b = append(b, byte(0))
}
var written uint64
if m > 0 { // In this case the current byte is not full.
written = 8 - m
if l < written {
written = l
}
mask = v >> 56 // Move 8 MSB to 8 LSB
b[n>>3] |= byte(mask >> m)
n += written
if l-written == 0 {
prev = cur
continue
}
}
// Shift remaining bits and write out in one go.
vv := v << written // Remove bits written in previous byte.
// TODO(edd): Optimise this.
if (n>>3)+8 >= uint64(len(b)) {
b = append(b, 0, 0, 0, 0, 0, 0, 0, 0)
}
binary.BigEndian.PutUint64(b[n>>3:], vv)
n += (l - written)
}
prev = cur
}
}
if math.IsNaN(sum) {
return nil, fmt.Errorf("unsupported value: NaN")
}
length := n >> 3
if n&7 > 0 {
length++ // Add an extra byte to capture overflowing bits.
}
return b[:length], nil
}
func FloatArrayDecodeAll(b []byte, dst []float64) ([]float64, error) {
if len(b) == 0 {
return []float64{}, nil
}

View File

@ -15,7 +15,196 @@ import (
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestFloatBatchDecodeAll_Simple(t *testing.T) {
var fullBlockFloat64Ones []float64
func init() {
for i := 0; i < 1000; i++ {
fullBlockFloat64Ones = append(fullBlockFloat64Ones, 1.0)
}
}
func TestFloatArrayEncodeAll(t *testing.T) {
examples := [][]float64{
{12, 12, 24, 13, 24, 24, 24, 24}, // From example paper.
{-3.8970913068231994e+307, -9.036931257783943e+307, 1.7173073833490201e+308,
-9.312369166661538e+307, -2.2435523083555231e+307, 1.4779121287289644e+307,
1.771273431601434e+308, 8.140360378221364e+307, 4.783405048208089e+307,
-2.8044680049605344e+307, 4.412915337205696e+307, -1.2779380602005046e+308,
1.6235802318921885e+308, -1.3402901846299688e+307, 1.6961015582104055e+308,
-1.067980796435633e+308, -3.02868987458268e+307, 1.7641793640790284e+308,
1.6587191845856813e+307, -1.786073304985983e+308, 1.0694549382051123e+308,
3.5635180996210295e+307}, // Failed during early development
{6.00065e+06, 6.000656e+06, 6.000657e+06, 6.000659e+06, 6.000661e+06}, // Similar values.
twoHoursData,
fullBlockFloat64Ones,
{},
}
for _, example := range examples {
src := example
var buf []byte
buf, err := tsm1.FloatArrayEncodeAll(src, buf)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
result, err := tsm1.FloatArrayDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := result, src; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
}
}
func TestFloatArrayEncode_Compare(t *testing.T) {
// generate random values
input := make([]float64, 1000)
for i := 0; i < len(input); i++ {
input[i] = (rand.Float64() * math.MaxFloat64) - math.MaxFloat32
}
s := tsm1.NewFloatEncoder()
for _, v := range input {
s.Write(v)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var buf2 []byte
buf2, err = tsm1.FloatArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
result, err := tsm1.FloatArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := result, input; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func dumpBufs(a, b []byte) {
longest := len(a)
if len(b) > longest {
longest = len(b)
}
for i := 0; i < longest; i++ {
var as, bs string
if i < len(a) {
as = fmt.Sprintf("%08b", a[i])
}
if i < len(b) {
bs = fmt.Sprintf("%08b", b[i])
}
same := as == bs
fmt.Printf("%d (%d) %s - %s :: %v\n", i, i*8, as, bs, same)
}
fmt.Println()
}
func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%d %08b\n", i, v)
}
fmt.Println()
}
func TestFloatArrayEncodeAll_NaN(t *testing.T) {
examples := [][]float64{
{1.0, math.NaN(), 2.0},
{1.22, math.NaN()},
{math.NaN(), math.NaN()},
{math.NaN()},
}
for _, example := range examples {
var buf []byte
_, err := tsm1.FloatArrayEncodeAll(example, buf)
if err == nil {
t.Fatalf("expected error. got nil")
}
}
}
func Test_FloatArrayEncodeAll_Quick(t *testing.T) {
quick.Check(func(values []float64) bool {
src := values
if src == nil {
src = []float64{}
}
for i, v := range src {
if math.IsNaN(v) {
src[i] = 1.0 // Remove invalid values
}
}
s := tsm1.NewFloatEncoder()
for _, p := range src {
s.Write(p)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var buf2 []byte
buf2, err = tsm1.FloatArrayEncodeAll(src, buf2)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
result, err := tsm1.FloatArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
fmt.Println(src)
t.Fatalf("unexpected error: %v", err)
}
if got, exp := result, src[:len(src)]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
return true
}, nil)
}
func TestDecodeFloatArrayAll_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
b, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var got []float64
if _, err := tsm1.FloatArrayDecodeAll(b, got); err != nil {
t.Fatal(err)
}
}
func TestFloatArrayDecodeAll_Simple(t *testing.T) {
// Example from the paper
s := tsm1.NewFloatEncoder()
@ -47,7 +236,7 @@ func TestFloatBatchDecodeAll_Simple(t *testing.T) {
}
buf := make([]float64, 8)
got, err := tsm1.FloatBatchDecodeAll(b, buf)
got, err := tsm1.FloatArrayDecodeAll(b, buf)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -57,7 +246,7 @@ func TestFloatBatchDecodeAll_Simple(t *testing.T) {
}
}
func TestFloatBatchDecodeAll_Empty(t *testing.T) {
func TestFloatArrayDecodeAll_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
@ -67,7 +256,7 @@ func TestFloatBatchDecodeAll_Empty(t *testing.T) {
}
buf := make([]float64, 8)
got, err := tsm1.FloatBatchDecodeAll(b, buf)
got, err := tsm1.FloatArrayDecodeAll(b, buf)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -245,7 +434,83 @@ func TestBatchBitReader_Quick(t *testing.T) {
}
}
func BenchmarkFloatBatchDecodeAll(b *testing.B) {
var bufResult []byte
func BenchmarkEncodeFloats(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
enc := tsm1.NewFloatEncoder()
for _, n := range cases {
b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) {
input := make([]float64, n)
for i := 0; i < n; i++ {
input[i] = float64(i)
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
enc.Reset()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
input := make([]float64, n)
for i := 0; i < n; i++ {
input[i] = rand.Float64() * 100.0
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = tsm1.FloatArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
}
}
func BenchmarkFloatArrayDecodeAll(b *testing.B) {
benchmarks := []int{
1,
55,
@ -270,7 +535,7 @@ func BenchmarkFloatBatchDecodeAll(b *testing.B) {
dst := make([]float64, size)
for i := 0; i < b.N; i++ {
got, err := tsm1.FloatBatchDecodeAll(bytes, dst)
got, err := tsm1.FloatArrayDecodeAll(bytes, dst)
if err != nil {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), size))
}

View File

@ -8,6 +8,126 @@ import (
"github.com/influxdata/influxdb/pkg/encoding/simple8b"
)
// IntegerArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// IntegerArrayEncodeAll implements batch oriented versions of the three integer
// encoding types we support: uncompressed, simple8b and RLE.
//
// Important: IntegerArrayEncodeAll modifies the contents of src by using it as
// scratch space for delta encoded values. It is NOT SAFE to use src after
// passing it into IntegerArrayEncodeAll.
func IntegerArrayEncodeAll(src []int64, b []byte) ([]byte, error) {
if len(src) == 0 {
return nil, nil // Nothing to do
}
var max = uint64(0)
// To prevent an allocation of the entire block we're encoding reuse the
// src slice to store the encoded deltas.
deltas := reintepretInt64ToUint64Slice(src)
for i := len(deltas) - 1; i > 0; i-- {
deltas[i] = deltas[i] - deltas[i-1]
deltas[i] = ZigZagEncode(int64(deltas[i]))
if deltas[i] > max {
max = deltas[i]
}
}
deltas[0] = ZigZagEncode(int64(deltas[0]))
if len(deltas) > 2 {
var rle = true
for i := 2; i < len(deltas); i++ {
if deltas[1] != deltas[i] {
rle = false
break
}
}
if rle {
// Large varints can take up to 10 bytes. We're storing 3 + 1
// type byte.
if len(b) < 31 && cap(b) >= 31 {
b = b[:31]
} else if len(b) < 31 {
b = append(b, make([]byte, 31-len(b))...)
}
// 4 high bits used for the encoding type
b[0] = byte(intCompressedRLE) << 4
i := 1
// The first value
binary.BigEndian.PutUint64(b[i:], deltas[0])
i += 8
// The first delta
i += binary.PutUvarint(b[i:], deltas[1])
// The number of times the delta is repeated
i += binary.PutUvarint(b[i:], uint64(len(deltas)-1))
return b[:i], nil
}
}
if max > simple8b.MaxValue { // There is an encoded value that's too big to simple8b encode.
// Encode uncompressed.
sz := 1 + len(deltas)*8
if len(b) < sz && cap(b) >= sz {
b = b[:sz]
} else if len(b) < sz {
b = append(b, make([]byte, sz-len(b))...)
}
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(intUncompressed) << 4
for i, v := range deltas {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
}
return b[:sz], nil
}
// Encode with simple8b - fist value is written unencoded using 8 bytes.
encoded, err := simple8b.EncodeAll(deltas[1:])
if err != nil {
return nil, err
}
sz := 1 + (len(encoded)+1)*8
if len(b) < sz && cap(b) >= sz {
b = b[:sz]
} else if len(b) < sz {
b = append(b, make([]byte, sz-len(b))...)
}
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(intCompressedSimple) << 4
// Write the first value since it's not part of the encoded values
binary.BigEndian.PutUint64(b[1:9], deltas[0])
// Write the encoded values
for i, v := range encoded {
binary.BigEndian.PutUint64(b[9+i*8:9+i*8+8], v)
}
return b, nil
}
// UnsignedArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// UnsignedArrayEncodeAll implements batch oriented versions of the three integer
// encoding types we support: uncompressed, simple8b and RLE.
//
// Important: IntegerArrayEncodeAll modifies the contents of src by using it as
// scratch space for delta encoded values. It is NOT SAFE to use src after
// passing it into IntegerArrayEncodeAll.
func UnsignedArrayEncodeAll(src []uint64, b []byte) ([]byte, error) {
srcint := reintepretUint64ToInt64Slice(src)
return IntegerArrayEncodeAll(srcint, b)
}
var (
integerBatchDecoderFunc = [...]func(b []byte, dst []int64) ([]int64, error){
integerBatchDecodeAllUncompressed,
@ -17,7 +137,7 @@ var (
}
)
func IntegerBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
func IntegerArrayDecodeAll(b []byte, dst []int64) ([]int64, error) {
if len(b) == 0 {
return []int64{}, nil
}
@ -30,7 +150,7 @@ func IntegerBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
return integerBatchDecoderFunc[encoding&3](b, dst)
}
func UnsignedBatchDecodeAll(b []byte, dst []uint64) ([]uint64, error) {
func UnsignedArrayDecodeAll(b []byte, dst []uint64) ([]uint64, error) {
if len(b) == 0 {
return []uint64{}, nil
}
@ -47,7 +167,7 @@ func UnsignedBatchDecodeAll(b []byte, dst []uint64) ([]uint64, error) {
func integerBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b)&0x7 != 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: expected multiple of 8 bytes")
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: expected multiple of 8 bytes")
}
count := len(b) / 8
@ -69,7 +189,7 @@ func integerBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
func integerBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b) < 8 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: not enough data to decode packed value")
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: not enough data to decode packed value")
}
count, err := simple8b.CountBytes(b[8:])
@ -94,7 +214,7 @@ func integerBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
return []int64{}, err
}
if n != count-1 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
}
// calculate prefix sum
@ -110,7 +230,7 @@ func integerBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
func integerBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b) < 8 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: not enough data to decode RLE starting value")
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: not enough data to decode RLE starting value")
}
var k, n int
@ -122,7 +242,7 @@ func integerBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
// Next 1-10 bytes is the delta value
value, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: invalid RLE delta value")
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: invalid RLE delta value")
}
k += n
@ -131,7 +251,7 @@ func integerBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
// Last 1-10 bytes is how many times the value repeats
count, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("IntegerBatchDecodeAll: invalid RLE repeat value")
return []int64{}, fmt.Errorf("IntegerArrayDecodeAll: invalid RLE repeat value")
}
count += 1

View File

@ -1,16 +1,686 @@
package tsm1
import (
"bytes"
"fmt"
"math"
"math/rand"
"reflect"
"sort"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
)
func TestIntegerBatchDecodeAll_NegativeUncompressed(t *testing.T) {
func dumpBufs(a, b []byte) {
longest := len(a)
if len(b) > longest {
longest = len(b)
}
for i := 0; i < longest; i++ {
var as, bs string
if i < len(a) {
as = fmt.Sprintf("%08[1]b (%[1]d)", a[i])
}
if i < len(b) {
bs = fmt.Sprintf("%08[1]b (%[1]d)", b[i])
}
same := as == bs
fmt.Printf("%d (%d) %s - %s :: %v\n", i, i*8, as, bs, same)
}
fmt.Println()
}
func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%[1]d %08[2]b (%[2]d)\n", i, v)
}
fmt.Println()
}
func TestIntegerArrayEncodeAll_NoValues(t *testing.T) {
b, err := IntegerArrayEncodeAll(nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(b) > 0 {
t.Fatalf("unexpected lenght: exp 0, got %v", len(b))
}
var dec IntegerDecoder
dec.SetBytes(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestIntegerArrayEncodeAll_Compare(t *testing.T) {
// generate random values (should use simple8b)
input := make([]int64, 1000)
for i := 0; i < len(input); i++ {
input[i] = rand.Int63n(100000) - 50000
}
sort.Slice(input, func(i int, j int) bool { return input[i] < input[j] })
testIntegerArrayEncodeAll_Compare(t, input, intCompressedSimple)
// Generate same values (should use RLE)
for i := 0; i < len(input); i++ {
input[i] = 1232342341234
}
testIntegerArrayEncodeAll_Compare(t, input, intCompressedRLE)
// Generate large random values that are not sorted. The deltas will be large
// and the values should be stored uncompressed.
for i := 0; i < len(input); i++ {
input[i] = int64(rand.Uint64())
}
testIntegerArrayEncodeAll_Compare(t, input, intUncompressed)
}
func testIntegerArrayEncodeAll_Compare(t *testing.T, input []int64, encoding byte) {
exp := make([]int64, len(input))
copy(exp, input)
s := NewIntegerEncoder(1000)
for _, v := range input {
s.Write(v)
}
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := buf1[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
var buf2 []byte
buf2, err = IntegerArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := buf2[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
result, err := IntegerArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got := result; !reflect.DeepEqual(got, exp) {
t.Fatalf("-got/+exp\n%s", cmp.Diff(got, exp))
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func TestUnsignedArrayEncodeAll_Compare(t *testing.T) {
// generate random values (should use simple8b)
input := make([]uint64, 1000)
for i := 0; i < len(input); i++ {
input[i] = uint64(rand.Int63n(100000))
}
sort.Slice(input, func(i int, j int) bool { return input[i] < input[j] })
testUnsignedArrayEncodeAll_Compare(t, input, intCompressedSimple)
// Generate same values (should use RLE)
for i := 0; i < len(input); i++ {
input[i] = 1232342341234
}
testUnsignedArrayEncodeAll_Compare(t, input, intCompressedRLE)
// Generate large random values that are not sorted. The deltas will be large
// and the values should be stored uncompressed.
for i := 0; i < len(input); i++ {
input[i] = rand.Uint64()
}
testUnsignedArrayEncodeAll_Compare(t, input, intUncompressed)
}
func testUnsignedArrayEncodeAll_Compare(t *testing.T, input []uint64, encoding byte) {
exp := make([]uint64, len(input))
copy(exp, input)
s := NewIntegerEncoder(1000)
for _, v := range input {
s.Write(int64(v))
}
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := buf1[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
var buf2 []byte
buf2, err = UnsignedArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := buf2[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
result, err := UnsignedArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got := result; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func TestIntegerArrayEncodeAll_One(t *testing.T) {
v1 := int64(1)
src := []int64{1}
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intCompressedSimple != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
}
func TestIntegerArrayEncodeAll_Two(t *testing.T) {
var v1, v2 int64 = 1, 2
src := []int64{v1, v2}
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intCompressedSimple != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
}
func TestIntegerArrayEncodeAll_Negative(t *testing.T) {
var v1, v2, v3 int64 = -2, 0, 1
src := []int64{v1, v2, v3}
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intCompressedSimple != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
}
}
func TestIntegerArrayEncodeAll_Large_Range(t *testing.T) {
exp := []int64{math.MaxInt64, 0, math.MaxInt64}
b, err := IntegerArrayEncodeAll(append([]int64{}, exp...), nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intUncompressed != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
var got []int64
for dec.Next() {
got = append(got, dec.Read())
}
if !cmp.Equal(got, exp) {
t.Fatalf("unxpected result, -got/+exp\n%s", cmp.Diff(got, exp))
}
}
func TestIntegerArrayEncodeAll_Uncompressed(t *testing.T) {
var v1, v2, v3 int64 = 0, 1, 1 << 60
src := []int64{v1, v2, v3}
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("expected error: %v", err)
}
// 1 byte header + 3 * 8 byte values
if exp := 25; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; intUncompressed != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v1 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v2 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if v3 != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
}
}
func TestIntegerArrayEncodeAll_NegativeUncompressed(t *testing.T) {
src := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
1432686216250034552, 3663244026151507025, -3068113732684750258,
-1949953187327444488, 3713374280993588804, 3226153669854871355,
-2093273755080502606, 1006087192578600616, -2272122301622271655,
2533238229511593671, -4450454445568858273, 2647789901083530435,
2761419461769776844, -1324397441074946198, -680758138988210958,
94468846694902125, -2394093124890745254, -2682139311758778198,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("expected error: %v", err)
}
if got := b[0] >> 4; intUncompressed != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(src) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_AllNegative(t *testing.T) {
src := []int64{
-10, -5, -1,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; intCompressedSimple != got {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_CounterPacked(t *testing.T) {
src := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedSimple {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte + 2, 8 byte words if delta-encoding is used based on
// values sizes. Without delta-encoding, we'd get 49 bytes.
if exp := 17; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_CounterRLE(t *testing.T) {
src := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected RLE, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 11; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_Descending(t *testing.T) {
src := []int64{
7094, 4472, 1850,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 12; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_Flat(t *testing.T) {
src := []int64{
1, 1, 1, 1,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedRLE {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
// Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for
// count of deltas in this particular RLE.
if exp := 11; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_MinMax(t *testing.T) {
src := []int64{
math.MinInt64, math.MaxInt64,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != intCompressedSimple {
t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4)
}
if exp := 17; len(b) != exp {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
var dec IntegerDecoder
dec.SetBytes(b)
i := 0
for dec.Next() {
if i > len(exp) {
t.Fatalf("read too many values: got %v, exp %v", i, len(exp))
}
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i += 1
}
if i != len(exp) {
t.Fatalf("failed to read enough values: got %v, exp %v", i, len(exp))
}
}
func TestIntegerArrayEncodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
src := values
if values == nil {
src = []int64{} // is this really expected?
}
// Copy over values to compare result—src is modified...
exp := make([]int64, 0, len(src))
for _, v := range src {
exp = append(exp, v)
}
// Retrieve encoded bytes from encoder.
b, err := IntegerArrayEncodeAll(src, nil)
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got := make([]int64, 0, len(src))
var dec IntegerDecoder
dec.SetBytes(b)
for dec.Next() {
if err := dec.Error(); err != nil {
t.Fatal(err)
}
got = append(got, dec.Read())
}
// Verify that input and output values match.
if !reflect.DeepEqual(exp, got) {
t.Fatalf("mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", src, got)
}
return true
}, nil)
}
func TestIntegerArrayDecodeAll_NegativeUncompressed(t *testing.T) {
exp := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
-1221292455668011702, -1941700286034261841, -2836753127140407751,
@ -35,7 +705,7 @@ func TestIntegerBatchDecodeAll_NegativeUncompressed(t *testing.T) {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -45,7 +715,7 @@ func TestIntegerBatchDecodeAll_NegativeUncompressed(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_AllNegative(t *testing.T) {
func TestIntegerArrayDecodeAll_AllNegative(t *testing.T) {
enc := NewIntegerEncoder(3)
exp := []int64{
-10, -5, -1,
@ -64,7 +734,7 @@ func TestIntegerBatchDecodeAll_AllNegative(t *testing.T) {
t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -74,7 +744,7 @@ func TestIntegerBatchDecodeAll_AllNegative(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_CounterPacked(t *testing.T) {
func TestIntegerArrayDecodeAll_CounterPacked(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6,
@ -99,7 +769,7 @@ func TestIntegerBatchDecodeAll_CounterPacked(t *testing.T) {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -109,7 +779,7 @@ func TestIntegerBatchDecodeAll_CounterPacked(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_CounterRLE(t *testing.T) {
func TestIntegerArrayDecodeAll_CounterRLE(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5,
@ -134,7 +804,7 @@ func TestIntegerBatchDecodeAll_CounterRLE(t *testing.T) {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -144,7 +814,7 @@ func TestIntegerBatchDecodeAll_CounterRLE(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_Descending(t *testing.T) {
func TestIntegerArrayDecodeAll_Descending(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
7094, 4472, 1850,
@ -169,7 +839,7 @@ func TestIntegerBatchDecodeAll_Descending(t *testing.T) {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -179,7 +849,7 @@ func TestIntegerBatchDecodeAll_Descending(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_Flat(t *testing.T) {
func TestIntegerArrayDecodeAll_Flat(t *testing.T) {
enc := NewIntegerEncoder(16)
exp := []int64{
1, 1, 1, 1,
@ -204,7 +874,7 @@ func TestIntegerBatchDecodeAll_Flat(t *testing.T) {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -214,7 +884,7 @@ func TestIntegerBatchDecodeAll_Flat(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_MinMax(t *testing.T) {
func TestIntegerArrayDecodeAll_MinMax(t *testing.T) {
enc := NewIntegerEncoder(2)
exp := []int64{
math.MinInt64, math.MaxInt64,
@ -237,7 +907,7 @@ func TestIntegerBatchDecodeAll_MinMax(t *testing.T) {
t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp)
}
got, err := IntegerBatchDecodeAll(b, nil)
got, err := IntegerArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -247,7 +917,7 @@ func TestIntegerBatchDecodeAll_MinMax(t *testing.T) {
}
}
func TestIntegerBatchDecodeAll_Quick(t *testing.T) {
func TestIntegerArrayDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
exp := values
if values == nil {
@ -267,7 +937,7 @@ func TestIntegerBatchDecodeAll_Quick(t *testing.T) {
}
// Read values out of decoder.
got, err := IntegerBatchDecodeAll(buf, nil)
got, err := IntegerArrayDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -280,7 +950,139 @@ func TestIntegerBatchDecodeAll_Quick(t *testing.T) {
}, nil)
}
func BenchmarkIntegerBatchDecodeAllUncompressed(b *testing.B) {
var bufResult []byte
func BenchmarkEncodeIntegers(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
for _, n := range cases {
enc := NewIntegerEncoder(n)
b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = int64(i)
}
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = IntegerArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = rand.Int63n(100)
}
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = IntegerArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
b.Run(fmt.Sprintf("%d_dup", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = 1233242
}
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = IntegerArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
}
}
func BenchmarkIntegerArrayDecodeAllUncompressed(b *testing.B) {
benchmarks := []int{
5,
55,
@ -314,13 +1116,13 @@ func BenchmarkIntegerBatchDecodeAllUncompressed(b *testing.B) {
dst := make([]int64, size)
for i := 0; i < b.N; i++ {
dst, _ = IntegerBatchDecodeAll(bytes, dst)
dst, _ = IntegerArrayDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkIntegerBatchDecodeAllPackedSimple(b *testing.B) {
func BenchmarkIntegerArrayDecodeAllPackedSimple(b *testing.B) {
benchmarks := []int{
5,
55,
@ -343,13 +1145,13 @@ func BenchmarkIntegerBatchDecodeAllPackedSimple(b *testing.B) {
dst := make([]int64, size)
for i := 0; i < b.N; i++ {
IntegerBatchDecodeAll(bytes, dst)
IntegerArrayDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkIntegerBatchDecodeAllRLE(b *testing.B) {
func BenchmarkIntegerArrayDecodeAllRLE(b *testing.B) {
benchmarks := []struct {
n int
delta int64
@ -377,7 +1179,7 @@ func BenchmarkIntegerBatchDecodeAllRLE(b *testing.B) {
dst := make([]int64, bm.n)
for i := 0; i < b.N; i++ {
IntegerBatchDecodeAll(bytes, dst)
IntegerArrayDecodeAll(bytes, dst)
}
})
}

View File

@ -9,12 +9,59 @@ import (
)
var (
errStringBatchDecodeInvalidStringLength = fmt.Errorf("StringBatchDecodeAll: invalid encoded string length")
errStringBatchDecodeLengthOverflow = fmt.Errorf("StringBatchDecodeAll: length overflow")
errStringBatchDecodeShortBuffer = fmt.Errorf("StringBatchDecodeAll: short buffer")
errStringBatchDecodeInvalidStringLength = fmt.Errorf("StringArrayDecodeAll: invalid encoded string length")
errStringBatchDecodeLengthOverflow = fmt.Errorf("StringArrayDecodeAll: length overflow")
errStringBatchDecodeShortBuffer = fmt.Errorf("StringArrayDecodeAll: short buffer")
)
func StringBatchDecodeAll(b []byte, dst []string) ([]string, error) {
// StringArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// Currently only the string compression scheme used snappy.
func StringArrayEncodeAll(src []string, b []byte) ([]byte, error) {
srcSz := 2 + len(src)*binary.MaxVarintLen32 // strings should't be longer than 64kb
for i := range src {
srcSz += len(src[i])
}
// determine the maximum possible length needed for the buffer, which
// includes the compressed size
var compressedSz = 0
if len(src) > 0 {
compressedSz = snappy.MaxEncodedLen(srcSz) + 1 /* header */
}
totSz := srcSz + compressedSz
if cap(b) < totSz {
b = make([]byte, totSz)
} else {
b = b[:totSz]
}
// Shortcut to snappy encoding nothing.
if len(src) == 0 {
b[0] = stringCompressedSnappy << 4
return b[:2], nil
}
// write the data to be compressed *after* the space needed for snappy
// compression. The compressed data is at the start of the allocated buffer,
// ensuring the entire capacity is returned and available for subsequent use.
dta := b[compressedSz:]
n := 0
for i := range src {
n += binary.PutUvarint(dta[n:], uint64(len(src[i])))
n += copy(dta[n:], src[i])
}
dta = dta[:n]
dst := b[:compressedSz]
dst[0] = stringCompressedSnappy << 4
res := snappy.Encode(dst[1:], dta)
return dst[:len(res)+1], nil
}
func StringArrayDecodeAll(b []byte, dst []string) ([]string, error) {
// First byte stores the encoding type, only have snappy format
// currently so ignore for now.
if len(b) > 0 {

View File

@ -1,23 +1,177 @@
package tsm1
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/internal/testutil"
"github.com/influxdata/influxdb/uuid"
)
func TestStringBatchDecodeAll_NoValues(t *testing.T) {
func TestStringArrayEncodeAll_NoValues(t *testing.T) {
b, err := StringArrayEncodeAll(nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec StringDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestStringArrayEncodeAll_Single(t *testing.T) {
src := []string{"v1"}
b, err := StringArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec StringDecoder
if dec.SetBytes(b); err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if src[0] != dec.Read() {
t.Fatalf("unexpected value: got %v, exp %v", dec.Read(), src[0])
}
}
func TestStringArrayEncode_Compare(t *testing.T) {
// generate random values
input := make([]string, 1000)
for i := 0; i < len(input); i++ {
input[i] = uuid.TimeUUID().String()
}
// Example from the paper
s := NewStringEncoder(1000)
for _, v := range input {
s.Write(v)
}
s.Flush()
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
buf2 := append([]byte("this is some jibberish"), make([]byte, 100, 200)...)
buf2, err = StringArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
result, err := StringArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := result, input; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func TestStringArrayEncodeAll_Multi_Compressed(t *testing.T) {
src := make([]string, 10)
for i := range src {
src[i] = fmt.Sprintf("value %d", i)
}
b, err := StringArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != stringCompressedSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy)
}
if exp := 51; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
var dec StringDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatalf("unexpected erorr creating string decoder: %v", err)
}
for i, v := range src {
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v != dec.Read() {
t.Fatalf("unexpected value at pos %d: got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestStringArrayEncodeAll_Quick(t *testing.T) {
var base []byte
quick.Check(func(values []string) bool {
src := values
if values == nil {
src = []string{}
}
// Retrieve encoded bytes from encoder.
buf, err := StringArrayEncodeAll(src, base)
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got := make([]string, 0, len(src))
var dec StringDecoder
if err := dec.SetBytes(buf); err != nil {
t.Fatal(err)
}
for dec.Next() {
if err := dec.Error(); err != nil {
t.Fatal(err)
}
got = append(got, dec.Read())
}
// Verify that input and output values match.
if !reflect.DeepEqual(src, got) {
t.Fatalf("mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", src, got)
}
return true
}, nil)
}
func TestStringArrayDecodeAll_NoValues(t *testing.T) {
enc := NewStringEncoder(1024)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := StringBatchDecodeAll(b, nil)
got, err := StringArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
@ -28,7 +182,7 @@ func TestStringBatchDecodeAll_NoValues(t *testing.T) {
}
}
func TestStringBatchDecodeAll_Single(t *testing.T) {
func TestStringArrayDecodeAll_Single(t *testing.T) {
enc := NewStringEncoder(1024)
v1 := "v1"
enc.Write(v1)
@ -37,7 +191,7 @@ func TestStringBatchDecodeAll_Single(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, err := StringBatchDecodeAll(b, nil)
got, err := StringArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
@ -48,7 +202,7 @@ func TestStringBatchDecodeAll_Single(t *testing.T) {
}
}
func TestStringBatchDecodeAll_Multi_Compressed(t *testing.T) {
func TestStringArrayDecodeAll_Multi_Compressed(t *testing.T) {
enc := NewStringEncoder(1024)
exp := make([]string, 10)
@ -70,7 +224,7 @@ func TestStringBatchDecodeAll_Multi_Compressed(t *testing.T) {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
got, err := StringBatchDecodeAll(b, nil)
got, err := StringArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
@ -80,7 +234,7 @@ func TestStringBatchDecodeAll_Multi_Compressed(t *testing.T) {
}
}
func TestStringBatchDecodeAll_Quick(t *testing.T) {
func TestStringArrayDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []string) bool {
exp := values
if values == nil {
@ -99,7 +253,7 @@ func TestStringBatchDecodeAll_Quick(t *testing.T) {
}
// Read values out of decoder.
got, err := StringBatchDecodeAll(buf, nil)
got, err := StringArrayDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
@ -112,8 +266,8 @@ func TestStringBatchDecodeAll_Quick(t *testing.T) {
}, nil)
}
func TestStringBatchDecodeAll_Empty(t *testing.T) {
got, err := StringBatchDecodeAll([]byte{}, nil)
func TestStringArrayDecodeAll_Empty(t *testing.T) {
got, err := StringArrayDecodeAll([]byte{}, nil)
if err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
@ -124,7 +278,7 @@ func TestStringBatchDecodeAll_Empty(t *testing.T) {
}
}
func TestStringBatchDecodeAll_CorruptBytes(t *testing.T) {
func TestStringArrayDecodeAll_CorruptBytes(t *testing.T) {
cases := []string{
"\x10\x03\b\x03Hi", // Higher length than actual data
"\x10\x1dp\x9c\x90\x90\x90\x90\x90\x90\x90\x90\x90length overflow----",
@ -136,7 +290,7 @@ func TestStringBatchDecodeAll_CorruptBytes(t *testing.T) {
for _, c := range cases {
t.Run(fmt.Sprintf("%q", c), func(t *testing.T) {
got, err := StringBatchDecodeAll([]byte(c), nil)
got, err := StringArrayDecodeAll([]byte(c), nil)
if err == nil {
t.Fatal("exp an err, got nil")
}
@ -149,7 +303,49 @@ func TestStringBatchDecodeAll_CorruptBytes(t *testing.T) {
}
}
func BenchmarkStringBatchDecodeAll(b *testing.B) {
func BenchmarkEncodeStrings(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
for _, n := range cases {
enc := NewStringEncoder(n)
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
input := make([]string, n)
for i := 0; i < n; i++ {
input[i] = uuid.TimeUUID().String()
}
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
enc.Reset()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = StringArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})
})
}
}
func BenchmarkStringArrayDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
w int
@ -178,7 +374,7 @@ func BenchmarkStringBatchDecodeAll(b *testing.B) {
dst := make([]string, bm.n)
for i := 0; i < b.N; i++ {
got, err := StringBatchDecodeAll(bytes, dst)
got, err := StringArrayDecodeAll(bytes, dst)
if err != nil {
b.Fatalf("unexpected length -got/+exp\n%s", cmp.Diff(len(dst), bm.n))
}

View File

@ -9,6 +9,148 @@ import (
"github.com/influxdata/influxdb/pkg/encoding/simple8b"
)
// TimeArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// TimeArrayEncodeAll implements batch oriented versions of the three integer
// encoding types we support: uncompressed, simple8b and RLE.
//
// Timestamp values to be encoded should be sorted before encoding. When encoded,
// the values are first delta-encoded. The first value is the starting timestamp,
// subsequent values are the difference from the prior value.
//
// Important: TimeArrayEncodeAll modifies the contents of src by using it as
// scratch space for delta encoded values. It is NOT SAFE to use src after
// passing it into TimeArrayEncodeAll.
func TimeArrayEncodeAll(src []int64, b []byte) ([]byte, error) {
if len(src) == 0 {
return nil, nil // Nothing to do
}
var max, div = uint64(0), uint64(1e12)
// To prevent an allocation of the entire block we're encoding reuse the
// src slice to store the encoded deltas.
deltas := reintepretInt64ToUint64Slice(src)
if len(deltas) > 1 {
for i := len(deltas) - 1; i > 0; i-- {
deltas[i] = deltas[i] - deltas[i-1]
if deltas[i] > max {
max = deltas[i]
}
}
var rle = true
for i := 2; i < len(deltas); i++ {
if deltas[1] != deltas[i] {
rle = false
break
}
}
// Deltas are the same - encode with RLE
if rle {
// Large varints can take up to 10 bytes. We're storing 3 + 1
// type byte.
if len(b) < 31 && cap(b) >= 31 {
b = b[:31]
} else if len(b) < 31 {
b = append(b, make([]byte, 31-len(b))...)
}
// 4 high bits used for the encoding type
b[0] = byte(timeCompressedRLE) << 4
i := 1
// The first value
binary.BigEndian.PutUint64(b[i:], deltas[0])
i += 8
// The first delta, checking the divisor
// given all deltas are the same, we can do a single check for the divisor
v := deltas[1]
for div > 1 && v%div != 0 {
div /= 10
}
if div > 1 {
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
i += binary.PutUvarint(b[i:], deltas[1]/div)
} else {
i += binary.PutUvarint(b[i:], deltas[1])
}
// The number of times the delta is repeated
i += binary.PutUvarint(b[i:], uint64(len(deltas)))
return b[:i], nil
}
}
// We can't compress this time-range, the deltas exceed 1 << 60
if max > simple8b.MaxValue {
// Encode uncompressed.
sz := 1 + len(deltas)*8
if len(b) < sz && cap(b) >= sz {
b = b[:sz]
} else if len(b) < sz {
b = append(b, make([]byte, sz-len(b))...)
}
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(timeUncompressed) << 4
for i, v := range deltas {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v)
}
return b[:sz], nil
}
// find divisor only if we're compressing with simple8b
for i := 1; i < len(deltas) && div > 1; i++ {
// If our value is divisible by 10, break. Otherwise, try the next smallest divisor.
v := deltas[i]
for div > 1 && v%div != 0 {
div /= 10
}
}
// Only apply the divisor if it's greater than 1 since division is expensive.
if div > 1 {
for i := 1; i < len(deltas); i++ {
deltas[i] /= div
}
}
// Encode with simple8b - fist value is written unencoded using 8 bytes.
encoded, err := simple8b.EncodeAll(deltas[1:])
if err != nil {
return nil, err
}
sz := 1 + (len(encoded)+1)*8
if len(b) < sz && cap(b) >= sz {
b = b[:sz]
} else if len(b) < sz {
b = append(b, make([]byte, sz-len(b))...)
}
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(timeCompressedPackedSimple) << 4
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
// Write the first value since it's not part of the encoded values
binary.BigEndian.PutUint64(b[1:9], deltas[0])
// Write the encoded values
for i, v := range encoded {
binary.BigEndian.PutUint64(b[9+i*8:9+i*8+8], v)
}
return b[:sz], nil
}
var (
timeBatchDecoderFunc = [...]func(b []byte, dst []int64) ([]int64, error){
timeBatchDecodeAllUncompressed,
@ -18,7 +160,7 @@ var (
}
)
func TimeBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
func TimeArrayDecodeAll(b []byte, dst []int64) ([]int64, error) {
if len(b) == 0 {
return []int64{}, nil
}
@ -34,7 +176,7 @@ func TimeBatchDecodeAll(b []byte, dst []int64) ([]int64, error) {
func timeBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
b = b[1:]
if len(b)&0x7 != 0 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: expected multiple of 8 bytes")
return []int64{}, fmt.Errorf("TimeArrayDecodeAll: expected multiple of 8 bytes")
}
count := len(b) / 8
@ -55,7 +197,7 @@ func timeBatchDecodeAllUncompressed(b []byte, dst []int64) ([]int64, error) {
func timeBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
if len(b) < 9 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: not enough data to decode packed timestamps")
return []int64{}, fmt.Errorf("TimeArrayDecodeAll: not enough data to decode packed timestamps")
}
div := uint64(math.Pow10(int(b[0] & 0xF))) // multiplier
@ -82,7 +224,7 @@ func timeBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
return []int64{}, err
}
if n != count-1 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
return []int64{}, fmt.Errorf("TimeArrayDecodeAll: unexpected number of values decoded; got=%d, exp=%d", n, count-1)
}
// Compute the prefix sum and scale the deltas back up
@ -105,7 +247,7 @@ func timeBatchDecodeAllSimple(b []byte, dst []int64) ([]int64, error) {
func timeBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
if len(b) < 9 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: not enough data to decode RLE starting value")
return []int64{}, fmt.Errorf("TimeArrayDecodeAll: not enough data to decode RLE starting value")
}
var k, n int
@ -121,7 +263,7 @@ func timeBatchDecodeAllRLE(b []byte, dst []int64) ([]int64, error) {
// Next 1-10 bytes is our (scaled down by factor of 10) run length delta
delta, n := binary.Uvarint(b[k:])
if n <= 0 {
return []int64{}, fmt.Errorf("TimeBatchDecodeAll: invalid run length in decodeRLE")
return []int64{}, fmt.Errorf("TimeArrayDecodeAll: invalid run length in decodeRLE")
}
k += n

View File

@ -1,8 +1,11 @@
package tsm1
import (
"bytes"
"fmt"
"math/rand"
"reflect"
"sort"
"testing"
"testing/quick"
"time"
@ -10,14 +13,562 @@ import (
"github.com/google/go-cmp/cmp"
)
func TestTimeBatchDecodeAll_NoValues(t *testing.T) {
func TestTimeArrayEncodeAll(t *testing.T) {
now := time.Unix(0, 0)
src := []int64{now.UnixNano()}
for i := 1; i < 4; i++ {
src = append(src, now.Add(time.Duration(i)*time.Second).UnixNano())
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
for i, v := range exp {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
}
if v != dec.Read() {
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
}
}
}
// This test compares the ArrayEncoder to the original iterator encoder, byte for
// byte.
func TestTimeArrayEncodeAll_Compare(t *testing.T) {
// generate random values (should use simple8b)
input := make([]int64, 1000)
for i := 0; i < len(input); i++ {
input[i] = rand.Int63n(100000) - 50000
}
sort.Slice(input, func(i int, j int) bool { return input[i] < input[j] })
testTimeArrayEncodeAll_Compare(t, input, timeCompressedPackedSimple)
// Generate same values (should use RLE)
for i := 0; i < len(input); i++ {
input[i] = 1232342341234
}
testTimeArrayEncodeAll_Compare(t, input, timeCompressedRLE)
// Generate large random values that are not sorted. The deltas will be large
// and the values should be stored uncompressed.
for i := 0; i < len(input); i++ {
input[i] = int64(rand.Uint64())
}
testTimeArrayEncodeAll_Compare(t, input, timeUncompressed)
}
func testTimeArrayEncodeAll_Compare(t *testing.T, input []int64, encoding byte) {
exp := make([]int64, len(input))
copy(exp, input)
s := NewTimeEncoder(1000)
for _, v := range input {
s.Write(v)
}
buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := buf1[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
var buf2 []byte
buf2, err = TimeArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got, exp := buf2[0]>>4, encoding; got != exp {
t.Fatalf("got encoding %v, expected %v", got, encoding)
}
result, err := TimeArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}
if got := result; !reflect.DeepEqual(got, exp) {
t.Fatalf("-got/+exp\n%s", cmp.Diff(got, exp))
}
// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}
func TestTimeArrayEncodeAll_NoValues(t *testing.T) {
b, err := TimeArrayEncodeAll(nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec TimeDecoder
dec.Init(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func TestTimeArrayEncodeAll_One(t *testing.T) {
src := []int64{0}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[0] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[0])
}
}
func TestTimeArrayEncodeAll_Two(t *testing.T) {
src := []int64{0, 1}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[0] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[0])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[1] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[1])
}
}
func TestTimeArrayEncodeAll_Three(t *testing.T) {
src := []int64{0, 1, 3}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[0] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[0])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[1] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[1])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[2] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[2])
}
}
func TestTimeArrayEncodeAll_Large_Range(t *testing.T) {
src := []int64{1442369134000000000, 1442369135000000000}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[0] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[2])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[1] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[1])
}
}
func TestTimeArrayEncodeAll_Uncompressed(t *testing.T) {
src := []int64{time.Unix(0, 0).UnixNano(), time.Unix(1, 0).UnixNano()}
// about 36.5yrs in NS resolution is max range for compressed format
// This should cause the encoding to fallback to raw points
src = append(src, time.Unix(2, (2<<59)).UnixNano())
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("expected error: %v", err)
}
if exp := 25; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[0] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[0])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[1] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[1])
}
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
if exp[2] != dec.Read() {
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), exp[2])
}
}
func TestTimeArrayEncodeAll_RLE(t *testing.T) {
var src []int64
for i := 0; i < 500; i++ {
src = append(src, int64(i))
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if exp := 12; len(b) != exp {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec TimeDecoder
dec.Init(b)
for i, v := range exp {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
}
if v != dec.Read() {
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected extra values")
}
}
func TestTimeArrayEncodeAll_Reverse(t *testing.T) {
src := []int64{3, 2, 0}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
i := 0
for dec.Next() {
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i++
}
}
func TestTimeArrayEncodeAll_220SecondDelta(t *testing.T) {
var src []int64
now := time.Now()
for i := 0; i < 220; i++ {
src = append(src, now.Add(time.Duration(i*60)*time.Second).UnixNano())
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Using RLE, should get 12 bytes
if exp := 12; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
var dec TimeDecoder
dec.Init(b)
i := 0
for dec.Next() {
if exp[i] != dec.Read() {
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), exp[i])
}
i++
}
if i != len(exp) {
t.Fatalf("Read too few values: exp %d, got %d", len(exp), i)
}
if dec.Next() {
t.Fatalf("expecte Next() = false, got true")
}
}
func TestTimeArrayEncodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
// Write values to encoder.
exp := make([]int64, len(values))
for i, v := range values {
exp[i] = int64(v)
}
// Retrieve encoded bytes from encoder.
b, err := TimeArrayEncodeAll(values, nil)
if err != nil {
t.Fatal(err)
}
// Read values out of decoder.
got := make([]int64, 0, len(values))
var dec TimeDecoder
dec.Init(b)
for dec.Next() {
if err := dec.Error(); err != nil {
t.Fatal(err)
}
got = append(got, dec.Read())
}
// Verify that input and output values match.
if !reflect.DeepEqual(exp, got) {
t.Fatalf("mismatch:\n\nexp=%+v\n\ngot=%+v\n\n", exp, got)
}
return true
}, nil)
}
func TestTimeArrayEncodeAll_RLESeconds(t *testing.T) {
src := []int64{
1444448158000000000,
1444448168000000000,
1444448178000000000,
1444448188000000000,
1444448198000000000,
1444448208000000000,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var dec TimeDecoder
dec.Init(b)
for i, v := range exp {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
}
if v != dec.Read() {
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
}
}
if dec.Next() {
t.Fatalf("unexpected extra values")
}
}
func TestTimeArrayEncodeAll_Count_Uncompressed(t *testing.T) {
src := []int64{time.Unix(0, 0).UnixNano(),
time.Unix(1, 0).UnixNano(),
}
// about 36.5yrs in NS resolution is max range for compressed format
// This should cause the encoding to fallback to raw points
src = append(src, time.Unix(2, (2<<59)).UnixNano())
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := CountTimestamps(b), 3; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestTimeArrayEncodeAll_Count_RLE(t *testing.T) {
src := []int64{
1444448158000000000,
1444448168000000000,
1444448178000000000,
1444448188000000000,
1444448198000000000,
1444448208000000000,
}
exp := make([]int64, len(src))
copy(exp, src)
b, err := TimeArrayEncodeAll(src, nil)
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := CountTimestamps(b), len(exp); got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestTimeArrayEncodeAll_Count_Simple8(t *testing.T) {
src := []int64{0, 1, 3}
b, err := TimeArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := CountTimestamps(b), 3; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestTimeArrayDecodeAll_NoValues(t *testing.T) {
enc := NewTimeEncoder(0)
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -28,7 +579,7 @@ func TestTimeBatchDecodeAll_NoValues(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_One(t *testing.T) {
func TestTimeArrayDecodeAll_One(t *testing.T) {
enc := NewTimeEncoder(1)
exp := []int64{0}
for _, v := range exp {
@ -43,7 +594,7 @@ func TestTimeBatchDecodeAll_One(t *testing.T) {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -53,7 +604,7 @@ func TestTimeBatchDecodeAll_One(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Two(t *testing.T) {
func TestTimeArrayDecodeAll_Two(t *testing.T) {
enc := NewTimeEncoder(2)
exp := []int64{0, 1}
for _, v := range exp {
@ -69,7 +620,7 @@ func TestTimeBatchDecodeAll_Two(t *testing.T) {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -79,7 +630,7 @@ func TestTimeBatchDecodeAll_Two(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Three(t *testing.T) {
func TestTimeArrayDecodeAll_Three(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{0, 1, 3}
for _, v := range exp {
@ -95,7 +646,7 @@ func TestTimeBatchDecodeAll_Three(t *testing.T) {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -105,7 +656,7 @@ func TestTimeBatchDecodeAll_Three(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Large_Range(t *testing.T) {
func TestTimeArrayDecodeAll_Large_Range(t *testing.T) {
enc := NewTimeEncoder(2)
exp := []int64{1442369134000000000, 1442369135000000000}
for _, v := range exp {
@ -120,7 +671,7 @@ func TestTimeBatchDecodeAll_Large_Range(t *testing.T) {
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -130,7 +681,7 @@ func TestTimeBatchDecodeAll_Large_Range(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Uncompressed(t *testing.T) {
func TestTimeArrayDecodeAll_Uncompressed(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
time.Unix(0, 0).UnixNano(),
@ -156,7 +707,7 @@ func TestTimeBatchDecodeAll_Uncompressed(t *testing.T) {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -166,7 +717,7 @@ func TestTimeBatchDecodeAll_Uncompressed(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_RLE(t *testing.T) {
func TestTimeArrayDecodeAll_RLE(t *testing.T) {
enc := NewTimeEncoder(512)
var exp []int64
for i := 0; i < 500; i++ {
@ -190,7 +741,7 @@ func TestTimeBatchDecodeAll_RLE(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -200,7 +751,7 @@ func TestTimeBatchDecodeAll_RLE(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Reverse(t *testing.T) {
func TestTimeArrayDecodeAll_Reverse(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
int64(3),
@ -221,7 +772,7 @@ func TestTimeBatchDecodeAll_Reverse(t *testing.T) {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -231,7 +782,7 @@ func TestTimeBatchDecodeAll_Reverse(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Negative(t *testing.T) {
func TestTimeArrayDecodeAll_Negative(t *testing.T) {
enc := NewTimeEncoder(3)
exp := []int64{
-2352281900722994752, 1438442655375607923, -4110452567888190110,
@ -257,7 +808,7 @@ func TestTimeBatchDecodeAll_Negative(t *testing.T) {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -267,7 +818,7 @@ func TestTimeBatchDecodeAll_Negative(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_220SecondDelta(t *testing.T) {
func TestTimeArrayDecodeAll_220SecondDelta(t *testing.T) {
enc := NewTimeEncoder(256)
var exp []int64
now := time.Now()
@ -293,7 +844,7 @@ func TestTimeBatchDecodeAll_220SecondDelta(t *testing.T) {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -303,7 +854,7 @@ func TestTimeBatchDecodeAll_220SecondDelta(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Quick(t *testing.T) {
func TestTimeArrayDecodeAll_Quick(t *testing.T) {
quick.Check(func(values []int64) bool {
// Write values to encoder.
enc := NewTimeEncoder(1024)
@ -319,7 +870,7 @@ func TestTimeBatchDecodeAll_Quick(t *testing.T) {
t.Fatal(err)
}
got, err := TimeBatchDecodeAll(buf, nil)
got, err := TimeArrayDecodeAll(buf, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -332,7 +883,7 @@ func TestTimeBatchDecodeAll_Quick(t *testing.T) {
}, nil)
}
func TestTimeBatchDecodeAll_RLESeconds(t *testing.T) {
func TestTimeArrayDecodeAll_RLESeconds(t *testing.T) {
enc := NewTimeEncoder(6)
exp := make([]int64, 6)
@ -356,7 +907,7 @@ func TestTimeBatchDecodeAll_RLESeconds(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
got, err := TimeBatchDecodeAll(b, nil)
got, err := TimeArrayDecodeAll(b, nil)
if err != nil {
t.Fatalf("unexpected decode error %q", err)
}
@ -366,7 +917,7 @@ func TestTimeBatchDecodeAll_RLESeconds(t *testing.T) {
}
}
func TestTimeBatchDecodeAll_Corrupt(t *testing.T) {
func TestTimeArrayDecodeAll_Corrupt(t *testing.T) {
cases := []string{
"\x10\x14", // Packed: not enough data
"\x20\x00", // RLE: not enough data for starting timestamp
@ -377,7 +928,7 @@ func TestTimeBatchDecodeAll_Corrupt(t *testing.T) {
for _, c := range cases {
t.Run(fmt.Sprintf("%q", c), func(t *testing.T) {
got, err := TimeBatchDecodeAll([]byte(c), nil)
got, err := TimeArrayDecodeAll([]byte(c), nil)
if err == nil {
t.Fatal("exp an err, got nil")
}
@ -390,7 +941,136 @@ func TestTimeBatchDecodeAll_Corrupt(t *testing.T) {
}
}
func BenchmarkTimeBatchDecodeAllUncompressed(b *testing.B) {
func BenchmarkEncodeTimestamps(b *testing.B) {
var err error
cases := []int{10, 100, 1000}
for _, n := range cases {
enc := NewTimeEncoder(n)
b.Run(fmt.Sprintf("%d_seq", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = int64(i)
}
sort.Slice(src, func(i int, j int) bool { return src[i] < src[j] })
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = TimeArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
b.Run(fmt.Sprintf("%d_ran", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = int64(rand.Uint64())
}
sort.Slice(src, func(i int, j int) bool { return src[i] < src[j] })
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = TimeArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
b.Run(fmt.Sprintf("%d_dup", n), func(b *testing.B) {
src := make([]int64, n)
for i := 0; i < n; i++ {
src[i] = 1233242
}
input := make([]int64, len(src))
copy(input, src)
b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range src {
enc.Write(x)
}
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
// Since the batch encoder needs to do a copy to reset the
// input, we will add a copy here too.
copy(input, src)
}
})
b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = TimeArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
copy(input, src) // Reset input that gets modified in IntegerArrayEncodeAll
}
})
})
}
}
func BenchmarkTimeArrayDecodeAllUncompressed(b *testing.B) {
benchmarks := []int{
5,
55,
@ -424,13 +1104,13 @@ func BenchmarkTimeBatchDecodeAllUncompressed(b *testing.B) {
dst := make([]int64, size)
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
dst, _ = TimeArrayDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkTimeBatchDecodeAllPackedSimple(b *testing.B) {
func BenchmarkTimeArrayDecodeAllPackedSimple(b *testing.B) {
benchmarks := []int{
5,
55,
@ -453,13 +1133,13 @@ func BenchmarkTimeBatchDecodeAllPackedSimple(b *testing.B) {
dst := make([]int64, size)
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
dst, _ = TimeArrayDecodeAll(bytes, dst)
}
})
}
}
func BenchmarkTimeBatchDecodeAllRLE(b *testing.B) {
func BenchmarkTimeArrayDecodeAllRLE(b *testing.B) {
benchmarks := []struct {
n int
delta int64
@ -484,7 +1164,7 @@ func BenchmarkTimeBatchDecodeAllRLE(b *testing.B) {
dst := make([]int64, bm.n)
for i := 0; i < b.N; i++ {
dst, _ = TimeBatchDecodeAll(bytes, dst)
dst, _ = TimeArrayDecodeAll(bytes, dst)
}
})
}

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,10 @@
package tsm1
import "sort"
import (
"sort"
"github.com/influxdata/influxdb/tsdb"
)
{{range .}}
@ -203,4 +207,212 @@ func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks {
return dst
}
{{ end }}
{{range .}}
// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge{{.Name}}() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && k.merged{{.Name}}Values.Len() == 0 {
return
}
sort.Stable(k.blocks)
dedup := k.merged{{.Name}}Values.Len() != 0
if len(k.blocks) > 0 && !dedup {
// If we have more than one block or any partially tombstoned blocks, we many need to dedup
dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead()
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
dedup = k.blocks[i].partiallyRead() ||
k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) ||
len(k.blocks[i].tombstones) > 0
}
}
k.merged = k.combine{{.Name}}(dedup)
}
// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
if dedup {
for k.merged{{.Name}}Values.Len() < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
var v tsdb.{{.Name}}Array
var err error
if err = Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
v.Include(minTime, maxTime)
if v.Len() > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v.MinTime(), v.MaxTime())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.merged{{.Name}}Values.Merge(&v)
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk{{.Name}}(nil)
}
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
k.merged = append(k.merged, k.blocks[i])
} else {
break
}
i++
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
k.merged = append(k.merged, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
k.merged = append(k.merged, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && k.merged{{.Name}}Values.Len() < k.size {
if k.blocks[i].read() {
i++
continue
}
var v tsdb.{{.Name}}Array
if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.merged{{.Name}}Values.Merge(&v)
i++
}
k.blocks = k.blocks[i:]
return k.chunk{{.Name}}(k.merged)
}
func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks {
if k.merged{{.Name}}Values.Len() > k.size {
var values tsdb.{{.Name}}Array
values.Timestamps = k.merged{{.Name}}Values.Timestamps[:k.size]
values.Values = k.merged{{.Name}}Values.Values[:k.size]
cb, err := Encode{{.Name}}ArrayBlock(&values, nil) // TODO(edd): pool this buffer
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: values.MinTime(),
maxTime: values.MaxTime(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[k.size:]
k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[k.size:]
return dst
}
// Re-encode the remaining values into the last block
if k.merged{{.Name}}Values.Len() > 0 {
cb, err := Encode{{.Name}}ArrayBlock(k.merged{{.Name}}Values, nil) // TODO(edd): pool this buffer
if err != nil {
k.err = err
return nil
}
dst = append(dst, &block{
minTime: k.merged{{.Name}}Values.MinTime(),
maxTime: k.merged{{.Name}}Values.MaxTime(),
key: k.key,
b: cb,
})
k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[:0]
k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[:0]
}
return dst
}
{{ end }}

View File

@ -932,7 +932,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, nil
}
tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
tsm, err := NewTSMBatchKeyIterator(size, fast, intC, trs...)
if err != nil {
return nil, err
}
@ -1556,6 +1556,297 @@ func (k *tsmKeyIterator) Err() error {
return k.err
}
// tsmBatchKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
// keys in sorted order and the values between the keys sorted and deduped. If any of
// the readers have associated tombstone entries, they are returned as part of iteration.
type tsmBatchKeyIterator struct {
// readers is the set of readers it produce a sorted key run with
readers []*TSMReader
// values is the temporary buffers for each key that is returned by a reader
values map[string][]Value
// pos is the current key postion within the corresponding readers slice. A value of
// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
pos []int
// err is any error we received while iterating values.
err error
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
// fast is true to prevent overlapping blocks of time for the same key.
// If false, the blocks will be decoded and duplicated (if needed) and
// then chunked into the maximally sized blocks.
fast bool
// size is the maximum number of values to encode in a single block
size int
// key is the current key lowest key across all readers that has not be fully exhausted
// of values.
key []byte
typ byte
iterators []*BlockIterator
blocks blocks
buf []blocks
// mergeValues are decoded blocks that have been combined
mergedFloatValues *tsdb.FloatArray
mergedIntegerValues *tsdb.IntegerArray
mergedUnsignedValues *tsdb.UnsignedArray
mergedBooleanValues *tsdb.BooleanArray
mergedStringValues *tsdb.StringArray
// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
interrupt chan struct{}
}
// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
}
return &tsmBatchKeyIterator{
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
size: size,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
mergedFloatValues: &tsdb.FloatArray{},
mergedIntegerValues: &tsdb.IntegerArray{},
mergedUnsignedValues: &tsdb.UnsignedArray{},
mergedBooleanValues: &tsdb.BooleanArray{},
mergedStringValues: &tsdb.StringArray{},
interrupt: interrupt,
}, nil
}
func (k *tsmBatchKeyIterator) hasMergedValues() bool {
return k.mergedFloatValues.Len() > 0 ||
k.mergedIntegerValues.Len() > 0 ||
k.mergedUnsignedValues.Len() > 0 ||
k.mergedStringValues.Len() > 0 ||
k.mergedBooleanValues.Len() > 0
}
func (k *tsmBatchKeyIterator) EstimatedIndexSize() int {
var size uint32
for _, r := range k.readers {
size += r.IndexSize()
}
return int(size) / len(k.readers)
}
// Next returns true if there are any values remaining in the iterator.
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}
// Any merged values pending?
if k.hasMergedValues() {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// Read the next block from each TSM iterator
for i, v := range k.buf {
if len(v) != 0 {
continue
}
iter := k.iterators[i]
if iter.Next() {
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
blockKey := key
for bytes.Equal(iter.PeekNext(), blockKey) {
iter.Next()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
}
}
if iter.Err() != nil {
k.err = iter.Err()
}
}
// Each reader could have a different key that it's currently at, need to find
// the next smallest one to keep the sort ordering.
var minKey []byte
var minType byte
for _, b := range k.buf {
// block could be nil if the iterator has been exhausted for that file
if len(b) == 0 {
continue
}
if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
minKey = b[0].key
minType = b[0].typ
}
}
k.key = minKey
k.typ = minType
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
for i, b := range k.buf {
if len(b) == 0 {
continue
}
if bytes.Equal(b[0].key, k.key) {
k.blocks = append(k.blocks, b...)
k.buf[i] = k.buf[i][:0]
}
}
if len(k.blocks) == 0 {
return false
}
k.merge()
// After merging all the values for this key, we might not have any. (e.g. they were all deleted
// through many tombstones). In this case, move on to the next key instead of ending iteration.
if len(k.merged) == 0 {
goto RETRY
}
return len(k.merged) > 0
}
// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge() {
switch k.typ {
case BlockFloat64:
k.mergeFloat()
case BlockInteger:
k.mergeInteger()
case BlockUnsigned:
k.mergeUnsigned()
case BlockBoolean:
k.mergeBoolean()
case BlockString:
k.mergeString()
default:
k.err = fmt.Errorf("unknown block type: %v", k.typ)
}
}
func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return nil, 0, 0, nil, errCompactionAborted{}
default:
}
if len(k.merged) == 0 {
return nil, 0, 0, nil, k.err
}
block := k.merged[0]
return block.key, block.minTime, block.maxTime, block.b, k.err
}
func (k *tsmBatchKeyIterator) Close() error {
k.values = nil
k.pos = nil
k.iterators = nil
for _, r := range k.readers {
if err := r.Close(); err != nil {
return err
}
}
return nil
}
// Error returns any errors encountered during iteration.
func (k *tsmBatchKeyIterator) Err() error {
return k.err
}
type cacheKeyIterator struct {
cache *Cache
size int

View File

@ -455,6 +455,29 @@ func (a FloatValues) Encode(buf []byte) ([]byte, error) {
return encodeFloatValuesBlock(buf, a)
}
func EncodeFloatArrayBlock(a *tsdb.FloatArray, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = FloatArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, BlockFloat64, tb, vb), nil
}
func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -725,6 +748,29 @@ func (a IntegerValues) Encode(buf []byte) ([]byte, error) {
return encodeIntegerValuesBlock(buf, a)
}
func EncodeIntegerArrayBlock(a *tsdb.IntegerArray, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = IntegerArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, BlockInteger, tb, vb), nil
}
func encodeIntegerValuesBlock(buf []byte, values []IntegerValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -995,6 +1041,29 @@ func (a UnsignedValues) Encode(buf []byte) ([]byte, error) {
return encodeUnsignedValuesBlock(buf, a)
}
func EncodeUnsignedArrayBlock(a *tsdb.UnsignedArray, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = UnsignedArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, BlockUnsigned, tb, vb), nil
}
func encodeUnsignedValuesBlock(buf []byte, values []UnsignedValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -1265,6 +1334,29 @@ func (a StringValues) Encode(buf []byte) ([]byte, error) {
return encodeStringValuesBlock(buf, a)
}
func EncodeStringArrayBlock(a *tsdb.StringArray, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = StringArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, BlockString, tb, vb), nil
}
func encodeStringValuesBlock(buf []byte, values []StringValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
@ -1535,6 +1627,29 @@ func (a BooleanValues) Encode(buf []byte) ([]byte, error) {
return encodeBooleanValuesBlock(buf, a)
}
func EncodeBooleanArrayBlock(a *tsdb.BooleanArray, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = BooleanArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, BlockBoolean, tb, vb), nil
}
func encodeBooleanValuesBlock(buf []byte, values []BooleanValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil

View File

@ -238,6 +238,29 @@ func (a {{.Name}}Values) Encode(buf []byte) ([]byte, error) {
return encode{{.Name}}ValuesBlock(buf, a)
}
func Encode{{ .Name }}ArrayBlock(a *tsdb.{{ .Name }}Array, b []byte) ([]byte, error) {
if a.Len() == 0 {
return nil, nil
}
// TODO(edd): These need to be pooled.
var vb []byte
var tb []byte
var err error
if vb, err = {{ .Name }}ArrayEncodeAll(a.Values, vb); err != nil {
return nil, err
}
if tb, err = TimeArrayEncodeAll(a.Timestamps, tb); err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
return packBlock(b, {{ .Type }}, tb, vb), nil
}
func encode{{ .Name }}ValuesBlock(buf []byte, values []{{.Name}}Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil

View File

@ -174,7 +174,6 @@ func TestFloatEncoder_Roundtrip(t *testing.T) {
}
func TestFloatEncoder_Roundtrip_NaN(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Write(1.0)
s.Write(math.NaN())
@ -182,12 +181,35 @@ func TestFloatEncoder_Roundtrip_NaN(t *testing.T) {
s.Flush()
_, err := s.Bytes()
if err == nil {
t.Fatalf("expected error. got nil")
}
}
func TestFloatEncoder_Empty(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Flush()
b, err := s.Bytes()
if err != nil {
t.Fatal(err)
}
var dec tsm1.FloatDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatal(err)
}
var got []float64
for dec.Next() {
got = append(got, dec.Values())
}
if len(got) != 0 {
t.Fatalf("got len %d, expected 0", len(got))
}
}
func Test_FloatEncoder_Quick(t *testing.T) {
quick.Check(func(values []float64) bool {