diff --git a/tsdb/engine/tsm1/bool.go b/tsdb/engine/tsm1/bool.go index 8d9653d199..ac08555cfe 100644 --- a/tsdb/engine/tsm1/bool.go +++ b/tsdb/engine/tsm1/bool.go @@ -7,6 +7,13 @@ package tsm1 import "encoding/binary" +const ( + // boolUncompressed is an uncompressed boolean format + boolUncompressed = 0 + // boolCompressedBitPacked is an bit packed format using 1 bit per boolean + boolCompressedBitPacked = 1 +) + type BoolEncoder interface { Write(b bool) Bytes() ([]byte, error) @@ -75,7 +82,7 @@ func (e *boolEncoder) Bytes() ([]byte, error) { b := make([]byte, 10+1) // Store the encoding type in the 4 high bits of the first byte - b[0] = byte(EncodingBitPacked) << 4 + b[0] = byte(boolCompressedBitPacked) << 4 i := 1 // Encode the number of bools written diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index 4b6a112d56..eef2b1d23e 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -10,21 +10,6 @@ import ( ) const ( - // EncodingPackedSimple is a bit-packed format - EncodingPackedSimple = 0 - - // EncodingRLE is a run-length encoded format - EncodingRLE = 1 - - // EncodingUncompressed is a non-compressed format - EncodingUncompressed = 2 - - // EncodingBitPacked is a basic bit-packed format - EncodingBitPacked = 3 - - // EncodingSnappy is a snappy encoded format - EncodingSnappy = 4 - // BlockFloat64 designates a block encodes float64 values BlockFloat64 = 0 diff --git a/tsdb/engine/tsm1/float.go b/tsdb/engine/tsm1/float.go index ff6a61c505..7ac1d93b49 100644 --- a/tsdb/engine/tsm1/float.go +++ b/tsdb/engine/tsm1/float.go @@ -17,6 +17,13 @@ import ( "github.com/dgryski/go-bitstream" ) +const ( + // floatUncompressed is an uncompressed format using 8 bytes per value + floatUncompressed = 0 + // floatCompressedGorilla is a compressed format using the gorilla paper encoding + floatCompressedGorilla = 1 +) + type FloatEncoder struct { val float64 @@ -43,7 +50,7 @@ func NewFloatEncoder() *FloatEncoder { } func (s *FloatEncoder) Bytes() []byte { - return s.buf.Bytes() + return append([]byte{floatCompressedGorilla << 4}, s.buf.Bytes()...) } func (s *FloatEncoder) Finish() { @@ -95,11 +102,6 @@ func (s *FloatEncoder) Push(v float64) { s.val = v } -func (s *FloatEncoder) FloatDecoder() *FloatDecoder { - iter, _ := NewFloatDecoder(s.buf.Bytes()) - return iter -} - type FloatDecoder struct { val float64 @@ -117,7 +119,9 @@ type FloatDecoder struct { } func NewFloatDecoder(b []byte) (*FloatDecoder, error) { - br := bitstream.NewReader(bytes.NewReader(b)) + // first byte is the compression type but we currently just have gorilla + // compression + br := bitstream.NewReader(bytes.NewReader(b[1:])) v, err := br.ReadBits(64) if err != nil { diff --git a/tsdb/engine/tsm1/float_test.go b/tsdb/engine/tsm1/float_test.go index 00b259bf95..49e811a174 100644 --- a/tsdb/engine/tsm1/float_test.go +++ b/tsdb/engine/tsm1/float_test.go @@ -28,7 +28,12 @@ func TestFloatEncoder_Simple(t *testing.T) { s.Finish() - it := s.FloatDecoder() + b := s.Bytes() + + it, err := tsm1.NewFloatDecoder(b) + if err != nil { + t.Fatalf("unexpected error creating float decoder: %v", err) + } want := []float64{ 12, @@ -100,7 +105,13 @@ func TestFloatEncoder_Roundtrip(t *testing.T) { } s.Finish() - it := s.FloatDecoder() + b := s.Bytes() + + it, err := tsm1.NewFloatDecoder(b) + if err != nil { + t.Fatalf("unexpected error creating float decoder: %v", err) + } + for _, w := range TwoHoursData { if !it.Next() { t.Fatalf("Next()=false, want true") @@ -137,11 +148,16 @@ func BenchmarkFloatDecoder(b *testing.B) { s.Push(tt.v) } s.Finish() + bytes := s.Bytes() b.ResetTimer() for i := 0; i < b.N; i++ { - it := s.FloatDecoder() + it, err := tsm1.NewFloatDecoder(bytes) + if err != nil { + b.Fatalf("unexpected error creating float decoder: %v", err) + } + for j := 0; j < len(TwoHoursData); it.Next() { j++ } diff --git a/tsdb/engine/tsm1/int.go b/tsdb/engine/tsm1/int.go index b178c50375..40e615657d 100644 --- a/tsdb/engine/tsm1/int.go +++ b/tsdb/engine/tsm1/int.go @@ -27,6 +27,13 @@ import ( "github.com/jwilder/encoding/simple8b" ) +const ( + // intUncompressed is an uncompressed format using 8 bytes per point + intUncompressed = 0 + // intCompressedSimple is a bit-packed format using simple8b encoding + intCompressedSimple = 1 +) + type Int64Encoder interface { Write(v int64) Bytes() ([]byte, error) @@ -68,7 +75,7 @@ func (e *int64Encoder) encodePacked() ([]byte, error) { b := make([]byte, 1+len(encoded)*8) // 4 high bits of first byte store the encoding type for the block - b[0] = byte(EncodingPackedSimple) << 4 + b[0] = byte(intCompressedSimple) << 4 for i, v := range encoded { binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v) @@ -79,7 +86,7 @@ func (e *int64Encoder) encodePacked() ([]byte, error) { func (e *int64Encoder) encodeUncompressed() ([]byte, error) { b := make([]byte, 1+len(e.values)*8) // 4 high bits of first byte store the encoding type for the block - b[0] = byte(EncodingUncompressed) << 4 + b[0] = byte(intUncompressed) << 4 for i, v := range e.values { binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v) @@ -123,9 +130,9 @@ func (d *int64Decoder) Next() bool { if d.i >= d.n { switch d.encoding { - case EncodingUncompressed: + case intUncompressed: d.decodeUncompressed() - case EncodingPackedSimple: + case intCompressedSimple: d.decodePacked() default: panic(fmt.Sprintf("unknown encoding %v", d.encoding)) diff --git a/tsdb/engine/tsm1/string.go b/tsdb/engine/tsm1/string.go index 1b5dafac3f..a2025809de 100644 --- a/tsdb/engine/tsm1/string.go +++ b/tsdb/engine/tsm1/string.go @@ -12,6 +12,13 @@ import ( "github.com/golang/snappy" ) +const ( + // stringUncompressed is a an uncompressed format encoding strings as raw bytes + stringUncompressed = 0 + // stringCompressedSnappy is a compressed encoding using Snappy compression + stringCompressedSnappy = 1 +) + type StringEncoder interface { Write(s string) Bytes() ([]byte, error) @@ -45,7 +52,7 @@ func (e *stringEncoder) Bytes() ([]byte, error) { // Compress the currently appended bytes using snappy and prefix with // a 1 byte header for future extension data := snappy.Encode(nil, e.bytes) - return append([]byte{EncodingSnappy << 4}, data...), nil + return append([]byte{stringCompressedSnappy << 4}, data...), nil } type stringDecoder struct { diff --git a/tsdb/engine/tsm1/string_test.go b/tsdb/engine/tsm1/string_test.go index 8710a50b36..f1bf173815 100644 --- a/tsdb/engine/tsm1/string_test.go +++ b/tsdb/engine/tsm1/string_test.go @@ -1,27 +1,25 @@ -package tsm1_test +package tsm1 import ( "fmt" "testing" - - "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) func Test_StringEncoder_NoValues(t *testing.T) { - enc := tsm1.NewStringEncoder() + enc := NewStringEncoder() b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewStringDecoder(b) + dec := NewStringDecoder(b) if dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } } func Test_StringEncoder_Single(t *testing.T) { - enc := tsm1.NewStringEncoder() + enc := NewStringEncoder() v1 := "v1" enc.Write(v1) b, err := enc.Bytes() @@ -29,7 +27,7 @@ func Test_StringEncoder_Single(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewStringDecoder(b) + dec := NewStringDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got false, exp true") } @@ -40,7 +38,7 @@ func Test_StringEncoder_Single(t *testing.T) { } func Test_StringEncoder_Multi_Compressed(t *testing.T) { - enc := tsm1.NewStringEncoder() + enc := NewStringEncoder() values := make([]string, 10) for i := range values { @@ -53,15 +51,15 @@ func Test_StringEncoder_Multi_Compressed(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if b[0]>>4 != tsm1.EncodingSnappy { - t.Fatalf("unexpected encoding: got %v, exp %v", b[0], tsm1.EncodingSnappy) + if b[0]>>4 != stringCompressedSnappy { + t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy) } if exp := 47; len(b) != exp { t.Fatalf("unexpected length: got %v, exp %v", len(b), exp) } - dec := tsm1.NewStringDecoder(b) + dec := NewStringDecoder(b) for i, v := range values { if !dec.Next() { diff --git a/tsdb/engine/tsm1/timestamp.go b/tsdb/engine/tsm1/timestamp.go index 59990f5cb4..8fcb0b4ce2 100644 --- a/tsdb/engine/tsm1/timestamp.go +++ b/tsdb/engine/tsm1/timestamp.go @@ -41,6 +41,15 @@ import ( "github.com/jwilder/encoding/simple8b" ) +const ( + // timeUncompressed is a an uncompressed format using 8 bytes per timestamp + timeUncompressed = 0 + // timeCompressedPackedSimple is a bit-packed format using simple8b encoding + timeCompressedPackedSimple = 1 + // timeCompressedRLE is a run-length encoding format + timeCompressedRLE = 2 +) + // TimeEncoder encodes time.Time to byte slices. type TimeEncoder interface { Write(t time.Time) @@ -135,7 +144,7 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) { b := make([]byte, 8+1) // 4 high bits used for the encoding type - b[0] = byte(EncodingPackedSimple) << 4 + b[0] = byte(timeCompressedPackedSimple) << 4 // 4 low bits are the log10 divisor b[0] |= byte(math.Log10(float64(div))) @@ -153,7 +162,7 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) { func (e *encoder) encodeRaw() ([]byte, error) { b := make([]byte, 1+len(e.ts)*8) - b[0] = byte(EncodingUncompressed) << 4 + b[0] = byte(timeUncompressed) << 4 for i, v := range e.ts { binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v)) } @@ -165,7 +174,7 @@ func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) { b := make([]byte, 1+10*3) // 4 high bits used for the encoding type - b[0] = byte(EncodingRLE) << 4 + b[0] = byte(timeCompressedRLE) << 4 // 4 low bits are the log10 divisor b[0] |= byte(math.Log10(float64(div))) @@ -213,11 +222,11 @@ func (d *decoder) decode(b []byte) { // Encoding type is stored in the 4 high bits of the first byte encoding := b[0] >> 4 switch encoding { - case EncodingUncompressed: + case timeUncompressed: d.decodeRaw(b[1:]) - case EncodingRLE: + case timeCompressedRLE: d.decodeRLE(b) - case EncodingPackedSimple: + case timeCompressedPackedSimple: d.decodePacked(b) default: panic(fmt.Sprintf("unknown encoding: %v", encoding)) diff --git a/tsdb/engine/tsm1/timestamp_test.go b/tsdb/engine/tsm1/timestamp_test.go index dbb5a2341a..402a6578a1 100644 --- a/tsdb/engine/tsm1/timestamp_test.go +++ b/tsdb/engine/tsm1/timestamp_test.go @@ -1,14 +1,12 @@ -package tsm1_test +package tsm1 import ( "testing" "time" - - "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) func Test_TimeEncoder(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() x := []time.Time{} now := time.Unix(0, 0) @@ -24,11 +22,11 @@ func Test_TimeEncoder(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingPackedSimple { + if got := b[0] >> 4; got != timeCompressedPackedSimple { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) for i, v := range x { if !dec.Next() { t.Fatalf("Next == false, expected true") @@ -41,20 +39,20 @@ func Test_TimeEncoder(t *testing.T) { } func Test_TimeEncoder_NoValues(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() b, err := enc.Bytes() if err != nil { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } } func Test_TimeEncoder_One(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() tm := time.Unix(0, 0) enc.Write(tm) @@ -63,11 +61,11 @@ func Test_TimeEncoder_One(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingPackedSimple { + if got := b[0] >> 4; got != timeCompressedPackedSimple { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -78,7 +76,7 @@ func Test_TimeEncoder_One(t *testing.T) { } func Test_TimeEncoder_Two(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() t1 := time.Unix(0, 0) t2 := time.Unix(0, 1) enc.Write(t1) @@ -89,11 +87,11 @@ func Test_TimeEncoder_Two(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingPackedSimple { + if got := b[0] >> 4; got != timeCompressedPackedSimple { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -112,7 +110,7 @@ func Test_TimeEncoder_Two(t *testing.T) { } func Test_TimeEncoder_Three(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() t1 := time.Unix(0, 0) t2 := time.Unix(0, 1) t3 := time.Unix(0, 2) @@ -126,11 +124,11 @@ func Test_TimeEncoder_Three(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingPackedSimple { + if got := b[0] >> 4; got != timeCompressedPackedSimple { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -157,7 +155,7 @@ func Test_TimeEncoder_Three(t *testing.T) { } func Test_TimeEncoder_Large_Range(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() t1 := time.Unix(0, 1442369134000000000) t2 := time.Unix(0, 1442369135000000000) enc.Write(t1) @@ -167,11 +165,11 @@ func Test_TimeEncoder_Large_Range(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingPackedSimple { + if got := b[0] >> 4; got != timeCompressedPackedSimple { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -190,7 +188,7 @@ func Test_TimeEncoder_Large_Range(t *testing.T) { } func Test_TimeEncoder_Uncompressed(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() t1 := time.Unix(0, 0) t2 := time.Unix(1, 0) @@ -210,11 +208,11 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) { t.Fatalf("length mismatch: got %v, exp %v", len(b), exp) } - if got := b[0] >> 4; got != tsm1.EncodingUncompressed { + if got := b[0] >> 4; got != timeUncompressed { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) if !dec.Next() { t.Fatalf("unexpected next value: got true, exp false") } @@ -241,7 +239,7 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) { } func Test_TimeEncoder_RLE(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() var ts []time.Time for i := 0; i < 500; i++ { ts = append(ts, time.Unix(int64(i), 0)) @@ -256,7 +254,7 @@ func Test_TimeEncoder_RLE(t *testing.T) { t.Fatalf("length mismatch: got %v, exp %v", len(b), exp) } - if got := b[0] >> 4; got != tsm1.EncodingRLE { + if got := b[0] >> 4; got != timeCompressedRLE { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } @@ -264,7 +262,7 @@ func Test_TimeEncoder_RLE(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) for i, v := range ts { if !dec.Next() { t.Fatalf("Next == false, expected true") @@ -281,7 +279,7 @@ func Test_TimeEncoder_RLE(t *testing.T) { } func Test_TimeEncoder_Reverse(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() ts := []time.Time{ time.Unix(0, 3), time.Unix(0, 2), @@ -297,11 +295,11 @@ func Test_TimeEncoder_Reverse(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != tsm1.EncodingUncompressed { + if got := b[0] >> 4; got != timeUncompressed { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) i := 0 for dec.Next() { if ts[i] != dec.Read() { @@ -312,7 +310,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) { } func Test_TimeEncoder_220SecondDelta(t *testing.T) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() var ts []time.Time now := time.Now() for i := 0; i < 220; i++ { @@ -333,11 +331,11 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) { t.Fatalf("unexpected length: got %v, exp %v", len(b), exp) } - if got := b[0] >> 4; got != tsm1.EncodingRLE { + if got := b[0] >> 4; got != timeCompressedRLE { t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) } - dec := tsm1.NewTimeDecoder(b) + dec := NewTimeDecoder(b) i := 0 for dec.Next() { if ts[i] != dec.Read() { @@ -356,7 +354,7 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) { } func BenchmarkTimeEncoder(b *testing.B) { - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() x := make([]time.Time, 1024) for i := 0; i < len(x); i++ { x[i] = time.Now() @@ -371,7 +369,7 @@ func BenchmarkTimeEncoder(b *testing.B) { func BenchmarkTimeDecoder(b *testing.B) { x := make([]time.Time, 1024) - enc := tsm1.NewTimeEncoder() + enc := NewTimeEncoder() for i := 0; i < len(x); i++ { x[i] = time.Now() enc.Write(x[i]) @@ -382,7 +380,7 @@ func BenchmarkTimeDecoder(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - dec := tsm1.NewTimeDecoder(bytes) + dec := NewTimeDecoder(bytes) b.StartTimer() for dec.Next() { }