Add time and float compression
Time compression uses an adaptive approach using delta-encoding, frame-of-reference, run length encoding as well as compressed integer encoding. Float compression uses an implementation of the Gorilla paper encoding for timestamps based on XOR deltas and leading and trailing null suppression.pull/4308/head
parent
112a03f24c
commit
42e1babe7f
|
@ -4,7 +4,6 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/dgryski/go-tsz"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
|
@ -127,23 +126,59 @@ func (f *FloatValue) Size() int {
|
|||
return 16
|
||||
}
|
||||
|
||||
// TODO: make this work with nanosecond timestamps
|
||||
func EncodeFloatBlock(buf []byte, values []*FloatValue) []byte {
|
||||
s := tsz.New(uint32(values[0].Time().Unix()))
|
||||
for _, v := range values {
|
||||
s.Push(uint32(v.Time().Unix()), v.value)
|
||||
if len(values) == 0 {
|
||||
return []byte{}
|
||||
}
|
||||
s.Finish()
|
||||
return append(u64tob(uint64(values[0].Time().UnixNano())), s.Bytes()...)
|
||||
|
||||
// A float block is encoded using different compression strategies
|
||||
// for timestamps and values.
|
||||
|
||||
// Encode values using Gorilla float compression
|
||||
venc := NewFloatEncoder()
|
||||
|
||||
// Encode timestamps using an adaptive encoder that uses delta-encoding,
|
||||
// frame-or-reference and run length encoding.
|
||||
tsenc := NewTimeEncoder()
|
||||
|
||||
for _, v := range values {
|
||||
tsenc.Write(v.Time())
|
||||
venc.Push(v.value)
|
||||
}
|
||||
venc.Finish()
|
||||
|
||||
// Encoded timestamp values
|
||||
tb, err := tsenc.Bytes()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
// Encoded float values
|
||||
vb := venc.Bytes()
|
||||
|
||||
// Preprend the first timestamp of the block in the first 8 bytes
|
||||
return append(u64tob(uint64(values[0].Time().UnixNano())),
|
||||
packBlock(tb, vb)...)
|
||||
}
|
||||
|
||||
func DecodeFloatBlock(block []byte) ([]Value, error) {
|
||||
iter, _ := tsz.NewIterator(block[8:])
|
||||
a := make([]Value, 0)
|
||||
for iter.Next() {
|
||||
t, f := iter.Values()
|
||||
a = append(a, &FloatValue{time.Unix(int64(t), 0), f})
|
||||
// The first 8 bytes is the minimum timestamp of the block
|
||||
tb, vb := unpackBlock(block[8:])
|
||||
|
||||
// Setup our timestamp and value decoders
|
||||
dec := NewTimeDecoder(tb)
|
||||
iter, err := NewFloatDecoder(vb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decode both a timestamp and value
|
||||
var a []Value
|
||||
for dec.Next() && iter.Next() {
|
||||
ts := dec.Read()
|
||||
v := iter.Values()
|
||||
a = append(a, &FloatValue{ts, v})
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
|
@ -181,3 +216,29 @@ type StringValue struct {
|
|||
func EncodeStringBlock(buf []byte, values []StringValue) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func packBlock(ts []byte, values []byte) []byte {
|
||||
// We encode the length of the timestamp block using a variable byte encoding.
|
||||
// This allows small byte slices to take up 1 byte while larger ones use 2 or more.
|
||||
b := make([]byte, 10)
|
||||
i := binary.PutUvarint(b, uint64(len(ts)))
|
||||
|
||||
// block is <len timestamp bytes>, <ts bytes>, <value bytes>
|
||||
block := append(b[:i], ts...)
|
||||
|
||||
// We don't encode the value length because we know it's the rest of the block after
|
||||
// the timestamp block.
|
||||
return append(block, values...)
|
||||
}
|
||||
|
||||
func unpackBlock(buf []byte) (ts, values []byte) {
|
||||
// Unpack the timestamp block length
|
||||
tsLen, i := binary.Uvarint(buf)
|
||||
|
||||
// Unpack the timestamp bytes
|
||||
ts = buf[int(i) : int(i)+int(tsLen)]
|
||||
|
||||
// Unpack the value bytes
|
||||
values = buf[int(i)+int(tsLen):]
|
||||
return
|
||||
}
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
package pd1
|
||||
|
||||
/*
|
||||
This code is originally from: https://github.com/dgryski/go-tsz and has been modified to remove
|
||||
the timestamp compression fuctionality.
|
||||
|
||||
It implements the float compression as presented in: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
|
||||
This implementation uses a sentinel value of NaN which means that float64 NaN cannot be stored using
|
||||
this version.
|
||||
*/
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
|
||||
"github.com/dgryski/go-bits"
|
||||
"github.com/dgryski/go-bitstream"
|
||||
)
|
||||
|
||||
type FloatEncoder struct {
|
||||
val float64
|
||||
|
||||
leading uint64
|
||||
trailing uint64
|
||||
|
||||
buf bytes.Buffer
|
||||
bw *bitstream.BitWriter
|
||||
|
||||
first bool
|
||||
finished bool
|
||||
}
|
||||
|
||||
func NewFloatEncoder() *FloatEncoder {
|
||||
s := FloatEncoder{
|
||||
first: true,
|
||||
leading: ^uint64(0),
|
||||
}
|
||||
|
||||
s.bw = bitstream.NewWriter(&s.buf)
|
||||
|
||||
return &s
|
||||
|
||||
}
|
||||
|
||||
func (s *FloatEncoder) Bytes() []byte {
|
||||
return s.buf.Bytes()
|
||||
}
|
||||
|
||||
func (s *FloatEncoder) Finish() {
|
||||
|
||||
if !s.finished {
|
||||
// // write an end-of-stream record
|
||||
s.Push(math.NaN())
|
||||
s.bw.Flush(bitstream.Zero)
|
||||
s.finished = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FloatEncoder) Push(v float64) {
|
||||
|
||||
if s.first {
|
||||
// first point
|
||||
s.val = v
|
||||
s.first = false
|
||||
s.bw.WriteBits(math.Float64bits(v), 64)
|
||||
return
|
||||
}
|
||||
|
||||
vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)
|
||||
|
||||
if vDelta == 0 {
|
||||
s.bw.WriteBit(bitstream.Zero)
|
||||
} else {
|
||||
s.bw.WriteBit(bitstream.One)
|
||||
|
||||
leading := bits.Clz(vDelta)
|
||||
trailing := bits.Ctz(vDelta)
|
||||
|
||||
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
|
||||
if s.leading != ^uint64(0) && leading >= s.leading && trailing >= s.trailing {
|
||||
s.bw.WriteBit(bitstream.Zero)
|
||||
s.bw.WriteBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
|
||||
} else {
|
||||
s.leading, s.trailing = leading, trailing
|
||||
|
||||
s.bw.WriteBit(bitstream.One)
|
||||
s.bw.WriteBits(leading, 5)
|
||||
|
||||
sigbits := 64 - leading - trailing
|
||||
s.bw.WriteBits(sigbits, 6)
|
||||
s.bw.WriteBits(vDelta>>trailing, int(sigbits))
|
||||
}
|
||||
}
|
||||
|
||||
s.val = v
|
||||
}
|
||||
|
||||
func (s *FloatEncoder) FloatDecoder() *FloatDecoder {
|
||||
iter, _ := NewFloatDecoder(s.buf.Bytes())
|
||||
return iter
|
||||
}
|
||||
|
||||
type FloatDecoder struct {
|
||||
val float64
|
||||
|
||||
leading uint64
|
||||
trailing uint64
|
||||
|
||||
br *bitstream.BitReader
|
||||
|
||||
b []byte
|
||||
|
||||
first bool
|
||||
finished bool
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func NewFloatDecoder(b []byte) (*FloatDecoder, error) {
|
||||
br := bitstream.NewReader(bytes.NewReader(b))
|
||||
|
||||
v, err := br.ReadBits(64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &FloatDecoder{
|
||||
val: math.Float64frombits(v),
|
||||
first: true,
|
||||
br: br,
|
||||
b: b,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (it *FloatDecoder) Next() bool {
|
||||
if it.err != nil || it.finished {
|
||||
return false
|
||||
}
|
||||
|
||||
if it.first {
|
||||
it.first = false
|
||||
return true
|
||||
}
|
||||
|
||||
// read compressed value
|
||||
bit, err := it.br.ReadBit()
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
if bit == bitstream.Zero {
|
||||
// it.val = it.val
|
||||
} else {
|
||||
bit, err := it.br.ReadBit()
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
if bit == bitstream.Zero {
|
||||
// reuse leading/trailing zero bits
|
||||
// it.leading, it.trailing = it.leading, it.trailing
|
||||
} else {
|
||||
bits, err := it.br.ReadBits(5)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.leading = bits
|
||||
|
||||
bits, err = it.br.ReadBits(6)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
mbits := bits
|
||||
it.trailing = 64 - it.leading - mbits
|
||||
}
|
||||
|
||||
mbits := int(64 - it.leading - it.trailing)
|
||||
bits, err := it.br.ReadBits(mbits)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
vbits := math.Float64bits(it.val)
|
||||
vbits ^= (bits << it.trailing)
|
||||
|
||||
val := math.Float64frombits(vbits)
|
||||
if math.IsNaN(val) {
|
||||
it.finished = true
|
||||
return false
|
||||
}
|
||||
it.val = val
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (it *FloatDecoder) Values() float64 {
|
||||
return it.val
|
||||
}
|
||||
|
||||
func (it *FloatDecoder) Err() error {
|
||||
return it.err
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
package pd1_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
)
|
||||
|
||||
func TestExampleEncoding(t *testing.T) {
|
||||
|
||||
// Example from the paper
|
||||
s := pd1.NewFloatEncoder()
|
||||
|
||||
s.Push(12)
|
||||
s.Push(12)
|
||||
s.Push(24)
|
||||
|
||||
// extra tests
|
||||
|
||||
// floating point masking/shifting bug
|
||||
s.Push(13)
|
||||
s.Push(24)
|
||||
|
||||
// delta-of-delta sizes
|
||||
s.Push(24)
|
||||
s.Push(24)
|
||||
s.Push(24)
|
||||
|
||||
s.Finish()
|
||||
|
||||
it := s.FloatDecoder()
|
||||
|
||||
want := []float64{
|
||||
12,
|
||||
12,
|
||||
24,
|
||||
|
||||
13,
|
||||
24,
|
||||
|
||||
24,
|
||||
24,
|
||||
24,
|
||||
}
|
||||
|
||||
for _, w := range want {
|
||||
if !it.Next() {
|
||||
t.Fatalf("Next()=false, want true")
|
||||
}
|
||||
vv := it.Values()
|
||||
if w != vv {
|
||||
t.Errorf("Values()=(%v), want (%v)\n", vv, w)
|
||||
}
|
||||
}
|
||||
|
||||
if it.Next() {
|
||||
t.Fatalf("Next()=true, want false")
|
||||
}
|
||||
|
||||
if err := it.Err(); err != nil {
|
||||
t.Errorf("it.Err()=%v, want nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
var TwoHoursData = []struct {
|
||||
v float64
|
||||
}{
|
||||
// 2h of data
|
||||
{761}, {727}, {763}, {706}, {700},
|
||||
{679}, {757}, {708}, {739}, {707},
|
||||
{699}, {740}, {729}, {766}, {730},
|
||||
{715}, {705}, {693}, {765}, {724},
|
||||
{799}, {761}, {737}, {766}, {756},
|
||||
{719}, {722}, {801}, {747}, {731},
|
||||
{742}, {744}, {791}, {750}, {759},
|
||||
{809}, {751}, {705}, {770}, {792},
|
||||
{727}, {762}, {772}, {721}, {748},
|
||||
{753}, {744}, {716}, {776}, {659},
|
||||
{789}, {766}, {758}, {690}, {795},
|
||||
{770}, {758}, {723}, {767}, {765},
|
||||
{693}, {706}, {681}, {727}, {724},
|
||||
{780}, {678}, {696}, {758}, {740},
|
||||
{735}, {700}, {742}, {747}, {752},
|
||||
{734}, {743}, {732}, {746}, {770},
|
||||
{780}, {710}, {731}, {712}, {712},
|
||||
{741}, {770}, {770}, {754}, {718},
|
||||
{670}, {775}, {749}, {795}, {756},
|
||||
{741}, {787}, {721}, {745}, {782},
|
||||
{765}, {780}, {811}, {790}, {836},
|
||||
{743}, {858}, {739}, {762}, {770},
|
||||
{752}, {763}, {795}, {792}, {746},
|
||||
{786}, {785}, {774}, {786}, {718},
|
||||
}
|
||||
|
||||
func TestRoundtrip(t *testing.T) {
|
||||
|
||||
s := pd1.NewFloatEncoder()
|
||||
for _, p := range TwoHoursData {
|
||||
s.Push(p.v)
|
||||
}
|
||||
s.Finish()
|
||||
|
||||
it := s.FloatDecoder()
|
||||
for _, w := range TwoHoursData {
|
||||
if !it.Next() {
|
||||
t.Fatalf("Next()=false, want true")
|
||||
}
|
||||
vv := it.Values()
|
||||
// t.Logf("it.Values()=(%+v, %+v)\n", time.Unix(int64(tt), 0), vv)
|
||||
if w.v != vv {
|
||||
t.Errorf("Values()=(%v), want (%v)\n", vv, w.v)
|
||||
}
|
||||
}
|
||||
|
||||
if it.Next() {
|
||||
t.Fatalf("Next()=true, want false")
|
||||
}
|
||||
|
||||
if err := it.Err(); err != nil {
|
||||
t.Errorf("it.Err()=%v, want nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkFloatEncoder(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
s := pd1.NewFloatEncoder()
|
||||
for _, tt := range TwoHoursData {
|
||||
s.Push(tt.v)
|
||||
}
|
||||
s.Finish()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkFloatDecoder(b *testing.B) {
|
||||
s := pd1.NewFloatEncoder()
|
||||
for _, tt := range TwoHoursData {
|
||||
s.Push(tt.v)
|
||||
}
|
||||
s.Finish()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
it := s.FloatDecoder()
|
||||
for j := 0; j < len(TwoHoursData); it.Next() {
|
||||
j++
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,286 @@
|
|||
// Package timestamp provides structs and functions for converting streams of timestamps
|
||||
// to byte slices.
|
||||
//
|
||||
// The encoding is adapative based on structure of the timestamps that are encoded. By default,
|
||||
// a bit-packed format that compresses multiple 64bit timestamps into a single 64bit word is used.
|
||||
// If the values are too large to be compressed using the bit-packed format, it will fall back to
|
||||
// a raw 8byte per timestamp format. If the the values can be run-length encoded, based on the
|
||||
// differences between consectutive values, a shorter, variable sized RLE format is used.
|
||||
package pd1
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/jwilder/encoding/simple8b"
|
||||
)
|
||||
|
||||
const (
|
||||
// EncodingPacked is a bit-packed format
|
||||
EncodingPacked = 0
|
||||
// EncodingRLE is a run-length encoded format
|
||||
EncodingRLE = 1
|
||||
// EncodingRAW is a non-compressed format
|
||||
EncodingRaw = 2
|
||||
)
|
||||
|
||||
// TimeEncoder encodes time.Time to byte slices.
|
||||
type TimeEncoder interface {
|
||||
Write(t time.Time)
|
||||
Bytes() ([]byte, error)
|
||||
}
|
||||
|
||||
// TimeEncoder decodes byte slices to time.Time values.
|
||||
type TimeDecoder interface {
|
||||
Next() bool
|
||||
Read() time.Time
|
||||
}
|
||||
|
||||
type encoder struct {
|
||||
ts []int64
|
||||
}
|
||||
|
||||
// NewTimeEncoder returns a TimeEncoder
|
||||
func NewTimeEncoder() TimeEncoder {
|
||||
return &encoder{}
|
||||
}
|
||||
|
||||
// Write adds a time.Time to the compressed stream.
|
||||
func (e *encoder) Write(t time.Time) {
|
||||
e.ts = append(e.ts, t.UnixNano())
|
||||
}
|
||||
|
||||
func (e *encoder) reduce() (min, max, divisor int64, rle bool, deltas []int64) {
|
||||
// We make a copy of the timestamps so that if we end up using using RAW encoding,
|
||||
// we still have the original values to encode.
|
||||
deltas = make([]int64, len(e.ts))
|
||||
copy(deltas, e.ts)
|
||||
|
||||
// Starting values for a min, max and divisor
|
||||
min, max, divisor = e.ts[0], 0, 1e12
|
||||
|
||||
// First differential encode the values in place
|
||||
for i := len(deltas) - 1; i > 0; i-- {
|
||||
deltas[i] = deltas[i] - deltas[i-1]
|
||||
|
||||
// We also want to keep track of the min, max and divisor so we don't
|
||||
// have to loop again
|
||||
v := deltas[i]
|
||||
if v < min {
|
||||
min = v
|
||||
}
|
||||
|
||||
if v > max {
|
||||
max = v
|
||||
}
|
||||
|
||||
for {
|
||||
// If our value is divisible by 10, break. Otherwise, try the next smallest divisor.
|
||||
if v%divisor == 0 {
|
||||
break
|
||||
}
|
||||
divisor /= 10
|
||||
}
|
||||
}
|
||||
|
||||
// Are the deltas able to be run-length encoded?
|
||||
rle = true
|
||||
for i := 1; i < len(deltas); i++ {
|
||||
deltas[i] = (deltas[i] - min) / divisor
|
||||
// Skip the first value || see if prev = curr. The deltas can be RLE if the are all equal.
|
||||
rle = i == 1 || rle && (deltas[i-1] == deltas[i])
|
||||
}
|
||||
|
||||
// No point RLE encoding 1 value
|
||||
rle = rle && len(deltas) > 1
|
||||
return
|
||||
}
|
||||
|
||||
// Bytes returns the encoded bytes of all written times.
|
||||
func (e *encoder) Bytes() ([]byte, error) {
|
||||
if len(e.ts) == 0 {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
// Minimum, maxim and largest common divisor. rle is true if dts (the delta timestamps),
|
||||
// are all the same.
|
||||
min, max, div, rle, dts := e.reduce()
|
||||
|
||||
// The deltas are all the same, so we can run-length encode them
|
||||
if rle && len(e.ts) > 60 {
|
||||
return e.encodeRLE(e.ts[0], e.ts[1]-e.ts[0], div, len(e.ts))
|
||||
}
|
||||
|
||||
// We can't compress this time-range, the deltas exceed 1 << 60. That would mean that two
|
||||
// adjacent timestamps are nanosecond resolution and ~36.5yr apart.
|
||||
if max > simple8b.MaxValue {
|
||||
return e.encodeRaw()
|
||||
}
|
||||
|
||||
// Otherwise, encode them in a compressed format
|
||||
return e.encodePacked(min, div, dts)
|
||||
}
|
||||
|
||||
func (e *encoder) encodePacked(min, div int64, dts []int64) ([]byte, error) {
|
||||
enc := simple8b.NewEncoder()
|
||||
for _, v := range dts[1:] {
|
||||
enc.Write(uint64(v))
|
||||
}
|
||||
|
||||
b := make([]byte, 8*2+1)
|
||||
|
||||
// 4 high bits used for the encoding type
|
||||
b[0] = byte(EncodingPacked) << 4
|
||||
// 4 low bits are the log10 divisor
|
||||
b[0] |= byte(math.Log10(float64(div)))
|
||||
|
||||
// The minimum timestamp value
|
||||
binary.BigEndian.PutUint64(b[1:9], uint64(min))
|
||||
|
||||
// The first delta value
|
||||
binary.BigEndian.PutUint64(b[9:17], uint64(dts[0]))
|
||||
|
||||
// The compressed deltas
|
||||
deltas, err := enc.Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append(b, deltas...), nil
|
||||
}
|
||||
|
||||
func (e *encoder) encodeRaw() ([]byte, error) {
|
||||
b := make([]byte, 1+len(e.ts)*8)
|
||||
b[0] = byte(EncodingRaw) << 4
|
||||
for i, v := range e.ts {
|
||||
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (e *encoder) encodeRLE(first, delta, div int64, n int) ([]byte, error) {
|
||||
// Large varints can take up to 10 bytes
|
||||
b := make([]byte, 1+10*3)
|
||||
|
||||
// 4 high bits used for the encoding type
|
||||
b[0] = byte(EncodingRLE) << 4
|
||||
// 4 low bits are the log10 divisor
|
||||
b[0] |= byte(math.Log10(float64(div)))
|
||||
|
||||
i := 1
|
||||
// The first timestamp
|
||||
binary.BigEndian.PutUint64(b[i:], uint64(first))
|
||||
i += 8
|
||||
// The first delta
|
||||
i += binary.PutUvarint(b[i:], uint64(delta/div))
|
||||
// The number of times the delta is repeated
|
||||
i += binary.PutUvarint(b[i:], uint64(n))
|
||||
|
||||
return b[:i], nil
|
||||
}
|
||||
|
||||
type decoder struct {
|
||||
v time.Time
|
||||
ts []int64
|
||||
}
|
||||
|
||||
func NewTimeDecoder(b []byte) TimeDecoder {
|
||||
d := &decoder{}
|
||||
d.decode(b)
|
||||
return d
|
||||
}
|
||||
|
||||
func (d *decoder) Next() bool {
|
||||
if len(d.ts) == 0 {
|
||||
return false
|
||||
}
|
||||
d.v = time.Unix(0, d.ts[0])
|
||||
d.ts = d.ts[1:]
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *decoder) Read() time.Time {
|
||||
return d.v
|
||||
}
|
||||
|
||||
func (d *decoder) decode(b []byte) {
|
||||
if len(b) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Encoding type is stored in the 4 high bits of the first byte
|
||||
encoding := b[0] >> 4
|
||||
switch encoding {
|
||||
case EncodingRaw:
|
||||
d.decodeRaw(b[1:])
|
||||
case EncodingRLE:
|
||||
d.decodeRLE(b)
|
||||
default:
|
||||
d.decodePacked(b)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) decodePacked(b []byte) {
|
||||
div := int64(math.Pow10(int(b[0] & 0xF)))
|
||||
min := int64(binary.BigEndian.Uint64(b[1:9]))
|
||||
first := int64(binary.BigEndian.Uint64(b[9:17]))
|
||||
|
||||
enc := simple8b.NewDecoder(b[17:])
|
||||
|
||||
deltas := []int64{first}
|
||||
for enc.Next() {
|
||||
deltas = append(deltas, int64(enc.Read()))
|
||||
}
|
||||
|
||||
// Compute the prefix sum and scale the deltas back up
|
||||
for i := 1; i < len(deltas); i++ {
|
||||
deltas[i] = (deltas[i] * div) + min
|
||||
deltas[i] = deltas[i-1] + deltas[i]
|
||||
}
|
||||
|
||||
d.ts = deltas
|
||||
}
|
||||
|
||||
func (d *decoder) decodeRLE(b []byte) {
|
||||
var i, n int
|
||||
|
||||
// Lower 4 bits hold the 10 based exponent so we can scale the values back up
|
||||
div := int64(math.Pow10(int(b[i] & 0xF)))
|
||||
i += 1
|
||||
|
||||
// Next 8 bytes is the starting timestamp
|
||||
first := binary.BigEndian.Uint64(b[i : i+8])
|
||||
i += 8
|
||||
|
||||
// Next 1-10 bytes is our (scaled down by factor of 10) run length values
|
||||
value, n := binary.Uvarint(b[i:])
|
||||
|
||||
// Scale the value back up
|
||||
value *= uint64(div)
|
||||
i += n
|
||||
|
||||
// Last 1-10 bytes is how many times the value repeats
|
||||
count, n := binary.Uvarint(b[i:])
|
||||
|
||||
// Rebuild construct the original values now
|
||||
deltas := make([]int64, count)
|
||||
for i := range deltas {
|
||||
deltas[i] = int64(value)
|
||||
}
|
||||
|
||||
// Reverse the delta-encoding
|
||||
deltas[0] = int64(first)
|
||||
for i := 1; i < len(deltas); i++ {
|
||||
deltas[i] = deltas[i-1] + deltas[i]
|
||||
}
|
||||
|
||||
d.ts = deltas
|
||||
}
|
||||
|
||||
func (d *decoder) decodeRaw(b []byte) {
|
||||
d.ts = make([]int64, len(b)/8)
|
||||
for i := range d.ts {
|
||||
d.ts[i] = int64(binary.BigEndian.Uint64(b[i*8 : i*8+8]))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,353 @@
|
|||
package pd1_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb/engine/pd1"
|
||||
)
|
||||
|
||||
func Test_TimeEncoder(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
|
||||
x := []time.Time{}
|
||||
now := time.Unix(0, 0)
|
||||
x = append(x, now)
|
||||
enc.Write(now)
|
||||
for i := 1; i < 4; i++ {
|
||||
x = append(x, now.Add(time.Duration(i)*time.Second))
|
||||
enc.Write(x[i])
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
for i, v := range x {
|
||||
if !dec.Next() {
|
||||
t.Fatalf("Next == false, expected true")
|
||||
}
|
||||
|
||||
if v != dec.Read() {
|
||||
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_NoValues(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_One(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
tm := time.Unix(0, 0)
|
||||
|
||||
enc.Write(tm)
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if tm != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), tm)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_Two(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(0, 1)
|
||||
enc.Write(t1)
|
||||
enc.Write(t2)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_Three(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(0, 1)
|
||||
t3 := time.Unix(0, 2)
|
||||
|
||||
enc.Write(t1)
|
||||
enc.Write(t2)
|
||||
enc.Write(t3)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t3 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t3)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_Large_Range(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 1442369134000000000)
|
||||
t2 := time.Unix(0, 1442369135000000000)
|
||||
enc.Write(t1)
|
||||
enc.Write(t2)
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_Raw(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(1, 0)
|
||||
|
||||
// about 36.5yrs in NS resolution is max range for compressed format
|
||||
// This should cause the encoding to fallback to raw points
|
||||
t3 := time.Unix(2, (2 << 59))
|
||||
enc.Write(t1)
|
||||
enc.Write(t2)
|
||||
enc.Write(t3)
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("expected error: %v", err)
|
||||
}
|
||||
|
||||
if exp := 25; len(b) != exp {
|
||||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t1 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t1)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t2 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t2)
|
||||
}
|
||||
|
||||
if !dec.Next() {
|
||||
t.Fatalf("unexpected next value: got true, exp false")
|
||||
}
|
||||
|
||||
if t3 != dec.Read() {
|
||||
t.Fatalf("read value mismatch: got %v, exp %v", dec.Read(), t3)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_RLE(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
var ts []time.Time
|
||||
for i := 0; i < 500; i++ {
|
||||
ts = append(ts, time.Unix(int64(i), 0))
|
||||
}
|
||||
|
||||
for _, v := range ts {
|
||||
enc.Write(v)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if exp := 12; len(b) != exp {
|
||||
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
for i, v := range ts {
|
||||
if !dec.Next() {
|
||||
t.Fatalf("Next == false, expected true")
|
||||
}
|
||||
|
||||
if v != dec.Read() {
|
||||
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
|
||||
}
|
||||
}
|
||||
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected extra values")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_Reverse(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
ts := []time.Time{
|
||||
time.Unix(0, 3),
|
||||
time.Unix(0, 2),
|
||||
time.Unix(0, 1),
|
||||
}
|
||||
|
||||
for _, v := range ts {
|
||||
enc.Write(v)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if ts[i] != dec.Read() {
|
||||
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), ts[i])
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_220SecondDelta(t *testing.T) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
var ts []time.Time
|
||||
for i := 0; i < 220; i++ {
|
||||
ts = append(ts, time.Unix(int64(i), 0))
|
||||
}
|
||||
|
||||
for _, v := range ts {
|
||||
enc.Write(v)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Using RLE, should get 12 bytes
|
||||
if exp := 12; len(b) != exp {
|
||||
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
|
||||
}
|
||||
|
||||
dec := pd1.NewTimeDecoder(b)
|
||||
i := 0
|
||||
for dec.Next() {
|
||||
if ts[i] != dec.Read() {
|
||||
t.Fatalf("read value %d mismatch: got %v, exp %v", i, dec.Read(), ts[i])
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
|
||||
if i != len(ts) {
|
||||
t.Fatalf("Read too few values: exp %d, got %d", len(ts), i)
|
||||
}
|
||||
|
||||
if dec.Next() {
|
||||
t.Fatalf("expecte Next() = false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkTimeEncoder(b *testing.B) {
|
||||
enc := pd1.NewTimeEncoder()
|
||||
x := make([]time.Time, 1024)
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = time.Now()
|
||||
enc.Write(x[i])
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
enc.Bytes()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkTimeDecoder(b *testing.B) {
|
||||
x := make([]time.Time, 1024)
|
||||
enc := pd1.NewTimeEncoder()
|
||||
for i := 0; i < len(x); i++ {
|
||||
x[i] = time.Now()
|
||||
enc.Write(x[i])
|
||||
}
|
||||
bytes, _ := enc.Bytes()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
dec := pd1.NewTimeDecoder(bytes)
|
||||
b.StartTimer()
|
||||
for dec.Next() {
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue