mirror of https://github.com/milvus-io/milvus.git
Improve sync target version logic (#26630)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/26891/head
parent
073ac8350b
commit
89fc9aad82
|
@ -625,12 +625,15 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
|||
|
||||
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
|
||||
sealedInTarget []int64, droppedInTarget []int64) {
|
||||
growings := sd.segmentManager.GetBy(segments.WithType(segments.SegmentTypeGrowing))
|
||||
growings := sd.segmentManager.GetBy(
|
||||
segments.WithType(segments.SegmentTypeGrowing),
|
||||
segments.WithChannel(sd.vchannelName),
|
||||
)
|
||||
|
||||
sealedSet := typeutil.NewUniqueSet(sealedInTarget...)
|
||||
growingSet := typeutil.NewUniqueSet(growingInTarget...)
|
||||
droppedSet := typeutil.NewUniqueSet(droppedInTarget...)
|
||||
redundantGrowing := make([]int64, 0)
|
||||
redundantGrowing := typeutil.NewUniqueSet()
|
||||
for _, s := range growings {
|
||||
if growingSet.Contain(s.ID()) {
|
||||
continue
|
||||
|
@ -638,19 +641,20 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []
|
|||
|
||||
// sealed segment already exists, make growing segment redundant
|
||||
if sealedSet.Contain(s.ID()) {
|
||||
redundantGrowing = append(redundantGrowing, s.ID())
|
||||
redundantGrowing.Insert(s.ID())
|
||||
}
|
||||
|
||||
// sealed segment already dropped, make growing segment redundant
|
||||
if droppedSet.Contain(s.ID()) {
|
||||
redundantGrowing = append(redundantGrowing, s.ID())
|
||||
redundantGrowing.Insert(s.ID())
|
||||
}
|
||||
}
|
||||
redundantGrowingIDs := redundantGrowing.Collect()
|
||||
if len(redundantGrowing) > 0 {
|
||||
log.Warn("found redundant growing segments",
|
||||
zap.Int64s("growingSegments", redundantGrowing))
|
||||
zap.Int64s("growingSegments", redundantGrowingIDs))
|
||||
}
|
||||
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowing)
|
||||
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
|
||||
}
|
||||
|
||||
func (sd *shardDelegator) GetTargetVersion() int64 {
|
||||
|
|
|
@ -730,6 +730,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
|
|||
ms.EXPECT().Partition().Return(1)
|
||||
ms.EXPECT().RowNum().Return(0)
|
||||
ms.EXPECT().Indexes().Return(nil)
|
||||
ms.EXPECT().Shard().Return(s.vchannelName)
|
||||
s.manager.Segment.Put(segments.SegmentTypeGrowing, ms)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue