Add unit test for mq_msgstream.go (#8548)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/8585/head
Xiangyu Wang 2021-09-26 13:09:56 +08:00 committed by GitHub
parent ca262b5016
commit cd980dd13b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 0 deletions

View File

@ -42,6 +42,51 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}
func Test_NewMqMsgStream(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
factory := &ProtoUDFactory{}
pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
assert.Nil(t, err)
rocksdbName := "/tmp/rocksmq_unittest_" + t.Name()
endpoints := os.Getenv("ETCD_ENDPOINTS")
if endpoints == "" {
endpoints = "localhost:2379"
}
etcdEndpoints := strings.Split(endpoints, ",")
etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, "/etcd/test/root")
if err != nil {
log.Fatalf("New clientv3 error = %v", err)
}
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
err = rocksmq.InitRmq(rocksdbName, idAllocator)
if err != nil {
log.Fatalf("InitRmq error = %v", err)
}
rmqClient, _ := mqclient.NewRmqClient(client.ClientOptions{Server: rocksmq.Rmq})
parameters := []struct {
client mqclient.Client
}{
{pulsarClient}, {rmqClient},
}
for i := range parameters {
func(client mqclient.Client) {
_, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher())
assert.Nil(t, err)
}(parameters[i].client)
}
rocksmq.CloseRocksMQ()
etcdKV.Close()
_ = os.RemoveAll(rocksdbName)
_ = os.RemoveAll(rocksdbName + "_meta_kv")
}
/* ========================== Pulsar & RocksMQ Tests ========================== */
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)