From 4a37ba868d6a126c67569d8c3c42a55ed6c1d1fd Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 24 Sep 2015 14:29:51 -0600 Subject: [PATCH] Add int64 compression This is using zig zag encoding to convert int64 to uint64s and then using simple8b to compress them, falling back to uncompressed if the value exceeds 1 << 60. A patched encoding scheme would likely be better in general but this provides decent compression for integers that are not at the ends of the int64 range. --- tsdb/engine/pd1/encoding.go | 86 ++++++++++- tsdb/engine/pd1/encoding_test.go | 40 +++++ tsdb/engine/pd1/int.go | 113 +++++++++++++++ tsdb/engine/pd1/int_test.go | 241 +++++++++++++++++++++++++++++++ tsdb/engine/pd1/timestamp.go | 18 +-- 5 files changed, 480 insertions(+), 18 deletions(-) create mode 100644 tsdb/engine/pd1/int.go create mode 100644 tsdb/engine/pd1/int_test.go diff --git a/tsdb/engine/pd1/encoding.go b/tsdb/engine/pd1/encoding.go index 6b29913e0e..055048620c 100644 --- a/tsdb/engine/pd1/encoding.go +++ b/tsdb/engine/pd1/encoding.go @@ -1,12 +1,23 @@ package pd1 import ( + "encoding/binary" + "fmt" "sort" "time" "github.com/influxdb/influxdb/tsdb" ) +const ( + // EncodingPacked is a bit-packed format + EncodingPacked = 0 + // EncodingRLE is a run-length encoded format + EncodingRLE = 1 + // EncodingUncompressed is a non-compressed format + EncodingUncompressed = 2 +) + type Value interface { Time() time.Time UnixNano() int64 @@ -16,8 +27,8 @@ type Value interface { func NewValue(t time.Time, value interface{}) Value { switch v := value.(type) { - // case int64: - // return &Int64Value{time: t, value: v} + case int64: + return &Int64Value{time: t, value: v} case float64: return &FloatValue{time: t, value: v} // case bool: @@ -58,6 +69,13 @@ func (v Values) Encode(buf []byte) []byte { } return EncodeFloatBlock(buf, a) + case *Int64Value: + a := make([]*Int64Value, len(v)) + for i, vv := range v { + a[i] = vv.(*Int64Value) + } + return EncodeInt64Block(buf, a) + // TODO: add support for other types } @@ -69,6 +87,9 @@ func (v Values) DecodeSameTypeBlock(block []byte) Values { case *FloatValue: a, _ := DecodeFloatBlock(block) return a + case *Int64Value: + a, _ := DecodeInt64Block(block) + return a // TODO: add support for other types } @@ -200,12 +221,65 @@ type Int64Value struct { value int64 } -func EncodeInt64Block(buf []byte, values []Int64Value) []byte { - return nil +func (v *Int64Value) Time() time.Time { + return v.time } -func DecodeInt64Block(block []byte) ([]Int64Value, error) { - return nil, nil +func (v *Int64Value) Value() interface{} { + return v.value +} + +func (f *Int64Value) UnixNano() int64 { + return f.time.UnixNano() +} + +func (v *Int64Value) Size() int { + return 16 +} + +func (v *Int64Value) String() string { return fmt.Sprintf("%v", v.value) } + +func EncodeInt64Block(buf []byte, values []*Int64Value) []byte { + tsEnc := NewTimeEncoder() + vEnc := NewInt64Encoder() + for _, v := range values { + tsEnc.Write(v.Time()) + vEnc.Write(v.value) + } + + // Encoded timestamp values + tb, err := tsEnc.Bytes() + if err != nil { + panic(err.Error()) + } + // Encoded int64 values + vb, err := vEnc.Bytes() + if err != nil { + panic(err.Error()) + } + + // Preprend the first timestamp of the block in the first 8 bytes + return append(u64tob(uint64(values[0].Time().UnixNano())), + packBlock(tb, vb)...) +} + +func DecodeInt64Block(block []byte) ([]Value, error) { + // The first 8 bytes is the minimum timestamp of the block + tb, vb := unpackBlock(block[8:]) + + // Setup our timestamp and value decoders + tsDec := NewTimeDecoder(tb) + vDec := NewInt64Decoder(vb) + + // Decode both a timestamp and value + var a []Value + for tsDec.Next() && vDec.Next() { + ts := tsDec.Read() + v := vDec.Read() + a = append(a, &Int64Value{ts, v}) + } + + return a, nil } type StringValue struct { diff --git a/tsdb/engine/pd1/encoding_test.go b/tsdb/engine/pd1/encoding_test.go index 02598a764d..49006085d7 100644 --- a/tsdb/engine/pd1/encoding_test.go +++ b/tsdb/engine/pd1/encoding_test.go @@ -1,6 +1,8 @@ package pd1_test import ( + // "math/rand" + "reflect" "testing" "time" @@ -25,6 +27,44 @@ func TestEncoding_FloatBlock(t *testing.T) { } } +func TestEncoding_IntBlock(t *testing.T) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make(pd1.Values, len(times)) + for i, t := range times { + values[i] = pd1.NewValue(t, int64(i)) + } + + b := values.Encode(nil) + + decodedValues := values.DecodeSameTypeBlock(b) + + if !reflect.DeepEqual(decodedValues, values) { + t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values) + } +} + +func TestEncoding_IntBlock_Negatives(t *testing.T) { + valueCount := 1000 + times := getTimes(valueCount, 60, time.Second) + values := make(pd1.Values, len(times)) + for i, t := range times { + v := int64(i) + if i%2 == 0 { + v = -v + } + values[i] = pd1.NewValue(t, int64(v)) + } + + b := values.Encode(nil) + + decodedValues := values.DecodeSameTypeBlock(b) + + if !reflect.DeepEqual(decodedValues, values) { + t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values) + } +} + func getTimes(n, step int, precision time.Duration) []time.Time { t := time.Now().Round(precision) a := make([]time.Time, n) diff --git a/tsdb/engine/pd1/int.go b/tsdb/engine/pd1/int.go new file mode 100644 index 0000000000..04ddc1a0f4 --- /dev/null +++ b/tsdb/engine/pd1/int.go @@ -0,0 +1,113 @@ +package pd1 + +import ( + "encoding/binary" + "fmt" + + "github.com/jwilder/encoding/simple8b" +) + +type int64Encoder struct { + values []int64 +} + +func NewInt64Encoder() *int64Encoder { + return &int64Encoder{} +} + +func (e *int64Encoder) Write(v int64) { + e.values = append(e.values, v) +} + +func (e *int64Encoder) zigZagEncode(x int64) uint64 { + return uint64(uint64(x<<1) ^ uint64((int64(x) >> 63))) +} + +func (e *int64Encoder) Bytes() ([]byte, error) { + enc := simple8b.NewEncoder() + + for _, v := range e.values { + n := e.zigZagEncode(v) + // Value is too large to encode using packed format + if n > simple8b.MaxValue { + return e.encodeUncompressed() + } + enc.Write(n) + } + + b, err := enc.Bytes() + if err != nil { + return nil, err + } + + return append([]byte{EncodingPacked << 4}, b...), nil +} + +func (e *int64Encoder) encodeUncompressed() ([]byte, error) { + b := make([]byte, 1+len(e.values)*8) + // 4 high bits of first byte store the encoding type for the block + b[0] = byte(EncodingUncompressed) << 4 + for i, v := range e.values { + binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v)) + } + return b, nil +} + +type int64Decoder struct { + values []int64 + v int64 +} + +func NewInt64Decoder(b []byte) *int64Decoder { + d := &int64Decoder{} + d.decode(b) + return d +} + +func (d *int64Decoder) zigZagDecode(v uint64) int64 { + return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63)) +} + +func (d *int64Decoder) Next() bool { + if len(d.values) == 0 { + return false + } + d.v = d.values[0] + d.values = d.values[1:] + return true +} + +func (d *int64Decoder) Read() int64 { + return d.v +} + +func (d *int64Decoder) decode(b []byte) { + if len(b) == 0 { + return + } + + // Encoding type is stored in the 4 high bits of the first byte + encoding := b[0] >> 4 + switch encoding { + case EncodingUncompressed: + d.decodeUncompressed(b[1:]) + case EncodingPacked: + d.decodePacked(b[1:]) + default: + panic(fmt.Sprintf("unknown encoding %v", encoding)) + } +} + +func (d *int64Decoder) decodePacked(b []byte) { + dec := simple8b.NewDecoder(b) + for dec.Next() { + d.values = append(d.values, d.zigZagDecode(dec.Read())) + } +} + +func (d *int64Decoder) decodeUncompressed(b []byte) { + d.values = make([]int64, len(b)/8) + for i := range d.values { + d.values[i] = int64(binary.BigEndian.Uint64(b[i*8 : i*8+8])) + } +} diff --git a/tsdb/engine/pd1/int_test.go b/tsdb/engine/pd1/int_test.go new file mode 100644 index 0000000000..82042f77bd --- /dev/null +++ b/tsdb/engine/pd1/int_test.go @@ -0,0 +1,241 @@ +package pd1_test + +import ( + "math" + "testing" + + "github.com/influxdb/influxdb/tsdb/engine/pd1" +) + +func Test_Int64Encoder_NoValues(t *testing.T) { + enc := pd1.NewInt64Encoder() + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + if dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } +} + +func Test_Int64Encoder_One(t *testing.T) { + enc := pd1.NewInt64Encoder() + v1 := int64(1) + + enc.Write(1) + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v1 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) + } +} + +func Test_Int64Encoder_Two(t *testing.T) { + enc := pd1.NewInt64Encoder() + var v1, v2 int64 = 1, 2 + + enc.Write(v1) + enc.Write(v2) + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v1 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v2 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2) + } +} + +func Test_Int64Encoder_Negative(t *testing.T) { + enc := pd1.NewInt64Encoder() + var v1, v2, v3 int64 = -2, 0, 1 + + enc.Write(v1) + enc.Write(v2) + enc.Write(v3) + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v1 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v2 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v3 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3) + } +} + +func Test_Int64Encoder_Large_Range(t *testing.T) { + enc := pd1.NewInt64Encoder() + var v1, v2 int64 = math.MinInt64, math.MaxInt64 + enc.Write(v1) + enc.Write(v2) + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v1 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v2 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2) + } +} + +func Test_Int64Encoder_Uncompressed(t *testing.T) { + enc := pd1.NewInt64Encoder() + var v1, v2, v3 int64 = 0, 1, 1 << 60 + + enc.Write(v1) + enc.Write(v2) + enc.Write(v3) + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("expected error: %v", err) + } + + // 1 byte header + 3 * 8 byte values + if exp := 25; len(b) != exp { + t.Fatalf("length mismatch: got %v, exp %v", len(b), exp) + } + + dec := pd1.NewInt64Decoder(b) + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v1 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v2 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2) + } + + if !dec.Next() { + t.Fatalf("unexpected next value: got true, exp false") + } + + if v3 != dec.Read() { + t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v3) + } +} + +func Test_Int64Encoder_AllNegative(t *testing.T) { + enc := pd1.NewInt64Encoder() + values := []int64{ + -10, -5, -1, + } + + for _, v := range values { + enc.Write(v) + } + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := pd1.NewInt64Decoder(b) + i := 0 + for dec.Next() { + if values[i] != dec.Read() { + t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i]) + } + i += 1 + } +} + +func BenchmarkInt64Encoder(b *testing.B) { + enc := pd1.NewInt64Encoder() + x := make([]int64, 1024) + for i := 0; i < len(x); i++ { + x[i] = int64(i) + enc.Write(x[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + enc.Bytes() + } +} + +func BenchmarkInt64Decoder(b *testing.B) { + x := make([]int64, 1024) + enc := pd1.NewInt64Encoder() + for i := 0; i < len(x); i++ { + x[i] = int64(i) + enc.Write(x[i]) + } + bytes, _ := enc.Bytes() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + dec := pd1.NewInt64Decoder(bytes) + b.StartTimer() + for dec.Next() { + } + } +} diff --git a/tsdb/engine/pd1/timestamp.go b/tsdb/engine/pd1/timestamp.go index b83199ef04..64907bb760 100644 --- a/tsdb/engine/pd1/timestamp.go +++ b/tsdb/engine/pd1/timestamp.go @@ -10,21 +10,13 @@ package pd1 import ( "encoding/binary" + "fmt" "math" "time" "github.com/jwilder/encoding/simple8b" ) -const ( - // EncodingPacked is a bit-packed format - EncodingPacked = 0 - // EncodingRLE is a run-length encoded format - EncodingRLE = 1 - // EncodingRAW is a non-compressed format - EncodingRaw = 2 -) - // TimeEncoder encodes time.Time to byte slices. type TimeEncoder interface { Write(t time.Time) @@ -152,7 +144,7 @@ func (e *encoder) encodePacked(min, div int64, dts []int64) ([]byte, error) { func (e *encoder) encodeRaw() ([]byte, error) { b := make([]byte, 1+len(e.ts)*8) - b[0] = byte(EncodingRaw) << 4 + b[0] = byte(EncodingUncompressed) << 4 for i, v := range e.ts { binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v)) } @@ -212,12 +204,14 @@ func (d *decoder) decode(b []byte) { // Encoding type is stored in the 4 high bits of the first byte encoding := b[0] >> 4 switch encoding { - case EncodingRaw: + case EncodingUncompressed: d.decodeRaw(b[1:]) case EncodingRLE: d.decodeRLE(b) - default: + case EncodingPacked: d.decodePacked(b) + default: + panic(fmt.Sprintf("unknown encoding: %v", encoding)) } }