Fix datanode ut using register bug (#10885)

Resolves: #10881
See also: #8058

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/10624/head
XuanYang-cn 2021-11-01 11:01:49 +08:00 committed by GitHub
parent ed630432f1
commit 225b2c66a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 157 deletions

View File

@ -436,46 +436,14 @@ func (node *DataNode) isHealthy() bool {
return code == internalpb.StateCode_Healthy
}
// WatchDmChannels create a new dataSyncService for every unique dmlVchannel name, ignore if dmlVchannel existed.
// WatchDmChannels is not in use
func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
metrics.DataNodeWatchDmChannelsCounter.WithLabelValues(MetricRequestsTotal).Inc()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
log.Warn("DataNode WatchDmChannels is not in use")
switch {
case !node.isHealthy():
status.Reason = msgDataNodeIsUnhealthy(node.NodeID)
return status, nil
case len(in.GetVchannels()) == 0:
status.Reason = illegalRequestErrStr
return status, nil
default:
for _, chanInfo := range in.GetVchannels() {
log.Info("DataNode new dataSyncService",
zap.Int64("collectionID", chanInfo.GetCollectionID()),
zap.String("channel name", chanInfo.ChannelName),
zap.Any("channal Info", chanInfo),
)
if err := node.NewDataSyncService(chanInfo); err != nil {
log.Warn("Failed to new data sync service",
zap.Any("channel", chanInfo),
zap.Error(err))
// return error even partial success
// TODO Goose: release partial success resources?
status.Reason = err.Error()
return status, nil
}
}
status.ErrorCode = commonpb.ErrorCode_Success
log.Debug("DataNode WatchDmChannels Done")
metrics.DataNodeWatchDmChannelsCounter.WithLabelValues(MetricRequestsSuccess).Inc()
return status, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "watchDmChannels do nothing",
}, nil
}
// GetComponentStates will return current state of DataNode

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
@ -64,110 +65,13 @@ func TestDataNode(t *testing.T) {
assert.Nil(t, err)
err = node.Start()
assert.Nil(t, err)
err = node.Register()
assert.Nil(t, err)
t.Run("Test WatchDmChannels", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("Test WatchDmChannels ", func(t *testing.T) {
emptyNode := &DataNode{}
node1 := newIDLEDataNodeMock(ctx)
err = node1.Init()
assert.Nil(t, err)
err = node1.Start()
assert.Nil(t, err)
err = node1.Register()
assert.Nil(t, err)
defer func() {
err := node1.Stop()
assert.Nil(t, err)
}()
cases := []struct {
desc string
channels []string
expect bool
failReason string
}{
{"test watch channel normally", []string{"datanode-01-test-WatchDmChannel", "datanode-02-test-WatchDmChannels"}, true, ""},
{"test send empty request", []string{}, false, illegalRequestErrStr},
}
for _, testcase := range cases {
vchannels := []*datapb.VchannelInfo{}
for _, ch := range testcase.channels {
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
}
vchannels = append(vchannels, vchan)
}
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Vchannels: vchannels,
}
resp, err := node1.WatchDmChannels(context.TODO(), req)
assert.NoError(t, err)
if testcase.expect {
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.NotNil(t, node1.vchan2FlushChs)
assert.NotNil(t, node1.vchan2SyncService)
sync, ok := node1.vchan2SyncService[testcase.channels[0]]
assert.True(t, ok)
assert.NotNil(t, sync)
assert.Equal(t, UniqueID(1), sync.collectionID)
assert.Equal(t, len(testcase.channels), len(node1.vchan2SyncService))
assert.Equal(t, len(node1.vchan2FlushChs), len(node1.vchan2SyncService))
} else {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, testcase.failReason, resp.Reason)
}
}
})
t.Run("Test WatchDmChannels fails", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := newIDLEDataNodeMock(ctx)
// before healthy
status, err := node.WatchDmChannels(ctx, &datapb.WatchDmChannelsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
err = node.Init()
assert.Nil(t, err)
err = node.Start()
assert.Nil(t, err)
err = node.Register()
assert.Nil(t, err)
defer func() {
err := node.Stop()
assert.Nil(t, err)
}()
node.msFactory = &FailMessageStreamFactory{
Factory: node.msFactory,
}
status, err = node.WatchDmChannels(ctx, &datapb.WatchDmChannelsRequest{
Vchannels: []*datapb.VchannelInfo{
{
CollectionID: collectionID0,
ChannelName: "test_channel_name",
},
},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
status, err := emptyNode.WatchDmChannels(ctx, &datapb.WatchDmChannelsRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
})
t.Run("Test SetRootCoord", func(t *testing.T) {
@ -256,15 +160,13 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test FlushSegments", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-HEALTHDataNodeMock"
dmChannelName := "fake-dm-channel-test-FlushSegments"
node1 := newIDLEDataNodeMock(context.TODO())
err = node1.Init()
assert.Nil(t, err)
err = node1.Start()
assert.Nil(t, err)
err = node1.Register()
assert.Nil(t, err)
defer func() {
err := node1.Stop()
assert.Nil(t, err)
@ -405,9 +307,12 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test getSystemInfoMetrics", func(t *testing.T) {
emptyNode := &DataNode{}
emptyNode.session = &sessionutil.Session{ServerID: 1}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.NoError(t, err)
resp, err := node.getSystemInfoMetrics(node.ctx, req)
resp, err := emptyNode.getSystemInfoMetrics(context.TODO(), req)
assert.NoError(t, err)
log.Info("Test DataNode.getSystemInfoMetrics",
zap.String("name", resp.ComponentName),
@ -415,13 +320,14 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test GetMetrics", func(t *testing.T) {
node := &DataNode{}
node.session = &sessionutil.Session{ServerID: 1}
// server is closed
stateSave := node.State.Load().(internalpb.StateCode)
node.State.Store(internalpb.StateCode_Abnormal)
resp, err := node.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
node.State.Store(stateSave)
node.State.Store(internalpb.StateCode_Healthy)
// failed to parse metric type
invalidRequest := "invalid request"
@ -451,7 +357,6 @@ func TestDataNode(t *testing.T) {
})
t.Run("Test BackGroundGC", func(te *testing.T) {
te.Skipf("issue #6574")
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
@ -510,7 +415,6 @@ func TestDataNode(t *testing.T) {
s, ok := node.vchan2SyncService[dmChannelName]
assert.False(t, ok)
assert.Nil(t, s)
})
t.Run("Test GetChannelName", func(t *testing.T) {
@ -563,7 +467,7 @@ func TestDataNode(t *testing.T) {
cancel()
<-node.ctx.Done()
err = node.Stop()
assert.Nil(t, err)
require.Nil(t, err)
}
func TestWatchChannel(t *testing.T) {
@ -641,12 +545,6 @@ func TestWatchChannel(t *testing.T) {
assert.False(t, has)
})
t.Run("watch dm channel fails", func(t *testing.T) {
s, err := node.WatchDmChannels(context.Background(), &datapb.WatchDmChannelsRequest{})
assert.Nil(t, err)
assert.Equal(t, s.ErrorCode, commonpb.ErrorCode_UnexpectedError)
})
t.Run("handle watch info failed", func(t *testing.T) {
node.handleWatchInfo("test1", []byte{23})

View File

@ -165,7 +165,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Error("type assertion failed for flowGraphMsg")
log.Warn("type assertion failed for flowGraphMsg")
return nil
}

View File

@ -145,7 +145,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Error("type assertion failed for flowGraphMsg")
log.Warn("type assertion failed for flowGraphMsg")
ibNode.Close()
return []Msg{}
}