mirror of https://github.com/milvus-io/milvus.git
fix: [cherry-pick] compaction task not be cleaned correctly (#34766)
1.fix compaction task not be cleaned correctly 2.add a new parameter to control compaction gc loop interval 3.remove some useless configs of clustering compaction bug: #34764 pr: #34765 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/34750/head
parent
3cf2e1831a
commit
83fc26c31a
|
@ -458,18 +458,17 @@ dataCoord:
|
|||
rpcTimeout: 10
|
||||
maxParallelTaskNum: 10
|
||||
workerMaxParallelTaskNum: 2
|
||||
gcInterval: 1800 # The time interval in seconds for compaction gc
|
||||
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
|
||||
clustering:
|
||||
enable: true
|
||||
autoEnable: false
|
||||
triggerInterval: 600
|
||||
stateCheckInterval: 10
|
||||
gcInterval: 600
|
||||
minInterval: 3600
|
||||
maxInterval: 259200
|
||||
newDataRatioThreshold: 0.2
|
||||
newDataSizeThreshold: 512m
|
||||
timeout: 7200
|
||||
dropTolerance: 86400
|
||||
# clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize].
|
||||
# data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment
|
||||
# buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often
|
||||
|
|
|
@ -376,8 +376,10 @@ func (c *compactionPlanHandler) loopCheck() {
|
|||
}
|
||||
|
||||
func (c *compactionPlanHandler) loopClean() {
|
||||
interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second)
|
||||
log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval))
|
||||
defer c.stopWg.Done()
|
||||
cleanTicker := time.NewTicker(30 * time.Minute)
|
||||
cleanTicker := time.NewTicker(interval)
|
||||
defer cleanTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
|
@ -401,10 +403,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
|
|||
for _, tasks := range triggers {
|
||||
for _, task := range tasks {
|
||||
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
|
||||
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
|
||||
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) {
|
||||
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
|
||||
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
|
||||
// try best to delete meta
|
||||
err := c.meta.DropCompactionTask(task)
|
||||
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
|
||||
if err != nil {
|
||||
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
|
@ -25,6 +27,7 @@ import (
|
|||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -703,6 +706,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
|
|||
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
|
||||
s.SetupTest()
|
||||
inTasks := []*datapb.CompactionTask{
|
||||
{
|
||||
PlanID: 1,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
State: datapb.CompactionTaskState_completed,
|
||||
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
|
||||
},
|
||||
{
|
||||
PlanID: 2,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
State: datapb.CompactionTaskState_cleaned,
|
||||
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
|
||||
},
|
||||
{
|
||||
PlanID: 3,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
State: datapb.CompactionTaskState_cleaned,
|
||||
StartTime: time.Now().UnixMilli(),
|
||||
},
|
||||
}
|
||||
|
||||
catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}
|
||||
compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog)
|
||||
s.NoError(err)
|
||||
s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta}
|
||||
for _, t := range inTasks {
|
||||
s.handler.meta.SaveCompactionTask(t)
|
||||
}
|
||||
|
||||
s.handler.cleanCompactionTaskMeta()
|
||||
// two task should be cleaned, one remains
|
||||
tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks()
|
||||
s.Equal(1, len(tasks))
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
|
||||
s.SetupTest()
|
||||
|
||||
|
|
|
@ -2872,6 +2872,7 @@ type dataCoordConfig struct {
|
|||
SegmentExpansionRate ParamItem `refreshable:"true"`
|
||||
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
|
||||
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
|
||||
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
|
||||
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
|
||||
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
|
||||
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
|
||||
|
@ -2885,12 +2886,9 @@ type dataCoordConfig struct {
|
|||
ClusteringCompactionEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionGCInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionDropTolerance ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
|
||||
|
@ -3193,11 +3191,19 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
p.CompactionDropToleranceInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.dropTolerance",
|
||||
Version: "2.4.2",
|
||||
Doc: "If compaction job is finished for a long time, gc it",
|
||||
Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)",
|
||||
DefaultValue: "86400",
|
||||
}
|
||||
p.CompactionDropToleranceInSeconds.Init(base.mgr)
|
||||
|
||||
p.CompactionGCIntervalInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.gcInterval",
|
||||
Version: "2.4.7",
|
||||
Doc: "The time interval in seconds for compaction gc",
|
||||
DefaultValue: "1800",
|
||||
}
|
||||
p.CompactionGCIntervalInSeconds.Init(base.mgr)
|
||||
|
||||
p.CompactionCheckIntervalInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.check.interval",
|
||||
Version: "2.0.0",
|
||||
|
@ -3326,20 +3332,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.ClusteringCompactionTriggerInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionStateCheckInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.stateCheckInterval",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "10",
|
||||
}
|
||||
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionGCInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.gcInterval",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "600",
|
||||
}
|
||||
p.ClusteringCompactionGCInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionMinInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minInterval",
|
||||
Version: "2.4.6",
|
||||
|
@ -3371,14 +3363,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionDropTolerance = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.dropTolerance",
|
||||
Version: "2.4.6",
|
||||
Doc: "If clustering compaction job is finished for a long time, gc it",
|
||||
DefaultValue: "259200",
|
||||
}
|
||||
p.ClusteringCompactionDropTolerance.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionPreferSegmentSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.preferSegmentSize",
|
||||
Version: "2.4.6",
|
||||
|
|
|
@ -446,6 +446,11 @@ func TestComponentParam(t *testing.T) {
|
|||
params.Save("datacoord.gracefulStopTimeout", "100")
|
||||
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
||||
|
||||
params.Save("dataCoord.compaction.gcInterval", "100")
|
||||
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
|
||||
params.Save("dataCoord.compaction.dropTolerance", "100")
|
||||
assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds())
|
||||
|
||||
params.Save("dataCoord.compaction.clustering.enable", "true")
|
||||
assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool())
|
||||
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10")
|
||||
|
@ -456,8 +461,6 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g")
|
||||
assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.dropTolerance", "86400")
|
||||
assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64())
|
||||
params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m")
|
||||
assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")
|
||||
|
|
Loading…
Reference in New Issue