mirror of https://github.com/milvus-io/milvus.git
fix set shard unservicable when sync target version (#25418)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/25424/head
parent
a47fc28023
commit
d4704ab9b6
|
@ -210,24 +210,25 @@ func (d *distribution) AddOfflines(segmentIDs ...int64) {
|
|||
func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64) {
|
||||
d.mut.Lock()
|
||||
defer d.mut.Unlock()
|
||||
|
||||
for _, segmentID := range growingInTarget {
|
||||
entry, ok := d.growingSegments[segmentID]
|
||||
if !ok {
|
||||
log.Error("readable growing segment lost, make it unserviceable",
|
||||
log.Warn("readable growing segment lost, consume from dml seems too slow",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
d.serviceable.Store(false)
|
||||
continue
|
||||
}
|
||||
entry.TargetVersion = newVersion
|
||||
d.growingSegments[segmentID] = entry
|
||||
}
|
||||
|
||||
available := true
|
||||
for _, segmentID := range sealedInTarget {
|
||||
entry, ok := d.sealedSegments[segmentID]
|
||||
if !ok {
|
||||
log.Error("readable sealed segment lost, make it unserviceable",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
d.serviceable.Store(false)
|
||||
available = false
|
||||
continue
|
||||
}
|
||||
entry.TargetVersion = newVersion
|
||||
|
@ -237,6 +238,8 @@ func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int
|
|||
oldValue := d.targetVersion.Load()
|
||||
d.targetVersion.Store(newVersion)
|
||||
d.genSnapshot()
|
||||
// if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable
|
||||
d.serviceable.Store(available)
|
||||
log.Info("Update readable segment version",
|
||||
zap.Int64("oldVersion", oldValue),
|
||||
zap.Int64("newVersion", newVersion),
|
||||
|
|
|
@ -631,6 +631,13 @@ func (s *DistributionSuite) Test_SyncTargetVersion() {
|
|||
s1, s2, _ = s.dist.GetSegments(false)
|
||||
s.Len(s1[0].Segments, 3)
|
||||
s.Len(s2, 3)
|
||||
|
||||
s.dist.serviceable.Store(true)
|
||||
s.dist.SyncTargetVersion(2, []int64{222}, []int64{})
|
||||
s.True(s.dist.Serviceable())
|
||||
|
||||
s.dist.SyncTargetVersion(2, []int64{}, []int64{333})
|
||||
s.False(s.dist.Serviceable())
|
||||
}
|
||||
|
||||
func TestDistributionSuite(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue