Merge pull request #4370 from influxdb/jw-tsm

Prevent panic in DecodeSameTypeBlock
pull/4389/head
Jason Wilder 2015-10-09 12:59:03 -06:00
commit f2e1dfed4f
4 changed files with 44 additions and 27 deletions

View File

@ -49,6 +49,7 @@
- [#3429](https://github.com/influxdb/influxdb/issues/3429): Incorrect parsing of regex containing '/'
- [#4374](https://github.com/influxdb/influxdb/issues/4374): Add tsm1 quickcheck tests
- [#4377](https://github.com/influxdb/influxdb/pull/4377): Hinted handoff should not process dropped nodes
- [#4365](https://github.com/influxdb/influxdb/issues/4365): Prevent panic in DecodeSameTypeBlock
## v0.9.4 [2015-09-14]

View File

@ -69,7 +69,13 @@ func (v Values) MaxTime() int64 {
return v[len(v)-1].Time().UnixNano()
}
// Encode converts the values to a byte slice. If there are no values,
// this function panics.
func (v Values) Encode(buf []byte) ([]byte, error) {
if len(v) == 0 {
panic("unable to encode block type")
}
switch v[0].(type) {
case *FloatValue:
return encodeFloatBlock(buf, v)
@ -84,24 +90,6 @@ func (v Values) Encode(buf []byte) ([]byte, error) {
return nil, fmt.Errorf("unsupported value type %T", v[0])
}
func (v Values) DecodeSameTypeBlock(block []byte) Values {
switch v[0].(type) {
case *FloatValue:
a, _ := decodeFloatBlock(block)
return a
case *Int64Value:
a, _ := decodeInt64Block(block)
return a
case *BoolValue:
a, _ := decodeBoolBlock(block)
return a
case *StringValue:
a, _ := decodeStringBlock(block)
return a
}
return nil
}
// DecodeBlock takes a byte array and will decode into values of the appropriate type
// based on the block
func DecodeBlock(block []byte) (Values, error) {

View File

@ -24,7 +24,10 @@ func TestEncoding_FloatBlock(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@ -42,7 +45,10 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@ -62,7 +68,10 @@ func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@ -82,7 +91,10 @@ func TestEncoding_IntBlock_Basic(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if len(decodedValues) != len(values) {
t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
@ -117,7 +129,10 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@ -141,7 +156,10 @@ func TestEncoding_BoolBlock_Basic(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@ -161,7 +179,10 @@ func TestEncoding_StringBlock_Basic(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
decodedValues, err := tsm1.DecodeBlock(b)
if err != nil {
t.Fatalf("unexpected error decoding block: %v", err)
}
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)

View File

@ -614,7 +614,11 @@ func (e *Engine) Compact(fullCompaction bool) error {
for {
// write the values, the block or combine with previous
if len(previousValues) > 0 {
previousValues = append(previousValues, previousValues.DecodeSameTypeBlock(block)...)
decoded, err := DecodeBlock(block)
if err != nil {
panic(fmt.Sprintf("failure decoding block: %v", err))
}
previousValues = append(previousValues, decoded...)
} else if len(block) > e.RotateBlockSize {
if _, err := f.Write(df.mmap[pos:newPos]); err != nil {
return err
@ -1645,7 +1649,10 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
// has future encoded blocks so that this method can know how much of its values can be
// combined and output in the resulting encoded block.
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) {
values := newValues.DecodeSameTypeBlock(block)
values, err := DecodeBlock(block)
if err != nil {
panic(fmt.Sprintf("failure decoding block: %v", err))
}
var remainingValues Values