fix master service ut (#5555)

* fix master unitttest

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* tt

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* fix master ut

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/5779/head
neza2017 2021-06-03 18:03:33 +08:00 committed by zhenshan.cao
parent 339a71aa0a
commit cb6ae3678a
2 changed files with 47 additions and 46 deletions

View File

@ -243,15 +243,12 @@ func TestMasterService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msFactory := msgstream.NewPmsFactory()
coreFactory := msgstream.NewPmsFactory()
Params.Init()
core, err := NewCore(ctx, msFactory)
core, err := NewCore(ctx, coreFactory)
assert.Nil(t, err)
randVal := rand.Int()
err = core.Register()
assert.Nil(t, err)
Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal)
Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)
@ -259,6 +256,9 @@ func TestMasterService(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
err = core.Register()
assert.Nil(t, err)
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
assert.Nil(t, err)
sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot)
@ -306,6 +306,43 @@ func TestMasterService(t *testing.T) {
err = core.SetQueryService(qm)
assert.Nil(t, err)
tmpFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = tmpFactory.SetParams(m)
assert.Nil(t, err)
dataServiceSegmentStream, _ := tmpFactory.NewMsgStream(ctx)
dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
timeTickStream, _ := tmpFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
timeTickStream.Start()
ddStream, _ := tmpFactory.NewMsgStream(ctx)
ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream.Start()
// test dataServiceSegmentStream seek
dataNodeSubName := Params.MsgChannelSubName + "dn"
flushedSegStream, _ := tmpFactory.NewMsgStream(ctx)
flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName)
flushedSegStream.Start()
msgPack := GenFlushedSegMsgPack(9999)
err = dataServiceSegmentStream.Produce(msgPack)
assert.Nil(t, err)
flushedSegMsgPack := flushedSegStream.Consume()
flushedSegStream.Close()
flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions)
_, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
@ -321,43 +358,6 @@ func TestMasterService(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx)
dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
timeTickStream, _ := msFactory.NewMsgStream(ctx)
timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
timeTickStream.Start()
ddStream, _ := msFactory.NewMsgStream(ctx)
ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream.Start()
// test dataServiceSegmentStream seek
dataNodeSubName := Params.MsgChannelSubName + "dn"
flushedSegStream, _ := msFactory.NewMsgStream(ctx)
flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName)
flushedSegStream.Start()
msgPack := GenFlushedSegMsgPack(9999)
err = dataServiceSegmentStream.Produce(msgPack)
assert.Nil(t, err)
flushedSegMsgPack := flushedSegStream.Consume()
flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions)
_, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
err = core.Start()
assert.Nil(t, err)
time.Sleep(time.Second)
getNotTtMsg := func(n int, ch <-chan *msgstream.MsgPack) []msgstream.TsMsg {
@ -1733,9 +1733,6 @@ func TestMasterService2(t *testing.T) {
assert.Nil(t, err)
randVal := rand.Int()
err = core.Register()
assert.Nil(t, err)
Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal)
Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal)
Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal)
@ -1743,6 +1740,9 @@ func TestMasterService2(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
err = core.Register()
assert.Nil(t, err)
dm := &dataMock{randVal: randVal}
err = core.SetDataService(ctx, dm)
assert.Nil(t, err)

View File

@ -58,7 +58,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
msgChannel := make(chan ConsumerMessage, 1)
msgChannel := make(chan ConsumerMessage)
pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel}
go func() {
@ -67,6 +67,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
case msg, ok := <-pConsumer.c.Chan():
if !ok {
close(msgChannel)
log.Debug("pulsar consumer channel closed")
return
}
msgChannel <- &pulsarMessage{msg: msg}