From 5c8978e5d61d4bee641f61452a731dc78d6473fd Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Fri, 12 Nov 2021 17:31:11 +0800 Subject: [PATCH] Fix drop segment bug (#11709) issue: #11708 Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/segment_manager.go | 6 +++- internal/datacoord/segment_manager_test.go | 29 +++++++++++++++++++ internal/datacoord/server_test.go | 1 - internal/datacoord/services.go | 13 +++++++-- internal/datanode/data_sync_service.go | 4 +++ .../datanode/flow_graph_insert_buffer_node.go | 3 -- .../flow_graph_insert_buffer_node_test.go | 3 ++ 7 files changed, 51 insertions(+), 8 deletions(-) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 4199091b88..3110df4f27 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -493,8 +493,12 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri validSegments := make([]int64, 0, len(s.segments)) for _, sid := range s.segments { segment := s.meta.GetSegment(sid) - if segment != nil && segment.GetInsertChannel() != channel { + if segment == nil { + continue + } + if segment.GetInsertChannel() != channel { validSegments = append(validSegments, sid) + continue } s.meta.SetAllocations(sid, nil) for _, allocation := range segment.allocations { diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 844c8d1b49..ff7108a6be 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -538,6 +538,35 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { }, []UniqueID{2}, }, + { + "test drop segments with dropped segment", + fields{ + meta: &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + InsertChannel: "ch2", + }, + }, + }, + }, + }, + segments: []UniqueID{1, 2, 3}, + }, + args{ + "ch1", + }, + []UniqueID{2}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index c5acd04bb2..83b5fc667b 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -745,7 +745,6 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu // DropSegment drops the segment from manager. func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { - panic("not implemented") // TODO: Implement } // SealAllSegments seals all segments of collection with collectionID and return sealed segments diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e65f165a9b..c8e62b17b0 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -300,6 +300,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Bool("isFlush", req.GetFlushed()), + zap.Bool("isDropped", req.GetDropped()), zap.Any("checkpoints", req.GetCheckPoints())) // validate @@ -320,6 +321,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return resp, nil } + if req.GetDropped() { + s.segmentManager.DropSegment(ctx, segment.GetID()) + } + // set segment to SegmentState_Flushing and save binlogs and checkpoints err := s.meta.UpdateFlushSegmentsInfo( req.GetSegmentID(), @@ -342,6 +347,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath zap.Any("meta", req.GetField2BinlogPaths())) if req.GetDropped() && s.checkShouldDropChannel(channel) { + log.Debug("remove channel", zap.String("channel", channel)) err = s.channelManager.RemoveChannel(channel) if err != nil { log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err)) @@ -359,8 +365,9 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath tt, err := getTimetravelReverseTime(cctx, s.allocator) if err == nil { - if err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(), - segmentID, segment.GetInsertChannel(), tt); err != nil { + err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), + segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), tt) + if err != nil { log.Warn("failed to trigger single compaction", zap.Int64("segmentID", segmentID)) } } @@ -377,7 +384,7 @@ func (s *Server) checkShouldDropChannel(channel string) bool { // FIXME: we filter compaction generated segments // because datanode may not know the segment due to the network lag or // datacoord crash when handling CompleteCompaction. - len(segment.CompactionFrom) != 0 && + len(segment.CompactionFrom) == 0 && segment.GetState() != commonpb.SegmentState_Dropped { return false } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 5f028215dd..bfbc8b6367 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -202,6 +202,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro if rsp.ErrorCode != commonpb.ErrorCode_Success { return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason) } + + if pack.flushed || pack.dropped { + dsService.replica.segmentFlushed(pack.segmentID) + } dsService.flushingSegCache.Remove(req.GetSegmentID()) return nil }) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 5c100b8451..e3a83e4408 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -343,9 +343,6 @@ flush: log.Warn("failed to invoke flushBufferData", zap.Error(err)) } else { segmentsToFlush = append(segmentsToFlush, task.segmentID) - if task.flushed { - ibNode.replica.segmentFlushed(task.segmentID) - } ibNode.insertBuffer.Delete(task.segmentID) } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index c225d1dd76..cf9df1d26b 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -395,6 +395,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { fpMut.Unlock() colRep.listNewSegmentsStartPositions() colRep.listSegmentsCheckPoints() + if pack.flushed || pack.dropped { + colRep.segmentFlushed(pack.segmentID) + } wg.Done() return nil })