Merge pull request #8348 from influxdata/jw-tsm-compaction-limit

Compaction limits
pull/8359/head
Jason Wilder 2017-05-04 11:08:11 -06:00 committed by GitHub
commit 23af70add4
17 changed files with 438 additions and 243 deletions

View File

@ -22,6 +22,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements
- [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI.
- [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1
- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits
### Bugfixes
@ -54,6 +55,8 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries.
- [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries.
- [#8045](https://github.com/influxdata/influxdb/issues/8045): Refactor the subquery code and fix outer condition queries.
- [#7425](https://github.com/influxdata/influxdb/issues/7425): Fix compaction aborted log messages
- [#8123](https://github.com/influxdata/influxdb/issues/8123): TSM compaction does not remove .tmp on error
## v1.2.3 [unreleased]

View File

@ -89,6 +89,11 @@
# write or delete
# compact-full-write-cold-duration = "4h"
# The maximum number of concurrent full and level compactions that can run at one time. A
# value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply
# to cache snapshotting.
# max-concurrent-compactions = 0
# The maximum series allowed per database before writes are dropped. This limit can prevent
# high cardinality issues at the database level. This limit can be disabled by setting it to
# 0.

View File

@ -47,6 +47,10 @@ const (
// DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
DefaultMaxValuesPerTag = 100000
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
// that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime.
DefaultMaxConcurrentCompactions = 0
)
// Config holds the configuration for the tsbd package.
@ -84,6 +88,12 @@ type Config struct {
// A value of 0 disables the limit.
MaxValuesPerTag int `toml:"max-values-per-tag"`
// MaxConcurrentCompactions is the maximum number of concurrent level and full compactions
// that can be running at one time across all shards. Compactions scheduled to run when the
// limit is reached are blocked until a running compaction completes. Snapshot compactions are
// not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0).
MaxConcurrentCompactions int `toml:"max-concurrent-compactions"`
TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
}
@ -100,8 +110,9 @@ func NewConfig() Config {
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
TraceLoggingEnabled: false,
}
@ -115,6 +126,10 @@ func (c *Config) Validate() error {
return errors.New("Data.WALDir must be specified")
}
if c.MaxConcurrentCompactions < 0 {
return errors.New("max-concurrent-compactions must be greater than 0")
}
valid := false
for _, e := range RegisteredEngines() {
if e == c.Engine {
@ -152,5 +167,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-values-per-tag": c.MaxValuesPerTag,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
}), nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/uber-go/zap"
)
@ -30,6 +31,8 @@ type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
@ -71,6 +74,8 @@ type Engine interface {
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
io.WriterTo
}
@ -136,10 +141,11 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
CompactionLimiter limiter.Fixed
Config Config
}

View File

@ -260,7 +260,7 @@ func (c *Cache) Write(key string, values []Value) error {
// Enough room in the cache?
limit := c.maxSize
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
@ -293,7 +293,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// Enough room in the cache?
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
@ -416,7 +416,7 @@ func (c *Cache) ClearSnapshot(success bool) {
// Size returns the number of point-calcuated bytes the cache currently uses.
func (c *Cache) Size() uint64 {
return atomic.LoadUint64(&c.size)
return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize)
}
// increaseSize increases size by delta.

View File

@ -448,7 +448,7 @@ func TestCache_Snapshot_Stats(t *testing.T) {
}
// Store size should have been reset.
if got, exp := c.Size(), uint64(0); got != exp {
if got, exp := c.Size(), uint64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

View File

@ -53,6 +53,8 @@ type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
FullyCompacted() bool
}
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
@ -60,17 +62,13 @@ type CompactionPlanner interface {
// to minimize the number of TSM files on disk while rolling up a bounder number
// of files.
type DefaultPlanner struct {
FileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
}
FileStore fileStore
// CompactFullWriteColdDuration specifies the length of time after
// compactFullWriteColdDuration specifies the length of time after
// which if no writes have been committed to the WAL, the engine will
// do a full compaction of the TSM files in this shard. This duration
// should always be greater than the CacheFlushWriteColdDuraion
CompactFullWriteColdDuration time.Duration
compactFullWriteColdDuration time.Duration
// lastPlanCheck is the last time Plan was called
lastPlanCheck time.Time
@ -81,6 +79,24 @@ type DefaultPlanner struct {
// lastGenerations is the last set of generations found by findGenerations
lastGenerations tsmGenerations
// filesInUse is the set of files that have been returned as part of a plan and might
// be being compacted. Two plans should not return the same file at any given time.
filesInUse map[string]struct{}
}
type fileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
}
func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
return &DefaultPlanner{
FileStore: fs,
compactFullWriteColdDuration: writeColdDuration,
filesInUse: make(map[string]struct{}),
}
}
// tsmGeneration represents the TSM files within a generation.
@ -129,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool {
return false
}
// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
gens := c.findGenerations()
return len(gens) <= 1 && !gens.hasTombstones()
}
// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Determine the generations from all files on disk. We need to treat
@ -205,6 +227,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}
}
if !c.acquire(cGroups) {
return nil
}
return cGroups
}
@ -270,6 +296,10 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
cGroups = append(cGroups, cGroup)
}
if !c.acquire(cGroups) {
return nil
}
return cGroups
}
@ -279,7 +309,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
generations := c.findGenerations()
// first check if we should be doing a full compaction because nothing has been written in a long time
if c.CompactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.CompactFullWriteColdDuration && len(generations) > 1 {
if c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {
var tsmFiles []string
var genCount int
for i, group := range generations {
@ -316,7 +346,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
return nil
}
return []CompactionGroup{tsmFiles}
group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
}
return group
}
// don't plan if nothing has changed in the filestore
@ -449,6 +483,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
tsmFiles = append(tsmFiles, cGroup)
}
if !c.acquire(tsmFiles) {
return nil
}
return tsmFiles
}
@ -496,6 +533,40 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
return orderedGenerations
}
func (c *DefaultPlanner) acquire(groups []CompactionGroup) bool {
c.mu.Lock()
defer c.mu.Unlock()
// See if the new files are already in use
for _, g := range groups {
for _, f := range g {
if _, ok := c.filesInUse[f]; ok {
return false
}
}
}
// Mark all the new files in use
for _, g := range groups {
for _, f := range g {
c.filesInUse[f] = struct{}{}
}
}
return true
}
// Release removes the files reference in each compaction group allowing new plans
// to be able to use them.
func (c *DefaultPlanner) Release(groups []CompactionGroup) {
c.mu.Lock()
defer c.mu.Unlock()
for _, g := range groups {
for _, f := range g {
delete(c.filesInUse, f)
}
}
}
// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files.
type Compactor struct {

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
@ -1090,8 +1091,8 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
func TestDefaultPlanner_Plan_Min(t *testing.T) {
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
@ -1108,8 +1109,8 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) {
},
}
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
@ -1151,13 +1152,13 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
@ -1213,13 +1214,11 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
cp := tsm1.NewDefaultPlanner(&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}
}, tsdb.DefaultCompactFullWriteColdDuration)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3],
data[4], data[5], data[6], data[7]}
@ -1280,13 +1279,13 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[4], data[5]}
tsm := cp.PlanLevel(1)
@ -1333,13 +1332,13 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]}
tsm := cp.PlanLevel(3)
@ -1382,13 +1381,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[2], data[3]}
tsm := cp.PlanLevel(1)
@ -1435,13 +1434,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(3)
@ -1478,13 +1477,13 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(3)
@ -1510,13 +1509,13 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanLevel(2)
@ -1554,13 +1553,13 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1]}
tsm := cp.PlanLevel(3)
@ -1603,13 +1602,13 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5]}
@ -1652,13 +1651,13 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanOptimize()
@ -1695,13 +1694,13 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.PlanOptimize()
@ -1760,13 +1759,13 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[5], data[6], data[7], data[8]}
@ -1813,13 +1812,13 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{}
tsm := cp.PlanOptimize()
@ -1845,13 +1844,13 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2]}
tsm := cp.PlanOptimize()
@ -1897,14 +1896,14 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
CompactFullWriteColdDuration: time.Nanosecond,
}
time.Nanosecond,
)
tsm := cp.Plan(time.Now().Add(-time.Second))
if exp, got := len(data), len(tsm[0]); got != exp {
@ -1932,13 +1931,13 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
@ -1975,15 +1974,13 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
blockCount: 1000,
}
cp := &tsm1.DefaultPlanner{
FileStore: fs,
CompactFullWriteColdDuration: time.Nanosecond,
}
cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond)
plan := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp {
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
cp.Release(plan)
// skip planning if all files are over the limit
over := []tsm1.FileStat{
@ -2017,14 +2014,18 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
cp.FileStore = overFs
if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp {
plan = cp.Plan(time.Now().Add(-time.Second))
if exp, got := 0, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
cp.Release(plan)
plan = cp.PlanOptimize()
// ensure the optimize planner would pick this up
if exp, got := 1, len(cp.PlanOptimize()); got != exp {
if exp, got := 1, len(plan); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
cp.Release(plan)
cp.FileStore = fs
// ensure that it will plan if last modified has changed
@ -2082,15 +2083,14 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
blockCount: 1000,
PathsFn: func() []tsm1.FileStat {
return data
},
},
CompactFullWriteColdDuration: time.Hour,
}
time.Hour)
tsm := cp.Plan(time.Now().Add(-24 * time.Hour))
if exp, got := 1, len(tsm); got != exp {
@ -2127,15 +2127,17 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
blockCount: 100,
}
cp := &tsm1.DefaultPlanner{
FileStore: fs,
CompactFullWriteColdDuration: time.Nanosecond,
}
cp := tsm1.NewDefaultPlanner(
fs,
time.Nanosecond,
)
plan := cp.Plan(time.Now().Add(-time.Second))
// first verify that our test set would return files
if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp {
if exp, got := 4, len(plan[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
cp.Release(plan)
// skip planning if all files are over the limit
over := []tsm1.FileStat{
@ -2188,13 +2190,13 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
},
}
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
tsm := cp.Plan(time.Now())
@ -2210,8 +2212,8 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
}
func TestDefaultPlanner_Plan_LargeSets(t *testing.T) {
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
@ -2236,8 +2238,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) {
},
}
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {
@ -2246,8 +2248,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) {
}
func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) {
cp := &tsm1.DefaultPlanner{
FileStore: &fakeFileStore{
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return []tsm1.FileStat{
tsm1.FileStat{
@ -2272,8 +2274,8 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) {
},
}
},
},
}
}, tsdb.DefaultCompactFullWriteColdDuration,
)
tsm := cp.Plan(time.Now())
if exp, got := 0, len(tsm); got != exp {

View File

@ -23,6 +23,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb"
_ "github.com/influxdata/influxdb/tsdb/index"
"github.com/uber-go/zap"
@ -132,6 +133,9 @@ type Engine struct {
enableCompactionsOnOpen bool
stats *EngineStatistics
// The limiter for concurrent compactions
compactionLimiter limiter.Fixed
}
// NewEngine returns a new instance of Engine.
@ -161,17 +165,15 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb.
WAL: w,
Cache: cache,
FileStore: fs,
Compactor: c,
CompactionPlan: &DefaultPlanner{
FileStore: fs,
CompactFullWriteColdDuration: time.Duration(opt.Config.CompactFullWriteColdDuration),
},
FileStore: fs,
Compactor: c,
CompactionPlan: NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration)),
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
stats: &EngineStatistics{},
stats: &EngineStatistics{},
compactionLimiter: opt.CompactionLimiter,
}
// Attach fieldset to index.
@ -243,6 +245,7 @@ func (e *Engine) disableLevelCompactions(wait bool) {
e.levelWorkers += 1
}
var cleanup bool
if old == 0 && e.done != nil {
// Prevent new compactions from starting
e.Compactor.DisableCompactions()
@ -250,12 +253,13 @@ func (e *Engine) disableLevelCompactions(wait bool) {
// Stop all background compaction goroutines
close(e.done)
e.done = nil
cleanup = true
}
e.mu.Unlock()
e.wg.Wait()
if old == 0 { // first to disable should cleanup
if cleanup { // first to disable should cleanup
if err := e.cleanup(); err != nil {
e.logger.Info(fmt.Sprintf("error cleaning up temp file: %v", err))
}
@ -428,6 +432,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
return statistics
}
// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes()
}
// Open opens and initializes the engine.
func (e *Engine) Open() error {
if err := os.MkdirAll(e.path, 0777); err != nil {
@ -526,6 +535,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return nil
}
// IsIdle returns true if the cache is empty, there are no running compactions and the
// shard is fully compacted.
func (e *Engine) IsIdle() bool {
cacheEmpty := e.Cache.Size() == 0
runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1])
runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2])
runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive)
runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive)
return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted()
}
// Backup writes a tar archive of any TSM files modified since the passed
// in time to the passed in writer. The basePath will be prepended to the names
// of the files in the archive. It will force a snapshot of the WAL first
@ -1165,8 +1189,12 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
case <-t.C:
s := e.levelCompactionStrategy(fast, level)
if s != nil {
// Release the files in the compaction plan
defer e.CompactionPlan.Release(s.compactionGroups)
s.Apply()
}
}
}
}
@ -1183,6 +1211,8 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) {
case <-t.C:
s := e.fullCompactionStrategy()
if s != nil {
// Release the files in the compaction plan
defer e.CompactionPlan.Release(s.compactionGroups)
s.Apply()
}
@ -1205,6 +1235,7 @@ type compactionStrategy struct {
logger zap.Logger
compactor *Compactor
fileStore *FileStore
limiter limiter.Fixed
}
// Apply concurrently compacts all the groups in a compaction strategy.
@ -1226,6 +1257,12 @@ func (s *compactionStrategy) Apply() {
// compactGroup executes the compaction strategy against a single CompactionGroup.
func (s *compactionStrategy) compactGroup(groupNum int) {
// Limit concurrent compactions if we have a limiter
if cap(s.limiter) > 0 {
s.limiter.Take()
defer s.limiter.Release()
}
group := s.compactionGroups[groupNum]
start := time.Now()
s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group)))
@ -1290,6 +1327,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate
fileStore: e.FileStore,
compactor: e.Compactor,
fast: fast,
limiter: e.compactionLimiter,
description: fmt.Sprintf("level %d", level),
activeStat: &e.stats.TSMCompactionsActive[level-1],
@ -1320,6 +1358,7 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy {
fileStore: e.FileStore,
compactor: e.Compactor,
fast: optimize,
limiter: e.compactionLimiter,
}
if optimize {

View File

@ -1059,6 +1059,8 @@ type mockPlanner struct{}
func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil }
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
func (m *mockPlanner) FullyCompacted() bool { return false }
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
func ParseTags(s string) influxql.Tags {

View File

@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error {
// DeleteRange removes the values for keys between timestamps min and max.
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
if err := f.walkFiles(func(tsm TSMFile) error {
return tsm.DeleteRange(keys, min, max)
}); err != nil {
return err
}
f.mu.Lock()
f.lastModified = time.Now().UTC()
f.lastFileStats = nil
f.mu.Unlock()
return f.walkFiles(func(tsm TSMFile) error {
return tsm.DeleteRange(keys, min, max)
})
return nil
}
// Open loads all the TSM files in the configured directory.
@ -382,15 +386,6 @@ func (f *FileStore) Open() error {
return fmt.Errorf("error opening file %s: %v", fn, err)
}
// Accumulate file store size stat
fi, err := file.Stat()
if err == nil {
atomic.AddInt64(&f.stats.DiskBytes, fi.Size())
if fi.ModTime().UTC().After(f.lastModified) {
f.lastModified = fi.ModTime().UTC()
}
}
go func(idx int, file *os.File) {
start := time.Now()
df, err := NewTSMReader(file)
@ -404,6 +399,7 @@ func (f *FileStore) Open() error {
}(i, file)
}
var lm int64
for range files {
res := <-readerC
if res.err != nil {
@ -411,7 +407,19 @@ func (f *FileStore) Open() error {
return res.err
}
f.files = append(f.files, res.r)
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
for _, ts := range res.r.TombstoneFiles() {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
}
// Re-initialize the lastModified time for the file store
if res.r.LastModified() > lm {
lm = res.r.LastModified()
}
}
f.lastModified = time.Unix(0, lm)
close(readerC)
sort.Sort(tsmReaders(f.files))
@ -434,6 +442,10 @@ func (f *FileStore) Close() error {
return nil
}
func (f *FileStore) DiskSizeBytes() int64 {
return atomic.LoadInt64(&f.stats.DiskBytes)
}
// Read returns the slice of values for the given key and the given timestamp,
// if any file matches those constraints.
func (f *FileStore) Read(key string, t int64) ([]Value, error) {
@ -623,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
var totalSize int64
for _, file := range f.files {
totalSize += int64(file.Size())
for _, ts := range file.TombstoneFiles() {
totalSize += int64(ts.Size)
}
}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)

View File

@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 {
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
for _, ts := range t.tombstoner.TombstoneFiles() {
if ts.LastModified > lm {
lm = ts.LastModified
}
}
t.mu.RUnlock()
return lm
}

View File

@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time {
return l.lastWriteTime
}
func (l *WAL) DiskSizeBytes() int64 {
return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes)
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below

View File

@ -263,10 +263,13 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// ForEachMeasurementTagKey iterates over all tag keys for a measurement.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
// Ensure we do not hold a lock on the index while fn executes in case fn tries
// to acquire a lock on the index again. If another goroutine has Lock, this will
// deadlock.
i.mu.RLock()
defer i.mu.RUnlock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
if mm == nil {
return nil
}
@ -537,9 +540,9 @@ func (i *Index) DropSeries(key []byte) error {
// ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error {
i.mu.RLock()
defer i.mu.RUnlock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
if mm == nil {
return nil
}
@ -731,7 +734,6 @@ type ShardIndex struct {
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error {
keys, names, tagsSlice = idx.assignExistingSeries(idx.id, keys, names, tagsSlice)
if len(keys) == 0 {
return nil

View File

@ -287,12 +287,9 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags
return err
}
m.mu.RLock()
defer m.mu.RUnlock()
// Iterate over each series.
for _, id := range ids {
s := m.seriesByID[id]
s := m.SeriesByID(id)
if err := fn(s.Tags()); err != nil {
return err
}

View File

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"math"
"os"
"path/filepath"
"regexp"
"sort"
@ -110,6 +109,7 @@ type Shard struct {
path string
walPath string
id uint64
wg sync.WaitGroup
database string
retentionPolicy string
@ -206,14 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}
// TODO(edd): Should statSeriesCreate be the current number of series in the
// shard, or the total number of series ever created?
sSketch, tSketch, err := s.engine.SeriesSketches()
seriesN := int64(sSketch.Count() - tSketch.Count())
if err != nil {
s.logger.Error("cannot compute series sketch", zap.Error(err))
seriesN = 0
}
// Refresh our disk size stat
_, _ = s.DiskSize()
seriesN := s.engine.SeriesN()
tags = s.defaultTags.Merge(tags)
statistics := []models.Statistic{{
@ -288,8 +283,6 @@ func (s *Shard) Open() error {
}
s.engine = e
go s.monitor()
return nil
}(); err != nil {
s.close(true)
@ -335,6 +328,7 @@ func (s *Shard) close(clean bool) error {
default:
close(s.closing)
}
s.wg.Wait()
if clean {
// Don't leak our shard ID and series keys in the index
@ -352,6 +346,12 @@ func (s *Shard) close(clean bool) error {
return err
}
func (s *Shard) IndexType() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.index.Type()
}
// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
func (s *Shard) ready() error {
@ -380,35 +380,28 @@ func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64
err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() {
size += fi.Size()
}
return err
})
if err != nil {
return 0, err
// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
if err := s.ready(); err != nil {
return true
}
err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
return s.engine.IsIdle()
}
if !fi.IsDir() {
size += fi.Size()
}
return err
})
// SetCompactionsEnabled enables or disable shard background compactions.
func (s *Shard) SetCompactionsEnabled(enabled bool) {
if err := s.ready(); err != nil {
return
}
s.engine.SetCompactionsEnabled(enabled)
}
return size, err
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
size := s.engine.DiskSize()
atomic.StoreInt64(&s.stats.DiskBytes, size)
return size, nil
}
// FieldCreate holds information for a field to create on a measurement.
@ -964,62 +957,12 @@ func (s *Shard) CreateSnapshot() (string, error) {
return s.engine.CreateSnapshot()
}
func (s *Shard) monitor() {
t := time.NewTicker(monitorStatInterval)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
var changed time.Time
func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
return s.engine.ForEachMeasurementTagKey(name, fn)
}
for {
select {
case <-s.closing:
return
case <-t.C:
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
// check if something has changed.
lm := s.LastModified()
if lm.Equal(changed) {
continue
}
size, err := s.DiskSize()
if err != nil {
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
changed = lm
case <-t2.C:
if s.options.Config.MaxValuesPerTag == 0 {
continue
}
names, err := s.MeasurementNamesByExpr(nil)
if err != nil {
s.logger.Warn("cannot retrieve measurement names", zap.Error(err))
continue
}
for _, name := range names {
s.engine.ForEachMeasurementTagKey(name, func(k []byte) error {
n := s.engine.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k))
}
return nil
})
}
}
}
func (s *Shard) TagKeyCardinality(name, key []byte) int {
return s.engine.TagKeyCardinality(name, key)
}
type ShardGroup interface {

View File

@ -145,6 +145,8 @@ func (s *Store) Open() error {
}
s.opened = true
s.wg.Add(1)
go s.monitorShards()
return nil
}
@ -158,6 +160,13 @@ func (s *Store) loadShards() error {
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
// Setup a shared limiter for compactions
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
lim = runtime.GOMAXPROCS(0)
}
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
resC := make(chan *res)
var n int
@ -224,6 +233,9 @@ func (s *Store) loadShards() error {
// Open engine.
shard := NewShard(shardID, path, walPath, opt)
// Disable compactions, writes and queries until all shards are loaded
shard.EnableOnOpen = false
shard.WithLogger(s.baseLogger)
err = shard.Open()
@ -251,6 +263,15 @@ func (s *Store) loadShards() error {
s.databases[res.s.database] = struct{}{}
}
close(resC)
// Enable all shards
for _, sh := range s.shards {
sh.SetEnabled(true)
if sh.IsIdle() {
sh.SetCompactionsEnabled(false)
}
}
return nil
}
@ -1030,6 +1051,69 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
return tagValues, nil
}
func (s *Store) monitorShards() {
defer s.wg.Done()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
for {
select {
case <-s.closing:
return
case <-t.C:
s.mu.RLock()
for _, sh := range s.shards {
if sh.IsIdle() {
sh.SetCompactionsEnabled(false)
} else {
sh.SetCompactionsEnabled(true)
}
}
s.mu.RUnlock()
case <-t2.C:
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
continue
}
s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.IndexType() == "inmem"
})
s.mu.RUnlock()
s.walkShards(shards, func(sh *Shard) error {
db := sh.database
id := sh.id
names, err := sh.MeasurementNamesByExpr(nil)
if err != nil {
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
return nil
}
for _, name := range names {
sh.ForEachMeasurementTagKey(name, func(k []byte) error {
n := sh.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k))
}
return nil
})
}
return nil
})
}
}
}
// KeyValue holds a string key and a string value.
type KeyValue struct {
Key, Value string