fix(storage): Don't panic when length of source slice is too large
StringArrayEncodeAll will panic if the total length of strings contained in the src slice is > 0xffffffff. This change adds a unit test to replicate the issue and an associated fix to return an error. This also raises an issue that compactions will be unable to make progress under the following condition: * multiple string blocks are to be merged to a single block and * the total length of all strings exceeds the maximum block size that snappy will encode (0xffffffff) The observable effect of this is errors in the logs indicating a compaction failure. Fixes #13687pull/13699/head
parent
f1e1164e96
commit
86734e7fcd
|
@ -1072,7 +1072,7 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
|
|||
var v tsdb.FloatArray
|
||||
var err error
|
||||
if err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "float")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1156,7 +1156,7 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
|
|||
|
||||
var v tsdb.FloatArray
|
||||
if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "float")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1190,7 @@ func (k *tsmBatchKeyIterator) chunkFloat(dst blocks) blocks {
|
|||
|
||||
cb, err := EncodeFloatArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "float")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1210,7 +1210,7 @@ func (k *tsmBatchKeyIterator) chunkFloat(dst blocks) blocks {
|
|||
minTime, maxTime := k.mergedFloatValues.Timestamps[0], k.mergedFloatValues.Timestamps[len(k.mergedFloatValues.Timestamps)-1]
|
||||
cb, err := EncodeFloatArrayBlock(k.mergedFloatValues, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "float")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1291,7 +1291,7 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks {
|
|||
var v tsdb.IntegerArray
|
||||
var err error
|
||||
if err = DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "integer")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1375,7 +1375,7 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks {
|
|||
|
||||
var v tsdb.IntegerArray
|
||||
if err := DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "integer")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1409,7 +1409,7 @@ func (k *tsmBatchKeyIterator) chunkInteger(dst blocks) blocks {
|
|||
|
||||
cb, err := EncodeIntegerArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "integer")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1429,7 +1429,7 @@ func (k *tsmBatchKeyIterator) chunkInteger(dst blocks) blocks {
|
|||
minTime, maxTime := k.mergedIntegerValues.Timestamps[0], k.mergedIntegerValues.Timestamps[len(k.mergedIntegerValues.Timestamps)-1]
|
||||
cb, err := EncodeIntegerArrayBlock(k.mergedIntegerValues, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "integer")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1510,7 +1510,7 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks {
|
|||
var v tsdb.UnsignedArray
|
||||
var err error
|
||||
if err = DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "unsigned")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1594,7 +1594,7 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks {
|
|||
|
||||
var v tsdb.UnsignedArray
|
||||
if err := DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "unsigned")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1628,7 +1628,7 @@ func (k *tsmBatchKeyIterator) chunkUnsigned(dst blocks) blocks {
|
|||
|
||||
cb, err := EncodeUnsignedArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "unsigned")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1648,7 +1648,7 @@ func (k *tsmBatchKeyIterator) chunkUnsigned(dst blocks) blocks {
|
|||
minTime, maxTime := k.mergedUnsignedValues.Timestamps[0], k.mergedUnsignedValues.Timestamps[len(k.mergedUnsignedValues.Timestamps)-1]
|
||||
cb, err := EncodeUnsignedArrayBlock(k.mergedUnsignedValues, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "unsigned")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1729,7 +1729,7 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks {
|
|||
var v tsdb.StringArray
|
||||
var err error
|
||||
if err = DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "string")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1813,7 +1813,7 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks {
|
|||
|
||||
var v tsdb.StringArray
|
||||
if err := DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "string")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1847,7 +1847,7 @@ func (k *tsmBatchKeyIterator) chunkString(dst blocks) blocks {
|
|||
|
||||
cb, err := EncodeStringArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "string")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1867,7 +1867,7 @@ func (k *tsmBatchKeyIterator) chunkString(dst blocks) blocks {
|
|||
minTime, maxTime := k.mergedStringValues.Timestamps[0], k.mergedStringValues.Timestamps[len(k.mergedStringValues.Timestamps)-1]
|
||||
cb, err := EncodeStringArrayBlock(k.mergedStringValues, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "string")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1948,7 +1948,7 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks {
|
|||
var v tsdb.BooleanArray
|
||||
var err error
|
||||
if err = DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "boolean")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2032,7 +2032,7 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks {
|
|||
|
||||
var v tsdb.BooleanArray
|
||||
if err := DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "boolean")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2066,7 +2066,7 @@ func (k *tsmBatchKeyIterator) chunkBoolean(dst blocks) blocks {
|
|||
|
||||
cb, err := EncodeBooleanArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "boolean")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2086,7 +2086,7 @@ func (k *tsmBatchKeyIterator) chunkBoolean(dst blocks) blocks {
|
|||
minTime, maxTime := k.mergedBooleanValues.Timestamps[0], k.mergedBooleanValues.Timestamps[len(k.mergedBooleanValues.Timestamps)-1]
|
||||
cb, err := EncodeBooleanArrayBlock(k.mergedBooleanValues, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "boolean")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -275,7 +275,7 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
|||
var v tsdb.{{.Name}}Array
|
||||
var err error
|
||||
if err = Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "{{.name}}")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -359,7 +359,7 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks {
|
|||
|
||||
var v tsdb.{{.Name}}Array
|
||||
if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil {
|
||||
k.err = err
|
||||
k.handleDecodeError(err, "{{.name}}")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -393,7 +393,7 @@ func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks {
|
|||
|
||||
cb, err := Encode{{.Name}}ArrayBlock(&values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "{{.name}}")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -413,7 +413,7 @@ func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks {
|
|||
minTime, maxTime := k.merged{{.Name}}Values.Timestamps[0], k.merged{{.Name}}Values.Timestamps[len(k.merged{{.Name}}Values.Timestamps)-1]
|
||||
cb, err := Encode{{.Name}}ArrayBlock(k.merged{{.Name}}Values, nil) // TODO(edd): pool this buffer
|
||||
if err != nil {
|
||||
k.err = err
|
||||
k.handleEncodeError(err, "{{.name}}")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1818,6 +1818,14 @@ func (k *tsmBatchKeyIterator) merge() {
|
|||
}
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) {
|
||||
k.err = fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) {
|
||||
k.err = fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err)
|
||||
}
|
||||
|
||||
func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
|
||||
// See if compactions were disabled while we were running.
|
||||
select {
|
||||
|
|
|
@ -244,6 +244,63 @@ func TestCompactor_CompactFull(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensures that a compaction will properly merge multiple TSM files
|
||||
func TestCompactor_DecodeError(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// write 3 TSM files with different data and one new point
|
||||
a1 := tsm1.NewValue(1, 1.1)
|
||||
writes := map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": {a1},
|
||||
}
|
||||
f1 := MustWriteTSM(dir, 1, writes)
|
||||
|
||||
a2 := tsm1.NewValue(2, 1.2)
|
||||
b1 := tsm1.NewValue(1, 2.1)
|
||||
writes = map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": {a2},
|
||||
"cpu,host=B#!~#value": {b1},
|
||||
}
|
||||
f2 := MustWriteTSM(dir, 2, writes)
|
||||
|
||||
a3 := tsm1.NewValue(1, 1.3)
|
||||
c1 := tsm1.NewValue(1, 3.1)
|
||||
writes = map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": {a3},
|
||||
"cpu,host=C#!~#value": {c1},
|
||||
}
|
||||
f3 := MustWriteTSM(dir, 3, writes)
|
||||
f, err := os.OpenFile(f3, os.O_RDWR, os.ModePerm)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
f.WriteAt([]byte("ffff"), 10) // skip over header
|
||||
f.Close()
|
||||
|
||||
fs := &fakeFileStore{}
|
||||
defer fs.Close()
|
||||
compactor := tsm1.NewCompactor()
|
||||
compactor.Dir = dir
|
||||
compactor.FileStore = fs
|
||||
|
||||
files, err := compactor.CompactFull([]string{f1, f2, f3})
|
||||
if err == nil {
|
||||
t.Fatalf("expected error writing snapshot: %v", err)
|
||||
}
|
||||
if len(files) > 0 {
|
||||
t.Fatalf("no files should be compacted: got %v", len(files))
|
||||
|
||||
}
|
||||
|
||||
compactor.Open()
|
||||
|
||||
files, err = compactor.CompactFull([]string{f1, f2, f3})
|
||||
if err == nil || err.Error() != "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp" {
|
||||
t.Fatalf("expected error writing snapshot: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensures that a compaction will properly merge multiple TSM files
|
||||
func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
|
|
Loading…
Reference in New Issue