From c340f387cf55483173fbcf3d8bd523d0fd625034 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 31 Jul 2024 10:32:00 +0800 Subject: [PATCH] enhance: [cherry-pick] Change the fixed value to a ratio for clustering segment size (#35075) issue: #34495 master pr: #35076 Signed-off-by: Cai Zhang --- configs/milvus.yaml | 4 +- .../datacoord/compaction_policy_clustering.go | 15 ++-- internal/datacoord/compaction_trigger_v2.go | 14 +++- internal/datacoord/task_analyze.go | 2 +- pkg/util/paramtable/component_param.go | 76 +++++++++---------- pkg/util/paramtable/component_param_test.go | 8 +- .../compaction/clustering_compaction_test.go | 10 ++- 7 files changed, 71 insertions(+), 58 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c24dd7eb2d..1238b79b3a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -478,8 +478,8 @@ dataCoord: # clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize]. # data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment # buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often - preferSegmentSize: 512m - maxSegmentSize: 1024m + preferSegmentSizeRatio: 0.8 + maxSegmentSizeRatio: 1 # vector clustering related maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index b8c06c049d..b26b481813 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -196,16 +196,19 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle return false, 0 } -func calculateClusteringCompactionConfig(view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64) { +func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) { for _, s := range view.GetSegmentsView() { totalRows += s.NumOfRows segmentIDs = append(segmentIDs, s.ID) } - clusteringMaxSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.GetAsSize() - clusteringPreferSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize() - segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - maxSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringMaxSegmentSize / segmentMaxSize - preferSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringPreferSegmentSize / segmentMaxSize + clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat() + clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat() + maxRows, err := calBySchemaPolicy(coll.Schema) + if err != nil { + return nil, 0, 0, 0, err + } + maxSegmentRows = int64(float64(maxRows) * clusteringMaxSegmentSizeRatio) + preferSegmentRows = int64(float64(maxRows) * clusteringPreferSegmentSizeRatio) return } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 53a8a12ff4..4c5c92242c 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -293,15 +293,23 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) { taskID, _, err := m.allocator.allocN(2) if err != nil { - log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()), + zap.Error(err)) return } collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) if err != nil { - log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()), + zap.Error(err)) return } - _, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view) + + _, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view) + if err != nil { + log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err)) + return + } + task := &datapb.CompactionTask{ PlanID: taskID, TriggerID: view.(*ClusteringSegmentsView).triggerID, diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index d2532a23b8..c29888099c 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -184,7 +184,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) at.req.Dim = int64(dim) totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(t.FieldType) // Byte - numClusters := int64(math.Ceil(totalSegmentsRawDataSize / float64(Params.DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize()))) + numClusters := int64(math.Ceil(totalSegmentsRawDataSize / (Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 * Params.DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()))) if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() { log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64())) at.SetState(indexpb.JobState_JobStateFinished, "") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9670fa4a81..78d95990a9 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2971,21 +2971,21 @@ type dataCoordConfig struct { SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction - ClusteringCompactionEnable ParamItem `refreshable:"true"` - ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` - ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` - ClusteringCompactionMinInterval ParamItem `refreshable:"true"` - ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` - ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` - ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"` - ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"` - ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"` - ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"` - ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"` - ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"` + ClusteringCompactionEnable ParamItem `refreshable:"true"` + ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` + ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` + ClusteringCompactionMinInterval ParamItem `refreshable:"true"` + ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` + ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` + ClusteringCompactionPreferSegmentSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxSegmentSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"` + ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"` + ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"` + ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"` // LevelZero Segment EnableLevelZeroSegment ParamItem `refreshable:"false"` @@ -3411,7 +3411,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionEnable = ParamItem{ Key: "dataCoord.compaction.clustering.enable", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "false", Doc: "Enable clustering compaction", Export: true, @@ -3420,7 +3420,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionAutoEnable = ParamItem{ Key: "dataCoord.compaction.clustering.autoEnable", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "false", Doc: "Enable auto clustering compaction", Export: true, @@ -3429,7 +3429,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.clustering.triggerInterval", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "600", Doc: "clustering compaction trigger interval in seconds", } @@ -3437,7 +3437,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", - Version: "2.4.6", + Version: "2.4.7", Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction", DefaultValue: "3600", } @@ -3445,7 +3445,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxInterval = ParamItem{ Key: "dataCoord.compaction.clustering.maxInterval", - Version: "2.4.6", + Version: "2.4.7", Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact", DefaultValue: "86400", } @@ -3453,7 +3453,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionNewDataSizeThreshold = ParamItem{ Key: "dataCoord.compaction.clustering.newDataSizeThreshold", - Version: "2.4.6", + Version: "2.4.7", Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction", DefaultValue: "512m", } @@ -3461,33 +3461,33 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTimeoutInSeconds = ParamItem{ Key: "dataCoord.compaction.clustering.timeout", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "3600", Doc: "timeout in seconds for clustering compaction, the task will stop if timeout", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) - p.ClusteringCompactionPreferSegmentSize = ParamItem{ - Key: "dataCoord.compaction.clustering.preferSegmentSize", - Version: "2.4.6", - DefaultValue: "512m", + p.ClusteringCompactionPreferSegmentSizeRatio = ParamItem{ + Key: "dataCoord.compaction.clustering.preferSegmentSizeRatio", + Version: "2.4.7", + DefaultValue: "0.8", PanicIfEmpty: false, Export: true, } - p.ClusteringCompactionPreferSegmentSize.Init(base.mgr) + p.ClusteringCompactionPreferSegmentSizeRatio.Init(base.mgr) - p.ClusteringCompactionMaxSegmentSize = ParamItem{ - Key: "dataCoord.compaction.clustering.maxSegmentSize", - Version: "2.4.6", - DefaultValue: "1024m", + p.ClusteringCompactionMaxSegmentSizeRatio = ParamItem{ + Key: "dataCoord.compaction.clustering.maxSegmentSizeRatio", + Version: "2.4.7", + DefaultValue: "1.0", PanicIfEmpty: false, Export: true, } - p.ClusteringCompactionMaxSegmentSize.Init(base.mgr) + p.ClusteringCompactionMaxSegmentSizeRatio.Init(base.mgr) p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxTrainSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "0.8", Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit", Export: true, @@ -3496,7 +3496,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.maxCentroidsNum", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "10240", Doc: "maximum centroids number in Kmeans train", Export: true, @@ -3505,7 +3505,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.minCentroidsNum", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "16", Doc: "minimum centroids number in Kmeans train", Export: true, @@ -3514,7 +3514,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.minClusterSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "0.01", Doc: "minimum cluster size / avg size in Kmeans train", Export: true, @@ -3523,7 +3523,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "10", Doc: "maximum cluster size / avg size in Kmeans train", Export: true, @@ -3532,7 +3532,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSize", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "5g", Doc: "maximum cluster size in Kmeans train", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 70af23bbc8..8ef52c56c0 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -481,10 +481,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g") assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) - params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m") - assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) - params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m") - assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize()) + params.Save("dataCoord.compaction.clustering.maxSegmentSizeRatio", "1.2") + assert.Equal(t, 1.2, Params.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()) + params.Save("dataCoord.compaction.clustering.preferSegmentSizeRatio", "0.5") + assert.Equal(t, 0.5, Params.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()) params.Save("dataCoord.slot.clusteringCompactionUsage", "10") assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt()) params.Save("dataCoord.slot.mixCompactionUsage", "5") diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 17c5e73965..adae93d5ac 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -71,11 +71,13 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key) - paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key, "1m") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key) - paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key, "1m") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key) schema := ConstructScalarClusteringSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema)