mirror of https://github.com/milvus-io/milvus.git
enhance: Skip load delta data in delegater when using RemoteLoad (#37082)
Related to #35303 Delta data is not needed when using `RemoteLoad` l0 forward policy. By skipping load delta data, memory pressure could be eased if l0 segment size/number is large. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/37120/head
parent
39a91eb100
commit
b086ef6b19
|
@ -144,6 +144,9 @@ type shardDelegator struct {
|
|||
// fieldId -> functionRunner map for search function field
|
||||
functionRunners map[UniqueID]function.FunctionRunner
|
||||
isBM25Field map[UniqueID]bool
|
||||
|
||||
// current forward policy
|
||||
l0ForwardPolicy string
|
||||
}
|
||||
|
||||
// getLogger returns the zap logger with pre-defined shard attributes.
|
||||
|
@ -917,6 +920,9 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||
|
||||
excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
|
||||
|
||||
policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue()
|
||||
log.Info("shard delegator setup l0 forward policy", zap.String("policy", policy))
|
||||
|
||||
sd := &shardDelegator{
|
||||
collectionID: collectionID,
|
||||
replicaID: replicaID,
|
||||
|
@ -940,6 +946,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||
excludedSegments: excludedSegments,
|
||||
functionRunners: make(map[int64]function.FunctionRunner),
|
||||
isBM25Field: make(map[int64]bool),
|
||||
l0ForwardPolicy: policy,
|
||||
}
|
||||
|
||||
for _, tf := range collection.Schema().GetFunctions() {
|
||||
|
|
|
@ -441,6 +441,15 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||
if len(req.GetInfos()) > 1 {
|
||||
var reqs []*querypb.LoadSegmentsRequest
|
||||
for _, info := range req.GetInfos() {
|
||||
// put meta l0, instead of load actual delta data
|
||||
if info.GetLevel() == datapb.SegmentLevel_L0 && sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad {
|
||||
l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, req.GetVersion(), info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sd.segmentManager.Put(ctx, segments.SegmentTypeSealed, l0Seg)
|
||||
continue
|
||||
}
|
||||
newReq := typeutil.Clone(req)
|
||||
newReq.Infos = []*querypb.SegmentLoadInfo{info}
|
||||
reqs = append(reqs, newReq)
|
||||
|
|
|
@ -60,13 +60,13 @@ func (sd *shardDelegator) forwardL0Deletion(ctx context.Context,
|
|||
targetNodeID int64,
|
||||
worker cluster.Worker,
|
||||
) error {
|
||||
switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy {
|
||||
switch sd.l0ForwardPolicy {
|
||||
case ForwardPolicyDefault, L0ForwardPolicyBF:
|
||||
return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker)
|
||||
case L0ForwardPolicyRemoteLoad:
|
||||
return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker)
|
||||
default:
|
||||
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy)
|
||||
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", sd.l0ForwardPolicy)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue