Merge pull request #4504 from influxdb/jw-4444

Fix panic: runtime error: index out of range - Values.MinTime
pull/4491/head
Jason Wilder 2015-10-19 13:44:28 -06:00
commit efea0edb82
3 changed files with 127 additions and 7 deletions

View File

@ -165,11 +165,11 @@ func readIds(path string) (map[string]uint64, error) {
}
return ids, err
}
func readIndex(f *os.File) *tsmIndex {
func readIndex(f *os.File) (*tsmIndex, error) {
// Get the file size
stat, err := f.Stat()
if err != nil {
panic(err.Error())
return nil, err
}
// Seek to the series count
@ -177,8 +177,7 @@ func readIndex(f *os.File) *tsmIndex {
b := make([]byte, 8)
_, err = f.Read(b[:4])
if err != nil {
fmt.Printf("error: %v\n", err.Error())
os.Exit(1)
return nil, err
}
seriesCount := binary.BigEndian.Uint32(b)
@ -206,6 +205,10 @@ func readIndex(f *os.File) *tsmIndex {
series: count,
}
if indexStart < 0 {
return nil, fmt.Errorf("index corrupt: offset=%d", indexStart)
}
// Read the index entries
for i := 0; i < count; i++ {
f.Read(b)
@ -215,7 +218,7 @@ func readIndex(f *os.File) *tsmIndex {
index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)})
}
return index
return index, nil
}
func cmdDumpTsm1(opts *tsdmDumpOpts) {
@ -254,7 +257,19 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
invIds[v] = k
}
index := readIndex(f)
index, err := readIndex(f)
if err != nil {
println("Failed to readIndex:", err.Error())
// Create a stubbed out index so we can still try and read the block data directly
// w/o panicing ourselves.
index = &tsmIndex{
minTime: time.Unix(0, 0),
maxTime: time.Unix(0, 0),
offset: stat.Size(),
}
}
blockStats := &blockStats{}
println("Summary:")

View File

@ -1649,6 +1649,11 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
// has future encoded blocks so that this method can know how much of its values can be
// combined and output in the resulting encoded block.
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) {
// No new values passed in, so nothing to combine. Just return the existing block.
if len(newValues) == 0 {
return newValues, block, nil
}
values, err := DecodeBlock(block)
if err != nil {
panic(fmt.Sprintf("failure decoding block: %v", err))

View File

@ -1013,7 +1013,81 @@ func TestEngine_WriteIntoCompactedFile(t *testing.T) {
}
if count := e.DataFileCount(); count != 1 {
t.Fatalf("execpted 1 data file but got %d", count)
t.Fatalf("expected 1 data file but got %d", count)
}
tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, _ := c.SeekTo(0)
if k != 1000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2500000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 3000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 4000000000 {
t.Fatalf("wrong time: %d", k)
}
}
func TestEngine_WriteIntoCompactedFile_MaxPointsPerBlockZero(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
fields := []string{"value"}
e.MaxPointsPerBlock = 4
e.RotateFileSize = 10
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
p4 := parsePoint("cpu,host=A value=1.5 4000000000")
p5 := parsePoint("cpu,host=A value=1.6 2500000000")
p6 := parsePoint("cpu,host=A value=1.7 5000000000")
p7 := parsePoint("cpu,host=A value=1.8 6000000000")
p8 := parsePoint("cpu,host=A value=1.9 7000000000")
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p6, p7, p8}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if count := e.DataFileCount(); count != 1 {
t.Fatalf("expected 1 data file but got %d", count)
}
tx, _ := e.Begin(false)
@ -1353,6 +1427,32 @@ func TestEngine_RewriteFileAndCompact(t *testing.T) {
}()
}
func TestEngine_DecodeAndCombine_NoNewValues(t *testing.T) {
var newValues tsm1.Values
e := OpenDefaultEngine()
defer e.Engine.Close()
values := make(tsm1.Values, 1)
values[0] = tsm1.NewValue(time.Unix(0, 0), float64(1))
block, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
remaining, encoded, err := e.DecodeAndCombine(newValues, block, nil, time.Unix(1, 0).UnixNano(), false)
if len(remaining) != 0 {
t.Fatalf("unexpected remaining values: exp %v, got %v", 0, len(remaining))
}
if len(encoded) != len(block) {
t.Fatalf("unexpected encoded block length: exp %v, got %v", len(block), len(encoded))
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// Engine represents a test wrapper for tsm1.Engine.
type Engine struct {
*tsm1.Engine