mirror of https://github.com/milvus-io/milvus.git
Perform alter distribution under mutex protection (#26229)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/26119/head
parent
f33a89387f
commit
eea9197306
|
@ -364,14 +364,6 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Info("load delete...")
|
||||
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker)
|
||||
if err != nil {
|
||||
log.Warn("load stream delete failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Info("load delete done")
|
||||
// alter distribution
|
||||
entries := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) SegmentEntry {
|
||||
return SegmentEntry{
|
||||
SegmentID: info.GetSegmentID(),
|
||||
|
@ -380,7 +372,12 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||
Version: req.GetVersion(),
|
||||
}
|
||||
})
|
||||
sd.distribution.AddDistributions(entries...)
|
||||
log.Info("load delete...")
|
||||
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
|
||||
if err != nil {
|
||||
log.Warn("load stream delete failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -389,7 +386,10 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
|
|||
candidates []*pkoracle.BloomFilterSet,
|
||||
infos []*querypb.SegmentLoadInfo,
|
||||
deltaPositions []*msgpb.MsgPosition,
|
||||
targetNodeID int64, worker cluster.Worker) error {
|
||||
targetNodeID int64,
|
||||
worker cluster.Worker,
|
||||
entries []SegmentEntry,
|
||||
) error {
|
||||
log := sd.getLogger(ctx)
|
||||
|
||||
idCandidates := lo.SliceToMap(candidates, func(candidate *pkoracle.BloomFilterSet) (int64, *pkoracle.BloomFilterSet) {
|
||||
|
@ -462,6 +462,9 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
|
|||
)
|
||||
sd.pkOracle.Register(candidate, targetNodeID)
|
||||
}
|
||||
log.Info("load delete done")
|
||||
// alter distribution
|
||||
sd.distribution.AddDistributions(entries...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue