mirror of https://github.com/milvus-io/milvus.git
enhance: [cherry-pick] Change the fixed value to a ratio for clustering segment size (#35075)
issue: #34495 master pr: #35076 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/35129/head
parent
4bbb7b8219
commit
c340f387cf
|
@ -478,8 +478,8 @@ dataCoord:
|
|||
# clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize].
|
||||
# data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment
|
||||
# buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often
|
||||
preferSegmentSize: 512m
|
||||
maxSegmentSize: 1024m
|
||||
preferSegmentSizeRatio: 0.8
|
||||
maxSegmentSizeRatio: 1
|
||||
|
||||
# vector clustering related
|
||||
maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit
|
||||
|
|
|
@ -196,16 +196,19 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle
|
|||
return false, 0
|
||||
}
|
||||
|
||||
func calculateClusteringCompactionConfig(view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64) {
|
||||
func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) {
|
||||
for _, s := range view.GetSegmentsView() {
|
||||
totalRows += s.NumOfRows
|
||||
segmentIDs = append(segmentIDs, s.ID)
|
||||
}
|
||||
clusteringMaxSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.GetAsSize()
|
||||
clusteringPreferSegmentSize := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize()
|
||||
segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
|
||||
maxSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringMaxSegmentSize / segmentMaxSize
|
||||
preferSegmentRows = view.GetSegmentsView()[0].MaxRowNum * clusteringPreferSegmentSize / segmentMaxSize
|
||||
clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()
|
||||
clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()
|
||||
maxRows, err := calBySchemaPolicy(coll.Schema)
|
||||
if err != nil {
|
||||
return nil, 0, 0, 0, err
|
||||
}
|
||||
maxSegmentRows = int64(float64(maxRows) * clusteringMaxSegmentSizeRatio)
|
||||
preferSegmentRows = int64(float64(maxRows) * clusteringPreferSegmentSizeRatio)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -293,15 +293,23 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
|
|||
func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) {
|
||||
taskID, _, err := m.allocator.allocN(2)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID)
|
||||
if err != nil {
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()))
|
||||
log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String()),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
_, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view)
|
||||
|
||||
_, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view)
|
||||
if err != nil {
|
||||
log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
task := &datapb.CompactionTask{
|
||||
PlanID: taskID,
|
||||
TriggerID: view.(*ClusteringSegmentsView).triggerID,
|
||||
|
|
|
@ -184,7 +184,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
|
|||
at.req.Dim = int64(dim)
|
||||
|
||||
totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(t.FieldType) // Byte
|
||||
numClusters := int64(math.Ceil(totalSegmentsRawDataSize / float64(Params.DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize())))
|
||||
numClusters := int64(math.Ceil(totalSegmentsRawDataSize / (Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 * Params.DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat())))
|
||||
if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() {
|
||||
log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64()))
|
||||
at.SetState(indexpb.JobState_JobStateFinished, "")
|
||||
|
|
|
@ -2971,21 +2971,21 @@ type dataCoordConfig struct {
|
|||
SyncSegmentsInterval ParamItem `refreshable:"false"`
|
||||
|
||||
// Clustering Compaction
|
||||
ClusteringCompactionEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionPreferSegmentSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxSegmentSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionTimeoutInSeconds ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxCentroidsNum ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinCentroidsNum ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMinClusterSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxClusterSizeRatio ParamItem `refreshable:"true"`
|
||||
ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"`
|
||||
|
||||
// LevelZero Segment
|
||||
EnableLevelZeroSegment ParamItem `refreshable:"false"`
|
||||
|
@ -3411,7 +3411,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionEnable = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.enable",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable clustering compaction",
|
||||
Export: true,
|
||||
|
@ -3420,7 +3420,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionAutoEnable = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.autoEnable",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable auto clustering compaction",
|
||||
Export: true,
|
||||
|
@ -3429,7 +3429,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionTriggerInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.triggerInterval",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "600",
|
||||
Doc: "clustering compaction trigger interval in seconds",
|
||||
}
|
||||
|
@ -3437,7 +3437,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMinInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minInterval",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction",
|
||||
DefaultValue: "3600",
|
||||
}
|
||||
|
@ -3445,7 +3445,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxInterval",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact",
|
||||
DefaultValue: "86400",
|
||||
}
|
||||
|
@ -3453,7 +3453,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionNewDataSizeThreshold = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.newDataSizeThreshold",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction",
|
||||
DefaultValue: "512m",
|
||||
}
|
||||
|
@ -3461,33 +3461,33 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionTimeoutInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.timeout",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "3600",
|
||||
Doc: "timeout in seconds for clustering compaction, the task will stop if timeout",
|
||||
}
|
||||
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionPreferSegmentSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.preferSegmentSize",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "512m",
|
||||
p.ClusteringCompactionPreferSegmentSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.preferSegmentSizeRatio",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "0.8",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
}
|
||||
p.ClusteringCompactionPreferSegmentSize.Init(base.mgr)
|
||||
p.ClusteringCompactionPreferSegmentSizeRatio.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionMaxSegmentSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxSegmentSize",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "1024m",
|
||||
p.ClusteringCompactionMaxSegmentSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxSegmentSizeRatio",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "1.0",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
}
|
||||
p.ClusteringCompactionMaxSegmentSize.Init(base.mgr)
|
||||
p.ClusteringCompactionMaxSegmentSizeRatio.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxTrainSizeRatio",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "0.8",
|
||||
Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit",
|
||||
Export: true,
|
||||
|
@ -3496,7 +3496,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxCentroidsNum = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxCentroidsNum",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "10240",
|
||||
Doc: "maximum centroids number in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3505,7 +3505,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMinCentroidsNum = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minCentroidsNum",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "16",
|
||||
Doc: "minimum centroids number in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3514,7 +3514,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMinClusterSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minClusterSizeRatio",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "0.01",
|
||||
Doc: "minimum cluster size / avg size in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3523,7 +3523,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxClusterSizeRatio",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "10",
|
||||
Doc: "maximum cluster size / avg size in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3532,7 +3532,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxClusterSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxClusterSize",
|
||||
Version: "2.4.6",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "5g",
|
||||
Doc: "maximum cluster size in Kmeans train",
|
||||
Export: true,
|
||||
|
|
|
@ -481,10 +481,10 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g")
|
||||
assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m")
|
||||
assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")
|
||||
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize())
|
||||
params.Save("dataCoord.compaction.clustering.maxSegmentSizeRatio", "1.2")
|
||||
assert.Equal(t, 1.2, Params.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat())
|
||||
params.Save("dataCoord.compaction.clustering.preferSegmentSizeRatio", "0.5")
|
||||
assert.Equal(t, 0.5, Params.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat())
|
||||
params.Save("dataCoord.slot.clusteringCompactionUsage", "10")
|
||||
assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt())
|
||||
params.Save("dataCoord.slot.mixCompactionUsage", "5")
|
||||
|
|
|
@ -71,11 +71,13 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
|
|||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key)
|
||||
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key, "1m")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key)
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, "1")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key)
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key, "1.0")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key)
|
||||
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key, "1m")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key)
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1.0")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
|
||||
|
||||
schema := ConstructScalarClusteringSchema(collectionName, dim, true)
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
|
|
Loading…
Reference in New Issue