Merge pull request #9609 from influxdata/jw-compaction-filter
Add capability change compaction plannerpull/9621/head
commit
477de23e35
|
@ -153,6 +153,7 @@ type EngineOptions struct {
|
|||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
|
||||
CompactionPlannerCreator CompactionPlannerCreator
|
||||
CompactionLimiter limiter.Fixed
|
||||
CompactionThroughputLimiter limiter.Rate
|
||||
WALEnabled bool
|
||||
|
@ -173,3 +174,5 @@ func NewEngineOptions() EngineOptions {
|
|||
|
||||
// NewInmemIndex returns a new "inmem" index type.
|
||||
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
|
||||
|
||||
type CompactionPlannerCreator func(cfg Config) interface{}
|
||||
|
|
|
@ -83,6 +83,8 @@ type CompactionPlanner interface {
|
|||
// ForceFull causes the planner to return a full compaction plan the next
|
||||
// time Plan() is called if there are files that could be compacted.
|
||||
ForceFull()
|
||||
|
||||
SetFileStore(fs *FileStore)
|
||||
}
|
||||
|
||||
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
|
||||
|
@ -178,6 +180,10 @@ func (t *tsmGeneration) hasTombstones() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
|
||||
c.FileStore = fs
|
||||
}
|
||||
|
||||
// FullyCompacted returns true if the shard is fully compacted.
|
||||
func (c *DefaultPlanner) FullyCompacted() bool {
|
||||
gens := c.findGenerations(false)
|
||||
|
|
|
@ -203,6 +203,12 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
|||
RateLimit: opt.CompactionThroughputLimiter,
|
||||
}
|
||||
|
||||
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
|
||||
if opt.CompactionPlannerCreator != nil {
|
||||
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
|
||||
planner.SetFileStore(fs)
|
||||
}
|
||||
|
||||
logger := zap.NewNop()
|
||||
stats := &EngineStatistics{}
|
||||
e := &Engine{
|
||||
|
@ -220,7 +226,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
|||
|
||||
FileStore: fs,
|
||||
Compactor: c,
|
||||
CompactionPlan: NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration)),
|
||||
CompactionPlan: planner,
|
||||
|
||||
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
|
||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||
|
|
|
@ -2020,6 +2020,7 @@ func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return
|
|||
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
|
||||
func (m *mockPlanner) FullyCompacted() bool { return false }
|
||||
func (m *mockPlanner) ForceFull() {}
|
||||
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}
|
||||
|
||||
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
|
||||
func ParseTags(s string) query.Tags {
|
||||
|
|
Loading…
Reference in New Issue