diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 7cef5fe3ee..626de66d52 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -75,7 +75,7 @@ var ( "none", "gor", } intEnc = []string{ - "none", "s8b", + "none", "s8b", "rle", } boolEnc = []string{ "none", "bp", diff --git a/tsdb/engine/tsm1/int.go b/tsdb/engine/tsm1/int.go index a3c262d3c7..25c16a9a32 100644 --- a/tsdb/engine/tsm1/int.go +++ b/tsdb/engine/tsm1/int.go @@ -32,6 +32,8 @@ const ( intUncompressed = 0 // intCompressedSimple is a bit-packed format using simple8b encoding intCompressedSimple = 1 + // intCompressedRLE is a run-length encoding format + intCompressedRLE = 2 ) // Int64Encoder encoders int64 into byte slices @@ -48,18 +50,34 @@ type Int64Decoder interface { } type int64Encoder struct { + prev int64 + rle bool values []uint64 } func NewInt64Encoder() Int64Encoder { - return &int64Encoder{} + return &int64Encoder{rle: true} } func (e *int64Encoder) Write(v int64) { - e.values = append(e.values, ZigZagEncode(v)) + // Delta-encode each value as it's written. This happens before + // ZigZagEncoding because the deltas could be negative. + delta := v - e.prev + e.prev = v + enc := ZigZagEncode(delta) + if len(e.values) > 1 { + e.rle = e.rle && e.values[len(e.values)-1] == enc + } + + e.values = append(e.values, enc) } func (e *int64Encoder) Bytes() ([]byte, error) { + // Only run-length encode if it could be reduce storage size + if e.rle && len(e.values) > 2 { + return e.encodeRLE() + } + for _, v := range e.values { // Value is too large to encode using packed format if v > simple8b.MaxValue { @@ -70,23 +88,56 @@ func (e *int64Encoder) Bytes() ([]byte, error) { return e.encodePacked() } +func (e *int64Encoder) encodeRLE() ([]byte, error) { + // Large varints can take up to 10 bytes + b := make([]byte, 1+10*3) + + // 4 high bits used for the encoding type + b[0] = byte(intCompressedRLE) << 4 + + i := 1 + // The first value + binary.BigEndian.PutUint64(b[i:], e.values[0]) + i += 8 + // The first delta + i += binary.PutUvarint(b[i:], e.values[1]) + // The number of times the delta is repeated + i += binary.PutUvarint(b[i:], uint64(len(e.values)-1)) + + return b[:i], nil +} + func (e *int64Encoder) encodePacked() ([]byte, error) { - encoded, err := simple8b.EncodeAll(e.values) + if len(e.values) == 0 { + return nil, nil + } + + // Encode all but the first value. Fist value is written unencoded + // using 8 bytes. + encoded, err := simple8b.EncodeAll(e.values[1:]) if err != nil { return nil, err } - b := make([]byte, 1+len(encoded)*8) + b := make([]byte, 1+(len(encoded)+1)*8) // 4 high bits of first byte store the encoding type for the block b[0] = byte(intCompressedSimple) << 4 + // Write the first value since it's not part of the encoded values + binary.BigEndian.PutUint64(b[1:9], e.values[0]) + + // Write the encoded values for i, v := range encoded { - binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v) + binary.BigEndian.PutUint64(b[9+i*8:9+i*8+8], v) } return b, nil } func (e *int64Encoder) encodeUncompressed() ([]byte, error) { + if len(e.values) == 0 { + return nil, nil + } + b := make([]byte, 1+len(e.values)*8) // 4 high bits of first byte store the encoding type for the block b[0] = byte(intUncompressed) << 4 @@ -102,7 +153,14 @@ type int64Decoder struct { bytes []byte i int n int + prev int64 + first bool + // The first value for a run-length encoded byte slice + rleFirst uint64 + + // The delta value for a run-length encoded byte slice + rleDelta uint64 encoding byte err error } @@ -122,6 +180,7 @@ func (d *int64Decoder) SetBytes(b []byte) { d.encoding = b[0] >> 4 d.bytes = b[1:] } + d.first = true d.i = 0 d.n = 0 } @@ -139,6 +198,8 @@ func (d *int64Decoder) Next() bool { d.decodeUncompressed() case intCompressedSimple: d.decodePacked() + case intCompressedRLE: + d.decodeRLE() default: d.err = fmt.Errorf("unknown encoding %v", d.encoding) } @@ -151,7 +212,48 @@ func (d *int64Decoder) Error() error { } func (d *int64Decoder) Read() int64 { - return ZigZagDecode(d.values[d.i]) + switch d.encoding { + case intCompressedRLE: + return ZigZagDecode(d.rleFirst + uint64(d.i)*d.rleDelta) + default: + v := ZigZagDecode(d.values[d.i]) + // v is the delta encoded value, we need to add the prior value to get the original + v = v + d.prev + d.prev = v + return v + + } +} + +func (d *int64Decoder) decodeRLE() { + if len(d.bytes) == 0 { + return + } + + var i, n int + + // Next 8 bytes is the starting value + first := binary.BigEndian.Uint64(d.bytes[i : i+8]) + i += 8 + + // Next 1-10 bytes is the delta value + value, n := binary.Uvarint(d.bytes[i:]) + + i += n + + // Last 1-10 bytes is how many times the value repeats + count, n := binary.Uvarint(d.bytes[i:]) + + // Store the first value and delta value so we do not need to allocate + // a large values slice. We can compute the value at position d.i on + // demand. + d.rleFirst = first + d.rleDelta = value + d.n = int(count) + 1 + d.i = 0 + + // We've process all the bytes + d.bytes = nil } func (d *int64Decoder) decodePacked() { @@ -160,14 +262,21 @@ func (d *int64Decoder) decodePacked() { } v := binary.BigEndian.Uint64(d.bytes[0:8]) - n, err := simple8b.Decode(d.values, v) - if err != nil { - // Should never happen, only error that could be returned is if the the value to be decoded was not - // actually encoded by simple8b encoder. - d.err = fmt.Errorf("failed to decode value %v: %v", v, err) - } + // The first value is always unencoded + if d.first { + d.first = false + d.n = 1 + d.values[0] = v + } else { + n, err := simple8b.Decode(d.values, v) + if err != nil { + // Should never happen, only error that could be returned is if the the value to be decoded was not + // actually encoded by simple8b encoder. + d.err = fmt.Errorf("failed to decode value %v: %v", v, err) + } - d.n = n + d.n = n + } d.i = 0 d.bytes = d.bytes[8:] } diff --git a/tsdb/engine/tsm1/int_test.go b/tsdb/engine/tsm1/int_test.go index a74d27e082..dde37ab599 100644 --- a/tsdb/engine/tsm1/int_test.go +++ b/tsdb/engine/tsm1/int_test.go @@ -1,29 +1,32 @@ -package tsm1_test +package tsm1 import ( "math" + "math/rand" "reflect" "testing" "testing/quick" - - "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) func Test_Int64Encoder_NoValues(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if len(b) > 0 { + t.Fatalf("unexpected lenght: exp 0, got %v", len(b)) + } + + dec := NewInt64Decoder(b) if dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } } func Test_Int64Encoder_One(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() v1 := int64(1) enc.Write(1) @@ -32,7 +35,11 @@ func Test_Int64Encoder_One(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intCompressedSimple != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -43,7 +50,7 @@ func Test_Int64Encoder_One(t *testing.T) { } func Test_Int64Encoder_Two(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() var v1, v2 int64 = 1, 2 enc.Write(v1) @@ -54,7 +61,11 @@ func Test_Int64Encoder_Two(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intCompressedSimple != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -73,7 +84,7 @@ func Test_Int64Encoder_Two(t *testing.T) { } func Test_Int64Encoder_Negative(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() var v1, v2, v3 int64 = -2, 0, 1 enc.Write(v1) @@ -85,7 +96,11 @@ func Test_Int64Encoder_Negative(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intCompressedSimple != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -112,7 +127,7 @@ func Test_Int64Encoder_Negative(t *testing.T) { } func Test_Int64Encoder_Large_Range(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() var v1, v2 int64 = math.MinInt64, math.MaxInt64 enc.Write(v1) enc.Write(v2) @@ -121,7 +136,11 @@ func Test_Int64Encoder_Large_Range(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intUncompressed != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -140,7 +159,7 @@ func Test_Int64Encoder_Large_Range(t *testing.T) { } func Test_Int64Encoder_Uncompressed(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() var v1, v2, v3 int64 = 0, 1, 1 << 60 enc.Write(v1) @@ -157,7 +176,11 @@ func Test_Int64Encoder_Uncompressed(t *testing.T) { t.Fatalf("length mismatch: got %v, exp %v", len(b), exp) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intUncompressed != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -194,7 +217,7 @@ func Test_Int64Encoder_NegativeUncompressed(t *testing.T) { 2761419461769776844, -1324397441074946198, -680758138988210958, 94468846694902125, -2394093124890745254, -2682139311758778198, } - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() for _, v := range values { enc.Write(v) } @@ -204,7 +227,11 @@ func Test_Int64Encoder_NegativeUncompressed(t *testing.T) { t.Fatalf("expected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intUncompressed != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) i := 0 for dec.Next() { @@ -224,7 +251,7 @@ func Test_Int64Encoder_NegativeUncompressed(t *testing.T) { } func Test_Int64Encoder_AllNegative(t *testing.T) { - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() values := []int64{ -10, -5, -1, } @@ -238,7 +265,11 @@ func Test_Int64Encoder_AllNegative(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewInt64Decoder(b) + if got := b[0] >> 4; intCompressedSimple != got { + t.Fatalf("encoding type mismatch: exp uncompressed, got %v", got) + } + + dec := NewInt64Decoder(b) i := 0 for dec.Next() { if i > len(values) { @@ -254,13 +285,139 @@ func Test_Int64Encoder_AllNegative(t *testing.T) { if i != len(values) { t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values)) } +} +func Test_Int64Encoder_CounterPacked(t *testing.T) { + enc := NewInt64Encoder() + values := []int64{ + 1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 6, + } + + for _, v := range values { + enc.Write(v) + } + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b[0]>>4 != intCompressedSimple { + t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4) + } + + // Should use 1 header byte + 2, 8 byte words if delta-encoding is used based on + // values sizes. Without delta-encoding, we'd get 49 bytes. + if exp := 17; len(b) != exp { + t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp) + } + + dec := NewInt64Decoder(b) + i := 0 + for dec.Next() { + if i > len(values) { + t.Fatalf("read too many values: got %v, exp %v", i, len(values)) + } + + if values[i] != dec.Read() { + t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i]) + } + i += 1 + } + + if i != len(values) { + t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values)) + } +} + +func Test_Int64Encoder_CounterRLE(t *testing.T) { + enc := NewInt64Encoder() + values := []int64{ + 1e15, 1e15 + 1, 1e15 + 2, 1e15 + 3, 1e15 + 4, 1e15 + 5, + } + + for _, v := range values { + enc.Write(v) + } + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b[0]>>4 != intCompressedRLE { + t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4) + } + + // Should use 1 header byte, 8 byte first value, 1 var-byte for delta and 1 var-byte for + // count of deltas in this particular RLE. + if exp := 11; len(b) != exp { + t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp) + } + + dec := NewInt64Decoder(b) + i := 0 + for dec.Next() { + if i > len(values) { + t.Fatalf("read too many values: got %v, exp %v", i, len(values)) + } + + if values[i] != dec.Read() { + t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i]) + } + i += 1 + } + + if i != len(values) { + t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values)) + } +} + +func Test_Int64Encoder_MinMax(t *testing.T) { + enc := NewInt64Encoder() + values := []int64{ + math.MinInt64, math.MaxInt64, + } + + for _, v := range values { + enc.Write(v) + } + + b, err := enc.Bytes() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b[0]>>4 != intUncompressed { + t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4) + } + + if exp := 17; len(b) != exp { + t.Fatalf("encoded length mismatch: got %v, exp %v", len(b), exp) + } + + dec := NewInt64Decoder(b) + i := 0 + for dec.Next() { + if i > len(values) { + t.Fatalf("read too many values: got %v, exp %v", i, len(values)) + } + + if values[i] != dec.Read() { + t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), values[i]) + } + i += 1 + } + + if i != len(values) { + t.Fatalf("failed to read enough values: got %v, exp %v", i, len(values)) + } } func Test_Int64Encoder_Quick(t *testing.T) { quick.Check(func(values []int64) bool { // Write values to encoder. - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() for _, v := range values { enc.Write(v) } @@ -273,7 +430,7 @@ func Test_Int64Encoder_Quick(t *testing.T) { // Read values out of decoder. got := make([]int64, 0, len(values)) - dec := tsm1.NewInt64Decoder(buf) + dec := NewInt64Decoder(buf) for dec.Next() { if err := dec.Error(); err != nil { t.Fatal(err) @@ -290,8 +447,8 @@ func Test_Int64Encoder_Quick(t *testing.T) { }, nil) } -func BenchmarkInt64Encoder(b *testing.B) { - enc := tsm1.NewInt64Encoder() +func BenchmarkInt64EncoderRLE(b *testing.B) { + enc := NewInt64Encoder() x := make([]int64, 1024) for i := 0; i < len(x); i++ { x[i] = int64(i) @@ -304,13 +461,49 @@ func BenchmarkInt64Encoder(b *testing.B) { } } +func BenchmarkInt64EncoderPackedSimple(b *testing.B) { + enc := NewInt64Encoder() + x := make([]int64, 1024) + for i := 0; i < len(x); i++ { + // Small amount of randomness prevents RLE from being used + x[i] = int64(i) + int64(rand.Intn(10)) + enc.Write(x[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + enc.Bytes() + } +} + type byteSetter interface { SetBytes(b []byte) } -func BenchmarkInt64Decoder(b *testing.B) { +func BenchmarkInt64DecoderPackedSimple(b *testing.B) { x := make([]int64, 1024) - enc := tsm1.NewInt64Encoder() + enc := NewInt64Encoder() + for i := 0; i < len(x); i++ { + // Small amount of randomness prevents RLE from being used + x[i] = int64(i) + int64(rand.Intn(10)) + enc.Write(x[i]) + } + bytes, _ := enc.Bytes() + + b.ResetTimer() + + dec := NewInt64Decoder(bytes) + + for i := 0; i < b.N; i++ { + dec.(byteSetter).SetBytes(bytes) + for dec.Next() { + } + } +} + +func BenchmarkInt64DecoderRLE(b *testing.B) { + x := make([]int64, 1024) + enc := NewInt64Encoder() for i := 0; i < len(x); i++ { x[i] = int64(i) enc.Write(x[i]) @@ -319,7 +512,7 @@ func BenchmarkInt64Decoder(b *testing.B) { b.ResetTimer() - dec := tsm1.NewInt64Decoder(bytes) + dec := NewInt64Decoder(bytes) for i := 0; i < b.N; i++ { dec.(byteSetter).SetBytes(bytes)