Don't rewrite TSM files while WAL segments exist

This approach is not working and needs to be reworked.
pull/4956/head
Jason Wilder 2015-12-02 09:15:27 -07:00
parent 5744f5ba02
commit 751d1dd467
2 changed files with 11 additions and 135 deletions

View File

@ -103,47 +103,32 @@ func (c *DefaultPlanner) Plan() (tsmFiles, walSegments []string, err error) {
var walPaths []string
var tsmPaths []string
// Since TSM files can be added from different conditions, this is used to ensure
// they are added only once.
uniqueTsmPaths := map[string]struct{}{}
// We need to rewrite any TSM files that could contain points for any keys in the
// WAL segments.
for _, w := range wal {
for _, t := range tsmStats {
if t.OverlapsTimeRange(w.MinTime, w.MaxTime) && t.OverlapsKeyRange(w.MinKey, w.MaxKey) {
uniqueTsmPaths[t.Path] = struct{}{}
}
}
walPaths = append(walPaths, w.Path)
}
if len(wal) < maxCompactionSegments && len(tsmPaths) == 0 {
var hasDeletes bool
// Only look to rollup TSM files we don't have any WAL files to compact
if len(wal) == 0 && len(tsmStats) > 1 {
for _, tsm := range tsmStats {
if tsm.HasTombstone {
uniqueTsmPaths[tsm.Path] = struct{}{}
}
}
tsmPaths = append(tsmPaths, tsm.Path)
hasDeletes = true
continue
}
// Only look to rollup TSM files if we don't have any already identified to be
// re-written and we have less than the max WAL segments.
if len(wal) < maxCompactionSegments && len(tsmPaths) == 0 && len(tsmStats) > 1 {
for _, tsm := range tsmStats {
if tsm.Size > maxTSMFileSize {
continue
}
uniqueTsmPaths[tsm.Path] = struct{}{}
tsmPaths = append(tsmPaths, tsm.Path)
}
}
for k := range uniqueTsmPaths {
tsmPaths = append(tsmPaths, k)
}
sort.Strings(tsmPaths)
if len(tsmPaths) == 1 && len(walPaths) == 0 {
if !hasDeletes && len(tsmPaths) == 1 && len(walPaths) == 0 {
return nil, nil, nil
}
return tsmPaths, walPaths, nil

View File

@ -1760,123 +1760,13 @@ func TestDefaultCompactionPlanner_NoRewrite_MaxWAL(t *testing.T) {
}
}
func TestDefaultCompactionPlanner_Rewrite_MixWAL(t *testing.T) {
c := tsm1.NewCache(0)
cp := &tsm1.DefaultPlanner{
WAL: &fakeWAL{
ClosedSegmentsFn: func() ([]tsm1.SegmentStat, error) {
return []tsm1.SegmentStat{
tsm1.SegmentStat{Path: "00001.wal"},
tsm1.SegmentStat{Path: "00002.wal"},
tsm1.SegmentStat{Path: "00003.wal"},
tsm1.SegmentStat{Path: "00004.wal"},
tsm1.SegmentStat{Path: "00005.wal"},
}, nil
},
},
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
Path: "0001.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "0002.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Size: 251 * 1024 * 1024,
},
}
},
},
Cache: c,
}
tsm, wal, err := cp.Plan()
if err != nil {
t.Fatalf("unexpected error running plan: %v", err)
}
if exp, got := 2, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if exp, got := 5, len(wal); got != exp {
t.Fatalf("wal file length mismatch: got %v, exp %v", got, exp)
}
}
func TestDefaultCompactionPlanner_Rewrite_WALOverlap(t *testing.T) {
c := tsm1.NewCache(0)
cp := &tsm1.DefaultPlanner{
WAL: &fakeWAL{
ClosedSegmentsFn: func() ([]tsm1.SegmentStat, error) {
return []tsm1.SegmentStat{
tsm1.SegmentStat{Path: "00001.tsm1",
MinTime: time.Unix(1, 0),
MaxTime: time.Unix(10, 0),
MinKey: "cpu",
MaxKey: "cpu"},
tsm1.SegmentStat{Path: "00002.tsm1"},
tsm1.SegmentStat{Path: "00003.tsm1"},
tsm1.SegmentStat{Path: "00004.tsm1"},
tsm1.SegmentStat{Path: "00005.tsm1"},
}, nil
},
},
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
MinKey: "cpu",
MaxKey: "mem",
MinTime: time.Unix(0, 0),
MaxTime: time.Unix(5, 0),
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Size: 51 * 1024 * 1024,
},
}
},
},
Cache: c,
}
tsm, wal, err := cp.Plan()
if err != nil {
t.Fatalf("unexpected error running plan: %v", err)
}
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if exp, got := 5, len(wal); got != exp {
t.Fatalf("wal file length mismatch: got %v, exp %v", got, exp)
}
}
func TestDefaultCompactionPlanner_Rewrite_Deletes(t *testing.T) {
c := tsm1.NewCache(0)
cp := &tsm1.DefaultPlanner{
WAL: &fakeWAL{
ClosedSegmentsFn: func() ([]tsm1.SegmentStat, error) {
return []tsm1.SegmentStat{
tsm1.SegmentStat{Path: "00001.wal"},
tsm1.SegmentStat{Path: "00002.wal"},
tsm1.SegmentStat{Path: "00003.wal"},
tsm1.SegmentStat{Path: "00004.wal"},
tsm1.SegmentStat{Path: "00005.wal"},
}, nil
return []tsm1.SegmentStat{}, nil
},
},
FileStore: &fakeFileStore{
@ -1904,10 +1794,11 @@ func TestDefaultCompactionPlanner_Rewrite_Deletes(t *testing.T) {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if exp, got := 5, len(wal); got != exp {
if exp, got := 0, len(wal); got != exp {
t.Fatalf("wal file length mismatch: got %v, exp %v", got, exp)
}
}
func assertValueEqual(t *testing.T, a, b tsm1.Value) {
if got, exp := a.Time(), b.Time(); !got.Equal(exp) {
t.Fatalf("time mismatch: got %v, exp %v", got, exp)