From 1bcd8dd5deab14b2b9815e2eefc742bbe8d97f7e Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 19 Oct 2015 12:03:12 -0600 Subject: [PATCH 1/2] Handle reading partially written tsm files better If a tsm file was partially written, we were not able to read the raw block data because we panic/exited when reading the corrupted index. This allows us to read the raw blocks if we can. --- cmd/influx_inspect/tsm.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 626de66d52..afc0aa370c 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -165,11 +165,11 @@ func readIds(path string) (map[string]uint64, error) { } return ids, err } -func readIndex(f *os.File) *tsmIndex { +func readIndex(f *os.File) (*tsmIndex, error) { // Get the file size stat, err := f.Stat() if err != nil { - panic(err.Error()) + return nil, err } // Seek to the series count @@ -177,8 +177,7 @@ func readIndex(f *os.File) *tsmIndex { b := make([]byte, 8) _, err = f.Read(b[:4]) if err != nil { - fmt.Printf("error: %v\n", err.Error()) - os.Exit(1) + return nil, err } seriesCount := binary.BigEndian.Uint32(b) @@ -206,6 +205,10 @@ func readIndex(f *os.File) *tsmIndex { series: count, } + if indexStart < 0 { + return nil, fmt.Errorf("index corrupt: offset=%d", indexStart) + } + // Read the index entries for i := 0; i < count; i++ { f.Read(b) @@ -215,7 +218,7 @@ func readIndex(f *os.File) *tsmIndex { index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)}) } - return index + return index, nil } func cmdDumpTsm1(opts *tsdmDumpOpts) { @@ -254,7 +257,19 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { invIds[v] = k } - index := readIndex(f) + index, err := readIndex(f) + if err != nil { + println("Failed to readIndex:", err.Error()) + + // Create a stubbed out index so we can still try and read the block data directly + // w/o panicing ourselves. + index = &tsmIndex{ + minTime: time.Unix(0, 0), + maxTime: time.Unix(0, 0), + offset: stat.Size(), + } + } + blockStats := &blockStats{} println("Summary:") From ba73b1fac6699e57efe615ac55e72a0304d69d41 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 19 Oct 2015 13:31:43 -0600 Subject: [PATCH 2/2] Fix panic: runtime error: index out of range - Values.MinTime When rewriting a tsm file, a panice on the Values slice could happen if there were no values in the slice and the conditions of the rewrite causes DecodeAndCombine to be called with the empty slice. This could happen is the sizes of the points new values was equal to the MaxPointsInBlock config options and there were no future blocks after the current one being written. When this happens, DecodeAndCombine returns a zero length remaining values slice which is passed back into DecodeAndCombine one last time. In this case, we now just return the original block since there is nothing new to combine. Fixes #4444 #4365 --- tsdb/engine/tsm1/tsm1.go | 5 ++ tsdb/engine/tsm1/tsm1_test.go | 102 +++++++++++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index 0d832717c1..a211f3f81c 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -1649,6 +1649,11 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) { // has future encoded blocks so that this method can know how much of its values can be // combined and output in the resulting encoded block. func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) { + // No new values passed in, so nothing to combine. Just return the existing block. + if len(newValues) == 0 { + return newValues, block, nil + } + values, err := DecodeBlock(block) if err != nil { panic(fmt.Sprintf("failure decoding block: %v", err)) diff --git a/tsdb/engine/tsm1/tsm1_test.go b/tsdb/engine/tsm1/tsm1_test.go index a569639e0f..d9a851aa1e 100644 --- a/tsdb/engine/tsm1/tsm1_test.go +++ b/tsdb/engine/tsm1/tsm1_test.go @@ -1013,7 +1013,81 @@ func TestEngine_WriteIntoCompactedFile(t *testing.T) { } if count := e.DataFileCount(); count != 1 { - t.Fatalf("execpted 1 data file but got %d", count) + t.Fatalf("expected 1 data file but got %d", count) + } + + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) + k, _ := c.SeekTo(0) + if k != 1000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2500000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 3000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 4000000000 { + t.Fatalf("wrong time: %d", k) + } +} + +func TestEngine_WriteIntoCompactedFile_MaxPointsPerBlockZero(t *testing.T) { + e := OpenDefaultEngine() + defer e.Close() + + fields := []string{"value"} + + e.MaxPointsPerBlock = 4 + e.RotateFileSize = 10 + + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=A value=1.2 2000000000") + p3 := parsePoint("cpu,host=A value=1.3 3000000000") + p4 := parsePoint("cpu,host=A value=1.5 4000000000") + p5 := parsePoint("cpu,host=A value=1.6 2500000000") + p6 := parsePoint("cpu,host=A value=1.7 5000000000") + p7 := parsePoint("cpu,host=A value=1.8 6000000000") + p8 := parsePoint("cpu,host=A value=1.9 7000000000") + + if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p6, p7, p8}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if count := e.DataFileCount(); count != 1 { + t.Fatalf("expected 1 data file but got %d", count) } tx, _ := e.Begin(false) @@ -1353,6 +1427,32 @@ func TestEngine_RewriteFileAndCompact(t *testing.T) { }() } +func TestEngine_DecodeAndCombine_NoNewValues(t *testing.T) { + var newValues tsm1.Values + e := OpenDefaultEngine() + defer e.Engine.Close() + + values := make(tsm1.Values, 1) + values[0] = tsm1.NewValue(time.Unix(0, 0), float64(1)) + + block, err := values.Encode(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + remaining, encoded, err := e.DecodeAndCombine(newValues, block, nil, time.Unix(1, 0).UnixNano(), false) + if len(remaining) != 0 { + t.Fatalf("unexpected remaining values: exp %v, got %v", 0, len(remaining)) + } + + if len(encoded) != len(block) { + t.Fatalf("unexpected encoded block length: exp %v, got %v", len(block), len(encoded)) + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + // Engine represents a test wrapper for tsm1.Engine. type Engine struct { *tsm1.Engine