Add int64 compression
This is using zig zag encoding to convert int64 to uint64s and then using simple8b to compress them, falling back to uncompressed if the value exceeds 1 << 60. A patched encoding scheme would likely be better in general but this provides decent compression for integers that are not at the ends of the int64 range.pull/4308/head
parent
42e1babe7f
commit
4a37ba868d
|
@ -1,12 +1,23 @@
|
|||
package pd1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
// EncodingPacked is a bit-packed format
|
||||
EncodingPacked = 0
|
||||
// EncodingRLE is a run-length encoded format
|
||||
EncodingRLE = 1
|
||||
// EncodingUncompressed is a non-compressed format
|
||||
EncodingUncompressed = 2
|
||||
)
|
||||
|
||||
type Value interface {
|
||||
Time() time.Time
|
||||
UnixNano() int64
|
||||
|
@ -16,8 +27,8 @@ type Value interface {
|
|||
|
||||
func NewValue(t time.Time, value interface{}) Value {
|
||||
switch v := value.(type) {
|
||||
// case int64:
|
||||
// return &Int64Value{time: t, value: v}
|
||||
case int64:
|
||||
return &Int64Value{time: t, value: v}
|
||||
case float64:
|
||||
return &FloatValue{time: t, value: v}
|
||||
// case bool:
|
||||
|
@ -58,6 +69,13 @@ func (v Values) Encode(buf []byte) []byte {
|
|||
}
|
||||
return EncodeFloatBlock(buf, a)
|
||||
|
||||
case *Int64Value:
|
||||
a := make([]*Int64Value, len(v))
|
||||
for i, vv := range v {
|
||||
a[i] = vv.(*Int64Value)
|
||||
}
|
||||
return EncodeInt64Block(buf, a)
|
||||
|
||||
// TODO: add support for other types
|
||||
}
|
||||
|
||||
|
@ -69,6 +87,9 @@ func (v Values) DecodeSameTypeBlock(block []byte) Values {
|
|||
case *FloatValue:
|
||||
a, _ := DecodeFloatBlock(block)
|
||||
return a
|
||||
case *Int64Value:
|
||||
a, _ := DecodeInt64Block(block)
|
||||
return a
|
||||
|
||||
// TODO: add support for other types
|
||||
}
|
||||
|
@ -200,12 +221,65 @@ type Int64Value struct {
|
|||
value int64
|
||||
}
|
||||
|
||||
func EncodeInt64Block(buf []byte, values []Int64Value) []byte {
|
||||
return nil
|
||||
func (v *Int64Value) Time() time.Time {
|
||||
return v.time
|
||||
}
|
||||
|
||||
func DecodeInt64Block(block []byte) ([]Int64Value, error) {
|
||||
return nil, nil
|
||||
func (v *Int64Value) Value() interface{} {
|
||||
return v.value
|
||||
}
|
||||
|
||||
func (f *Int64Value) UnixNano() int64 {
|
||||
return f.time.UnixNano()
|
||||
}
|
||||
|
||||
func (v *Int64Value) Size() int {
|
||||
return 16
|
||||
}
|
||||
|
||||
func (v *Int64Value) String() string { return fmt.Sprintf("%v", v.value) }
|
||||
|
||||
func EncodeInt64Block(buf []byte, values []*Int64Value) []byte {
|
||||
tsEnc := NewTimeEncoder()
|
||||
vEnc := NewInt64Encoder()
|
||||
for _, v := range values {
|
||||
tsEnc.Write(v.Time())
|
||||
vEnc.Write(v.value)
|
||||
}
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err := tsEnc.Bytes()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
// Encoded int64 values
|
||||
vb, err := vEnc.Bytes()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
// Preprend the first timestamp of the block in the first 8 bytes
|
||||
return append(u64tob(uint64(values[0].Time().UnixNano())),
|
||||
packBlock(tb, vb)...)
|
||||
}
|
||||
|
||||
func DecodeInt64Block(block []byte) ([]Value, error) {
|
||||
// The first 8 bytes is the minimum timestamp of the block
|
||||
tb, vb := unpackBlock(block[8:])
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
tsDec := NewTimeDecoder(tb)
|
||||
vDec := NewInt64Decoder(vb)
|
||||
|
||||
// Decode both a timestamp and value
|
||||
var a []Value
|
||||
for tsDec.Next() && vDec.Next() {
|
||||
ts := tsDec.Read()
|
||||
v := vDec.Read()
|
||||
a = append(a, &Int64Value{ts, v})
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
type StringValue struct {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package pd1_test
|
||||
|
||||
import (
|
||||
// "math/rand"
|
||||
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -25,6 +27,44 @@ func TestEncoding_FloatBlock(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEncoding_IntBlock(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
for i, t := range times {
|
||||
values[i] = pd1.NewValue(t, int64(i))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
||||
decodedValues := values.DecodeSameTypeBlock(b)
|
||||
|
||||
if !reflect.DeepEqual(decodedValues, values) {
|
||||
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncoding_IntBlock_Negatives(t *testing.T) {
|
||||
valueCount := 1000
|
||||
times := getTimes(valueCount, 60, time.Second)
|
||||
values := make(pd1.Values, len(times))
|
||||
for i, t := range times {
|
||||
v := int64(i)
|
||||
if i%2 == 0 {
|
||||
v = -v
|
||||
}
|
||||
values[i] = pd1.NewValue(t, int64(v))
|
||||
}
|
||||
|
||||
b := values.Encode(nil)
|
||||
|
||||
decodedValues := values.DecodeSameTypeBlock(b)
|
||||
|
||||
if !reflect.DeepEqual(decodedValues, values) {
|
||||
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
|
||||
}
|
||||
}
|
||||
|
||||
func getTimes(n, step int, precision time.Duration) []time.Time {
|
||||
t := time.Now().Round(precision)
|
||||
a := make([]time.Time, n)
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package pd1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/jwilder/encoding/simple8b"
|
||||
)
|
||||
|
||||
type int64Encoder struct {
|
||||
values []int64
|
||||
}
|
||||
|
||||
func NewInt64Encoder() *int64Encoder {
|
||||
return &int64Encoder{}
|
||||
}
|
||||
|
||||
func (e *int64Encoder) Write(v int64) {
|
||||
e.values = append(e.values, v)
|
||||
}
|
||||
|
||||
func (e *int64Encoder) zigZagEncode(x int64) uint64 {
|
||||
return uint64(uint64(x<<1) ^ uint64((int64(x) >> 63)))
|
||||
}
|
||||
|
||||
func (e *int64Encoder) Bytes() ([]byte, error) {
|
||||
enc := simple8b.NewEncoder()
|
||||
|
||||
for _, v := range e.values {
|
||||
n := e.zigZagEncode(v)
|
||||
// Value is too large to encode using packed format
|
||||
if n > simple8b.MaxValue {
|
||||
return e.encodeUncompressed()
|
||||
}
|
||||
enc.Write(n)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append([]byte{EncodingPacked << 4}, b...), nil
|
||||
}
|
||||
|
||||
func (e *int64Encoder) encodeUncompressed() ([]byte, error) {
|
||||
b := make([]byte, 1+len(e.values)*8)
|
||||
// 4 high bits of first byte store the encoding type for the block
|
||||
b[0] = byte(EncodingUncompressed) << 4
|
||||
for i, v := range e.values {
|
||||
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
type int64Decoder struct {
|
||||
values []int64
|
||||
v int64
|
||||
}
|
||||
|
||||
func NewInt64Decoder(b []byte) *int64Decoder {
|
||||
d := &int64Decoder{}
|
||||
d.decode(b)
|
||||
return d
|
||||
}
|
||||
|
||||
func (d *int64Decoder) zigZagDecode(v uint64) int64 {
|
||||
return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63))
|
||||
}
|
||||
|
||||
func (d *int64Decoder) Next() bool {
|
||||
if len(d.values) == 0 {
|
||||
return false
|
||||
}
|
||||
d.v = d.values[0]
|
||||
d.values = d.values[1:]
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *int64Decoder) Read() int64 {
|
||||
return d.v
|
||||
}
|
||||
|
||||
func (d *int64Decoder) decode(b []byte) {
|
||||
if len(b) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Encoding type is stored in the 4 high bits of the first byte
|
||||
encoding := b[0] >> 4
|
||||
switch encoding {
|
||||
case EncodingUncompressed:
|
||||
d.decodeUncompressed(b[1:])
|
||||
case EncodingPacked:
|
||||
d.decodePacked(b[1:])
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown encoding %v", encoding))
|
||||
}
|
||||
}
|
||||
|
||||
func (d *int64Decoder) decodePacked(b []byte) {
|
||||
dec := simple8b.NewDecoder(b)
|
||||
for dec.Next() {
|
||||
d.values = append(d.values, d.zigZagDecode(dec.Read()))
|
||||
}
|
||||
}
|
||||
|
||||
func (d *int64Decoder) decodeUncompressed(b []byte) {
|
||||
d.values = make([]int64, len(b)/8)
|
||||
for i := range d.values {
|
||||
d.values[i] = int64(binary.BigEndian.Uint64(b[i*8 : i*8+8]))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
package pd1_test
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
)
|
||||
|
||||
func Test_Int64Encoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_One(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
v1 := int64(1)
|
||||
|
||||
enc.Write(1)
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_Two(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
var v1, v2 int64 = 1, 2
|
||||
|
||||
enc.Write(v1)
|
||||
enc.Write(v2)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_Negative(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
var v1, v2, v3 int64 = -2, 0, 1
|
||||
|
||||
enc.Write(v1)
|
||||
enc.Write(v2)
|
||||
enc.Write(v3)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v3 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_Large_Range(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
var v1, v2 int64 = math.MinInt64, math.MaxInt64
|
||||
enc.Write(v1)
|
||||
enc.Write(v2)
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_Uncompressed(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
var v1, v2, v3 int64 = 0, 1, 1 << 60
|
||||
|
||||
enc.Write(v1)
|
||||
enc.Write(v2)
|
||||
enc.Write(v3)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("expected error: %v", err)
|
||||
}
|
||||
|
||||
// 1 byte header + 3 * 8 byte values
|
||||
if exp := 25; len(b) != exp {
|
||||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if v3 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Int64Encoder_AllNegative(t *testing.T) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
values := []int64{
|
||||
-10, -5, -1,
|
||||
}
|
||||
|
||||
for _, v := range values {
|
||||
enc.Write(v)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewInt64Decoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if values[i] != dec.Read() {
|
||||
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i])
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInt64Encoder(b *testing.B) {
|
||||
enc := pd1.NewInt64Encoder()
|
||||
x := make([]int64, 1024)
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = int64(i)
|
||||
enc.Write(x[i])
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
enc.Bytes()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInt64Decoder(b *testing.B) {
|
||||
x := make([]int64, 1024)
|
||||
enc := pd1.NewInt64Encoder()
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = int64(i)
|
||||
enc.Write(x[i])
|
||||
}
|
||||
bytes, _ := enc.Bytes()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
dec := pd1.NewInt64Decoder(bytes)
|
||||
b.StartTimer()
|
||||
for dec.Next() {
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,21 +10,13 @@ package pd1
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/jwilder/encoding/simple8b"
|
||||
)
|
||||
|
||||
const (
|
||||
// EncodingPacked is a bit-packed format
|
||||
EncodingPacked = 0
|
||||
// EncodingRLE is a run-length encoded format
|
||||
EncodingRLE = 1
|
||||
// EncodingRAW is a non-compressed format
|
||||
EncodingRaw = 2
|
||||
)
|
||||
|
||||
// TimeEncoder encodes time.Time to byte slices.
|
||||
type TimeEncoder interface {
|
||||
Write(t time.Time)
|
||||
|
@ -152,7 +144,7 @@ func (e *encoder) encodePacked(min, div int64, dts []int64) ([]byte, error) {
|
|||
|
||||
func (e *encoder) encodeRaw() ([]byte, error) {
|
||||
b := make([]byte, 1+len(e.ts)*8)
|
||||
b[0] = byte(EncodingRaw) << 4
|
||||
b[0] = byte(EncodingUncompressed) << 4
|
||||
for i, v := range e.ts {
|
||||
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
|
||||
}
|
||||
|
@ -212,12 +204,14 @@ func (d *decoder) decode(b []byte) {
|
|||
// Encoding type is stored in the 4 high bits of the first byte
|
||||
encoding := b[0] >> 4
|
||||
switch encoding {
|
||||
case EncodingRaw:
|
||||
case EncodingUncompressed:
|
||||
d.decodeRaw(b[1:])
|
||||
case EncodingRLE:
|
||||
d.decodeRLE(b)
|
||||
default:
|
||||
case EncodingPacked:
|
||||
d.decodePacked(b)
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown encoding: %v", encoding))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue