mirror of https://github.com/milvus-io/milvus.git
Fix test_watch_channel unit test unstable (#23507)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/23526/head
parent
3933080511
commit
4508786819
|
@ -249,26 +249,6 @@ func TestWatchChannel(t *testing.T) {
|
|||
|
||||
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
|
||||
path = fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch)
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
c <- struct{}{}
|
||||
cnt := 0
|
||||
for {
|
||||
evt := <-ec
|
||||
for _, event := range evt.Events {
|
||||
if strings.Contains(string(event.Kv.Key), ch) {
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
if cnt >= 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
c <- struct{}{}
|
||||
}()
|
||||
// wait for check goroutine start Watch
|
||||
<-c
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
|
@ -284,16 +264,28 @@ func TestWatchChannel(t *testing.T) {
|
|||
err = kv.Save(path, string(val))
|
||||
assert.Nil(t, err)
|
||||
|
||||
// wait for check goroutine received 2 events
|
||||
<-c
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
assert.True(t, exist)
|
||||
assert.Eventually(t, func() bool {
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
if !exist {
|
||||
return false
|
||||
}
|
||||
bs, err := kv.LoadBytes(fmt.Sprintf("%s/%d/%s", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID(), ch))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
watchInfo := &datapb.ChannelWatchInfo{}
|
||||
err = proto.Unmarshal(bs, watchInfo)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return watchInfo.GetState() == datapb.ChannelWatchState_WatchSuccess
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
exist = node.flowgraphManager.exist(ch)
|
||||
exist := node.flowgraphManager.exist(ch)
|
||||
return !exist
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue