Generate encode*Values funcs

pull/8193/head
Jason Wilder 2017-03-14 11:54:53 -06:00
parent ced953ae89
commit 890ffb4ce8
9 changed files with 253 additions and 194 deletions

View File

@ -84,6 +84,9 @@ func (e *BooleanEncoder) flush() {
}
}
// Flush is no-op
func (e *BooleanEncoder) Flush() {}
// Bytes returns a new byte slice containing the encoded booleans from previous calls to Write.
func (e *BooleanEncoder) Bytes() ([]byte, error) {
// Ensure the current byte is flushed

View File

@ -374,6 +374,46 @@ func (a FloatValues) Encode(buf []byte) ([]byte, error) {
return encodeFloatValuesBlock(buf, a)
}
func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getFloatEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockFloat64, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return b, err
}
// Sort methods
func (a FloatValues) Len() int { return len(a) }
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -560,6 +600,46 @@ func (a IntegerValues) Encode(buf []byte) ([]byte, error) {
return encodeIntegerValuesBlock(buf, a)
}
func encodeIntegerValuesBlock(buf []byte, values []IntegerValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getIntegerEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockInteger, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putIntegerEncoder(venc)
return b, err
}
// Sort methods
func (a IntegerValues) Len() int { return len(a) }
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -746,6 +826,46 @@ func (a StringValues) Encode(buf []byte) ([]byte, error) {
return encodeStringValuesBlock(buf, a)
}
func encodeStringValuesBlock(buf []byte, values []StringValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getStringEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockString, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putStringEncoder(venc)
return b, err
}
// Sort methods
func (a StringValues) Len() int { return len(a) }
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -932,6 +1052,46 @@ func (a BooleanValues) Encode(buf []byte) ([]byte, error) {
return encodeBooleanValuesBlock(buf, a)
}
func encodeBooleanValuesBlock(buf []byte, values []BooleanValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getBooleanEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockBoolean, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return b, err
}
// Sort methods
func (a BooleanValues) Len() int { return len(a) }
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -189,6 +189,47 @@ func (a {{.Name}}Values) Merge(b {{.Name}}Values) {{.Name}}Values {
func (a {{.Name}}Values) Encode(buf []byte) ([]byte, error) {
return encode{{.Name}}ValuesBlock(buf, a)
}
func encode{{ .Name }}ValuesBlock(buf []byte, values []{{.Name}}Value) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := get{{ .Name }}Encoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, {{ .Type }}, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
put{{.Name}}Encoder(venc)
return b, err
}
{{ end }}
// Sort methods

View File

@ -1,22 +1,27 @@
[
{
"Name":"",
"name":""
"name":"",
"Type":""
},
{
"Name":"Float",
"name":"float"
"name":"float",
"Type":"BlockFloat64"
},
{
"Name":"Integer",
"name":"integer"
"name":"integer",
"Type":"BlockInteger"
},
{
"Name":"String",
"name":"string"
"name":"string",
"Type":"BlockString"
},
{
"Name":"Boolean",
"name":"boolean"
"name":"boolean",
"Type":"BlockBoolean"
}
]

View File

@ -327,7 +327,7 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
// for timestamps and values.
// Encode values using Gorilla float compression
venc := getFloatEncoder()
venc := getFloatEncoder(len(values))
// Encode timestamps using an adaptive encoder that uses delta-encoding,
// frame-or-reference and run length encoding.
@ -338,56 +338,9 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
for _, v := range values {
vv := v.(FloatValue)
tsenc.Write(vv.unixnano)
venc.Push(vv.value)
venc.Write(vv.value)
}
venc.Finish()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockFloat64, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return b, err
}
func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A float block is encoded using different compression strategies
// for timestamps and values.
// Encode values using Gorilla float compression
venc := getFloatEncoder()
// Encode timestamps using an adaptive encoder that uses delta-encoding,
// frame-or-reference and run length encoding.
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Push(v.value)
}
venc.Finish()
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
@ -543,48 +496,6 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeBooleanValuesBlock(buf []byte, values []BooleanValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
// A boolean block is encoded using different compression strategies
// for timestamps and values.
venc := getBooleanEncoder(len(values))
// Encode timestamps using an adaptive encoder
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockBoolean, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putBooleanEncoder(venc)
return b, err
}
// DecodeBooleanBlock decodes the boolean block from the byte slice
// and appends the boolean values to a.
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
@ -702,39 +613,6 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeIntegerValuesBlock(buf []byte, values []IntegerValue) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getIntegerEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsEnc.Write(v.unixnano)
vEnc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return err
}
// Encoded int64 values
vb, err := vEnc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes
b = packBlock(buf, BlockInteger, tb, vb)
return nil
}()
putTimeEncoder(tsEnc)
putIntegerEncoder(vEnc)
return b, err
}
// DecodeIntegerBlock decodes the integer block from the byte slice
// and appends the integer values to a.
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
@ -854,40 +732,6 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
func encodeStringValuesBlock(buf []byte, values []StringValue) ([]byte, error) {
tsEnc := getTimeEncoder(len(values))
vEnc := getStringEncoder(len(values) * len(values[0].value))
var b []byte
err := func() error {
for _, v := range values {
tsEnc.Write(v.unixnano)
vEnc.Write(v.value)
}
// Encoded timestamp values
tb, err := tsEnc.Bytes()
if err != nil {
return err
}
// Encoded string values
vb, err := vEnc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes
b = packBlock(buf, BlockString, tb, vb)
return nil
}()
putTimeEncoder(tsEnc)
putStringEncoder(vEnc)
return b, err
}
// DecodeStringBlock decodes the string block from the byte slice
// and appends the string values to a.
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {
@ -1014,8 +858,8 @@ func getIntegerEncoder(sz int) IntegerEncoder {
}
func putIntegerEncoder(enc IntegerEncoder) { integerEncoderPool.Put(enc) }
func getFloatEncoder() *FloatEncoder {
x := floatEncoderPool.Get(1024).(*FloatEncoder)
func getFloatEncoder(sz int) *FloatEncoder {
x := floatEncoderPool.Get(sz).(*FloatEncoder)
x.Reset()
return x
}

View File

@ -78,18 +78,18 @@ func (s *FloatEncoder) Bytes() ([]byte, error) {
return s.buf.Bytes(), s.err
}
// Finish indicates there are no more values to encode.
func (s *FloatEncoder) Finish() {
// Flush indicates there are no more values to encode.
func (s *FloatEncoder) Flush() {
if !s.finished {
// write an end-of-stream record
s.finished = true
s.Push(math.NaN())
s.Write(math.NaN())
s.bw.Flush(bitstream.Zero)
}
}
// Push encodes v to the underlying buffer.
func (s *FloatEncoder) Push(v float64) {
// Write encodes v to the underlying buffer.
func (s *FloatEncoder) Write(v float64) {
// Only allow NaN as a sentinel value
if math.IsNaN(v) && !s.finished {
s.err = fmt.Errorf("unsupported value: NaN")

View File

@ -13,22 +13,22 @@ func TestFloatEncoder_Simple(t *testing.T) {
// Example from the paper
s := tsm1.NewFloatEncoder()
s.Push(12)
s.Push(12)
s.Push(24)
s.Write(12)
s.Write(12)
s.Write(24)
// extra tests
// floating point masking/shifting bug
s.Push(13)
s.Push(24)
s.Write(13)
s.Write(24)
// delta-of-delta sizes
s.Push(24)
s.Push(24)
s.Push(24)
s.Write(24)
s.Write(24)
s.Write(24)
s.Finish()
s.Flush()
b, err := s.Bytes()
if err != nil {
@ -84,10 +84,10 @@ func TestFloatEncoder_SimilarFloats(t *testing.T) {
}
for _, v := range want {
s.Push(v)
s.Write(v)
}
s.Finish()
s.Flush()
b, err := s.Bytes()
if err != nil {
@ -151,9 +151,9 @@ var TwoHoursData = []struct {
func TestFloatEncoder_Roundtrip(t *testing.T) {
s := tsm1.NewFloatEncoder()
for _, p := range TwoHoursData {
s.Push(p.v)
s.Write(p.v)
}
s.Finish()
s.Flush()
b, err := s.Bytes()
if err != nil {
@ -188,10 +188,10 @@ func TestFloatEncoder_Roundtrip(t *testing.T) {
func TestFloatEncoder_Roundtrip_NaN(t *testing.T) {
s := tsm1.NewFloatEncoder()
s.Push(1.0)
s.Push(math.NaN())
s.Push(2.0)
s.Finish()
s.Write(1.0)
s.Write(math.NaN())
s.Write(2.0)
s.Flush()
_, err := s.Bytes()
@ -211,9 +211,9 @@ func Test_FloatEncoder_Quick(t *testing.T) {
// Write values to encoder.
enc := tsm1.NewFloatEncoder()
for _, v := range values {
enc.Push(v)
enc.Write(v)
}
enc.Finish()
enc.Flush()
// Read values out of decoder.
got := make([]float64, 0, len(values))
@ -254,18 +254,18 @@ func BenchmarkFloatEncoder(b *testing.B) {
for i := 0; i < b.N; i++ {
s := tsm1.NewFloatEncoder()
for _, tt := range TwoHoursData {
s.Push(tt.v)
s.Write(tt.v)
}
s.Finish()
s.Flush()
}
}
func BenchmarkFloatDecoder(b *testing.B) {
s := tsm1.NewFloatEncoder()
for _, tt := range TwoHoursData {
s.Push(tt.v)
s.Write(tt.v)
}
s.Finish()
s.Flush()
bytes, err := s.Bytes()
if err != nil {
b.Fatalf("unexpected error: %v", err)

View File

@ -51,6 +51,9 @@ func NewIntegerEncoder(sz int) IntegerEncoder {
}
}
// Flush is no-op
func (e *IntegerEncoder) Flush() {}
// Reset sets the encoder back to its initial state.
func (e *IntegerEncoder) Reset() {
e.prev = 0

View File

@ -34,6 +34,9 @@ func NewStringEncoder(sz int) StringEncoder {
}
}
// Flush is no-op
func (e *StringEncoder) Flush() {}
// Reset sets the encoder back to its initial state.
func (e *StringEncoder) Reset() {
e.bytes = e.bytes[:0]