mirror of https://github.com/milvus-io/milvus.git
fix: [cherry-pick] Remove flushed segment in segment manager generated through import (#34651)
issue: #34648 master pr: #34649 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/34687/head
parent
6a3a14affb
commit
e1686d096f
|
@ -278,19 +278,28 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
|
|||
defer s.mu.Unlock()
|
||||
|
||||
// filter segments
|
||||
validSegments := make(map[UniqueID]struct{})
|
||||
invalidSegments := make(map[UniqueID]struct{})
|
||||
segments := make([]*SegmentInfo, 0)
|
||||
for _, segmentID := range s.segments {
|
||||
segment := s.meta.GetHealthySegment(segmentID)
|
||||
if segment == nil {
|
||||
log.Warn("Failed to get segment info from meta", zap.Int64("id", segmentID))
|
||||
invalidSegments[segmentID] = struct{}{}
|
||||
continue
|
||||
}
|
||||
validSegments[segmentID] = struct{}{}
|
||||
|
||||
if !satisfy(segment, collectionID, partitionID, channelName) || !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
|
||||
if len(invalidSegments) > 0 {
|
||||
log.Warn("Failed to get segments infos from meta, clear them", zap.Int64s("segmentIDs", lo.Keys(invalidSegments)))
|
||||
}
|
||||
s.segments = lo.Keys(validSegments)
|
||||
|
||||
// Apply allocation policy.
|
||||
maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID)
|
||||
if err != nil {
|
||||
|
@ -500,11 +509,24 @@ func (s *SegmentManager) FlushImportSegments(ctx context.Context, collectionID U
|
|||
// We set the importing segment state directly to 'Flushed' rather than
|
||||
// 'Sealed' because all data has been imported, and there is no data
|
||||
// in the datanode flowgraph that needs to be synced.
|
||||
candidatesMap := make(map[UniqueID]struct{})
|
||||
for _, id := range candidates {
|
||||
if err := s.meta.SetState(id, commonpb.SegmentState_Flushed); err != nil {
|
||||
return err
|
||||
}
|
||||
candidatesMap[id] = struct{}{}
|
||||
}
|
||||
|
||||
validSegments := make(map[UniqueID]struct{})
|
||||
for _, id := range s.segments {
|
||||
if _, ok := candidatesMap[id]; !ok {
|
||||
validSegments[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// it is necessary for v2.4.x, import segments were no longer assigned by the segmentManager.
|
||||
s.segments = lo.Keys(validSegments)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -141,6 +141,23 @@ func TestAllocSegment(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
assert.Nil(t, segmentManager)
|
||||
})
|
||||
|
||||
t.Run("alloc clear unhealthy segment", func(t *testing.T) {
|
||||
allocations1, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(allocations1))
|
||||
assert.EqualValues(t, 1, len(segmentManager.segments))
|
||||
|
||||
err = meta.SetState(allocations1[0].SegmentID, commonpb.SegmentState_Dropped)
|
||||
assert.NoError(t, err)
|
||||
|
||||
allocations2, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, len(allocations2))
|
||||
// clear old healthy and alloc new
|
||||
assert.EqualValues(t, 1, len(segmentManager.segments))
|
||||
assert.NotEqual(t, allocations1[0].SegmentID, allocations2[0].SegmentID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLastExpireReset(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue