diff --git a/tsdb/engine/tsm1/compact.gen.go b/tsdb/engine/tsm1/compact.gen.go index 6861d85412..c469cd1823 100644 --- a/tsdb/engine/tsm1/compact.gen.go +++ b/tsdb/engine/tsm1/compact.gen.go @@ -1072,7 +1072,7 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks { var v tsdb.FloatArray var err error if err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "float") return nil } @@ -1156,7 +1156,7 @@ func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks { var v tsdb.FloatArray if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "float") return nil } @@ -1190,7 +1190,7 @@ func (k *tsmBatchKeyIterator) chunkFloat(dst blocks) blocks { cb, err := EncodeFloatArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "float") return nil } @@ -1210,7 +1210,7 @@ func (k *tsmBatchKeyIterator) chunkFloat(dst blocks) blocks { minTime, maxTime := k.mergedFloatValues.Timestamps[0], k.mergedFloatValues.Timestamps[len(k.mergedFloatValues.Timestamps)-1] cb, err := EncodeFloatArrayBlock(k.mergedFloatValues, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "float") return nil } @@ -1291,7 +1291,7 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks { var v tsdb.IntegerArray var err error if err = DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "integer") return nil } @@ -1375,7 +1375,7 @@ func (k *tsmBatchKeyIterator) combineInteger(dedup bool) blocks { var v tsdb.IntegerArray if err := DecodeIntegerArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "integer") return nil } @@ -1409,7 +1409,7 @@ func (k *tsmBatchKeyIterator) chunkInteger(dst blocks) blocks { cb, err := EncodeIntegerArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "integer") return nil } @@ -1429,7 +1429,7 @@ func (k *tsmBatchKeyIterator) chunkInteger(dst blocks) blocks { minTime, maxTime := k.mergedIntegerValues.Timestamps[0], k.mergedIntegerValues.Timestamps[len(k.mergedIntegerValues.Timestamps)-1] cb, err := EncodeIntegerArrayBlock(k.mergedIntegerValues, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "integer") return nil } @@ -1510,7 +1510,7 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks { var v tsdb.UnsignedArray var err error if err = DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "unsigned") return nil } @@ -1594,7 +1594,7 @@ func (k *tsmBatchKeyIterator) combineUnsigned(dedup bool) blocks { var v tsdb.UnsignedArray if err := DecodeUnsignedArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "unsigned") return nil } @@ -1628,7 +1628,7 @@ func (k *tsmBatchKeyIterator) chunkUnsigned(dst blocks) blocks { cb, err := EncodeUnsignedArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "unsigned") return nil } @@ -1648,7 +1648,7 @@ func (k *tsmBatchKeyIterator) chunkUnsigned(dst blocks) blocks { minTime, maxTime := k.mergedUnsignedValues.Timestamps[0], k.mergedUnsignedValues.Timestamps[len(k.mergedUnsignedValues.Timestamps)-1] cb, err := EncodeUnsignedArrayBlock(k.mergedUnsignedValues, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "unsigned") return nil } @@ -1729,7 +1729,7 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks { var v tsdb.StringArray var err error if err = DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "string") return nil } @@ -1813,7 +1813,7 @@ func (k *tsmBatchKeyIterator) combineString(dedup bool) blocks { var v tsdb.StringArray if err := DecodeStringArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "string") return nil } @@ -1847,7 +1847,7 @@ func (k *tsmBatchKeyIterator) chunkString(dst blocks) blocks { cb, err := EncodeStringArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "string") return nil } @@ -1867,7 +1867,7 @@ func (k *tsmBatchKeyIterator) chunkString(dst blocks) blocks { minTime, maxTime := k.mergedStringValues.Timestamps[0], k.mergedStringValues.Timestamps[len(k.mergedStringValues.Timestamps)-1] cb, err := EncodeStringArrayBlock(k.mergedStringValues, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "string") return nil } @@ -1948,7 +1948,7 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks { var v tsdb.BooleanArray var err error if err = DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "boolean") return nil } @@ -2032,7 +2032,7 @@ func (k *tsmBatchKeyIterator) combineBoolean(dedup bool) blocks { var v tsdb.BooleanArray if err := DecodeBooleanArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "boolean") return nil } @@ -2066,7 +2066,7 @@ func (k *tsmBatchKeyIterator) chunkBoolean(dst blocks) blocks { cb, err := EncodeBooleanArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "boolean") return nil } @@ -2086,7 +2086,7 @@ func (k *tsmBatchKeyIterator) chunkBoolean(dst blocks) blocks { minTime, maxTime := k.mergedBooleanValues.Timestamps[0], k.mergedBooleanValues.Timestamps[len(k.mergedBooleanValues.Timestamps)-1] cb, err := EncodeBooleanArrayBlock(k.mergedBooleanValues, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "boolean") return nil } diff --git a/tsdb/engine/tsm1/compact.gen.go.tmpl b/tsdb/engine/tsm1/compact.gen.go.tmpl index 96c81dcd2f..d5bafac944 100644 --- a/tsdb/engine/tsm1/compact.gen.go.tmpl +++ b/tsdb/engine/tsm1/compact.gen.go.tmpl @@ -275,7 +275,7 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks { var v tsdb.{{.Name}}Array var err error if err = Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "{{.name}}") return nil } @@ -359,7 +359,7 @@ func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks { var v tsdb.{{.Name}}Array if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil { - k.err = err + k.handleDecodeError(err, "{{.name}}") return nil } @@ -393,7 +393,7 @@ func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks { cb, err := Encode{{.Name}}ArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "{{.name}}") return nil } @@ -413,7 +413,7 @@ func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks { minTime, maxTime := k.merged{{.Name}}Values.Timestamps[0], k.merged{{.Name}}Values.Timestamps[len(k.merged{{.Name}}Values.Timestamps)-1] cb, err := Encode{{.Name}}ArrayBlock(k.merged{{.Name}}Values, nil) // TODO(edd): pool this buffer if err != nil { - k.err = err + k.handleEncodeError(err, "{{.name}}") return nil } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index d40a93f2cb..b142717eec 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1818,6 +1818,14 @@ func (k *tsmBatchKeyIterator) merge() { } } +func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) { + k.err = fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err) +} + +func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) { + k.err = fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err) +} + func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) { // See if compactions were disabled while we were running. select { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 158d5346ce..7ef4c461ba 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -244,6 +244,63 @@ func TestCompactor_CompactFull(t *testing.T) { } } +// Ensures that a compaction will properly merge multiple TSM files +func TestCompactor_DecodeError(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + // write 3 TSM files with different data and one new point + a1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": {a1}, + } + f1 := MustWriteTSM(dir, 1, writes) + + a2 := tsm1.NewValue(2, 1.2) + b1 := tsm1.NewValue(1, 2.1) + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": {a2}, + "cpu,host=B#!~#value": {b1}, + } + f2 := MustWriteTSM(dir, 2, writes) + + a3 := tsm1.NewValue(1, 1.3) + c1 := tsm1.NewValue(1, 3.1) + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": {a3}, + "cpu,host=C#!~#value": {c1}, + } + f3 := MustWriteTSM(dir, 3, writes) + f, err := os.OpenFile(f3, os.O_RDWR, os.ModePerm) + if err != nil { + panic(err) + } + f.WriteAt([]byte("ffff"), 10) // skip over header + f.Close() + + fs := &fakeFileStore{} + defer fs.Close() + compactor := tsm1.NewCompactor() + compactor.Dir = dir + compactor.FileStore = fs + + files, err := compactor.CompactFull([]string{f1, f2, f3}) + if err == nil { + t.Fatalf("expected error writing snapshot: %v", err) + } + if len(files) > 0 { + t.Fatalf("no files should be compacted: got %v", len(files)) + + } + + compactor.Open() + + files, err = compactor.CompactFull([]string{f1, f2, f3}) + if err == nil || err.Error() != "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp" { + t.Fatalf("expected error writing snapshot: %v", err) + } +} + // Ensures that a compaction will properly merge multiple TSM files func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { dir := MustTempDir()