Move compression encoding constants to encoders

Will make it less error-prone to add new encodings int the future
since each encoder has it's set of constants.  There are some placeholder
contants for uncompressed encodings which are not in all encoder currently.
pull/4317/merge
Jason Wilder 2015-10-02 10:03:20 -06:00
parent 1a174de9e5
commit c2e8922533
9 changed files with 114 additions and 83 deletions

View File

@ -7,6 +7,13 @@ package tsm1
import "encoding/binary"
const (
// boolUncompressed is an uncompressed boolean format
boolUncompressed = 0
// boolCompressedBitPacked is an bit packed format using 1 bit per boolean
boolCompressedBitPacked = 1
)
type BoolEncoder interface {
Write(b bool)
Bytes() ([]byte, error)
@ -75,7 +82,7 @@ func (e *boolEncoder) Bytes() ([]byte, error) {
b := make([]byte, 10+1)
// Store the encoding type in the 4 high bits of the first byte
b[0] = byte(EncodingBitPacked) << 4
b[0] = byte(boolCompressedBitPacked) << 4
i := 1
// Encode the number of bools written

View File

@ -10,21 +10,6 @@ import (
)
const (
// EncodingPackedSimple is a bit-packed format
EncodingPackedSimple = 0
// EncodingRLE is a run-length encoded format
EncodingRLE = 1
// EncodingUncompressed is a non-compressed format
EncodingUncompressed = 2
// EncodingBitPacked is a basic bit-packed format
EncodingBitPacked = 3
// EncodingSnappy is a snappy encoded format
EncodingSnappy = 4
// BlockFloat64 designates a block encodes float64 values
BlockFloat64 = 0

View File

@ -17,6 +17,13 @@ import (
"github.com/dgryski/go-bitstream"
)
const (
// floatUncompressed is an uncompressed format using 8 bytes per value
floatUncompressed = 0
// floatCompressedGorilla is a compressed format using the gorilla paper encoding
floatCompressedGorilla = 1
)
type FloatEncoder struct {
val float64
@ -43,7 +50,7 @@ func NewFloatEncoder() *FloatEncoder {
}
func (s *FloatEncoder) Bytes() []byte {
return s.buf.Bytes()
return append([]byte{floatCompressedGorilla << 4}, s.buf.Bytes()...)
}
func (s *FloatEncoder) Finish() {
@ -95,11 +102,6 @@ func (s *FloatEncoder) Push(v float64) {
s.val = v
}
func (s *FloatEncoder) FloatDecoder() *FloatDecoder {
iter, _ := NewFloatDecoder(s.buf.Bytes())
return iter
}
type FloatDecoder struct {
val float64
@ -117,7 +119,9 @@ type FloatDecoder struct {
}
func NewFloatDecoder(b []byte) (*FloatDecoder, error) {
br := bitstream.NewReader(bytes.NewReader(b))
// first byte is the compression type but we currently just have gorilla
// compression
br := bitstream.NewReader(bytes.NewReader(b[1:]))
v, err := br.ReadBits(64)
if err != nil {

View File

@ -28,7 +28,12 @@ func TestFloatEncoder_Simple(t *testing.T) {
s.Finish()
it := s.FloatDecoder()
b := s.Bytes()
it, err := tsm1.NewFloatDecoder(b)
if err != nil {
t.Fatalf("unexpected error creating float decoder: %v", err)
}
want := []float64{
12,
@ -100,7 +105,13 @@ func TestFloatEncoder_Roundtrip(t *testing.T) {
}
s.Finish()
it := s.FloatDecoder()
b := s.Bytes()
it, err := tsm1.NewFloatDecoder(b)
if err != nil {
t.Fatalf("unexpected error creating float decoder: %v", err)
}
for _, w := range TwoHoursData {
if !it.Next() {
t.Fatalf("Next()=false, want true")
@ -137,11 +148,16 @@ func BenchmarkFloatDecoder(b *testing.B) {
s.Push(tt.v)
}
s.Finish()
bytes := s.Bytes()
b.ResetTimer()
for i := 0; i < b.N; i++ {
it := s.FloatDecoder()
it, err := tsm1.NewFloatDecoder(bytes)
if err != nil {
b.Fatalf("unexpected error creating float decoder: %v", err)
}
for j := 0; j < len(TwoHoursData); it.Next() {
j++
}

View File

@ -27,6 +27,13 @@ import (
"github.com/jwilder/encoding/simple8b"
)
const (
// intUncompressed is an uncompressed format using 8 bytes per point
intUncompressed = 0
// intCompressedSimple is a bit-packed format using simple8b encoding
intCompressedSimple = 1
)
type Int64Encoder interface {
Write(v int64)
Bytes() ([]byte, error)
@ -68,7 +75,7 @@ func (e *int64Encoder) encodePacked() ([]byte, error) {
b := make([]byte, 1+len(encoded)*8)
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(EncodingPackedSimple) << 4
b[0] = byte(intCompressedSimple) << 4
for i, v := range encoded {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v)
@ -79,7 +86,7 @@ func (e *int64Encoder) encodePacked() ([]byte, error) {
func (e *int64Encoder) encodeUncompressed() ([]byte, error) {
b := make([]byte, 1+len(e.values)*8)
// 4 high bits of first byte store the encoding type for the block
b[0] = byte(EncodingUncompressed) << 4
b[0] = byte(intUncompressed) << 4
for i, v := range e.values {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], v)
@ -123,9 +130,9 @@ func (d *int64Decoder) Next() bool {
if d.i >= d.n {
switch d.encoding {
case EncodingUncompressed:
case intUncompressed:
d.decodeUncompressed()
case EncodingPackedSimple:
case intCompressedSimple:
d.decodePacked()
default:
panic(fmt.Sprintf("unknown encoding %v", d.encoding))

View File

@ -12,6 +12,13 @@ import (
"github.com/golang/snappy"
)
const (
// stringUncompressed is a an uncompressed format encoding strings as raw bytes
stringUncompressed = 0
// stringCompressedSnappy is a compressed encoding using Snappy compression
stringCompressedSnappy = 1
)
type StringEncoder interface {
Write(s string)
Bytes() ([]byte, error)
@ -45,7 +52,7 @@ func (e *stringEncoder) Bytes() ([]byte, error) {
// Compress the currently appended bytes using snappy and prefix with
// a 1 byte header for future extension
data := snappy.Encode(nil, e.bytes)
return append([]byte{EncodingSnappy << 4}, data...), nil
return append([]byte{stringCompressedSnappy << 4}, data...), nil
}
type stringDecoder struct {

View File

@ -1,27 +1,25 @@
package tsm1_test
package tsm1
import (
"fmt"
"testing"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func Test_StringEncoder_NoValues(t *testing.T) {
enc := tsm1.NewStringEncoder()
enc := NewStringEncoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewStringDecoder(b)
dec := NewStringDecoder(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_StringEncoder_Single(t *testing.T) {
enc := tsm1.NewStringEncoder()
enc := NewStringEncoder()
v1 := "v1"
enc.Write(v1)
b, err := enc.Bytes()
@ -29,7 +27,7 @@ func Test_StringEncoder_Single(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewStringDecoder(b)
dec := NewStringDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
@ -40,7 +38,7 @@ func Test_StringEncoder_Single(t *testing.T) {
}
func Test_StringEncoder_Multi_Compressed(t *testing.T) {
enc := tsm1.NewStringEncoder()
enc := NewStringEncoder()
values := make([]string, 10)
for i := range values {
@ -53,15 +51,15 @@ func Test_StringEncoder_Multi_Compressed(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if b[0]>>4 != tsm1.EncodingSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], tsm1.EncodingSnappy)
if b[0]>>4 != stringCompressedSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy)
}
if exp := 47; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
dec := tsm1.NewStringDecoder(b)
dec := NewStringDecoder(b)
for i, v := range values {
if !dec.Next() {

View File

@ -41,6 +41,15 @@ import (
"github.com/jwilder/encoding/simple8b"
)
const (
// timeUncompressed is a an uncompressed format using 8 bytes per timestamp
timeUncompressed = 0
// timeCompressedPackedSimple is a bit-packed format using simple8b encoding
timeCompressedPackedSimple = 1
// timeCompressedRLE is a run-length encoding format
timeCompressedRLE = 2
)
// TimeEncoder encodes time.Time to byte slices.
type TimeEncoder interface {
Write(t time.Time)
@ -135,7 +144,7 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) {
b := make([]byte, 8+1)
// 4 high bits used for the encoding type
b[0] = byte(EncodingPackedSimple) << 4
b[0] = byte(timeCompressedPackedSimple) << 4
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
@ -153,7 +162,7 @@ func (e *encoder) encodePacked(div uint64, dts []uint64) ([]byte, error) {
func (e *encoder) encodeRaw() ([]byte, error) {
b := make([]byte, 1+len(e.ts)*8)
b[0] = byte(EncodingUncompressed) << 4
b[0] = byte(timeUncompressed) << 4
for i, v := range e.ts {
binary.BigEndian.PutUint64(b[1+i*8:1+i*8+8], uint64(v))
}
@ -165,7 +174,7 @@ func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) {
b := make([]byte, 1+10*3)
// 4 high bits used for the encoding type
b[0] = byte(EncodingRLE) << 4
b[0] = byte(timeCompressedRLE) << 4
// 4 low bits are the log10 divisor
b[0] |= byte(math.Log10(float64(div)))
@ -213,11 +222,11 @@ func (d *decoder) decode(b []byte) {
// Encoding type is stored in the 4 high bits of the first byte
encoding := b[0] >> 4
switch encoding {
case EncodingUncompressed:
case timeUncompressed:
d.decodeRaw(b[1:])
case EncodingRLE:
case timeCompressedRLE:
d.decodeRLE(b)
case EncodingPackedSimple:
case timeCompressedPackedSimple:
d.decodePacked(b)
default:
panic(fmt.Sprintf("unknown encoding: %v", encoding))

View File

@ -1,14 +1,12 @@
package tsm1_test
package tsm1
import (
"testing"
"time"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
func Test_TimeEncoder(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
x := []time.Time{}
now := time.Unix(0, 0)
@ -24,11 +22,11 @@ func Test_TimeEncoder(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
for i, v := range x {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
@ -41,20 +39,20 @@ func Test_TimeEncoder(t *testing.T) {
}
func Test_TimeEncoder_NoValues(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
b, err := enc.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}
func Test_TimeEncoder_One(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
tm := time.Unix(0, 0)
enc.Write(tm)
@ -63,11 +61,11 @@ func Test_TimeEncoder_One(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
@ -78,7 +76,7 @@ func Test_TimeEncoder_One(t *testing.T) {
}
func Test_TimeEncoder_Two(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(0, 1)
enc.Write(t1)
@ -89,11 +87,11 @@ func Test_TimeEncoder_Two(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
@ -112,7 +110,7 @@ func Test_TimeEncoder_Two(t *testing.T) {
}
func Test_TimeEncoder_Three(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(0, 1)
t3 := time.Unix(0, 2)
@ -126,11 +124,11 @@ func Test_TimeEncoder_Three(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
@ -157,7 +155,7 @@ func Test_TimeEncoder_Three(t *testing.T) {
}
func Test_TimeEncoder_Large_Range(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
t1 := time.Unix(0, 1442369134000000000)
t2 := time.Unix(0, 1442369135000000000)
enc.Write(t1)
@ -167,11 +165,11 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingPackedSimple {
if got := b[0] >> 4; got != timeCompressedPackedSimple {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
@ -190,7 +188,7 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
}
func Test_TimeEncoder_Uncompressed(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
t1 := time.Unix(0, 0)
t2 := time.Unix(1, 0)
@ -210,11 +208,11 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != tsm1.EncodingUncompressed {
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
if !dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
@ -241,7 +239,7 @@ func Test_TimeEncoder_Uncompressed(t *testing.T) {
}
func Test_TimeEncoder_RLE(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
var ts []time.Time
for i := 0; i < 500; i++ {
ts = append(ts, time.Unix(int64(i), 0))
@ -256,7 +254,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != tsm1.EncodingRLE {
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
@ -264,7 +262,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
for i, v := range ts {
if !dec.Next() {
t.Fatalf("Next == false, expected true")
@ -281,7 +279,7 @@ func Test_TimeEncoder_RLE(t *testing.T) {
}
func Test_TimeEncoder_Reverse(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
ts := []time.Time{
time.Unix(0, 3),
time.Unix(0, 2),
@ -297,11 +295,11 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got := b[0] >> 4; got != tsm1.EncodingUncompressed {
if got := b[0] >> 4; got != timeUncompressed {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
i := 0
for dec.Next() {
if ts[i] != dec.Read() {
@ -312,7 +310,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
}
func Test_TimeEncoder_220SecondDelta(t *testing.T) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
var ts []time.Time
now := time.Now()
for i := 0; i < 220; i++ {
@ -333,11 +331,11 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}
if got := b[0] >> 4; got != tsm1.EncodingRLE {
if got := b[0] >> 4; got != timeCompressedRLE {
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
}
dec := tsm1.NewTimeDecoder(b)
dec := NewTimeDecoder(b)
i := 0
for dec.Next() {
if ts[i] != dec.Read() {
@ -356,7 +354,7 @@ func Test_TimeEncoder_220SecondDelta(t *testing.T) {
}
func BenchmarkTimeEncoder(b *testing.B) {
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
x := make([]time.Time, 1024)
for i := 0; i < len(x); i++ {
x[i] = time.Now()
@ -371,7 +369,7 @@ func BenchmarkTimeEncoder(b *testing.B) {
func BenchmarkTimeDecoder(b *testing.B) {
x := make([]time.Time, 1024)
enc := tsm1.NewTimeEncoder()
enc := NewTimeEncoder()
for i := 0; i < len(x); i++ {
x[i] = time.Now()
enc.Write(x[i])
@ -382,7 +380,7 @@ func BenchmarkTimeDecoder(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
dec := tsm1.NewTimeDecoder(bytes)
dec := NewTimeDecoder(bytes)
b.StartTimer()
for dec.Next() {
}