diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 626de66d52..afc0aa370c 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -165,11 +165,11 @@ func readIds(path string) (map[string]uint64, error) { } return ids, err } -func readIndex(f *os.File) *tsmIndex { +func readIndex(f *os.File) (*tsmIndex, error) { // Get the file size stat, err := f.Stat() if err != nil { - panic(err.Error()) + return nil, err } // Seek to the series count @@ -177,8 +177,7 @@ func readIndex(f *os.File) *tsmIndex { b := make([]byte, 8) _, err = f.Read(b[:4]) if err != nil { - fmt.Printf("error: %v\n", err.Error()) - os.Exit(1) + return nil, err } seriesCount := binary.BigEndian.Uint32(b) @@ -206,6 +205,10 @@ func readIndex(f *os.File) *tsmIndex { series: count, } + if indexStart < 0 { + return nil, fmt.Errorf("index corrupt: offset=%d", indexStart) + } + // Read the index entries for i := 0; i < count; i++ { f.Read(b) @@ -215,7 +218,7 @@ func readIndex(f *os.File) *tsmIndex { index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)}) } - return index + return index, nil } func cmdDumpTsm1(opts *tsdmDumpOpts) { @@ -254,7 +257,19 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { invIds[v] = k } - index := readIndex(f) + index, err := readIndex(f) + if err != nil { + println("Failed to readIndex:", err.Error()) + + // Create a stubbed out index so we can still try and read the block data directly + // w/o panicing ourselves. + index = &tsmIndex{ + minTime: time.Unix(0, 0), + maxTime: time.Unix(0, 0), + offset: stat.Size(), + } + } + blockStats := &blockStats{} println("Summary:") diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index 0d832717c1..a211f3f81c 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -1649,6 +1649,11 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) { // has future encoded blocks so that this method can know how much of its values can be // combined and output in the resulting encoded block. func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) { + // No new values passed in, so nothing to combine. Just return the existing block. + if len(newValues) == 0 { + return newValues, block, nil + } + values, err := DecodeBlock(block) if err != nil { panic(fmt.Sprintf("failure decoding block: %v", err)) diff --git a/tsdb/engine/tsm1/tsm1_test.go b/tsdb/engine/tsm1/tsm1_test.go index a569639e0f..d9a851aa1e 100644 --- a/tsdb/engine/tsm1/tsm1_test.go +++ b/tsdb/engine/tsm1/tsm1_test.go @@ -1013,7 +1013,81 @@ func TestEngine_WriteIntoCompactedFile(t *testing.T) { } if count := e.DataFileCount(); count != 1 { - t.Fatalf("execpted 1 data file but got %d", count) + t.Fatalf("expected 1 data file but got %d", count) + } + + tx, _ := e.Begin(false) + defer tx.Rollback() + c := tx.Cursor("cpu,host=A", fields, nil, true) + k, _ := c.SeekTo(0) + if k != 1000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 2500000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 3000000000 { + t.Fatalf("wrong time: %d", k) + } + k, _ = c.Next() + if k != 4000000000 { + t.Fatalf("wrong time: %d", k) + } +} + +func TestEngine_WriteIntoCompactedFile_MaxPointsPerBlockZero(t *testing.T) { + e := OpenDefaultEngine() + defer e.Close() + + fields := []string{"value"} + + e.MaxPointsPerBlock = 4 + e.RotateFileSize = 10 + + p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p2 := parsePoint("cpu,host=A value=1.2 2000000000") + p3 := parsePoint("cpu,host=A value=1.3 3000000000") + p4 := parsePoint("cpu,host=A value=1.5 4000000000") + p5 := parsePoint("cpu,host=A value=1.6 2500000000") + p6 := parsePoint("cpu,host=A value=1.7 5000000000") + p7 := parsePoint("cpu,host=A value=1.8 6000000000") + p8 := parsePoint("cpu,host=A value=1.9 7000000000") + + if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p6, p7, p8}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if err := e.Compact(true); err != nil { + t.Fatalf("error compacting: %s", err.Error()) + } + + if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + if count := e.DataFileCount(); count != 1 { + t.Fatalf("expected 1 data file but got %d", count) } tx, _ := e.Begin(false) @@ -1353,6 +1427,32 @@ func TestEngine_RewriteFileAndCompact(t *testing.T) { }() } +func TestEngine_DecodeAndCombine_NoNewValues(t *testing.T) { + var newValues tsm1.Values + e := OpenDefaultEngine() + defer e.Engine.Close() + + values := make(tsm1.Values, 1) + values[0] = tsm1.NewValue(time.Unix(0, 0), float64(1)) + + block, err := values.Encode(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + remaining, encoded, err := e.DecodeAndCombine(newValues, block, nil, time.Unix(1, 0).UnixNano(), false) + if len(remaining) != 0 { + t.Fatalf("unexpected remaining values: exp %v, got %v", 0, len(remaining)) + } + + if len(encoded) != len(block) { + t.Fatalf("unexpected encoded block length: exp %v, got %v", len(block), len(encoded)) + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + // Engine represents a test wrapper for tsm1.Engine. type Engine struct { *tsm1.Engine