mirror of https://github.com/milvus-io/milvus.git
Allow DataNode clean vchannel with meta error without panicking (#17247)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/17255/head
parent
ad9276a440
commit
55f1e03ca7
|
@ -853,6 +853,27 @@ func TestSaveBinlogPaths(t *testing.T) {
|
|||
assert.EqualValues(t, segmentInfo.NumOfRows, 10)
|
||||
})
|
||||
|
||||
t.Run("with channel not matched", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
err := svr.channelManager.AddNode(0)
|
||||
require.Nil(t, err)
|
||||
err = svr.channelManager.Watch(&channel{"ch1", 0})
|
||||
require.Nil(t, err)
|
||||
s := &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
InsertChannel: "ch2",
|
||||
State: commonpb.SegmentState_Growing,
|
||||
}
|
||||
svr.meta.AddSegment(NewSegmentInfo(s))
|
||||
|
||||
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{
|
||||
SegmentID: 1,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("with closed server", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
closeTestServer(t, svr)
|
||||
|
@ -1038,7 +1059,7 @@ func TestDropVirtualChannel(t *testing.T) {
|
|||
ChannelName: "ch2",
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
|
||||
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("with closed server", func(t *testing.T) {
|
||||
|
|
|
@ -335,6 +335,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
|||
channel := segment.GetInsertChannel()
|
||||
if !s.channelManager.Match(nodeID, channel) {
|
||||
FailResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
|
||||
resp.ErrorCode = commonpb.ErrorCode_MetaFailed
|
||||
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -412,6 +413,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
|
|||
nodeID := req.GetBase().GetSourceID()
|
||||
if !s.channelManager.Match(nodeID, channel) {
|
||||
FailResponse(resp.Status, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_MetaFailed
|
||||
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -686,7 +686,13 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
|
|||
return fmt.Errorf(err.Error())
|
||||
}
|
||||
|
||||
// TODO should retry only when datacoord status is unhealthy
|
||||
// meta error, datanode handles a virtual channel does not belong here
|
||||
if rsp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
||||
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// retry for other error
|
||||
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason())
|
||||
}
|
||||
|
@ -772,7 +778,12 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
|||
return fmt.Errorf(err.Error())
|
||||
}
|
||||
|
||||
// TODO should retry only when datacoord status is unhealthy
|
||||
// meta error, datanode handles a virtual channel does not belong here
|
||||
if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
||||
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName))
|
||||
return nil
|
||||
}
|
||||
|
||||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
|
@ -555,12 +556,21 @@ func TestFlushNotifyFunc(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("datacoord Save fails", func(t *testing.T) {
|
||||
dataCoord.SaveBinlogPathNotSuccess = true
|
||||
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_UnexpectedError
|
||||
assert.Panics(t, func() {
|
||||
notifyFunc(&segmentFlushPack{})
|
||||
})
|
||||
})
|
||||
|
||||
// issue https://github.com/milvus-io/milvus/issues/17097
|
||||
// meta error, datanode shall not panic, just drop the virtual channel
|
||||
t.Run("datacoord found meta error", func(t *testing.T) {
|
||||
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_MetaFailed
|
||||
assert.NotPanics(t, func() {
|
||||
notifyFunc(&segmentFlushPack{})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("datacoord call error", func(t *testing.T) {
|
||||
dataCoord.SaveBinlogPathError = true
|
||||
assert.Panics(t, func() {
|
||||
|
@ -623,7 +633,7 @@ func TestDropVirtualChannelFunc(t *testing.T) {
|
|||
})
|
||||
})
|
||||
t.Run("datacoord drop fails", func(t *testing.T) {
|
||||
dataCoord.DropVirtualChannelNotSuccess = true
|
||||
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError
|
||||
assert.Panics(t, func() {
|
||||
dropFunc(nil)
|
||||
})
|
||||
|
@ -631,11 +641,21 @@ func TestDropVirtualChannelFunc(t *testing.T) {
|
|||
|
||||
t.Run("datacoord call error", func(t *testing.T) {
|
||||
|
||||
dataCoord.DropVirtualChannelNotSuccess = false
|
||||
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError
|
||||
dataCoord.DropVirtualChannelError = true
|
||||
assert.Panics(t, func() {
|
||||
dropFunc(nil)
|
||||
})
|
||||
})
|
||||
|
||||
// issue https://github.com/milvus-io/milvus/issues/17097
|
||||
// meta error, datanode shall not panic, just drop the virtual channel
|
||||
t.Run("datacoord found meta error", func(t *testing.T) {
|
||||
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_MetaFailed
|
||||
dataCoord.DropVirtualChannelError = false
|
||||
assert.NotPanics(t, func() {
|
||||
dropFunc(nil)
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
|
|
@ -164,14 +164,14 @@ type RootCoordFactory struct {
|
|||
type DataCoordFactory struct {
|
||||
types.DataCoord
|
||||
|
||||
SaveBinlogPathError bool
|
||||
SaveBinlogPathNotSuccess bool
|
||||
SaveBinlogPathError bool
|
||||
SaveBinlogPathStatus commonpb.ErrorCode
|
||||
|
||||
CompleteCompactionError bool
|
||||
CompleteCompactionNotSuccess bool
|
||||
|
||||
DropVirtualChannelError bool
|
||||
DropVirtualChannelNotSuccess bool
|
||||
DropVirtualChannelError bool
|
||||
DropVirtualChannelStatus commonpb.ErrorCode
|
||||
}
|
||||
|
||||
func (ds *DataCoordFactory) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
|
||||
|
@ -202,27 +202,16 @@ func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.Sav
|
|||
if ds.SaveBinlogPathError {
|
||||
return nil, errors.New("Error")
|
||||
}
|
||||
if ds.SaveBinlogPathNotSuccess {
|
||||
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
|
||||
}
|
||||
|
||||
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
|
||||
return &commonpb.Status{ErrorCode: ds.SaveBinlogPathStatus}, nil
|
||||
}
|
||||
|
||||
func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
|
||||
if ds.DropVirtualChannelError {
|
||||
return nil, errors.New("error")
|
||||
}
|
||||
if ds.DropVirtualChannelNotSuccess {
|
||||
return &datapb.DropVirtualChannelResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
return &datapb.DropVirtualChannelResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
ErrorCode: ds.DropVirtualChannelStatus,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue