mirror of https://github.com/milvus-io/milvus.git
Not convert legacy error code to new merr (#28232)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/28271/head
parent
1b90630633
commit
f4341f254d
|
@ -48,7 +48,7 @@ type DataCoord interface {
|
|||
GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error)
|
||||
UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error
|
||||
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error
|
||||
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error
|
||||
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
|
||||
UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error
|
||||
SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) error
|
||||
}
|
||||
|
|
|
@ -117,19 +117,16 @@ func (dc *dataCoordBroker) SaveBinlogPaths(ctx context.Context, req *datapb.Save
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error {
|
||||
func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
|
||||
log := log.Ctx(ctx)
|
||||
|
||||
resp, err := dc.client.DropVirtualChannel(ctx, req)
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
||||
err = merr.WrapErrChannelNotFound(req.GetChannelName())
|
||||
}
|
||||
log.Warn("failed to SaveBinlogPaths", zap.Error(err))
|
||||
return err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dc *dataCoordBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error {
|
||||
|
|
|
@ -260,7 +260,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
|
|||
s.Equal("dml_0", req.GetChannelName())
|
||||
}).
|
||||
Return(&datapb.DropVirtualChannelResponse{Status: merr.Status(nil)}, nil)
|
||||
err := s.broker.DropVirtualChannel(ctx, req)
|
||||
_, err := s.broker.DropVirtualChannel(ctx, req)
|
||||
s.NoError(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -268,7 +268,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
|
|||
s.Run("datacoord_return_error", func() {
|
||||
s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
|
||||
Return(nil, errors.New("mock"))
|
||||
err := s.broker.DropVirtualChannel(ctx, req)
|
||||
_, err := s.broker.DropVirtualChannel(ctx, req)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -276,7 +276,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
|
|||
s.Run("datacoord_return_failure_status", func() {
|
||||
s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
|
||||
Return(&datapb.DropVirtualChannelResponse{Status: merr.Status(errors.New("mock"))}, nil)
|
||||
err := s.broker.DropVirtualChannel(ctx, req)
|
||||
_, err := s.broker.DropVirtualChannel(ctx, req)
|
||||
s.Error(err)
|
||||
s.resetMock()
|
||||
})
|
||||
|
@ -284,7 +284,7 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
|
|||
s.Run("datacoord_return_legacy_MetaFailed", func() {
|
||||
s.dc.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
|
||||
Return(&datapb.DropVirtualChannelResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_MetaFailed}}, nil)
|
||||
err := s.broker.DropVirtualChannel(ctx, req)
|
||||
_, err := s.broker.DropVirtualChannel(ctx, req)
|
||||
s.Error(err)
|
||||
s.ErrorIs(err, merr.ErrChannelNotFound)
|
||||
s.resetMock()
|
||||
|
|
|
@ -214,17 +214,29 @@ func (_c *MockBroker_DescribeCollection_Call) RunAndReturn(run func(context.Cont
|
|||
}
|
||||
|
||||
// DropVirtualChannel provides a mock function with given fields: ctx, req
|
||||
func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error {
|
||||
func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
|
||||
ret := _m.Called(ctx, req)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) error); ok {
|
||||
var r0 *datapb.DropVirtualChannelResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)); ok {
|
||||
return rf(ctx, req)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) *datapb.DropVirtualChannelResponse); ok {
|
||||
r0 = rf(ctx, req)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*datapb.DropVirtualChannelResponse)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropVirtualChannelRequest) error); ok {
|
||||
r1 = rf(ctx, req)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockBroker_DropVirtualChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropVirtualChannel'
|
||||
|
@ -246,12 +258,12 @@ func (_c *MockBroker_DropVirtualChannel_Call) Run(run func(ctx context.Context,
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_DropVirtualChannel_Call) Return(_a0 error) *MockBroker_DropVirtualChannel_Call {
|
||||
_c.Call.Return(_a0)
|
||||
func (_c *MockBroker_DropVirtualChannel_Call) Return(_a0 *datapb.DropVirtualChannelResponse, _a1 error) *MockBroker_DropVirtualChannel_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockBroker_DropVirtualChannel_Call) RunAndReturn(run func(context.Context, *datapb.DropVirtualChannelRequest) error) *MockBroker_DropVirtualChannel_Call {
|
||||
func (_c *MockBroker_DropVirtualChannel_Call) RunAndReturn(run func(context.Context, *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)) *MockBroker_DropVirtualChannel_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
|
|
@ -198,7 +198,7 @@ func TestDataSyncService_Start(t *testing.T) {
|
|||
broker := broker.NewMockBroker(t)
|
||||
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
|
||||
node.broker = broker
|
||||
|
@ -376,7 +376,7 @@ func TestDataSyncService_Close(t *testing.T) {
|
|||
broker := broker.NewMockBroker(t)
|
||||
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
|
||||
node.broker = broker
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestWatchChannel(t *testing.T) {
|
|||
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
|
||||
node.broker = broker
|
||||
|
|
|
@ -60,7 +60,7 @@ func TestFlowGraphManager(t *testing.T) {
|
|||
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/allocator"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -819,10 +820,12 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
|
|||
req.Segments = segments
|
||||
|
||||
err := retry.Do(context.Background(), func() error {
|
||||
err := dsService.broker.DropVirtualChannel(context.Background(), req)
|
||||
if err != nil {
|
||||
resp, err := dsService.broker.DropVirtualChannel(context.Background(), req)
|
||||
if err != nil ||
|
||||
resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
// meta error, datanode handles a virtual channel does not belong here
|
||||
if errors.Is(err, merr.ErrChannelNotFound) {
|
||||
if errors.Is(err, merr.ErrChannelNotFound) ||
|
||||
resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
||||
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -663,7 +663,7 @@ func TestFlushNotifyFunc(t *testing.T) {
|
|||
Schema: meta.GetSchema(),
|
||||
}, nil).Maybe()
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
@ -748,7 +748,7 @@ func TestDropVirtualChannelFunc(t *testing.T) {
|
|||
Schema: meta.GetSchema(),
|
||||
}, nil)
|
||||
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
vchanName := "vchan_01"
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -811,7 +811,7 @@ func TestDropVirtualChannelFunc(t *testing.T) {
|
|||
t.Run("datacoord_return_error", func(t *testing.T) {
|
||||
broker.ExpectedCalls = nil
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
|
||||
Return(errors.New("mock"))
|
||||
Return(nil, errors.New("mock"))
|
||||
assert.Panics(t, func() {
|
||||
dropFunc(nil)
|
||||
})
|
||||
|
@ -822,7 +822,7 @@ func TestDropVirtualChannelFunc(t *testing.T) {
|
|||
t.Run("datacoord_return_channel_not_found", func(t *testing.T) {
|
||||
broker.ExpectedCalls = nil
|
||||
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
|
||||
Return(merr.WrapErrChannelNotFound("channel"))
|
||||
Return(nil, merr.WrapErrChannelNotFound("channel"))
|
||||
assert.NotPanics(t, func() {
|
||||
dropFunc(nil)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue