mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #37082 Related to #35303 Delta data is not needed when using `RemoteLoad` l0 forward policy. By skipping load delta data, memory pressure could be eased if l0 segment size/number is large. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/37132/head
parent
59b2563029
commit
79891f047d
|
@ -134,6 +134,9 @@ type shardDelegator struct {
|
|||
// in order to make add/remove growing be atomic, need lock before modify these meta info
|
||||
growingSegmentLock sync.RWMutex
|
||||
partitionStatsMut sync.RWMutex
|
||||
|
||||
// current forward policy
|
||||
l0ForwardPolicy string
|
||||
}
|
||||
|
||||
// getLogger returns the zap logger with pre-defined shard attributes.
|
||||
|
@ -866,6 +869,9 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||
|
||||
excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
|
||||
|
||||
policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue()
|
||||
log.Info("shard delegator setup l0 forward policy", zap.String("policy", policy))
|
||||
|
||||
sd := &shardDelegator{
|
||||
collectionID: collectionID,
|
||||
replicaID: replicaID,
|
||||
|
@ -887,6 +893,8 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||
chunkManager: chunkManager,
|
||||
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
|
||||
excludedSegments: excludedSegments,
|
||||
|
||||
l0ForwardPolicy: policy,
|
||||
}
|
||||
m := sync.Mutex{}
|
||||
sd.tsCond = sync.NewCond(&m)
|
||||
|
|
|
@ -422,6 +422,15 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||
if len(req.GetInfos()) > 1 {
|
||||
var reqs []*querypb.LoadSegmentsRequest
|
||||
for _, info := range req.GetInfos() {
|
||||
// put meta l0, instead of load actual delta data
|
||||
if info.GetLevel() == datapb.SegmentLevel_L0 && sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad {
|
||||
l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, req.GetVersion(), info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sd.segmentManager.Put(ctx, segments.SegmentTypeSealed, l0Seg)
|
||||
continue
|
||||
}
|
||||
newReq := typeutil.Clone(req)
|
||||
newReq.Infos = []*querypb.SegmentLoadInfo{info}
|
||||
reqs = append(reqs, newReq)
|
||||
|
|
|
@ -56,13 +56,13 @@ func (sd *shardDelegator) forwardL0Deletion(ctx context.Context,
|
|||
targetNodeID int64,
|
||||
worker cluster.Worker,
|
||||
) error {
|
||||
switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy {
|
||||
switch sd.l0ForwardPolicy {
|
||||
case ForwardPolicyDefault, L0ForwardPolicyBF:
|
||||
return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker)
|
||||
case L0ForwardPolicyRemoteLoad:
|
||||
return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker)
|
||||
default:
|
||||
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy)
|
||||
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", sd.l0ForwardPolicy)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue