From 2c92ce0c0e334df3b22e707c858533bc9e3f8445 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 10 Oct 2022 11:33:21 +0800 Subject: [PATCH] Prevent datacoord from syncing unflushed segments (#19659) See also: #19653 Signed-off-by: yangxuan Signed-off-by: yangxuan --- internal/datanode/segment_replica.go | 3 ++- internal/datanode/segment_replica_test.go | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index f247658692..497f24299b 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -768,7 +768,8 @@ func (replica *SegmentReplica) mergeFlushedSegments(seg *Segment, planID UniqueI var inValidSegments []UniqueID for _, ID := range compactedFrom { - if !replica.hasSegment(ID, true) { + // no such segments in replica or the segments are unflushed. + if !replica.hasSegment(ID, true) || replica.hasSegment(ID, false) { inValidSegments = append(inValidSegments, ID) } } diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index dbb43aa51f..85ce901d5e 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -838,6 +838,11 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { collectionID: 1, numRows: 15, }}, + {"segment exists but not flushed", false, false, []UniqueID{1, 4}, &Segment{ + segmentID: 3, + collectionID: 1, + numRows: 15, + }}, } for _, test := range tests { @@ -851,12 +856,23 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { sr.addFlushedSegmentWithPKs(2, 1, 0, "channel", 10, primaryKeyData) } + if !sr.hasSegment(4, false) { + sr.removeSegments(4) + sr.addSegment(addSegmentReq{ + segType: datapb.SegmentType_Normal, + segID: 4, + collID: 1, + partitionID: 0, + }) + } + if sr.hasSegment(3, true) { sr.removeSegments(3) } require.True(t, sr.hasSegment(1, true)) require.True(t, sr.hasSegment(2, true)) + require.True(t, sr.hasSegment(4, false)) require.False(t, sr.hasSegment(3, true)) // tests start