Check data node test error (#9532)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/9562/head
godchen 2021-10-09 15:45:15 +08:00 committed by GitHub
parent c2a79e765c
commit 5e7648fc7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 23 deletions

View File

@ -54,18 +54,24 @@ func TestMain(t *testing.M) {
func TestDataNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
node.Init()
node.Start()
node.Register()
err := node.Init()
assert.Nil(t, err)
err = node.Start()
assert.Nil(t, err)
err = node.Register()
assert.Nil(t, err)
t.Run("Test WatchDmChannels", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node1 := newIDLEDataNodeMock(ctx)
node1.Init()
node1.Start()
node1.Register()
err = node1.Init()
assert.Nil(t, err)
err = node1.Start()
assert.Nil(t, err)
err = node1.Register()
assert.Nil(t, err)
defer func() {
err := node1.Stop()
assert.Nil(t, err)
@ -131,9 +137,12 @@ func TestDataNode(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
node.Init()
node.Start()
node.Register()
err = node.Init()
assert.Nil(t, err)
err = node.Start()
assert.Nil(t, err)
err = node.Register()
assert.Nil(t, err)
defer func() {
err := node.Stop()
assert.Nil(t, err)
@ -165,7 +174,8 @@ func TestDataNode(t *testing.T) {
t.Skip()
ctx, cancel := context.WithCancel(context.Background())
node2 := newIDLEDataNodeMock(ctx)
node2.Start()
err = node2.Start()
assert.Nil(t, err)
dmChannelName := "fake-dm-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{
@ -189,16 +199,20 @@ func TestDataNode(t *testing.T) {
cancel()
<-node2.ctx.Done()
node2.Stop()
err = node2.Stop()
assert.Nil(t, err)
})
t.Run("Test FlushSegments", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-HEALTHDataNodeMock"
node1 := newIDLEDataNodeMock(context.TODO())
node1.Init()
node1.Start()
node1.Register()
err = node1.Init()
assert.Nil(t, err)
err = node1.Start()
assert.Nil(t, err)
err = node1.Register()
assert.Nil(t, err)
defer func() {
err := node1.Stop()
assert.Nil(t, err)
@ -215,7 +229,8 @@ func TestDataNode(t *testing.T) {
service, ok := node1.vchan2SyncService[dmChannelName]
assert.True(t, ok)
service.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
err = service.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
assert.Nil(t, err)
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
@ -358,7 +373,8 @@ func TestDataNode(t *testing.T) {
for i, t := range testDataSyncs {
if i <= 2 {
node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: t.collID, ChannelName: t.dmChannelName})
err = node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: t.collID, ChannelName: t.dmChannelName})
assert.Nil(te, err)
}
collIDCh <- t.collID
@ -413,7 +429,8 @@ func TestDataNode(t *testing.T) {
newSegments: make(map[UniqueID]*Segment),
}
replica.addNewSegment(testSegIDs[i], testCollIDs[i], 0, name, &internalpb.MsgPosition{}, nil)
err = replica.addNewSegment(testSegIDs[i], testCollIDs[i], 0, name, &internalpb.MsgPosition{}, nil)
assert.Nil(t, err)
node.vchan2SyncService[name] = &dataSyncService{collectionID: testCollIDs[i], replica: replica}
}
node.chanMut.Unlock()
@ -446,15 +463,19 @@ func TestDataNode(t *testing.T) {
cancel()
<-node.ctx.Done()
node.Stop()
err = node.Stop()
assert.Nil(t, err)
}
func TestWatchChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
node.Init()
node.Start()
node.Register()
err := node.Init()
assert.Nil(t, err)
err = node.Start()
assert.Nil(t, err)
err = node.Register()
assert.Nil(t, err)
defer cancel()
@ -506,7 +527,8 @@ func TestWatchChannel(t *testing.T) {
node.chanMut.RUnlock()
assert.True(t, has)
kv.RemoveWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
err = kv.RemoveWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
assert.Nil(t, err)
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
@ -517,6 +539,8 @@ func TestWatchChannel(t *testing.T) {
})
t.Run("watch dm channel fails", func(t *testing.T) {
node.WatchDmChannels(context.Background(), &datapb.WatchDmChannelsRequest{})
s, err := node.WatchDmChannels(context.Background(), &datapb.WatchDmChannelsRequest{})
assert.Nil(t, err)
assert.Equal(t, s.ErrorCode, commonpb.ErrorCode_UnexpectedError)
})
}