diff --git a/tsdb/engine/tsm1/batch_integer.go b/tsdb/engine/tsm1/batch_integer.go index 9111589226..6ca074b587 100644 --- a/tsdb/engine/tsm1/batch_integer.go +++ b/tsdb/engine/tsm1/batch_integer.go @@ -22,64 +22,56 @@ func IntegerArrayEncodeAll(src []int64, b []byte) ([]byte, error) { return nil, nil // Nothing to do } - // Zigzag encode deltas of all provided values. - var prev int64 - var rle = true - var canpack = true + var max = uint64(0) // To prevent an allocation of the entire block we're encoding reuse the // src slice to store the encoded deltas. deltas := reintepretInt64ToUint64Slice(src) - - prev = src[0] - enc := ZigZagEncode(prev) - src[0] = int64(enc) - canpack = enc <= simple8b.MaxValue - - if len(src) > 1 { - delta := src[1] - prev - prev = src[1] - enc = ZigZagEncode(delta) - d0 := enc - src[1] = int64(enc) - canpack = canpack && enc <= simple8b.MaxValue - - for i := 2; i < len(src); i++ { - delta := src[i] - prev - prev = src[i] - enc = ZigZagEncode(delta) - src[i] = int64(enc) - rle = rle && d0 == enc - canpack = canpack && enc <= simple8b.MaxValue + for i := len(deltas) - 1; i > 0; i-- { + deltas[i] = deltas[i] - deltas[i-1] + deltas[i] = ZigZagEncode(int64(deltas[i])) + if deltas[i] > max { + max = deltas[i] } } - // Encode with RLE - if rle && len(deltas) > 2 { - // Large varints can take up to 10 bytes. We're storing 3 + 1 - // type byte. - if len(b) < 31 && cap(b) >= 31 { - b = b[:31] - } else if len(b) < 31 { - b = append(b, make([]byte, 31-len(b))...) + deltas[0] = ZigZagEncode(int64(deltas[0])) + + if len(deltas) > 2 { + var rle = true + for i := 2; i < len(deltas); i++ { + if deltas[1] != deltas[i] { + rle = false + break + } } - // 4 high bits used for the encoding type - b[0] = byte(intCompressedRLE) << 4 + if rle { + // Large varints can take up to 10 bytes. We're storing 3 + 1 + // type byte. + if len(b) < 31 && cap(b) >= 31 { + b = b[:31] + } else if len(b) < 31 { + b = append(b, make([]byte, 31-len(b))...) + } - i := 1 - // The first value - binary.BigEndian.PutUint64(b[i:], deltas[0]) - i += 8 - // The first delta - i += binary.PutUvarint(b[i:], deltas[1]) - // The number of times the delta is repeated - i += binary.PutUvarint(b[i:], uint64(len(deltas)-1)) + // 4 high bits used for the encoding type + b[0] = byte(intCompressedRLE) << 4 - return b[:i], nil + i := 1 + // The first value + binary.BigEndian.PutUint64(b[i:], deltas[0]) + i += 8 + // The first delta + i += binary.PutUvarint(b[i:], deltas[1]) + // The number of times the delta is repeated + i += binary.PutUvarint(b[i:], uint64(len(deltas)-1)) + + return b[:i], nil + } } - if !canpack { // There is an encoded value that's too big to simple8b encode. + if max > simple8b.MaxValue { // There is an encoded value that's too big to simple8b encode. // Encode uncompressed. sz := 1 + len(deltas)*8 if len(b) < sz && cap(b) >= sz { diff --git a/tsdb/engine/tsm1/batch_integer_test.go b/tsdb/engine/tsm1/batch_integer_test.go index 99d49ace0e..01089e2bd8 100644 --- a/tsdb/engine/tsm1/batch_integer_test.go +++ b/tsdb/engine/tsm1/batch_integer_test.go @@ -116,7 +116,7 @@ func testIntegerArrayEncodeAll_Compare(t *testing.T, input []int64, encoding byt } if got := result; !reflect.DeepEqual(got, exp) { - t.Fatalf("got result %v, expected %v", got, exp) + t.Fatalf("-got/+exp\n%s", cmp.Diff(got, exp)) } // Check that the encoders are byte for byte the same... @@ -291,10 +291,9 @@ func TestIntegerArrayEncodeAll_Negative(t *testing.T) { } func TestIntegerArrayEncodeAll_Large_Range(t *testing.T) { - var v1, v2 int64 = math.MinInt64, math.MaxInt64 + exp := []int64{math.MaxInt64, 0, math.MaxInt64} - src := []int64{v1, v2} - b, err := IntegerArrayEncodeAll(src, nil) + b, err := IntegerArrayEncodeAll(append([]int64{}, exp...), nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -305,20 +304,14 @@ func TestIntegerArrayEncodeAll_Large_Range(t *testing.T) { var dec IntegerDecoder dec.SetBytes(b) - if !dec.Next() { - t.Fatalf("unexpected next value: got true, exp false") + + var got []int64 + for dec.Next() { + got = append(got, dec.Read()) } - if v1 != dec.Read() { - t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v1) - } - - if !dec.Next() { - t.Fatalf("unexpected next value: got true, exp false") - } - - if v2 != dec.Read() { - t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), v2) + if !cmp.Equal(got, exp) { + t.Fatalf("unxpected result, -got/+exp\n%s", cmp.Diff(got, exp)) } } @@ -621,7 +614,7 @@ func TestIntegerArrayEncodeAll_MinMax(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if b[0]>>4 != intUncompressed { + if b[0]>>4 != intCompressedSimple { t.Fatalf("unexpected encoding format: expected simple, got %v", b[0]>>4) }