Fix channelmanager ut (#21140) (#21903)

See also: #17199

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/21900/head
XuanYang-cn 2023-02-01 16:33:52 +08:00 committed by GitHub
parent b7a97fe8b7
commit 1ba8337d42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 51 deletions

View File

@ -96,13 +96,17 @@ func TestChannelManager_StateTransfer(t *testing.T) {
prefix := Params.DataCoordCfg.ChannelWatchSubPath
var (
collectionID = UniqueID(9)
nodeID = UniqueID(119)
channel1 = "channel1"
collectionID = UniqueID(9)
nodeID = UniqueID(119)
channelNamePrefix = t.Name()
waitFor = time.Second
tick = time.Millisecond * 10
)
t.Run("ToWatch-WatchSuccess", func(t *testing.T) {
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToWatch-WatchSuccess"
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -115,21 +119,24 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, cName, collectionID)
assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
return !loaded
}, waitFor, tick)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.False(t, loaded)
})
t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) {
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToWatch-WatchFail-ToRelease"
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -142,22 +149,25 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID)
assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
return loaded
}, waitFor, tick)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
t.Run("ToWatch-Timeout", func(t *testing.T) {
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToWatch-Timeout"
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -170,30 +180,33 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
// simulating timeout behavior of startOne, cuz 20s is a long wait
e := &ackEvent{
ackType: watchTimeoutAck,
channelName: channel1,
channelName: cName,
nodeID: nodeID,
}
chManager.stateTimer.notifyTimeoutWatcher(e)
chManager.stateTimer.stopIfExist(e)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, cName, collectionID)
assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
return loaded
}, waitFor, tick)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-2-DN", func(t *testing.T) {
var oldNode = UniqueID(120)
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToRelease-ReleaseSuccess-Reassign-ToWatch-2-DN"
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -209,18 +222,18 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
}
err = chManager.Release(nodeID, channel1)
err = chManager.Release(nodeID, cName)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, cName, collectionID)
cancel()
wg.Wait()
@ -229,13 +242,14 @@ func TestChannelManager_StateTransfer(t *testing.T) {
assert.Error(t, err)
assert.Empty(t, w)
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-1-DN", func(t *testing.T) {
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToRelease-ReleaseSuccess-Reassign-ToWatch-1-DN"
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -251,31 +265,34 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: cName, CollectionID: collectionID},
}},
},
}
err = chManager.Release(nodeID, channel1)
err = chManager.Release(nodeID, cName)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID)
assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
return loaded
}, waitFor, tick)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-2-DN", func(t *testing.T) {
var oldNode = UniqueID(121)
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-2-DN"
ctx, cancel := context.WithCancel(context.TODO())
factory := dependency.NewDefaultFactory(true)
_, err := factory.NewMsgStream(context.TODO())
@ -294,18 +311,18 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: cName, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
}
err = chManager.Release(nodeID, channel1)
err = chManager.Release(nodeID, cName)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, cName, collectionID)
cancel()
wg.Wait()
@ -314,13 +331,14 @@ func TestChannelManager_StateTransfer(t *testing.T) {
assert.Error(t, err)
assert.Empty(t, w)
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-1-DN", func(t *testing.T) {
metakv.RemoveWithPrefix("")
cName := channelNamePrefix + "ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-1-DN"
ctx, cancel := context.WithCancel(context.TODO())
factory := dependency.NewDefaultFactory(true)
_, err := factory.NewMsgStream(context.TODO())
@ -339,25 +357,25 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{Name: channel1, CollectionID: collectionID},
{Name: cName, CollectionID: collectionID},
}},
},
}
err = chManager.Release(nodeID, channel1)
err = chManager.Release(nodeID, cName)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, cName, collectionID)
assert.Eventually(t, func() bool {
_, loaded := chManager.stateTimer.runningTimers.Load(cName)
return loaded
}, waitFor, tick)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
chManager.stateTimer.removeTimers([]string{cName})
})
}