mirror of https://github.com/milvus-io/milvus.git
enhance: [Cherry-pick] Use different interval for gc scan (#31364)
Cherry-pick from master pr: #31363 See also #31362 This PR make datacoord garbage collection scan operation using differet interval than other opeartion. This interval is a newly added param item, which default value is 7*24 hours. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/31372/head
parent
7ee7d484cc
commit
89cff29b6a
|
@ -441,6 +441,7 @@ dataCoord:
|
|||
enableGarbageCollection: true
|
||||
gc:
|
||||
interval: 3600 # gc interval in seconds
|
||||
scanInterval: 168 #gc residual file scan interval in hours
|
||||
missingTolerance: 3600 # file meta missing tolerance duration in seconds, 3600
|
||||
dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 10800
|
||||
enableActiveStandby: false
|
||||
|
|
|
@ -51,6 +51,7 @@ type GcOption struct {
|
|||
checkInterval time.Duration // each interval
|
||||
missingTolerance time.Duration // key missing in meta tolerance time
|
||||
dropTolerance time.Duration // dropped segment related key tolerance time
|
||||
scanInterval time.Duration // interval for scan residue for interupted log wrttien
|
||||
|
||||
removeLogPool *conc.Pool[struct{}]
|
||||
}
|
||||
|
@ -77,8 +78,12 @@ type gcCmd struct {
|
|||
|
||||
// newGarbageCollector create garbage collector with meta and option
|
||||
func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
|
||||
log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval),
|
||||
zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance))
|
||||
log.Info("GC with option",
|
||||
zap.Bool("enabled", opt.enabled),
|
||||
zap.Duration("interval", opt.checkInterval),
|
||||
zap.Duration("scanInterval", opt.scanInterval),
|
||||
zap.Duration("missingTolerance", opt.missingTolerance),
|
||||
zap.Duration("dropTolerance", opt.dropTolerance))
|
||||
opt.removeLogPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
|
||||
return &garbageCollector{
|
||||
meta: meta,
|
||||
|
@ -145,6 +150,8 @@ func (gc *garbageCollector) work() {
|
|||
defer gc.wg.Done()
|
||||
ticker := time.NewTicker(gc.option.checkInterval)
|
||||
defer ticker.Stop()
|
||||
scanTicker := time.NewTicker(gc.option.scanInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
@ -155,8 +162,10 @@ func (gc *garbageCollector) work() {
|
|||
gc.clearEtcd()
|
||||
gc.recycleUnusedIndexes()
|
||||
gc.recycleUnusedSegIndexes()
|
||||
gc.scan()
|
||||
gc.recycleUnusedIndexFiles()
|
||||
case <-scanTicker.C:
|
||||
log.Info("Garbage collector start to scan interrupted write residue")
|
||||
gc.scan()
|
||||
case cmd := <-gc.cmdCh:
|
||||
switch cmd.cmdType {
|
||||
case datapb.GcCommand_Pause:
|
||||
|
|
|
@ -68,6 +68,7 @@ func Test_garbageCollector_basic(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -84,6 +85,7 @@ func Test_garbageCollector_basic(t *testing.T) {
|
|||
cli: nil,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -120,6 +122,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -137,6 +140,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -162,6 +166,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -190,6 +195,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: 0,
|
||||
})
|
||||
|
@ -206,6 +212,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: 0,
|
||||
dropTolerance: 0,
|
||||
})
|
||||
|
@ -227,6 +234,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
cli: cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Minute * 30,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: 0,
|
||||
dropTolerance: 0,
|
||||
})
|
||||
|
@ -1473,6 +1481,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
|
|||
cli: s.cli,
|
||||
enabled: false,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 24 * 7,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -1494,6 +1503,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
|
|||
cli: s.cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -1518,6 +1528,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
|
|||
cli: s.cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
@ -1545,6 +1556,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() {
|
|||
cli: s.cli,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
scanInterval: time.Hour * 7 * 24,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
})
|
||||
|
|
|
@ -531,6 +531,7 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
|
|||
cli: cli,
|
||||
enabled: Params.DataCoordCfg.EnableGarbageCollection.GetAsBool(),
|
||||
checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second),
|
||||
scanInterval: Params.DataCoordCfg.GCScanIntervalInHour.GetAsDuration(time.Hour),
|
||||
missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second),
|
||||
dropTolerance: Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second),
|
||||
})
|
||||
|
|
|
@ -2549,6 +2549,7 @@ type dataCoordConfig struct {
|
|||
GCMissingTolerance ParamItem `refreshable:"false"`
|
||||
GCDropTolerance ParamItem `refreshable:"false"`
|
||||
GCRemoveConcurrent ParamItem `refreshable:"false"`
|
||||
GCScanIntervalInHour ParamItem `refreshable:"false"`
|
||||
EnableActiveStandby ParamItem `refreshable:"false"`
|
||||
|
||||
BindIndexNodeMode ParamItem `refreshable:"false"`
|
||||
|
@ -2913,6 +2914,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.GCInterval.Init(base.mgr)
|
||||
|
||||
p.GCScanIntervalInHour = ParamItem{
|
||||
Key: "dataCoord.gc.scanInterval",
|
||||
Version: "2.4.0",
|
||||
DefaultValue: "168", // hours, default 7 * 24 hours
|
||||
Doc: "garbage collection scan residue interval in hours",
|
||||
Export: true,
|
||||
}
|
||||
p.GCScanIntervalInHour.Init(base.mgr)
|
||||
|
||||
// Do not set this to incredible small value, make sure this to be more than 10 minutes at least
|
||||
p.GCMissingTolerance = ParamItem{
|
||||
Key: "dataCoord.gc.missingTolerance",
|
||||
|
|
Loading…
Reference in New Issue