Fix master branch unittest error (#7083)

Resolves: #7080

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/7087/head
XuanYang-cn 2021-08-13 14:00:14 +08:00 committed by GitHub
parent 6c5be83ea1
commit 86ca96a02e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 51 deletions

View File

@ -320,11 +320,11 @@ func (node *DataNode) Start() error {
} }
connectEtcdFn := func() error { connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil { if err != nil {
return err return err
} }
node.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) node.kvClient = etcdKV
return nil return nil
} }
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime)) err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))

View File

@ -24,7 +24,6 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap" "go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -333,59 +332,56 @@ func TestWatchChannel(t *testing.T) {
defer cancel() defer cancel()
t.Run("test watch channel", func(t *testing.T) { t.Run("test watch channel", func(t *testing.T) {
client, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
assert.Nil(t, err)
if assert.NotNil(t, client) {
kv := etcdkv.NewEtcdKV(client, Params.MetaRootPath) kv, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
ch := fmt.Sprintf("datanode-etcd-test-channel_%d", rand.Int31()) require.NoError(t, err)
path := fmt.Sprintf("channel/%d/%s", node.NodeID, ch) ch := fmt.Sprintf("datanode-etcd-test-channel_%d", rand.Int31())
c := make(chan struct{}) path := fmt.Sprintf("channel/%d/%s", node.NodeID, ch)
go func() { c := make(chan struct{})
ec := kv.WatchWithPrefix(fmt.Sprintf("channel/%d", node.NodeID)) go func() {
cnt := 0 ec := kv.WatchWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
for { cnt := 0
evt := <-ec for {
for _, event := range evt.Events { evt := <-ec
if strings.Contains(string(event.Kv.Key), ch) { for _, event := range evt.Events {
cnt++ if strings.Contains(string(event.Kv.Key), ch) {
} cnt++
}
if cnt >= 2 {
break
} }
} }
c <- struct{}{} if cnt >= 2 {
}() break
}
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
} }
info := &datapb.ChannelWatchInfo{ c <- struct{}{}
State: datapb.ChannelWatchState_Uncomplete, }()
Vchan: vchan,
}
val, err := proto.Marshal(info)
assert.Nil(t, err)
err = kv.Save(path, string(val))
assert.Nil(t, err)
<-c vchan := &datapb.VchannelInfo{
node.chanMut.RLock() CollectionID: 1,
_, has := node.vchan2SyncService[ch] ChannelName: ch,
node.chanMut.RUnlock() UnflushedSegments: []*datapb.SegmentInfo{},
assert.True(t, has)
kv.RemoveWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
node.chanMut.RLock()
_, has = node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.False(t, has)
} }
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_Uncomplete,
Vchan: vchan,
}
val, err := proto.Marshal(info)
assert.Nil(t, err)
err = kv.Save(path, string(val))
assert.Nil(t, err)
<-c
node.chanMut.RLock()
_, has := node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.True(t, has)
kv.RemoveWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
node.chanMut.RLock()
_, has = node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.False(t, has)
}) })
} }