Add extension point to swap out the compaction planner
parent
c720b3b40c
commit
0eb6564e79
|
@ -153,6 +153,7 @@ type EngineOptions struct {
|
||||||
ShardID uint64
|
ShardID uint64
|
||||||
InmemIndex interface{} // shared in-memory index
|
InmemIndex interface{} // shared in-memory index
|
||||||
|
|
||||||
|
CompactionPlannerCreator CompactionPlannerCreator
|
||||||
CompactionLimiter limiter.Fixed
|
CompactionLimiter limiter.Fixed
|
||||||
CompactionThroughputLimiter limiter.Rate
|
CompactionThroughputLimiter limiter.Rate
|
||||||
WALEnabled bool
|
WALEnabled bool
|
||||||
|
@ -173,3 +174,5 @@ func NewEngineOptions() EngineOptions {
|
||||||
|
|
||||||
// NewInmemIndex returns a new "inmem" index type.
|
// NewInmemIndex returns a new "inmem" index type.
|
||||||
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
|
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
|
||||||
|
|
||||||
|
type CompactionPlannerCreator func(cfg Config) interface{}
|
||||||
|
|
|
@ -83,6 +83,8 @@ type CompactionPlanner interface {
|
||||||
// ForceFull causes the planner to return a full compaction plan the next
|
// ForceFull causes the planner to return a full compaction plan the next
|
||||||
// time Plan() is called if there are files that could be compacted.
|
// time Plan() is called if there are files that could be compacted.
|
||||||
ForceFull()
|
ForceFull()
|
||||||
|
|
||||||
|
SetFileStore(fs *FileStore)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
|
// DefaultPlanner implements CompactionPlanner using a strategy to roll up
|
||||||
|
@ -178,6 +180,10 @@ func (t *tsmGeneration) hasTombstones() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
|
||||||
|
c.FileStore = fs
|
||||||
|
}
|
||||||
|
|
||||||
// FullyCompacted returns true if the shard is fully compacted.
|
// FullyCompacted returns true if the shard is fully compacted.
|
||||||
func (c *DefaultPlanner) FullyCompacted() bool {
|
func (c *DefaultPlanner) FullyCompacted() bool {
|
||||||
gens := c.findGenerations(false)
|
gens := c.findGenerations(false)
|
||||||
|
|
|
@ -204,6 +204,12 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
||||||
RateLimit: opt.CompactionThroughputLimiter,
|
RateLimit: opt.CompactionThroughputLimiter,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var planner CompactionPlanner = NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration))
|
||||||
|
if opt.CompactionPlannerCreator != nil {
|
||||||
|
planner = opt.CompactionPlannerCreator(opt.Config).(CompactionPlanner)
|
||||||
|
planner.SetFileStore(fs)
|
||||||
|
}
|
||||||
|
|
||||||
logger := zap.NewNop()
|
logger := zap.NewNop()
|
||||||
stats := &EngineStatistics{}
|
stats := &EngineStatistics{}
|
||||||
e := &Engine{
|
e := &Engine{
|
||||||
|
@ -221,7 +227,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
||||||
|
|
||||||
FileStore: fs,
|
FileStore: fs,
|
||||||
Compactor: c,
|
Compactor: c,
|
||||||
CompactionPlan: NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration)),
|
CompactionPlan: planner,
|
||||||
|
|
||||||
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
|
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
|
||||||
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
|
||||||
|
|
|
@ -2010,6 +2010,7 @@ func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return
|
||||||
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
|
func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {}
|
||||||
func (m *mockPlanner) FullyCompacted() bool { return false }
|
func (m *mockPlanner) FullyCompacted() bool { return false }
|
||||||
func (m *mockPlanner) ForceFull() {}
|
func (m *mockPlanner) ForceFull() {}
|
||||||
|
func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {}
|
||||||
|
|
||||||
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
|
// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
|
||||||
func ParseTags(s string) query.Tags {
|
func ParseTags(s string) query.Tags {
|
||||||
|
|
Loading…
Reference in New Issue