From 196a7986b3196e4e9ddd88b20f935933abdf2dab Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 1 Aug 2024 22:04:14 +0800 Subject: [PATCH] enhance: Change the fixed value to a ratio for clustering segment size (#35076) issue: #34495 --------- Signed-off-by: Cai Zhang --- configs/milvus.yaml | 4 +- .../datacoord/compaction_policy_clustering.go | 16 +- .../compaction_policy_clustering_test.go | 4 + internal/datacoord/compaction_trigger_v2.go | 45 ++++- .../datacoord/compaction_trigger_v2_test.go | 174 ++++++++++++++++++ .../datacoord/segment_allocation_policy.go | 15 ++ .../segment_allocation_policy_test.go | 52 ++++++ internal/datacoord/task_analyze.go | 2 +- pkg/util/paramtable/component_param.go | 76 ++++---- pkg/util/paramtable/component_param_test.go | 8 +- .../compaction/clustering_compaction_test.go | 10 +- 11 files changed, 347 insertions(+), 59 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 49e5b1ad04..5dc3fb6d78 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -540,8 +540,8 @@ dataCoord: minInterval: 3600 # The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction maxInterval: 259200 # If a collection haven't been clustering compacted for longer than maxInterval, force compact newDataSizeThreshold: 512m # If new data size is large than newDataSizeThreshold, execute clustering compaction - preferSegmentSize: 512m - maxSegmentSize: 1024m + preferSegmentSizeRatio: 0.8 + maxSegmentSizeRatio: 1 maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit maxCentroidsNum: 10240 # maximum centroids number in Kmeans train minCentroidsNum: 16 # minimum centroids number in Kmeans train diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 7694143135..f160563740 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -197,16 +197,20 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle return false, 0 } -func calculateClusteringCompactionConfig(view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64) { +func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) { for _, s := range view.GetSegmentsView() { totalRows += s.NumOfRows segmentIDs = append(segmentIDs, s.ID) } - clusteringMaxSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.GetAsSize() - clusteringPreferSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize() - segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - maxSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringMaxSegmentSize / segmentMaxSize - preferSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringPreferSegmentSize / segmentMaxSize + clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat() + clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat() + + maxRows, err := calBySegmentSizePolicy(coll.Schema, expectedSegmentSize) + if err != nil { + return nil, 0, 0, 0, err + } + maxSegmentRows = int64(float64(maxRows) * clusteringMaxSegmentSizeRatio) + preferSegmentRows = int64(float64(maxRows) * clusteringPreferSegmentSizeRatio) return } diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index f605f025a4..9014b81623 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -182,3 +182,7 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() } }) } + +func (s *ClusteringCompactionPolicySuite) TestGetExpectedSegmentSize() { + +} diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 6ea9b103f8..bb1004e183 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -24,10 +24,14 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CompactionTriggerType int8 @@ -296,19 +300,29 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) { taskID, _, err := m.allocator.allocN(2) if err != nil { - log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()), + zap.Error(err)) return } collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) if err != nil { - log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()), + zap.Error(err)) return } - _, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view) + + expectedSegmentSize := m.getExpectedSegmentSize(collection) + + _, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize) + if err != nil { + log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err)) + return + } + resultSegmentNum := totalRows / preferSegmentRows * 2 start, end, err := m.allocator.allocN(resultSegmentNum) if err != nil { - log.Warn("pre-allocate result segments failed", zap.String("view", view.String())) + log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err)) return } task := &datapb.CompactionTask{ @@ -397,6 +411,29 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte ) } +func (m *CompactionTriggerManager) getExpectedSegmentSize(collection *collectionInfo) int64 { + indexInfos := m.meta.indexMeta.GetIndexesForCollection(collection.ID, "") + + vectorFields := typeutil.GetVectorFieldSchemas(collection.Schema) + fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { + return t.FieldID, GetIndexType(t.IndexParams) + }) + vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { + return indexparamcheck.IsDiskIndex(indexType) + } + return false + }) + + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) + if allDiskIndex { + // Only if all vector fields index type are DiskANN, recalc segment max size here. + return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 + } + // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 +} + // chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName type chanPartSegments struct { collectionID UniqueID diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index dabd84b621..2fd2ecb736 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -2,13 +2,22 @@ package datacoord import ( "context" + "strconv" "testing" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + + "github.com/milvus-io/milvus/pkg/util/paramtable" + + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/pkg/common" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) @@ -140,3 +149,168 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) } + +func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { + var ( + collectionID = int64(1000) + fieldID = int64(2000) + indexID = int64(3000) + ) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(100)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key, strconv.Itoa(200)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key) + + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 2: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + + s.Run("all DISKANN", func() { + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(200*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) + + s.Run("HNSW & DISKANN", func() { + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "HNSW"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 2: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) + + s.Run("some vector has no index", func() { + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "HNSW"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) +} diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 4d9dda6c30..cf467ffa09 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -67,6 +67,21 @@ func calBySchemaPolicyWithDiskIndex(schema *schemapb.CollectionSchema) (int, err return int(threshold / float64(sizePerRecord)), nil } +func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64) (int, error) { + if schema == nil { + return -1, errors.New("nil schema") + } + sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) + if err != nil { + return -1, err + } + // check zero value, preventing panicking + if sizePerRecord == 0 { + return -1, errors.New("zero size record schema found") + } + return int(segmentSize) / sizePerRecord, nil +} + // AllocatePolicy helper function definition to allocate Segment space type AllocatePolicy func(segments []*SegmentInfo, count int64, maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation) diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index fc6f77ddc8..fa803b227e 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -140,6 +140,58 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) { } } +func TestCalBySegmentSizePolicy(t *testing.T) { + t.Run("nil schema", func(t *testing.T) { + rows, err := calBySegmentSizePolicy(nil, 1024) + + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("get dim failed", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "fake"}}}, + }, + EnableDynamicField: false, + Properties: nil, + } + + rows, err := calBySegmentSizePolicy(schema, 1024) + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("sizePerRecord is zero", func(t *testing.T) { + schema := &schemapb.CollectionSchema{Fields: nil} + rows, err := calBySegmentSizePolicy(schema, 1024) + + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("normal case", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + } + + rows, err := calBySegmentSizePolicy(schema, 1200) + assert.NoError(t, err) + // 1200/(4*8+8) + assert.Equal(t, 30, rows) + }) +} + func TestSortSegmentsByLastExpires(t *testing.T) { segs := make([]*SegmentInfo, 0, 10) for i := 0; i < 10; i++ { diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index d2532a23b8..c29888099c 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -184,7 +184,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) at.req.Dim = int64(dim) totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(t.FieldType) // Byte - numClusters := int64(math.Ceil(totalSegmentsRawDataSize / float64(Params.DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize()))) + numClusters := int64(math.Ceil(totalSegmentsRawDataSize / (Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 * Params.DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()))) if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() { log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64())) at.SetState(indexpb.JobState_JobStateFinished, "") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 28d267c4b4..e91eb883e3 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3002,21 +3002,21 @@ type dataCoordConfig struct { SyncSegmentsInterval ParamItem `refreshable:"false"` // Clustering Compaction - ClusteringCompactionEnable ParamItem `refreshable:"true"` - ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` - ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` - ClusteringCompactionMinInterval ParamItem `refreshable:"true"` - ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` - ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` - ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"` - ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"` - ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"` - ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"` - ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"` - ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"` - ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"` + ClusteringCompactionEnable ParamItem `refreshable:"true"` + ClusteringCompactionAutoEnable ParamItem `refreshable:"true"` + ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"` + ClusteringCompactionMinInterval ParamItem `refreshable:"true"` + ClusteringCompactionMaxInterval ParamItem `refreshable:"true"` + ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"` + ClusteringCompactionPreferSegmentSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxSegmentSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"` + ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"` + ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"` + ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"` + ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"` // LevelZero Segment EnableLevelZeroSegment ParamItem `refreshable:"false"` @@ -3449,7 +3449,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionEnable = ParamItem{ Key: "dataCoord.compaction.clustering.enable", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "false", Doc: "Enable clustering compaction", Export: true, @@ -3458,7 +3458,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionAutoEnable = ParamItem{ Key: "dataCoord.compaction.clustering.autoEnable", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "false", Doc: "Enable auto clustering compaction", Export: true, @@ -3467,7 +3467,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.clustering.triggerInterval", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "600", Doc: "clustering compaction trigger interval in seconds", Export: true, @@ -3476,7 +3476,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", - Version: "2.4.6", + Version: "2.4.7", Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction", DefaultValue: "3600", Export: true, @@ -3485,7 +3485,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxInterval = ParamItem{ Key: "dataCoord.compaction.clustering.maxInterval", - Version: "2.4.6", + Version: "2.4.7", Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact", DefaultValue: "86400", Export: true, @@ -3494,7 +3494,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionNewDataSizeThreshold = ParamItem{ Key: "dataCoord.compaction.clustering.newDataSizeThreshold", - Version: "2.4.6", + Version: "2.4.7", Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction", DefaultValue: "512m", Export: true, @@ -3503,32 +3503,32 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTimeoutInSeconds = ParamItem{ Key: "dataCoord.compaction.clustering.timeout", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "3600", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) - p.ClusteringCompactionPreferSegmentSize = ParamItem{ - Key: "dataCoord.compaction.clustering.preferSegmentSize", - Version: "2.4.6", - DefaultValue: "512m", + p.ClusteringCompactionPreferSegmentSizeRatio = ParamItem{ + Key: "dataCoord.compaction.clustering.preferSegmentSizeRatio", + Version: "2.4.7", + DefaultValue: "0.8", PanicIfEmpty: false, Export: true, } - p.ClusteringCompactionPreferSegmentSize.Init(base.mgr) + p.ClusteringCompactionPreferSegmentSizeRatio.Init(base.mgr) - p.ClusteringCompactionMaxSegmentSize = ParamItem{ - Key: "dataCoord.compaction.clustering.maxSegmentSize", - Version: "2.4.6", - DefaultValue: "1024m", + p.ClusteringCompactionMaxSegmentSizeRatio = ParamItem{ + Key: "dataCoord.compaction.clustering.maxSegmentSizeRatio", + Version: "2.4.7", + DefaultValue: "1.0", PanicIfEmpty: false, Export: true, } - p.ClusteringCompactionMaxSegmentSize.Init(base.mgr) + p.ClusteringCompactionMaxSegmentSizeRatio.Init(base.mgr) p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxTrainSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "0.8", Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit", Export: true, @@ -3537,7 +3537,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.maxCentroidsNum", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "10240", Doc: "maximum centroids number in Kmeans train", Export: true, @@ -3546,7 +3546,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.minCentroidsNum", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "16", Doc: "minimum centroids number in Kmeans train", Export: true, @@ -3555,7 +3555,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.minClusterSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "0.01", Doc: "minimum cluster size / avg size in Kmeans train", Export: true, @@ -3564,7 +3564,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSizeRatio", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "10", Doc: "maximum cluster size / avg size in Kmeans train", Export: true, @@ -3573,7 +3573,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSize", - Version: "2.4.6", + Version: "2.4.7", DefaultValue: "5g", Doc: "maximum cluster size in Kmeans train", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 83b0e27510..8204c87182 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -483,10 +483,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g") assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize()) - params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m") - assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) - params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m") - assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize()) + params.Save("dataCoord.compaction.clustering.maxSegmentSizeRatio", "1.2") + assert.Equal(t, 1.2, Params.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()) + params.Save("dataCoord.compaction.clustering.preferSegmentSizeRatio", "0.5") + assert.Equal(t, 0.5, Params.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()) params.Save("dataCoord.slot.clusteringCompactionUsage", "10") assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt()) params.Save("dataCoord.slot.mixCompactionUsage", "5") diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 6ca5ad09de..c8c65eaac7 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -71,11 +71,13 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key) - paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key, "1m") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key) - paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key, "1m") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key) schema := ConstructScalarClusteringSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema)