diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index b70b42d76f..c142d5a14b 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -369,14 +369,8 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { encoded := buf[9:] - var v []tsm1.Value - v, err := tsm1.DecodeBlock(buf, v) - if err != nil { - fmt.Printf("error: %v\n", err.Error()) - os.Exit(1) - } - - pointCount += int64(len(v)) + cnt := tsm1.BlockCount(buf) + pointCount += int64(cnt) // Length of the timestamp block tsLen, j := binary.Uvarint(encoded) @@ -409,7 +403,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { strconv.FormatUint(id, 10), typeDesc, startTime.UTC().Format(time.RFC3339Nano), - strconv.FormatInt(int64(len(v)), 10), + strconv.FormatInt(int64(cnt), 10), fmt.Sprintf("%s/%s", tsEncoding, vEncoding), fmt.Sprintf("%d/%d", len(ts), len(values)), }, "\t")) diff --git a/tsdb/config.go b/tsdb/config.go index 0f64b6d331..252e1ebd4a 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -63,10 +63,6 @@ const ( // the shard hasn't received writes or deletes DefaultCacheSnapshotWriteColdDuration = time.Duration(time.Hour) - // DefaultMinCompactionFileCount is the minimum number of TSM files - // that need to exist before a compaction cycle will run - DefaultCompactMinFileCount = 3 - // DefaultCompactFullWriteColdDuration is the duration at which the engine // will compact all TSM files in a shard if it hasn't received a write or delete DefaultCompactFullWriteColdDuration = time.Duration(24 * time.Hour) @@ -101,7 +97,6 @@ type Config struct { CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"` CacheSnapshotMemorySize uint64 `toml:"cache-snapshot-memory-size"` CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"` - CompactMinFileCount int `toml:"compact-min-file-count"` CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` MaxPointsPerBlock int `toml:"max-points-per-block"` @@ -133,7 +128,6 @@ func NewConfig() Config { CacheMaxMemorySize: DefaultCacheMaxMemorySize, CacheSnapshotMemorySize: DefaultCacheSnapshotMemorySize, CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration), - CompactMinFileCount: DefaultCompactMinFileCount, CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration), DataLoggingEnabled: true, diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index edc4a9af15..ad8af8bcc3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -37,32 +37,13 @@ var ( MinTime = time.Unix(0, 0) ) -// compactionSteps are the sizes of files to roll up into before combining. -var compactionSteps = []uint32{ - 32 * 1024 * 1024, - 128 * 1024 * 1024, - 512 * 1024 * 1024, - 2048 * 1024 * 1024, -} - -// compactionLevel takes a size and returns the index of the compaction step -// that the size falls into -func compactionLevel(size uint64) int { - for i, step := range compactionSteps { - if size < uint64(step) { - return i - } - } - - return len(compactionSteps) -} - type CompactionGroup []string // CompactionPlanner determines what TSM files and WAL segments to include in a // given compaction run. type CompactionPlanner interface { Plan(lastWrite time.Time) []CompactionGroup + PlanLevel(level int) []CompactionGroup } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -75,8 +56,6 @@ type DefaultPlanner struct { LastModified() time.Time } - MinCompactionFileCount int - // CompactFullWriteColdDuration specifies the length of time after // which if no writes have been committed to the WAL, the engine will // do a full compaction of the TSM files in this shard. This duration @@ -109,6 +88,22 @@ func (t *tsmGeneration) size() uint64 { return n } +// compactionLevel returns the level of the files in this generation +func (t *tsmGeneration) level() int { + // Level 0 is always created from the result of a cache compaction. It generates + // 1 file with a sequence num of 1. Level 2 is generated by compacting multiple + // level 1 files. Level 3 is generate by compacting multiple level 2 files. Level + // 4 is for anything else. + if len(t.files) == 1 { + _, seq, _ := ParseTSMFileName(t.files[0].Path) + if seq < 4 { + return seq + } + } + + return 4 +} + func (t *tsmGeneration) lastModified() time.Time { var max time.Time for _, f := range t.files { @@ -124,7 +119,78 @@ func (t *tsmGeneration) count() int { return len(t.files) } -// Plan returns a set of TSM files to rewrite +// PlanLevel returns a set of TSM files to rewrite for a specific level +func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { + // Determine the generations from all files on disk. We need to treat + // a generation conceptually as a single file even though it may be + // split across several files in sequence. + generations := c.findGenerations() + + if len(generations) <= 1 { + return nil + } + + // Loop through the generations and find the generations matching the requested + // level + var cGroup CompactionGroup + for i := 0; i < len(generations)-1; i++ { + cur := generations[i] + next := generations[i+1] + + // If the current and next level match the specified level, then add the current level + // to the group + if level == cur.level() && next.level() == level { + for _, f := range cur.files { + cGroup = append(cGroup, f.Path) + } + continue + } + + // // Special case that has occurred experimentally where a generation matching the + // // level we are looking for is bordered on each side by a higher level. If this happens, + // // higher level compactions will stop for these files and this specific file will get stuck. + // // In this case, add + // if i > 0 { + // prev := generations[i-1] + // if level == cur.level() && prev.level() > level && next.level() > level { + // for _, f := range cur.files { + // cGroup = append(cGroup, f.Path) + // } + // } + // } + } + + // Add the last segments if it matches the level + if len(generations) > 0 { + last := generations[len(generations)-1] + if last.level() == level { + for _, f := range last.files { + cGroup = append(cGroup, f.Path) + } + } + } + + // Ensure we have at least 2 generations. For higher levels, we want to use more files to maximize + // the compression, but we don't want it unbounded since that can cause backups of compactions at that + // level. + // Level 1 -> 2 + // Level 2 -> 2 + // Level 3 -> 4 + // Level 4 -> 4 + limit := 2 + if level%2 != 0 { + limit = level + 1 + } + + if len(cGroup) < limit { + return nil + } + return []CompactionGroup{cGroup[:limit]} + +} + +// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns +// multiple groups if possible to allow compactions to run concurrently. func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // first check if we should be doing a full compaction because nothing has been written in a long time if !c.lastPlanCompactedFull && c.CompactFullWriteColdDuration > 0 && time.Now().Sub(lastWrite) > c.CompactFullWriteColdDuration { @@ -160,60 +226,119 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { c.lastPlanCheck = time.Now() + // If there is only one generation, return early to avoid re-compacting the same file + // over and over again. if len(generations) <= 1 { return nil } - // Loop through the generations (they're in decending order) and find the newest generations - // that have the min compaction file count in the same compaction step size - startIndex := 0 - endIndex := len(generations) - currentLevel := compactionLevel(generations[0].size()) - count := 0 + // Need to find the ending point for level 4 files. They will be the oldest files. We scan + // each generation in descending break once we see a file less than 4. + end := 0 + start := 0 for i, g := range generations { - level := compactionLevel(g.size()) - count += 1 + if g.level() <= 3 { + break + } + end = i + 1 + } - if level != currentLevel { - if count >= c.MinCompactionFileCount { - endIndex = i + // As compactions run, the oldest files get bigger. We don't want to re-compact them during + // this planning if they are maxed out so skip over any we see. + for i, g := range generations[:end] { + if g.size() > uint64(maxTSMFileSize) { + start = i + 1 + } + + // This is an edge case that can happen after multiple compactions run. The files at the beginning + // can become larger faster than thes ofter them. We want to skip those really big ones and just + // compact the smaller ones until they are closer in size. + if i > 0 { + if g.size()*2 < generations[i-1].size() { + start = i break } - currentLevel = level - startIndex = i - count = 0 + } + } + + // step is how may files to compact in a group. We want to clamp it at 4 but also stil + // return groups smaller than 4. + step := 4 + if step > end { + step = end + } + + // slice off the generations that we'll examine + generations = generations[start:end] + + // Loop through the generations in groups of size step and see if we can compact all (or + // some of them as group) + groups := []tsmGenerations{} + for i := 0; i < len(generations); i += step { + var skipGroup bool + startIndex := i + + for j := i; j < i+step && j < len(generations); j++ { + lvl := generations[j].level() + + // Skip compacting this group if there happens to be any lower level files in the + // middle. These will get picked up by the level compactors. + if lvl <= 3 { + skipGroup = true + break + } + + if generations[j].size() >= uint64(maxTSMFileSize) { + startIndex++ + continue + } + + } + + if skipGroup { continue } - } - if currentLevel == len(compactionSteps) { - return nil - } - - generations = generations[startIndex:endIndex] - - // if we don't have enough generations to compact, return - if len(generations) < c.MinCompactionFileCount { - return nil - } - - // All the files to be compacted must be compacted in order - var tsmFiles []string - for _, group := range generations { - for _, f := range group.files { - tsmFiles = append(tsmFiles, f.Path) + endIndex := i + step + if endIndex > len(generations) { + endIndex = len(generations) + } + if endIndex-startIndex > 0 { + groups = append(groups, generations[startIndex:endIndex]) } } - sort.Strings(tsmFiles) - // Only one, we can't improve on that so nothing to do - if len(tsmFiles) == 1 { + if len(groups) == 0 { return nil } + // With the groups, we need to evaluate whether the group as a whole can be compacted + compactable := []tsmGenerations{} + for _, group := range groups { + //if we don't have enough generations to compact, skip it + if len(group) < 2 { + continue + } + compactable = append(compactable, group) + } + + // All the files to be compacted must be compacted in order. We need to convert each + // group to the actual set of files in that group to be compacted. + var tsmFiles []CompactionGroup + for _, c := range compactable { + var cGroup CompactionGroup + for _, group := range c { + for _, f := range group.files { + cGroup = append(cGroup, f.Path) + } + } + sort.Strings(cGroup) + tsmFiles = append(tsmFiles, cGroup) + } + c.lastPlanCompactedFull = false - return []CompactionGroup{tsmFiles} + return tsmFiles } // findGenerations groups all the TSM files by they generation based @@ -239,7 +364,7 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { for _, g := range generations { orderedGenerations = append(orderedGenerations, g) } - sort.Sort(sort.Reverse(orderedGenerations)) + sort.Sort(orderedGenerations) return orderedGenerations } @@ -262,7 +387,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } // Compact will write multiple smaller TSM files into 1 or more larger files -func (c *Compactor) Compact(tsmFiles []string) ([]string, error) { +func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { size := c.Size if size <= 0 { size = tsdb.DefaultMaxPointsPerBlock @@ -310,7 +435,7 @@ func (c *Compactor) Compact(tsmFiles []string) ([]string, error) { return nil, nil } - tsm, err := NewTSMKeyIterator(size, trs...) + tsm, err := NewTSMKeyIterator(size, fast, trs...) if err != nil { return nil, err } @@ -318,6 +443,16 @@ func (c *Compactor) Compact(tsmFiles []string) ([]string, error) { return c.writeNewFiles(maxGeneration, maxSequence, tsm) } +// Compact will write multiple smaller TSM files into 1 or more larger files +func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { + return c.compact(false, tsmFiles) +} + +// Compact will write multiple smaller TSM files into 1 or more larger files +func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { + return c.compact(true, tsmFiles) +} + // Clone will return a new compactor that can be used even if the engine is closed func (c *Compactor) Clone() *Compactor { return &Compactor{ @@ -452,7 +587,17 @@ type tsmKeyIterator struct { // err is any error we received while iterating values. err error + // indicates whether the iterator should choose a faster merging strategy over a more + // optimally compressed one. If fast is true, multiple blocks will just be added as is + // and not combined. In some cases, a slower path will need to be utilized even when + // fast is true to prevent overlapping blocks of time for the same key. + // If false, the blocks will be decoded and duplicated (if needed) and + // then chunked into the maximally sized blocks. + fast bool + + // size is the maximum number of values to encode in a single block size int + // key is the current key lowest key across all readers that has not be fully exhausted // of values. key string @@ -482,7 +627,7 @@ func (a blocks) Less(i, j int) bool { func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -495,6 +640,7 @@ func NewTSMKeyIterator(size int, readers ...*TSMReader) (KeyIterator, error) { keys: make([]string, len(readers)), size: size, iterators: iter, + fast: fast, buf: make([]blocks, len(iter)), }, nil } @@ -582,7 +728,7 @@ func (k *tsmKeyIterator) Next() bool { // Quickly scan each block to see if any overlap with the first block, if they overlap then // we need to dedup as there may be duplicate points now for i := 1; i < len(k.blocks); i++ { - if k.blocks[i].minTime.Equal(k.blocks[0].maxTime) || k.blocks[i].minTime.Before(k.blocks[0].maxTime) { + if k.blocks[i].minTime.Equal(k.blocks[i-1].maxTime) || k.blocks[i].minTime.Before(k.blocks[i-1].maxTime) { dedup = true break } @@ -627,13 +773,19 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { i++ } + if k.fast { + for i < len(k.blocks) { + chunked = append(chunked, k.blocks[i]) + i++ + } + } + // If we only have 1 blocks left, just append it as is and avoid decoding/recoding if i == len(k.blocks)-1 { chunked = append(chunked, k.blocks[i]) i++ } - var needSort bool // The remaining blocks can be combined and we know that they do not overlap and // so we can just append each, sort and re-encode. for i < len(k.blocks) { @@ -642,17 +794,12 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks { k.err = err return nil } - if len(decoded) > 0 && decoded[len(decoded)-1].Time().After(v[0].Time()) { - needSort = true - } decoded = append(decoded, v...) i++ } - if needSort { - sort.Sort(Values(decoded)) - } + sort.Sort(Values(decoded)) return k.chunk(chunked, decoded) } } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index d151722001..9fa1724203 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -78,7 +78,7 @@ func TestCompactor_Snapshot(t *testing.T) { } // Ensures that a compaction will properly merge multiple TSM files -func TestCompactor_Compact(t *testing.T) { +func TestCompactor_CompactFull(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) @@ -110,7 +110,7 @@ func TestCompactor_Compact(t *testing.T) { FileStore: &fakeFileStore{}, } - files, err := compactor.Compact([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -171,7 +171,7 @@ func TestCompactor_Compact(t *testing.T) { } // Ensures that a compaction will properly merge multiple TSM files -func TestCompactor_Compact_SkipFullBlocks(t *testing.T) { +func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) @@ -201,7 +201,7 @@ func TestCompactor_Compact_SkipFullBlocks(t *testing.T) { Size: 2, } - files, err := compactor.Compact([]string{f1, f2, f3}) + files, err := compactor.CompactFull([]string{f1, f2, f3}) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -275,7 +275,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -325,7 +325,7 @@ func TestTSMKeyIterator_Chunked(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, r) + iter, err := tsm1.NewTSMKeyIterator(1, false, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -387,7 +387,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -446,7 +446,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([]string{"cpu,host=A#!~#count"}) - iter, err := tsm1.NewTSMKeyIterator(1, r1, r2) + iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -585,7 +585,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } } -func TestDefaultCompactionPlanner_Min(t *testing.T) { +func TestDefaultPlanner_Plan_Min(t *testing.T) { cp := &tsm1.DefaultPlanner{ FileStore: &fakeFileStore{ PathsFn: func() []tsm1.FileStat { @@ -605,7 +605,6 @@ func TestDefaultCompactionPlanner_Min(t *testing.T) { } }, }, - MinCompactionFileCount: 5, } tsm := cp.Plan(time.Now()) @@ -616,19 +615,23 @@ func TestDefaultCompactionPlanner_Min(t *testing.T) { // Ensure that if there are older files that can be compacted together but a newer // file that is in a larger step, the older ones will get compacted. -func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) { +func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ - Path: "01-01.tsm1", - Size: 1 * 1024 * 1024, + Path: "01-04.tsm1", + Size: 128 * 1024 * 1024, }, tsm1.FileStat{ - Path: "02-02.tsm1", - Size: 1 * 1024 * 1024, + Path: "02-04.tsm1", + Size: 128 * 1024 * 1024, }, tsm1.FileStat{ - Path: "03-03.tsm1", - Size: 1 * 1024 * 1024, + Path: "03-04.tsm1", + Size: 128 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "04-04.tsm1", + Size: 128 * 1024 * 1024, }, tsm1.FileStat{ Path: "06-02.tsm1", @@ -639,7 +642,7 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) { Size: 128 * 1024 * 1024, }, tsm1.FileStat{ - Path: "04-1.tsm1", + Path: "08-01.tsm1", Size: 251 * 1024 * 1024, }, } @@ -650,10 +653,9 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) { return data }, }, - MinCompactionFileCount: 3, } - expFiles := []tsm1.FileStat{data[0], data[1], data[2]} + expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) @@ -667,26 +669,105 @@ func TestDefaultCompactionPlanner_CombineSequence(t *testing.T) { } // Ensure that the planner grabs the smallest compaction step -func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) { +func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ - Path: "01-01.tsm1", + Path: "01-04.tsm1", + Size: 64 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "02-04.tsm1", + Size: 64 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "03-04.tsm1", + Size: 64 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "04-04.tsm1", + Size: 129 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "05-04.tsm1", + Size: 129 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "06-04.tsm1", + Size: 129 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "07-04.tsm1", + Size: 129 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "08-04.tsm1", + Size: 129 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "09-04.tsm1", // should be skipped + Size: 129 * 1024 * 1024, + }, + } + + cp := &tsm1.DefaultPlanner{ + FileStore: &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, + } + + expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], + data[4], data[5], data[6], data[7]} + tsm := cp.Plan(time.Now()) + + if got, exp := len(tsm), 2; got != exp { + t.Fatalf("compaction group length mismatch: got %v, exp %v", got, exp) + } + + if exp, got := len(expFiles[:4]), len(tsm[0]); got != exp { + t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } + + if exp, got := len(expFiles[4:]), len(tsm[1]); got != exp { + t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } + + for i, p := range expFiles[:4] { + if got, exp := tsm[0][i], p.Path; got != exp { + t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) + } + } + + for i, p := range expFiles[4:] { + if got, exp := tsm[1][i], p.Path; got != exp { + t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) + } + } + +} + +// Ensure that the planner grabs the smallest compaction step +func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { + data := []tsm1.FileStat{ + tsm1.FileStat{ + Path: "01-03.tsm1", Size: 251 * 1024 * 1024, }, tsm1.FileStat{ - Path: "02-02.tsm1", + Path: "02-03.tsm1", Size: 1 * 1024 * 1024, }, tsm1.FileStat{ - Path: "03-02.tsm1", + Path: "03-03.tsm1", Size: 1 * 1024 * 1024, }, tsm1.FileStat{ - Path: "04-02.tsm1", + Path: "04-03.tsm1", Size: 1 * 1024 * 1024, }, tsm1.FileStat{ - Path: "05-02.tsm1", + Path: "05-01.tsm1", Size: 1 * 1024 * 1024, }, tsm1.FileStat{ @@ -701,11 +782,10 @@ func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) { return data }, }, - MinCompactionFileCount: 3, } - expFiles := []tsm1.FileStat{data[1], data[2], data[3], data[4], data[5]} - tsm := cp.Plan(time.Now()) + expFiles := []tsm1.FileStat{data[4], data[5]} + tsm := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } @@ -719,7 +799,7 @@ func TestDefaultCompactionPlanner_SmallestCompactionStep(t *testing.T) { // Ensure that the planner will compact all files if no writes // have happened in some interval -func TestDefaultCompactionPlanner_FullOnCold(t *testing.T) { +func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ Path: "01-01.tsm1", @@ -770,7 +850,7 @@ func TestDefaultCompactionPlanner_FullOnCold(t *testing.T) { // Ensure that the planner will compact all files if no writes // have happened in some interval but skip files already over the limit -func TestDefaultCompactionPlanner_FullSkipMaxSize(t *testing.T) { +func TestDefaultPlanner_Plan_FullSkipMaxSize(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ Path: "01-01.tsm1", @@ -810,7 +890,7 @@ func TestDefaultCompactionPlanner_FullSkipMaxSize(t *testing.T) { // Ensure that the planner will not return files that are over the max // allowable size -func TestDefaultCompactionPlanner_SkipMaxSizeFiles(t *testing.T) { +func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ Path: "01-01.tsm1", @@ -838,35 +918,39 @@ func TestDefaultCompactionPlanner_SkipMaxSizeFiles(t *testing.T) { // Ensure that the planner will not return files that are over the max // allowable size -func TestDefaultCompactionPlanner_SkipPlanningAfterFull(t *testing.T) { +func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { testSet := []tsm1.FileStat{ tsm1.FileStat{ - Path: "01-01.tsm1", - Size: 1024 * 1024, + Path: "01-05.tsm1", + Size: 256 * 1024 * 1024, }, tsm1.FileStat{ - Path: "02-02.tsm1", - Size: 1024 * 1024, + Path: "02-05.tsm1", + Size: 256 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "03-05.tsm1", + Size: 256 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "04-04.tsm1", + Size: 256 * 1024 * 1024, }, } - getFS := func(set []tsm1.FileStat) *fakeFileStore { - return &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return set - }, - } + fs := &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return testSet + }, } - testFileStore := getFS(testSet) - cp := &tsm1.DefaultPlanner{ - FileStore: testFileStore, + FileStore: fs, CompactFullWriteColdDuration: time.Nanosecond, } // first verify that our test set would return files - if exp, got := 2, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { + if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } @@ -882,42 +966,52 @@ func TestDefaultCompactionPlanner_SkipPlanningAfterFull(t *testing.T) { }, } - cp.FileStore = getFS(over) + overFs := &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return over + }, + } + + cp.FileStore = overFs if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } // even though we do this, the planner should remember that last time we were over - cp.FileStore = testFileStore + cp.FileStore = fs if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } // ensure that it will plan if last modified has changed - testFileStore.lastModified = time.Now() + fs.lastModified = time.Now() - if exp, got := 2, len(cp.Plan(time.Now())[0]); got != exp { + if exp, got := 4, len(cp.Plan(time.Now())[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } } // Ensure that the planner will compact files that are past the smallest step // size even if there is a single file in the smaller step size -func TestDefaultCompactionPlanner_CompactsMiddleSteps(t *testing.T) { +func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ - Path: "01-01.tsm1", + Path: "01-04.tsm1", Size: 64 * 1024 * 1024, }, tsm1.FileStat{ - Path: "02-02.tsm1", + Path: "02-04.tsm1", Size: 64 * 1024 * 1024, }, tsm1.FileStat{ - Path: "03-02.tsm1", + Path: "03-04.tsm1", Size: 64 * 1024 * 1024, }, tsm1.FileStat{ - Path: "04-02.tsm1", + Path: "04-04.tsm1", + Size: 64 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "05-02.tsm1", Size: 2 * 1024 * 1024, }, } @@ -928,10 +1022,9 @@ func TestDefaultCompactionPlanner_CompactsMiddleSteps(t *testing.T) { return data }, }, - MinCompactionFileCount: 3, } - expFiles := []tsm1.FileStat{data[0], data[1], data[2]} + expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index e6e7353b5f..4dc40f87d6 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -82,7 +82,6 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi Compactor: c, CompactionPlan: &DefaultPlanner{ FileStore: fs, - MinCompactionFileCount: opt.Config.CompactMinFileCount, CompactFullWriteColdDuration: time.Duration(opt.Config.CompactFullWriteColdDuration), }, MaxPointsPerBlock: opt.Config.MaxPointsPerBlock, @@ -131,9 +130,12 @@ func (e *DevEngine) Open() error { return err } - e.wg.Add(2) + e.wg.Add(5) go e.compactCache() - go e.compactTSM() + go e.compactTSMFull() + go e.compactTSMLevel(true, 1) + go e.compactTSMLevel(true, 2) + go e.compactTSMLevel(false, 3) return nil } @@ -409,7 +411,71 @@ func (e *DevEngine) ShouldCompactCache(lastWriteTime time.Time) bool { time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration } -func (e *DevEngine) compactTSM() { +func (e *DevEngine) compactTSMLevel(fast bool, level int) { + defer e.wg.Done() + + for { + select { + case <-e.done: + return + + default: + tsmFiles := e.CompactionPlan.PlanLevel(level) + + if len(tsmFiles) == 0 { + time.Sleep(time.Second) + continue + } + + var wg sync.WaitGroup + for i, group := range tsmFiles { + wg.Add(1) + go func(groupNum int, group CompactionGroup) { + defer wg.Done() + start := time.Now() + e.logger.Printf("beginning level %d compaction of group %d, %d TSM files", level, groupNum, len(group)) + for i, f := range group { + e.logger.Printf("compacting level %d group (%d) %s (#%d)", level, groupNum, f, i) + } + + var files []string + var err error + + if fast { + files, err = e.Compactor.CompactFast(group) + if err != nil { + e.logger.Printf("error compacting TSM files: %v", err) + time.Sleep(time.Second) + return + } + } else { + files, err = e.Compactor.CompactFull(group) + if err != nil { + e.logger.Printf("error compacting TSM files: %v", err) + time.Sleep(time.Second) + return + } + } + + if err := e.FileStore.Replace(group, files); err != nil { + e.logger.Printf("error replacing new TSM files: %v", err) + time.Sleep(time.Second) + return + } + + for i, f := range files { + e.logger.Printf("compacted level %d group (%d) into %s (#%d)", level, groupNum, f, i) + } + e.logger.Printf("compacted level %d group %d of %d files into %d files in %s", + level, groupNum, len(group), len(files), time.Since(start)) + }(i, group) + } + wg.Wait() + } + } +} + +func (e *DevEngine) compactTSMFull() { defer e.wg.Done() for { @@ -425,24 +491,38 @@ func (e *DevEngine) compactTSM() { continue } - start := time.Now() - e.logger.Printf("beginning compaction of %d TSM files", len(tsmFiles)) + var wg sync.WaitGroup + for i, group := range tsmFiles { + wg.Add(1) + go func(groupNum int, group CompactionGroup) { + defer wg.Done() + start := time.Now() + e.logger.Printf("beginning full compaction of group %d, %d TSM files", groupNum, len(group)) + for i, f := range group { + e.logger.Printf("compacting full group (%d) %s (#%d)", groupNum, f, i) + } - files, err := e.Compactor.Compact(tsmFiles[0]) - if err != nil { - e.logger.Printf("error compacting TSM files: %v", err) - time.Sleep(time.Second) - continue + files, err := e.Compactor.CompactFull(group) + if err != nil { + e.logger.Printf("error compacting TSM files: %v", err) + time.Sleep(time.Second) + return + } + + if err := e.FileStore.Replace(group, files); err != nil { + e.logger.Printf("error replacing new TSM files: %v", err) + time.Sleep(time.Second) + return + } + + for i, f := range files { + e.logger.Printf("compacted full group (%d) into %s (#%d)", groupNum, f, i) + } + e.logger.Printf("compacted full %d files into %d files in %s", + len(group), len(files), time.Since(start)) + }(i, group) } - - if err := e.FileStore.Replace(tsmFiles[0], files); err != nil { - e.logger.Printf("error replacing new TSM files: %v", err) - time.Sleep(time.Second) - continue - } - - e.logger.Printf("compacted %d TSM into %d files in %s", - len(tsmFiles), len(files), time.Since(start)) + wg.Wait() } } } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 057a63d9be..c645aaa74c 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -391,6 +391,10 @@ func newFiles(values ...keyValues) ([]TSMFile, error) { return nil, err } + if err := w.Close(); err != nil { + return nil, err + } + r, err := NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { return nil, err diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 247f721fa2..c7d8dedcd7 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -505,10 +505,18 @@ type KeyCursor struct { type location struct { r TSMFile entry *IndexEntry + + // Has this location been before + read bool } func (c *KeyCursor) init(t time.Time, ascending bool) { if c.ready { + // Re-set the read status of each blocks in case a cursor is Seeked to + // multiple times + for _, c := range c.seeks { + c.read = false + } return } c.ascending = ascending @@ -592,6 +600,7 @@ func (c *KeyCursor) readAt() ([]Value, error) { // First block is the oldest block containing the points we're search for. first := c.current[0] values, err := first.r.ReadAt(first.entry, c.buf[:0]) + first.read = true // Only one block with this key and time range so return it if len(c.current) == 1 { @@ -602,15 +611,16 @@ func (c *KeyCursor) readAt() ([]Value, error) { // dedup them. for i := 1; i < len(c.current); i++ { cur := c.current[i] - if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) { + if c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + cur.read = true c.pos++ v, err := cur.r.ReadAt(cur.entry, nil) if err != nil { return nil, err } values = append(values, v...) - - } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) { + } else if !c.ascending && cur.entry.OverlapsTimeRange(first.entry.MinTime, first.entry.MaxTime) && !cur.read { + cur.read = true c.pos-- v, err := cur.r.ReadAt(cur.entry, nil) @@ -628,9 +638,15 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) { c.current = c.current[:0] if ascending { - c.pos++ - if c.pos >= len(c.seeks) { - return nil, nil + for { + c.pos++ + if c.pos >= len(c.seeks) { + return nil, nil + } + + if !c.seeks[c.pos].read { + break + } } // Append the first matching block @@ -639,7 +655,11 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) { // If we have ovelapping blocks, append all their values so we can dedup if c.duplicates { first := c.seeks[c.pos] - for i := c.pos; i < len(c.seeks); i++ { + for i := c.pos + 1; i < len(c.seeks); i++ { + if c.seeks[i].read { + continue + } + if c.seeks[i].entry.MinTime.Before(first.entry.MaxTime) || c.seeks[i].entry.MinTime.Equal(first.entry.MaxTime) { c.current = append(c.current, c.seeks[i]) } @@ -649,9 +669,15 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) { return c.readAt() } else { - c.pos-- - if c.pos < 0 { - return nil, nil + for { + c.pos-- + if c.pos < 0 { + return nil, nil + } + + if !c.seeks[c.pos].read { + break + } } // Append the first matching block @@ -661,6 +687,9 @@ func (c *KeyCursor) Next(ascending bool) ([]Value, error) { if c.duplicates { first := c.seeks[c.pos] for i := c.pos; i >= 0; i-- { + if c.seeks[i].read { + continue + } if c.seeks[i].entry.MaxTime.After(first.entry.MinTime) || c.seeks[i].entry.MaxTime.Equal(first.entry.MinTime) { c.current = append(c.current, c.seeks[i]) } diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index eb81bab8f1..1a3f8397ef 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -603,7 +603,7 @@ func newFileDir(dir string, values ...keyValues) ([]string, error) { return nil, err } - if err := f.Close(); err != nil { + if err := w.Close(); err != nil { return nil, err } newName := filepath.Join(filepath.Dir(f.Name()), tsmFileName(id)) @@ -636,6 +636,10 @@ func newFiles(values ...keyValues) ([]tsm1.TSMFile, error) { return nil, err } + if err := w.Close(); err != nil { + return nil, err + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { return nil, err diff --git a/tsdb/engine/tsm1/mmap_unix.go b/tsdb/engine/tsm1/mmap_unix.go index e178232aa1..5ef7699194 100644 --- a/tsdb/engine/tsm1/mmap_unix.go +++ b/tsdb/engine/tsm1/mmap_unix.go @@ -8,13 +8,11 @@ import ( "unsafe" ) -func mmap(f *os.File, offset int64, length int) ([]byte, error) { - mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - return nil, err - } +const MAP_POPULATE = 0x8000 - if err := madvise(mmap, syscall.MADV_RANDOM); err != nil { +func mmap(f *os.File, offset int64, length int) ([]byte, error) { + mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED|MAP_POPULATE) + if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index 4380157270..227f52f3a0 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -26,6 +26,10 @@ func TestTSMReader_Type(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -566,6 +570,10 @@ func TestBlockIterator_Single(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -625,6 +633,10 @@ func TestBlockIterator_MultipleBlocks(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -690,6 +702,10 @@ func TestBlockIterator_Sorted(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index d4aec14c91..5be32b9a03 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -63,6 +63,7 @@ The last section is the footer that stores the offset of the start of the index. */ import ( + "bufio" "bytes" "encoding/binary" "fmt" @@ -456,9 +457,10 @@ func (d *directIndex) Size() uint32 { // tsmWriter writes keys and values in the TSM format type tsmWriter struct { - w io.Writer - index TSMIndex - n int64 + wrapped io.Writer + w *bufio.Writer + index TSMIndex + n int64 } func NewTSMWriter(w io.Writer) (TSMWriter, error) { @@ -466,7 +468,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) { blocks: map[string]*indexEntries{}, } - return &tsmWriter{w: w, index: index}, nil + return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 4*1024*1024), index: index}, nil } func (t *tsmWriter) writeHeader() error { @@ -574,7 +576,11 @@ func (t *tsmWriter) WriteIndex() error { } func (t *tsmWriter) Close() error { - if c, ok := t.w.(io.Closer); ok { + if err := t.w.Flush(); err != nil { + return err + } + + if c, ok := t.wrapped.(io.Closer); ok { return c.Close() } return nil diff --git a/tsdb/engine/tsm1/writer_test.go b/tsdb/engine/tsm1/writer_test.go index 4185bcb1e6..88dd6e687f 100644 --- a/tsdb/engine/tsm1/writer_test.go +++ b/tsdb/engine/tsm1/writer_test.go @@ -58,6 +58,10 @@ func TestTSMWriter_Write_Single(t *testing.T) { } if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error writing index: %v", err) + } + + if err := w.Close(); err != nil { t.Fatalf("unexpected error closing: %v", err) } @@ -114,6 +118,10 @@ func TestTSMWriter_Write_Multiple(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -168,6 +176,10 @@ func TestTSMWriter_Write_MultipleKeyValues(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -223,6 +235,10 @@ func TestTSMWriter_Write_ReverseKeys(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -278,6 +294,10 @@ func TestTSMWriter_Write_SameKey(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -334,6 +354,10 @@ func TestTSMWriter_Read_Multiple(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) if err != nil { t.Fatalf("unexpected error created reader: %v", err) @@ -421,6 +445,10 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + if got, exp := len(b.Bytes()), 5; got < exp { t.Fatalf("file size mismatch: got %v, exp %v", got, exp) } @@ -455,6 +483,10 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) { t.Fatalf("unexpected error closing: %v", err) } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + // Now create a reader to verify the written blocks matches the originally // written file using Write r, err = tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))