Update compactions to run concurrently

This has a few changes in it (unfortuantely).  The main change is to run compactions
concurrently.  While implementing this, a few query and performance bugs showed up that
are also fixed by this commit.
pull/5221/head
Jason Wilder 2015-12-23 15:19:12 -07:00
parent 48d4156eac
commit a38c95ec85
12 changed files with 582 additions and 185 deletions

View File

@ -369,14 +369,8 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
encoded := buf[9:]
var v []tsm1.Value
v, err := tsm1.DecodeBlock(buf, v)
if err != nil {
fmt.Printf("error: %v\n", err.Error())
os.Exit(1)
}
pointCount += int64(len(v))
cnt := tsm1.BlockCount(buf)
pointCount += int64(cnt)
// Length of the timestamp block
tsLen, j := binary.Uvarint(encoded)
@ -409,7 +403,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
strconv.FormatUint(id, 10),
typeDesc,
startTime.UTC().Format(time.RFC3339Nano),
strconv.FormatInt(int64(len(v)), 10),
strconv.FormatInt(int64(cnt), 10),
fmt.Sprintf("%s/%s", tsEncoding, vEncoding),
fmt.Sprintf("%d/%d", len(ts), len(values)),
}, "\t"))

View File

@ -63,10 +63,6 @@ const (
// the shard hasn't received writes or deletes
DefaultCacheSnapshotWriteColdDuration = time.Duration(time.Hour)
// DefaultMinCompactionFileCount is the minimum number of TSM files
// that need to exist before a compaction cycle will run
DefaultCompactMinFileCount = 3
// DefaultCompactFullWriteColdDuration is the duration at which the engine
// will compact all TSM files in a shard if it hasn't received a write or delete
DefaultCompactFullWriteColdDuration = time.Duration(24 * time.Hour)
@ -101,7 +97,6 @@ type Config struct {
CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"`
CacheSnapshotMemorySize uint64 `toml:"cache-snapshot-memory-size"`
CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"`
CompactMinFileCount int `toml:"compact-min-file-count"`
CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"`
MaxPointsPerBlock int `toml:"max-points-per-block"`
@ -133,7 +128,6 @@ func NewConfig() Config {
CacheMaxMemorySize: DefaultCacheMaxMemorySize,
CacheSnapshotMemorySize: DefaultCacheSnapshotMemorySize,
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
CompactMinFileCount: DefaultCompactMinFileCount,
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
DataLoggingEnabled: true,

View File

@ -37,32 +37,13 @@ var (
MinTime = time.Unix(0, 0)
)
// compactionSteps are the sizes of files to roll up into before combining.
var compactionSteps = []uint32{
32 * 1024 * 1024,
128 * 1024 * 1024,
512 * 1024 * 1024,
2048 * 1024 * 1024,
}
// compactionLevel takes a size and returns the index of the compaction step
// that the size falls into
func compactionLevel(size uint64) int {
for i, step := range compactionSteps {
if size < uint64(step) {
return i
}
}
return len(compactionSteps)
}
type CompactionGroup []string
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
}
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
@ -75,8 +56,6 @@ type DefaultPlanner struct {
LastModified() time.Time
}
MinCompactionFileCount int
// CompactFullWriteColdDuration specifies the length of time after
// which if no writes have been committed to the WAL, the engine will
// do a full compaction of the TSM files in this shard. This duration
@ -109,6 +88,22 @@ func (t *tsmGeneration) size() uint64 {
return n
}
// compactionLevel returns the level of the files in this generation
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// 4 is for anything else.
if len(t.files) == 1 {
_, seq, _ := ParseTSMFileName(t.files[0].Path)
if seq < 4 {
return seq
}
}
return 4
}
func (t *tsmGeneration) lastModified() time.Time {
var max time.Time
for _, f := range t.files {
@ -124,7 +119,78 @@ func (t *tsmGeneration) count() int {
return len(t.files)
}
// Plan returns a set of TSM files to rewrite
// PlanLevel returns a set of TSM files to rewrite for a specific level
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
generations := c.findGenerations()
if len(generations) <= 1 {
return nil
}
// Loop through the generations and find the generations matching the requested
// level
var cGroup CompactionGroup
for i := 0; i < len(generations)-1; i++ {
cur := generations[i]
next := generations[i+1]
// If the current and next level match the specified level, then add the current level
// to the group
if level == cur.level() && next.level() == level {
for _, f := range cur.files {
cGroup = append(cGroup, f.Path)
}
continue
}
// // Special case that has occurred experimentally where a generation matching the
// // level we are looking for is bordered on each side by a higher level. If this happens,
// // higher level compactions will stop for these files and this specific file will get stuck.
// // In this case, add
// if i > 0 {
// prev := generations[i-1]
// if level == cur.level() && prev.level() > level && next.level() > level {
// for _, f := range cur.files {
// cGroup = append(cGroup, f.Path)
// }
// }
// }
}
// Add the last segments if it matches the level
if len(generations) > 0 {
last := generations[len(generations)-1]
if last.level() == level {
for _, f := range last.files {
cGroup = append(cGroup, f.Path)
}
}
}
// Ensure we have at least 2 generations. For higher levels, we want to use more files to maximize
// the compression, but we don't want it unbounded since that can cause backups of compactions at that
// level.
// Level 1 -> 2
// Level 2 -> 2
// Level 3 -> 4
// Level 4 -> 4
limit := 2
if level%2 != 0 {
limit = level + 1
}
if len(cGroup) < limit {
return nil
}
return []CompactionGroup{cGroup[:limit]}
}
// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
// multiple groups if possible to allow compactions to run concurrently.
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// first check if we should be doing a full compaction because nothing has been written in a long time
if !c.lastPlanCompactedFull && c.CompactFullWriteColdDuration > 0 && time.Now().Sub(lastWrite) > c.CompactFullWriteColdDuration {
@ -160,60 +226,119 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
c.lastPlanCheck = time.Now()
// If there is only one generation, return early to avoid re-compacting the same file
// over and over again.
if len(generations) <= 1 {
return nil
}
// Loop through the generations (they're in decending order) and find the newest generations
// that have the min compaction file count in the same compaction step size
startIndex := 0
endIndex := len(generations)
currentLevel := compactionLevel(generations[0].size())
count := 0
// Need to find the ending point for level 4 files. They will be the oldest files. We scan
// each generation in descending break once we see a file less than 4.
end := 0
start := 0
for i, g := range generations {
level := compactionLevel(g.size())
count += 1
if g.level() <= 3 {
break
}
end = i + 1
}
if level != currentLevel {
if count >= c.MinCompactionFileCount {
endIndex = i
// As compactions run, the oldest files get bigger. We don't want to re-compact them during
// this planning if they are maxed out so skip over any we see.
for i, g := range generations[:end] {
if g.size() > uint64(maxTSMFileSize) {
start = i + 1
}
// This is an edge case that can happen after multiple compactions run. The files at the beginning
// can become larger faster than thes ofter them. We want to skip those really big ones and just
// compact the smaller ones until they are closer in size.
if i > 0 {
if g.size()*2 < generations[i-1].size() {
start = i
break
}
currentLevel = level
startIndex = i
count = 0
}
}
// step is how may files to compact in a group. We want to clamp it at 4 but also stil
// return groups smaller than 4.
step := 4
if step > end {
step = end
}
// slice off the generations that we'll examine
generations = generations[start:end]
// Loop through the generations in groups of size step and see if we can compact all (or
// some of them as group)
groups := []tsmGenerations{}
for i := 0; i < len(generations); i += step {
var skipGroup bool
startIndex := i
for j := i; j < i+step && j < len(generations); j++ {
lvl := generations[j].level()
// Skip compacting this group if there happens to be any lower level files in the
// middle. These will get picked up by the level compactors.
if lvl <= 3 {
skipGroup = true
break
}
if generations[j].size() >= uint64(maxTSMFileSize) {
startIndex++
continue
}
}
if skipGroup {
continue
}
}
if currentLevel == len(compactionSteps) {
return nil
}
generations = generations[startIndex:endIndex]
// if we don't have enough generations to compact, return
if len(generations) < c.MinCompactionFileCount {
return nil
}
// All the files to be compacted must be compacted in order
var tsmFiles []string
for _, group := range generations {
for _, f := range group.files {
tsmFiles = append(tsmFiles, f.Path)
endIndex := i + step
if endIndex > len(generations) {
endIndex = len(generations)
}
if endIndex-startIndex > 0 {
groups = append(groups, generations[startIndex:endIndex])
}
}
sort.Strings(tsmFiles)
// Only one, we can't improve on that so nothing to do
if len(tsmFiles) == 1 {
if len(groups) == 0 {
return nil
}
// With the groups, we need to evaluate whether the group as a whole can be compacted
compactable := []tsmGenerations{}
for _, group := range groups {
//if we don't have enough generations to compact, skip it
if len(group) < 2 {
continue
}
compactable = append(compactable, group)
}
// All the files to be compacted must be compacted in order. We need to convert each
// group to the actual set of files in that group to be compacted.
var tsmFiles []CompactionGroup
for _, c := range compactable {
var cGroup CompactionGroup
for _, group := range c {
for _, f := range group.files {
cGroup = append(cGroup, f.Path)
}
}
sort.Strings(cGroup)
tsmFiles = append(tsmFiles, cGroup)
}
c.lastPlanCompactedFull = false
return []CompactionGroup{tsmFiles}
return tsmFiles
}
// findGenerations groups all the TSM files by they generation based
@ -239,7 +364,7 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
for _, g := range generations {
orderedGenerations = append(orderedGenerations, g)
}
sort.Sort(sort.Reverse(orderedGenerations))
sort.Sort(orderedGenerations)
return orderedGenerations
}
@ -262,7 +387,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) Compact(tsmFiles []string) ([]string, error) {
func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
@ -310,7 +435,7 @@ func (c *Compactor) Compact(tsmFiles []string) ([]string, error) {
return nil, nil
}
tsm, err := NewTSMKeyIterator(size, trs...)
tsm, err := NewTSMKeyIterator(size, fast, trs...)
if err != nil {
return nil, err
}
@ -318,6 +443,16 @@ func (c *Compactor) Compact(tsmFiles []string) ([]string, error) {
return c.writeNewFiles(maxGeneration, maxSequence, tsm)
}
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
return c.compact(false, tsmFiles)
}
// Compact will write multiple smaller TSM files into 1 or more larger files
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
return c.compact(true, tsmFiles)
}
// Clone will return a new compactor that can be used even if the engine is closed
func (c *Compactor) Clone() *Compactor {
return &Compactor{
@ -452,7 +587,17 @@ type tsmKeyIterator struct {
// err is any error we received while iterating values.
err error
// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
// fast is true to prevent overlapping blocks of time for the same key.
// If false, the blocks will be decoded and duplicated (if needed) and
// then chunked into the maximally sized blocks.
fast bool
// size is the maximum number of values to encode in a single block
size int
// key is the current key lowest key across all readers that has not be fully exhausted
// of values.
key string
@ -482,7 +627,7 @@ func (a blocks) Less(i, j int) bool {
func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
@ -495,6 +640,7 @@ func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) {
keys: make([]string, len(readers)),
size: size,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
}, nil
}
@ -582,7 +728,7 @@ func (k *tsmKeyIterator) Next() bool {
// Quickly scan each block to see if any overlap with the first block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; i < len(k.blocks); i++ {
if k.blocks[i].minTime.Equal(k.blocks[0].maxTime) || k.blocks[i].minTime.Before(k.blocks[0].maxTime) {
if k.blocks[i].minTime.Equal(k.blocks[i-1].maxTime) || k.blocks[i].minTime.Before(k.blocks[i-1].maxTime) {
dedup = true
break
}
@ -627,13 +773,19 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
i++
}
if k.fast {
for i < len(k.blocks) {
chunked = append(chunked, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
chunked = append(chunked, k.blocks[i])
i++
}
var needSort bool
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) {
@ -642,17 +794,12 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {
k.err = err
return nil
}
if len(decoded) > 0 && decoded[len(decoded)-1].Time().After(v[0].Time()) {
needSort = true
}
decoded = append(decoded, v...)
i++
}
if needSort {
sort.Sort(Values(decoded))
}
sort.Sort(Values(decoded))
return k.chunk(chunked, decoded)
}
}

View File

@ -78,7 +78,7 @@ func TestCompactor_Snapshot(t *testing.T) {
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact(t *testing.T) {
func TestCompactor_CompactFull(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
@ -110,7 +110,7 @@ func TestCompactor_Compact(t *testing.T) {
FileStore: &fakeFileStore{},
}
files, err := compactor.Compact([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -171,7 +171,7 @@ func TestCompactor_Compact(t *testing.T) {
}
// Ensures that a compaction will properly merge multiple TSM files
func TestCompactor_Compact_SkipFullBlocks(t *testing.T) {
func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
@ -201,7 +201,7 @@ func TestCompactor_Compact_SkipFullBlocks(t *testing.T) {
Size: 2,
}
files, err := compactor.Compact([]string{f1, f2, f3})
files, err := compactor.CompactFull([]string{f1, f2, f3})
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
@ -275,7 +275,7 @@ func TestTSMKeyIterator_Single(t *testing.T) {
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(1, r)
iter, err := tsm1.NewTSMKeyIterator(1, false, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -325,7 +325,7 @@ func TestTSMKeyIterator_Chunked(t *testing.T) {
r := MustTSMReader(dir, 1, writes)
iter, err := tsm1.NewTSMKeyIterator(1, r)
iter, err := tsm1.NewTSMKeyIterator(1, false, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -387,7 +387,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) {
r2 := MustTSMReader(dir, 2, writes2)
iter, err := tsm1.NewTSMKeyIterator(1, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -446,7 +446,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([]string{"cpu,host=A#!~#count"})
iter, err := tsm1.NewTSMKeyIterator(1, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
@ -585,7 +585,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}
func TestDefaultCompactionPlanner_Min(t *testing.T) {
func TestDefaultPlanner_Plan_Min(t *testing.T) {
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
@ -605,7 +605,6 @@ func TestDefaultCompactionPlanner_Min(t *testing.T) {
}
},
},
MinCompactionFileCount: 5,
}
tsm := cp.Plan(time.Now())
@ -616,19 +615,23 @@ func TestDefaultCompactionPlanner_Min(t *testing.T) {
// Ensure that if there are older files that can be compacted together but a newer
// file that is in a larger step, the older ones will get compacted.
func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) {
func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Size: 1 * 1024 * 1024,
Path: "01-04.tsm1",
Size: 128 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Size: 1 * 1024 * 1024,
Path: "02-04.tsm1",
Size: 128 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-03.tsm1",
Size: 1 * 1024 * 1024,
Path: "03-04.tsm1",
Size: 128 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-04.tsm1",
Size: 128 * 1024 * 1024,
},
tsm1.FileStat{
Path: "06-02.tsm1",
@ -639,7 +642,7 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) {
Size: 128 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-1.tsm1",
Path: "08-01.tsm1",
Size: 251 * 1024 * 1024,
},
}
@ -650,10 +653,9 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) {
return data
},
},
MinCompactionFileCount: 3,
}
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
@ -667,26 +669,105 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) {
}
// Ensure that the planner grabs the smallest compaction step
func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) {
func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Path: "01-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-04.tsm1",
Size: 129 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-04.tsm1",
Size: 129 * 1024 * 1024,
},
tsm1.FileStat{
Path: "06-04.tsm1",
Size: 129 * 1024 * 1024,
},
tsm1.FileStat{
Path: "07-04.tsm1",
Size: 129 * 1024 * 1024,
},
tsm1.FileStat{
Path: "08-04.tsm1",
Size: 129 * 1024 * 1024,
},
tsm1.FileStat{
Path: "09-04.tsm1", // should be skipped
Size: 129 * 1024 * 1024,
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3],
data[4], data[5], data[6], data[7]}
tsm := cp.Plan(time.Now())
if got, exp := len(tsm), 2; got != exp {
t.Fatalf("compaction group length mismatch: got %v, exp %v", got, exp)
}
if exp, got := len(expFiles[:4]), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if exp, got := len(expFiles[4:]), len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles[:4] {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
for i, p := range expFiles[4:] {
if got, exp := tsm[1][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
// Ensure that the planner grabs the smallest compaction step
func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-02.tsm1",
Path: "03-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-02.tsm1",
Path: "04-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Path: "05-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
@ -701,11 +782,10 @@ func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) {
return data
},
},
MinCompactionFileCount: 3,
}
expFiles := []tsm1.FileStat{data[1], data[2], data[3], data[4], data[5]}
tsm := cp.Plan(time.Now())
expFiles := []tsm1.FileStat{data[4], data[5]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
@ -719,7 +799,7 @@ func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) {
// Ensure that the planner will compact all files if no writes
// have happened in some interval
func TestDefaultCompactionPlanner_FullOnCold(t *testing.T) {
func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
@ -770,7 +850,7 @@ func TestDefaultCompactionPlanner_FullOnCold(t *testing.T) {
// Ensure that the planner will compact all files if no writes
// have happened in some interval but skip files already over the limit
func TestDefaultCompactionPlanner_FullSkipMaxSize(t *testing.T) {
func TestDefaultPlanner_Plan_FullSkipMaxSize(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
@ -810,7 +890,7 @@ func TestDefaultCompactionPlanner_FullSkipMaxSize(t *testing.T) {
// Ensure that the planner will not return files that are over the max
// allowable size
func TestDefaultCompactionPlanner_SkipMaxSizeFiles(t *testing.T) {
func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
@ -838,35 +918,39 @@ func TestDefaultCompactionPlanner_SkipMaxSizeFiles(t *testing.T) {
// Ensure that the planner will not return files that are over the max
// allowable size
func TestDefaultCompactionPlanner_SkipPlanningAfterFull(t *testing.T) {
func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
testSet := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Size: 1024 * 1024,
Path: "01-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Size: 1024 * 1024,
Path: "02-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-04.tsm1",
Size: 256 * 1024 * 1024,
},
}
getFS := func(set []tsm1.FileStat) *fakeFileStore {
return &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return set
},
}
fs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return testSet
},
}
testFileStore := getFS(testSet)
cp := &tsm1.DefaultPlanner{
FileStore: testFileStore,
FileStore: fs,
CompactFullWriteColdDuration: time.Nanosecond,
}
// first verify that our test set would return files
if exp, got := 2, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp {
if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
@ -882,42 +966,52 @@ func TestDefaultCompactionPlanner_SkipPlanningAfterFull(t *testing.T) {
},
}
cp.FileStore = getFS(over)
overFs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return over
},
}
cp.FileStore = overFs
if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
// even though we do this, the planner should remember that last time we were over
cp.FileStore = testFileStore
cp.FileStore = fs
if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
// ensure that it will plan if last modified has changed
testFileStore.lastModified = time.Now()
fs.lastModified = time.Now()
if exp, got := 2, len(cp.Plan(time.Now())[0]); got != exp {
if exp, got := 4, len(cp.Plan(time.Now())[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}
// Ensure that the planner will compact files that are past the smallest step
// size even if there is a single file in the smaller step size
func TestDefaultCompactionPlanner_CompactsMiddleSteps(t *testing.T) {
func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Path: "01-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Path: "02-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-02.tsm1",
Path: "03-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-02.tsm1",
Path: "04-04.tsm1",
Size: 64 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Size: 2 * 1024 * 1024,
},
}
@ -928,10 +1022,9 @@ func TestDefaultCompactionPlanner_CompactsMiddleSteps(t *testing.T) {
return data
},
},
MinCompactionFileCount: 3,
}
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)

View File

@ -82,7 +82,6 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
Compactor: c,
CompactionPlan: &DefaultPlanner{
FileStore: fs,
MinCompactionFileCount: opt.Config.CompactMinFileCount,
CompactFullWriteColdDuration: time.Duration(opt.Config.CompactFullWriteColdDuration),
},
MaxPointsPerBlock: opt.Config.MaxPointsPerBlock,
@ -131,9 +130,12 @@ func (e *DevEngine) Open() error {
return err
}
e.wg.Add(2)
e.wg.Add(5)
go e.compactCache()
go e.compactTSM()
go e.compactTSMFull()
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)
return nil
}
@ -409,7 +411,71 @@ func (e *DevEngine) ShouldCompactCache(lastWriteTime time.Time) bool {
time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration
}
func (e *DevEngine) compactTSM() {
func (e *DevEngine) compactTSMLevel(fast bool, level int) {
defer e.wg.Done()
for {
select {
case <-e.done:
return
default:
tsmFiles := e.CompactionPlan.PlanLevel(level)
if len(tsmFiles) == 0 {
time.Sleep(time.Second)
continue
}
var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
go func(groupNum int, group CompactionGroup) {
defer wg.Done()
start := time.Now()
e.logger.Printf("beginning level %d compaction of group %d, %d TSM files", level, groupNum, len(group))
for i, f := range group {
e.logger.Printf("compacting level %d group (%d) %s (#%d)", level, groupNum, f, i)
}
var files []string
var err error
if fast {
files, err = e.Compactor.CompactFast(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
time.Sleep(time.Second)
return
}
}
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
time.Sleep(time.Second)
return
}
for i, f := range files {
e.logger.Printf("compacted level %d group (%d) into %s (#%d)", level, groupNum, f, i)
}
e.logger.Printf("compacted level %d group %d of %d files into %d files in %s",
level, groupNum, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()
}
}
}
func (e *DevEngine) compactTSMFull() {
defer e.wg.Done()
for {
@ -425,24 +491,38 @@ func (e *DevEngine) compactTSM() {
continue
}
start := time.Now()
e.logger.Printf("beginning compaction of %d TSM files", len(tsmFiles))
var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
go func(groupNum int, group CompactionGroup) {
defer wg.Done()
start := time.Now()
e.logger.Printf("beginning full compaction of group %d, %d TSM files", groupNum, len(group))
for i, f := range group {
e.logger.Printf("compacting full group (%d) %s (#%d)", groupNum, f, i)
}
files, err := e.Compactor.Compact(tsmFiles[0])
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
time.Sleep(time.Second)
continue
files, err := e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
time.Sleep(time.Second)
return
}
if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
time.Sleep(time.Second)
return
}
for i, f := range files {
e.logger.Printf("compacted full group (%d) into %s (#%d)", groupNum, f, i)
}
e.logger.Printf("compacted full %d files into %d files in %s",
len(group), len(files), time.Since(start))
}(i, group)
}
if err := e.FileStore.Replace(tsmFiles[0], files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
time.Sleep(time.Second)
continue
}
e.logger.Printf("compacted %d TSM into %d files in %s",
len(tsmFiles), len(files), time.Since(start))
wg.Wait()
}
}
}

View File

@ -391,6 +391,10 @@ func newFiles(values ...keyValues) ([]TSMFile, error) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
r, err := NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
return nil, err

View File

@ -505,10 +505,18 @@ type KeyCursor struct {
type location struct {
r TSMFile
entry *IndexEntry
// Has this location been before
read bool
}
func (c *KeyCursor) init(t time.Time, ascending bool) {
if c.ready {
// Re-set the read status of each blocks in case a cursor is Seeked to
// multiple times
for _, c := range c.seeks {
c.read = false
}
return
}
c.ascending = ascending
@ -592,6 +600,7 @@ func (c *KeyCursor) readAt() ([]Value, error) {
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadAt(first.entry, c.buf[:0])
first.read = true
// Only one block with this key and time range so return it
if len(c.current) == 1 {
@ -602,15 +611,16 @@ func (c *KeyCursor) readAt() ([]Value, error) {
// dedup them.
for i := 1; i < len(c.current); i++ {
cur := c.current[i]
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) {
if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadAt(cur.entry, nil)
if err != nil {
return nil, err
}
values = append(values, v...)
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) {
} else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read {
cur.read = true
c.pos--
v, err := cur.r.ReadAt(cur.entry, nil)
@ -628,9 +638,15 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) {
c.current = c.current[:0]
if ascending {
c.pos++
if c.pos >= len(c.seeks) {
return nil, nil
for {
c.pos++
if c.pos >= len(c.seeks) {
return nil, nil
}
if !c.seeks[c.pos].read {
break
}
}
// Append the first matching block
@ -639,7 +655,11 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) {
// If we have ovelapping blocks, append all their values so we can dedup
if c.duplicates {
first := c.seeks[c.pos]
for i := c.pos; i < len(c.seeks); i++ {
for i := c.pos + 1; i < len(c.seeks); i++ {
if c.seeks[i].read {
continue
}
if c.seeks[i].entry.MinTime.Before(first.entry.MaxTime) || c.seeks[i].entry.MinTime.Equal(first.entry.MaxTime) {
c.current = append(c.current, c.seeks[i])
}
@ -649,9 +669,15 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) {
return c.readAt()
} else {
c.pos--
if c.pos < 0 {
return nil, nil
for {
c.pos--
if c.pos < 0 {
return nil, nil
}
if !c.seeks[c.pos].read {
break
}
}
// Append the first matching block
@ -661,6 +687,9 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) {
if c.duplicates {
first := c.seeks[c.pos]
for i := c.pos; i >= 0; i-- {
if c.seeks[i].read {
continue
}
if c.seeks[i].entry.MaxTime.After(first.entry.MinTime) || c.seeks[i].entry.MaxTime.Equal(first.entry.MinTime) {
c.current = append(c.current, c.seeks[i])
}

View File

@ -603,7 +603,7 @@ func newFileDir(dir string, values ...keyValues) ([]string, error) {
return nil, err
}
if err := f.Close(); err != nil {
if err := w.Close(); err != nil {
return nil, err
}
newName := filepath.Join(filepath.Dir(f.Name()), tsmFileName(id))
@ -636,6 +636,10 @@ func newFiles(values ...keyValues) ([]tsm1.TSMFile, error) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
return nil, err

View File

@ -8,13 +8,11 @@ import (
"unsafe"
)
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}
const MAP_POPULATE = 0x8000
if err := madvise(mmap, syscall.MADV_RANDOM); err != nil {
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED|MAP_POPULATE)
if err != nil {
return nil, err
}

View File

@ -26,6 +26,10 @@ func TestTSMReader_Type(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -566,6 +570,10 @@ func TestBlockIterator_Single(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -625,6 +633,10 @@ func TestBlockIterator_MultipleBlocks(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -690,6 +702,10 @@ func TestBlockIterator_Sorted(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)

View File

@ -63,6 +63,7 @@ The last section is the footer that stores the offset of the start of the index.
*/
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
@ -456,9 +457,10 @@ func (d *directIndex) Size() uint32 {
// tsmWriter writes keys and values in the TSM format
type tsmWriter struct {
w io.Writer
index TSMIndex
n int64
wrapped io.Writer
w *bufio.Writer
index TSMIndex
n int64
}
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
@ -466,7 +468,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) {
blocks: map[string]*indexEntries{},
}
return &tsmWriter{w: w, index: index}, nil
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 4*1024*1024), index: index}, nil
}
func (t *tsmWriter) writeHeader() error {
@ -574,7 +576,11 @@ func (t *tsmWriter) WriteIndex() error {
}
func (t *tsmWriter) Close() error {
if c, ok := t.w.(io.Closer); ok {
if err := t.w.Flush(); err != nil {
return err
}
if c, ok := t.wrapped.(io.Closer); ok {
return c.Close()
}
return nil

View File

@ -58,6 +58,10 @@ func TestTSMWriter_Write_Single(t *testing.T) {
}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error writing index: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
@ -114,6 +118,10 @@ func TestTSMWriter_Write_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -168,6 +176,10 @@ func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -223,6 +235,10 @@ func TestTSMWriter_Write_ReverseKeys(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -278,6 +294,10 @@ func TestTSMWriter_Write_SameKey(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -334,6 +354,10 @@ func TestTSMWriter_Read_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
@ -421,6 +445,10 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
if got, exp := len(b.Bytes()), 5; got < exp {
t.Fatalf("file size mismatch: got %v, exp %v", got, exp)
}
@ -455,6 +483,10 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
t.Fatalf("unexpected error closing: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
// Now create a reader to verify the written blocks matches the originally
// written file using Write
r, err = tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))