diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 1f749eb8fa..f22f7db9c4 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -277,7 +277,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") return retry.Unrecoverable(err) - } else if errors.Is(err, merr.ErrSegmentNotFound) { + } else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) { log.Warn("try to delete data of released segment") return nil } else if err != nil { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 41257c71bd..bd57fcf70c 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -58,7 +58,7 @@ type DelegatorDataSuite struct { loader *segments.MockLoader mq *msgstream.MockMsgStream - delegator ShardDelegator + delegator *shardDelegator } func (s *DelegatorDataSuite) SetupSuite() { @@ -131,13 +131,15 @@ func (s *DelegatorDataSuite) SetupTest() { s.mq = &msgstream.MockMsgStream{} - var err error - s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { return s.mq, nil }, }, 10000, nil) s.Require().NoError(err) + sd, ok := delegator.(*shardDelegator) + s.Require().True(ok) + s.delegator = sd } func (s *DelegatorDataSuite) TestProcessInsert() { @@ -330,6 +332,20 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + s.True(s.delegator.distribution.Serviceable()) + + // test worker return segment not loaded + worker1.ExpectedCalls = nil + worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrSegmentNotLoaded) + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) + s.True(s.delegator.distribution.Serviceable(), "segment not loaded shall not trigger offline") // test worker offline worker1.ExpectedCalls = nil @@ -342,6 +358,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + + s.False(s.delegator.distribution.Serviceable()) } func (s *DelegatorDataSuite) TestLoadSegments() { @@ -901,7 +919,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() { } func (s *DelegatorDataSuite) TestLevel0Deletions() { - delegator := s.delegator.(*shardDelegator) + delegator := s.delegator partitionID := int64(10) partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100}) allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})