fix(storage): Don't panic when length of source slice is too large

StringArrayEncodeAll will panic if the total length of strings
contained in the src slice is > 0xffffffff. This change adds a unit
test to replicate the issue and an associated fix to return an error.

This also raises an issue that compactions will be unable to make
progress under the following condition:

* multiple string blocks are to be merged to a single block and
* the total length of all strings exceeds the maximum block size that
  snappy will encode (0xffffffff)

The observable effect of this is errors in the logs indicating a
compaction failure.

Fixes #13687
pull/13701/head
Stuart Carnie 2019-04-29 11:59:09 -07:00
parent 7855147241
commit 369a4610e6
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
2 changed files with 27 additions and 1 deletions

View File

@ -2,6 +2,7 @@ package tsm1
import (
"encoding/binary"
"errors"
"fmt"
"unsafe"
@ -12,6 +13,9 @@ var (
errStringBatchDecodeInvalidStringLength = fmt.Errorf("stringArrayDecodeAll: invalid encoded string length")
errStringBatchDecodeLengthOverflow = fmt.Errorf("stringArrayDecodeAll: length overflow")
errStringBatchDecodeShortBuffer = fmt.Errorf("stringArrayDecodeAll: short buffer")
// ErrStringArrayEncodeTooLarge reports that the encoded length of a slice of strings is too large.
ErrStringArrayEncodeTooLarge = errors.New("StringArrayEncodeAll: source length too large")
)
// StringArrayEncodeAll encodes src into b, returning b and any error encountered.
@ -28,7 +32,11 @@ func StringArrayEncodeAll(src []string, b []byte) ([]byte, error) {
// includes the compressed size
var compressedSz = 0
if len(src) > 0 {
compressedSz = snappy.MaxEncodedLen(srcSz) + 1 /* header */
mle := snappy.MaxEncodedLen(srcSz)
if mle == -1 {
return b[:0], ErrStringArrayEncodeTooLarge
}
compressedSz = mle + 1 /* header */
}
totSz := srcSz + compressedSz

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"reflect"
"strings"
"testing"
"testing/quick"
@ -13,6 +14,10 @@ import (
"github.com/influxdata/influxdb/uuid"
)
func equalError(a, b error) bool {
return a == nil && b == nil || a != nil && b != nil && a.Error() == b.Error()
}
func TestStringArrayEncodeAll_NoValues(t *testing.T) {
b, err := StringArrayEncodeAll(nil, nil)
if err != nil {
@ -28,6 +33,19 @@ func TestStringArrayEncodeAll_NoValues(t *testing.T) {
}
}
func TestStringArrayEncodeAll_ExceedsMaxEncodedLen(t *testing.T) {
str := strings.Repeat(" ", 1<<23) // 8MB string
var s []string
for i := 0; i < (1<<32)/(1<<23); i++ {
s = append(s, str)
}
_, got := StringArrayEncodeAll(s, nil)
if !cmp.Equal(got, ErrStringArrayEncodeTooLarge, cmp.Comparer(equalError)) {
t.Fatalf("expected error, got: %v", got)
}
}
func TestStringArrayEncodeAll_Single(t *testing.T) {
src := []string{"v1"}
b, err := StringArrayEncodeAll(src, nil)