Fix drop segment bug (#11709)

issue: #11708
Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/11662/head
Bingyi Sun 2021-11-12 17:31:11 +08:00 committed by GitHub
parent 94b31e6bf9
commit 5c8978e5d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 51 additions and 8 deletions

View File

@ -493,8 +493,12 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
validSegments := make([]int64, 0, len(s.segments))
for _, sid := range s.segments {
segment := s.meta.GetSegment(sid)
if segment != nil && segment.GetInsertChannel() != channel {
if segment == nil {
continue
}
if segment.GetInsertChannel() != channel {
validSegments = append(validSegments, sid)
continue
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {

View File

@ -538,6 +538,35 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
},
[]UniqueID{2},
},
{
"test drop segments with dropped segment",
fields{
meta: &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
InsertChannel: "ch2",
},
},
},
},
},
segments: []UniqueID{1, 2, 3},
},
args{
"ch1",
},
[]UniqueID{2},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -745,7 +745,6 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu
// DropSegment drops the segment from manager.
func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
panic("not implemented") // TODO: Implement
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments

View File

@ -300,6 +300,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("segmentID", req.GetSegmentID()),
zap.Bool("isFlush", req.GetFlushed()),
zap.Bool("isDropped", req.GetDropped()),
zap.Any("checkpoints", req.GetCheckPoints()))
// validate
@ -320,6 +321,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
if req.GetDropped() {
s.segmentManager.DropSegment(ctx, segment.GetID())
}
// set segment to SegmentState_Flushing and save binlogs and checkpoints
err := s.meta.UpdateFlushSegmentsInfo(
req.GetSegmentID(),
@ -342,6 +347,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
zap.Any("meta", req.GetField2BinlogPaths()))
if req.GetDropped() && s.checkShouldDropChannel(channel) {
log.Debug("remove channel", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
if err != nil {
log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err))
@ -359,8 +365,9 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
tt, err := getTimetravelReverseTime(cctx, s.allocator)
if err == nil {
if err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel(), tt); err != nil {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(),
segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), tt)
if err != nil {
log.Warn("failed to trigger single compaction", zap.Int64("segmentID", segmentID))
}
}
@ -377,7 +384,7 @@ func (s *Server) checkShouldDropChannel(channel string) bool {
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
len(segment.CompactionFrom) != 0 &&
len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}

View File

@ -202,6 +202,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
if rsp.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
}
if pack.flushed || pack.dropped {
dsService.replica.segmentFlushed(pack.segmentID)
}
dsService.flushingSegCache.Remove(req.GetSegmentID())
return nil
})

View File

@ -343,9 +343,6 @@ flush:
log.Warn("failed to invoke flushBufferData", zap.Error(err))
} else {
segmentsToFlush = append(segmentsToFlush, task.segmentID)
if task.flushed {
ibNode.replica.segmentFlushed(task.segmentID)
}
ibNode.insertBuffer.Delete(task.segmentID)
}
}

View File

@ -395,6 +395,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
fpMut.Unlock()
colRep.listNewSegmentsStartPositions()
colRep.listSegmentsCheckPoints()
if pack.flushed || pack.dropped {
colRep.segmentFlushed(pack.segmentID)
}
wg.Done()
return nil
})