Merge pull request #4402 from influxdb/pd-tsm-file-size-limit

TSM1 fixes
pull/4474/head
Paul Dix 2015-10-16 07:46:08 -04:00
commit f4d8271006
2 changed files with 49 additions and 1 deletions

View File

@ -1069,7 +1069,7 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
currentPosition += (12 + length)
// make sure we're not at the end of the file
if fpos >= oldDF.size {
if fpos >= oldDF.indexPosition() {
break
}
}

View File

@ -1305,6 +1305,54 @@ func TestEngine_IndexGoodAfterFlush(t *testing.T) {
verify()
}
// Ensure that when rewriting an index file with values in a
// series not in the file doesn't cause corruption on compaction
func TestEngine_RewriteFileAndCompact(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
fields := []string{"value"}
e.RotateFileSize = 10
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
p4 := parsePoint("cpu,host=A value=1.5 4000000000")
p5 := parsePoint("cpu,host=A value=1.6 5000000000")
p6 := parsePoint("cpu,host=B value=2.1 2000000000")
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p4, p5, p6}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error())
}
func() {
tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, _ := c.SeekTo(0)
if k != p1.UnixNano() {
t.Fatalf("wrong time %d", k)
}
c = tx.Cursor("cpu,host=B", fields, nil, true)
k, _ = c.SeekTo(0)
if k != p6.UnixNano() {
t.Fatalf("wrong time %d", k)
}
}()
}
// Engine represents a test wrapper for tsm1.Engine.
type Engine struct {
*tsm1.Engine