enhance: [cherry-pick] refine clustering compaction configs and logs (#34818)

issue: #30633
pr: #34784

---------

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/34869/head
wayblink 2024-07-21 19:27:41 +08:00 committed by GitHub
parent 33bbc614df
commit c0c3c5f528
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 31 additions and 25 deletions

View File

@ -466,25 +466,28 @@ dataCoord:
gcInterval: 1800 # The time interval in seconds for compaction gc
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
clustering:
enable: true
autoEnable: false
triggerInterval: 600
minInterval: 3600
maxInterval: 259200
newDataRatioThreshold: 0.2
newDataSizeThreshold: 512m
timeout: 7200
enable: true # Enable clustering compaction
autoEnable: false # Enable auto background clustering compaction
triggerInterval: 600 # clustering compaction trigger interval in seconds
minInterval: 3600 # The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction
maxInterval: 259200 # If a collection haven't been clustering compacted for longer than maxInterval, force compact
newDataSizeThreshold: 512m # If new data size is large than newDataSizeThreshold, execute clustering compaction
timeout: 7200 # timeout in seconds for clustering compaction, the task will stop if timeout
dropTolerance: 86400
# clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize].
# data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment
# buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often
preferSegmentSize: 512m
maxSegmentSize: 1024m
maxTrainSizeRatio: 0.8 # max data size ratio in analyze, if data is larger than it, will down sampling to meet this limit
maxCentroidsNum: 10240
minCentroidsNum: 16
minClusterSizeRatio: 0.01
maxClusterSizeRatio: 10
maxClusterSize: 5g
# vector clustering related
maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit
maxCentroidsNum: 10240 # maximum centroids number in Kmeans train
minCentroidsNum: 16 # minimum centroids number in Kmeans train
minClusterSizeRatio: 0.01 # minimum cluster size / avg size in Kmeans train
maxClusterSizeRatio: 10 #maximum cluster size / avg size in Kmeans train
maxClusterSize: 5g # maximum cluster size in Kmeans train
levelzero:
forceTrigger:
@ -577,8 +580,8 @@ dataNode:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.
workPoolSize: 8
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
workPoolSize: 8 # worker pool size for one clustering compaction job
# Configures the system log output.
log:

View File

@ -53,6 +53,7 @@ func (policy *clusteringCompactionPolicy) Enable() bool {
}
func (policy *clusteringCompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
log.Info("start trigger clusteringCompactionPolicy...")
ctx := context.Background()
collections := policy.meta.GetCollections()
ts, err := policy.allocator.allocTimestamp(ctx)
@ -97,7 +98,8 @@ func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context
}
func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64, ts Timestamp, manual bool) ([]CompactionView, int64, error) {
log.Info("trigger collection clustering compaction", zap.Int64("collectionID", collectionID))
log := log.With(zap.Int64("collectionID", collectionID))
log.Info("trigger collection clustering compaction")
collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("fail to get collection")
@ -105,6 +107,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
}
clusteringKeyField := clustering.GetClusteringKeyField(collection.Schema)
if clusteringKeyField == nil {
log.Info("the collection has no clustering key, skip tigger clustering compaction")
return nil, 0, nil
}
@ -120,7 +123,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
compacting, triggerID := policy.collectionIsClusteringCompacting(collection.ID)
if compacting {
log.Info("collection is clustering compacting", zap.Int64("collectionID", collection.ID), zap.Int64("triggerID", triggerID))
log.Info("collection is clustering compacting", zap.Int64("triggerID", triggerID))
return nil, triggerID, nil
}
@ -142,10 +145,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments
for _, group := range partSegments {
log := log.Ctx(ctx).With(zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
zap.String("channel", group.channelName))
log := log.With(zap.Int64("partitionID", group.partitionID), zap.String("channel", group.channelName))
if !policy.checkAllL2SegmentsContains(ctx, group.collectionID, group.partitionID, group.channelName) {
log.Warn("clustering compaction cannot be done, otherwise the performance will fall back")
continue
@ -184,7 +184,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
views = append(views, view)
}
log.Info("trigger collection clustering compaction", zap.Int64("collectionID", collectionID), zap.Int("viewNum", len(views)))
log.Info("finish trigger collection clustering compaction", zap.Int("viewNum", len(views)))
return views, newTriggerID, nil
}

View File

@ -30,6 +30,7 @@ func (policy *l0CompactionPolicy) Enable() bool {
}
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
log.Info("start trigger l0CompactionPolicy...")
// support config hot refresh
events := policy.generateEventForLevelZeroViewChange()
if len(events) != 0 {

View File

@ -2803,7 +2803,7 @@ user-task-polling:
Key: "queryNode.enableSegmentPrune",
Version: "2.3.4",
DefaultValue: "false",
Doc: "use partition prune function on shard delegator",
Doc: "use partition stats to prune data in search/query on shard delegator",
Export: true,
}
p.EnableSegmentPrune.Init(base.mgr)
@ -3354,6 +3354,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.clustering.triggerInterval",
Version: "2.4.6",
DefaultValue: "600",
Doc: "clustering compaction trigger interval in seconds",
}
p.ClusteringCompactionTriggerInterval.Init(base.mgr)
@ -3385,6 +3386,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.clustering.timeout",
Version: "2.4.6",
DefaultValue: "3600",
Doc: "timeout in seconds for clustering compaction, the task will stop if timeout",
}
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)
@ -4145,7 +4147,7 @@ if this parameter <= 0, will set it as 10`,
p.ClusteringCompactionMemoryBufferRatio = ParamItem{
Key: "dataNode.clusteringCompaction.memoryBufferRatio",
Version: "2.4.6",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.",
DefaultValue: "0.1",
PanicIfEmpty: false,
Export: true,