mirror of https://github.com/milvus-io/milvus.git
Fix panic while forwarding empty deletions to growing segment (#28213)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/28281/head
parent
ec0d9d8fd4
commit
d20ea061d6
|
@ -323,19 +323,22 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
|
|||
}
|
||||
|
||||
deletedPks, deletedTss := sd.segmentManager.GetL0DeleteRecords()
|
||||
for _, segment := range loaded {
|
||||
err = segment.Delete(deletedPks, deletedTss)
|
||||
if err != nil {
|
||||
log.Warn("failed to forward L0 deletions to growing segment",
|
||||
zap.Int64("segmentID", segment.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
if len(deletedPks) > 0 {
|
||||
log.Info("forwarding L0 delete records...", zap.Int("deleteNum", len(deletedPks)))
|
||||
for _, segment := range loaded {
|
||||
err = segment.Delete(deletedPks, deletedTss)
|
||||
if err != nil {
|
||||
log.Warn("failed to forward L0 deletions to growing segment",
|
||||
zap.Int64("segmentID", segment.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
// clear loaded growing segments
|
||||
for _, segment := range loaded {
|
||||
segment.Release()
|
||||
// clear loaded growing segments
|
||||
for _, segment := range loaded {
|
||||
segment.Release()
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -501,6 +501,62 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
|
|||
}, sealed[0].Segments)
|
||||
})
|
||||
|
||||
s.Run("load_segments_with_l0_delete_failed", func() {
|
||||
defer func() {
|
||||
s.workerManager.ExpectedCalls = nil
|
||||
s.loader.ExpectedCalls = nil
|
||||
}()
|
||||
|
||||
mockMgr := segments.NewMockSegmentManager(s.T())
|
||||
delegator, err := NewShardDelegator(
|
||||
context.Background(),
|
||||
s.collectionID,
|
||||
s.replicaID,
|
||||
s.vchannelName,
|
||||
s.version,
|
||||
s.workerManager,
|
||||
&segments.Manager{
|
||||
Collection: s.manager.Collection,
|
||||
Segment: mockMgr,
|
||||
},
|
||||
s.tsafeManager,
|
||||
s.loader,
|
||||
&msgstream.MockMqFactory{
|
||||
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
|
||||
return s.mq, nil
|
||||
},
|
||||
}, 10000)
|
||||
s.NoError(err)
|
||||
|
||||
growing0 := segments.NewMockSegment(s.T())
|
||||
growing1 := segments.NewMockSegment(s.T())
|
||||
growing1.EXPECT().ID().Return(2)
|
||||
growing0.EXPECT().Release()
|
||||
growing1.EXPECT().Release()
|
||||
|
||||
mockErr := merr.WrapErrServiceInternal("mock")
|
||||
|
||||
growing0.EXPECT().Delete(mock.Anything, mock.Anything).Return(nil)
|
||||
growing1.EXPECT().Delete(mock.Anything, mock.Anything).Return(mockErr)
|
||||
|
||||
s.loader.EXPECT().Load(
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
segments.SegmentTypeGrowing,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return([]segments.Segment{growing0, growing1}, nil)
|
||||
|
||||
mockMgr.EXPECT().GetL0DeleteRecords().Return(
|
||||
[]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||
[]uint64{100},
|
||||
)
|
||||
|
||||
err = delegator.LoadGrowing(context.Background(), []*querypb.SegmentLoadInfo{{}, {}}, 100)
|
||||
s.ErrorIs(err, mockErr)
|
||||
})
|
||||
|
||||
s.Run("load_segments_with_streaming_delete_failed", func() {
|
||||
defer func() {
|
||||
s.workerManager.ExpectedCalls = nil
|
||||
|
|
|
@ -545,6 +545,10 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
|
|||
const unsigned long* timestamps);
|
||||
*/
|
||||
|
||||
if len(primaryKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.ptrLock.RLock()
|
||||
defer s.ptrLock.RUnlock()
|
||||
|
||||
|
|
|
@ -254,8 +254,19 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
|
||||
collection := node.manager.Collection.Get(req.GetCollectionID())
|
||||
collection.SetMetricType(req.GetLoadMeta().GetMetricType())
|
||||
delegator, err := delegator.NewShardDelegator(ctx, req.GetCollectionID(), req.GetReplicaID(), channel.GetChannelName(), req.GetVersion(),
|
||||
node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp())
|
||||
delegator, err := delegator.NewShardDelegator(
|
||||
ctx,
|
||||
req.GetCollectionID(),
|
||||
req.GetReplicaID(),
|
||||
channel.GetChannelName(),
|
||||
req.GetVersion(),
|
||||
node.clusterManager,
|
||||
node.manager,
|
||||
node.tSafeManager,
|
||||
node.loader,
|
||||
node.factory,
|
||||
channel.GetSeekPosition().GetTimestamp(),
|
||||
)
|
||||
if err != nil {
|
||||
log.Warn("failed to create shard delegator", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
|
|
Loading…
Reference in New Issue