From 758359accce083ac387742d559c19d595f3c21a2 Mon Sep 17 00:00:00 2001
From: Jason Wilder <mail@jasonwilder.com>
Date: Thu, 8 Oct 2015 08:59:26 -0600
Subject: [PATCH] Prevent panic in DecodeSameTypeBlock

If DecodeSameTypeBlock is called on on an empty Values slice, it would
panic with an index out of bounds error.  This func can actually be removed
because DecodeBlock can determine what type of values are encoded already.

This will still panic if the block cannot be decoded due to other reasons.

Fixes #4365
---
 CHANGELOG.md                      |  1 +
 tsdb/engine/tsm1/encoding.go      | 24 ++++++---------------
 tsdb/engine/tsm1/encoding_test.go | 35 ++++++++++++++++++++++++-------
 tsdb/engine/tsm1/tsm1.go          | 11 ++++++++--
 4 files changed, 44 insertions(+), 27 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 318e48f902..6d995014c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -49,6 +49,7 @@
 - [#3429](https://github.com/influxdb/influxdb/issues/3429): Incorrect parsing of regex containing '/'
 - [#4374](https://github.com/influxdb/influxdb/issues/4374): Add tsm1 quickcheck tests
 - [#4377](https://github.com/influxdb/influxdb/pull/4377): Hinted handoff should not process dropped nodes
+- [#4365](https://github.com/influxdb/influxdb/issues/4365): Prevent panic in DecodeSameTypeBlock
 
 ## v0.9.4 [2015-09-14]
 
diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go
index 3de8858632..4b5981c4ca 100644
--- a/tsdb/engine/tsm1/encoding.go
+++ b/tsdb/engine/tsm1/encoding.go
@@ -69,7 +69,13 @@ func (v Values) MaxTime() int64 {
 	return v[len(v)-1].Time().UnixNano()
 }
 
+// Encode converts the values to a byte slice.  If there are no values,
+// this function panics.
 func (v Values) Encode(buf []byte) ([]byte, error) {
+	if len(v) == 0 {
+		panic("unable to encode block type")
+	}
+
 	switch v[0].(type) {
 	case *FloatValue:
 		return encodeFloatBlock(buf, v)
@@ -84,24 +90,6 @@ func (v Values) Encode(buf []byte) ([]byte, error) {
 	return nil, fmt.Errorf("unsupported value type %T", v[0])
 }
 
-func (v Values) DecodeSameTypeBlock(block []byte) Values {
-	switch v[0].(type) {
-	case *FloatValue:
-		a, _ := decodeFloatBlock(block)
-		return a
-	case *Int64Value:
-		a, _ := decodeInt64Block(block)
-		return a
-	case *BoolValue:
-		a, _ := decodeBoolBlock(block)
-		return a
-	case *StringValue:
-		a, _ := decodeStringBlock(block)
-		return a
-	}
-	return nil
-}
-
 // DecodeBlock takes a byte array and will decode into values of the appropriate type
 // based on the block
 func DecodeBlock(block []byte) (Values, error) {
diff --git a/tsdb/engine/tsm1/encoding_test.go b/tsdb/engine/tsm1/encoding_test.go
index 3a7ff4b43f..26b9b0bd70 100644
--- a/tsdb/engine/tsm1/encoding_test.go
+++ b/tsdb/engine/tsm1/encoding_test.go
@@ -24,7 +24,10 @@ func TestEncoding_FloatBlock(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@@ -42,7 +45,10 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@@ -62,7 +68,10 @@ func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@@ -82,7 +91,10 @@ func TestEncoding_IntBlock_Basic(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if len(decodedValues) != len(values) {
 		t.Fatalf("unexpected results length:\n\tgot: %v\n\texp: %v\n", len(decodedValues), len(values))
@@ -117,7 +129,10 @@ func TestEncoding_IntBlock_Negatives(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@@ -141,7 +156,10 @@ func TestEncoding_BoolBlock_Basic(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
@@ -161,7 +179,10 @@ func TestEncoding_StringBlock_Basic(t *testing.T) {
 		t.Fatalf("unexpected error: %v", err)
 	}
 
-	decodedValues := values.DecodeSameTypeBlock(b)
+	decodedValues, err := tsm1.DecodeBlock(b)
+	if err != nil {
+		t.Fatalf("unexpected error decoding block: %v", err)
+	}
 
 	if !reflect.DeepEqual(decodedValues, values) {
 		t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go
index 59a0f3fe05..427406a6cf 100644
--- a/tsdb/engine/tsm1/tsm1.go
+++ b/tsdb/engine/tsm1/tsm1.go
@@ -614,7 +614,11 @@ func (e *Engine) Compact(fullCompaction bool) error {
 			for {
 				// write the values, the block or combine with previous
 				if len(previousValues) > 0 {
-					previousValues = append(previousValues, previousValues.DecodeSameTypeBlock(block)...)
+					decoded, err := DecodeBlock(block)
+					if err != nil {
+						panic(fmt.Sprintf("failure decoding block: %v", err))
+					}
+					previousValues = append(previousValues, decoded...)
 				} else if len(block) > e.RotateBlockSize {
 					if _, err := f.Write(df.mmap[pos:newPos]); err != nil {
 						return err
@@ -1645,7 +1649,10 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
 // has future encoded blocks so that this method can know how much of its values can be
 // combined and output in the resulting encoded block.
 func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) {
-	values := newValues.DecodeSameTypeBlock(block)
+	values, err := DecodeBlock(block)
+	if err != nil {
+		panic(fmt.Sprintf("failure decoding block: %v", err))
+	}
 
 	var remainingValues Values