Support master get segment info channel name from config file (#5606)

* optimize msgstream unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* get data service segment info channel name from config file directly

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5604/head^2
Cai Yudong 2021-06-04 16:28:34 +08:00 committed by GitHub
parent 4aefdaeabe
commit 2ba93deb5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 22 deletions

View File

@ -108,9 +108,6 @@ func (s *Server) setClient() {
if err := dsClient.Start(); err != nil {
panic(err)
}
if err := funcutil.WaitForComponentInitOrHealthy(ctx, dsClient, "DataService", 1000000, 200*time.Millisecond); err != nil {
panic(err)
}
return dsClient
}
s.newIndexServiceClient = func(s, metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService {

View File

@ -709,10 +709,6 @@ func (c *Core) setMsgStreams() error {
}
}()
if Params.DataServiceSegmentChannel == "" {
return fmt.Errorf("DataServiceSegmentChannel is empty")
}
// data service will put msg into this channel when create segment
dsChanName := Params.DataServiceSegmentChannel
dsSubName := Params.MsgChannelSubName + "ds"
@ -755,13 +751,6 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyN
}
func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
rsp, err := s.GetSegmentInfoChannel(ctx)
if err != nil {
return err
}
Params.DataServiceSegmentChannel = rsp.Value
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) {
defer func() {
if err := recover(); err != nil {

View File

@ -38,7 +38,7 @@ type ParamTable struct {
TimeTickChannel string
DdChannel string
StatisticsChannel string
DataServiceSegmentChannel string // get from data service, data service create segment, or data node flush segment
DataServiceSegmentChannel string // data service create segment, or data node flush segment
MaxPartitionNum int64
DefaultPartitionName string
@ -71,6 +71,7 @@ func (p *ParamTable) Init() {
p.initTimeTickChannel()
p.initDdChannelName()
p.initStatisticsChannelName()
p.initSegmentInfoChannelName()
p.initMaxPartitionNum()
p.initMinSegmentSizeToEnableIndex()
@ -157,6 +158,14 @@ func (p *ParamTable) initStatisticsChannelName() {
p.StatisticsChannel = channel
}
func (p *ParamTable) initSegmentInfoChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
if err != nil {
panic(err)
}
p.DataServiceSegmentChannel = channel
}
func (p *ParamTable) initMaxPartitionNum() {
p.MaxPartitionNum = p.ParseInt64("master.maxPartitionNum")
}

View File

@ -191,7 +191,7 @@ func getTimeTickMsg(reqID UniqueID) TsMsg {
}
// Generate MsgPack contains 'num' msgs, with timestamp in (start, end)
func getInsertMsgPack(num int, start int, end int) *MsgPack {
func getRandInsertMsgPack(num int, start int, end int) *MsgPack {
Rand := rand.New(rand.NewSource(time.Now().UnixNano()))
set := make(map[int]bool)
msgPack := MsgPack{}
@ -206,6 +206,14 @@ func getInsertMsgPack(num int, start int, end int) *MsgPack {
return &msgPack
}
func getInsertMsgPack(ts []int) *MsgPack {
msgPack := MsgPack{}
for i := 0; i < len(ts); i++ {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(ts[i])))
}
return &msgPack
}
func getTimeTickMsgPack(reqID UniqueID) *MsgPack {
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID))
@ -712,13 +720,29 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
outputStream.Close()
}
func createMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack {
func createRandMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack {
msgPacks := make([]*MsgPack, numOfMsgPack)
// generate MsgPack
for i := 0; i < numOfMsgPack; i++ {
if i%2 == 0 {
msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*deltaTs, (i/2+2)*deltaTs+2)
msgPacks[i] = getRandInsertMsgPack(msgsInPack, i/2*deltaTs, (i/2+2)*deltaTs+2)
} else {
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * deltaTs))
}
}
msgPacks = append(msgPacks, nil)
msgPacks = append(msgPacks, getTimeTickMsgPack(int64(numOfMsgPack*deltaTs)))
return msgPacks
}
func createMsgPacks(ts [][]int, numOfMsgPack int, deltaTs int) []*MsgPack {
msgPacks := make([]*MsgPack, numOfMsgPack)
// generate MsgPack
for i := 0; i < numOfMsgPack; i++ {
if i%2 == 0 {
msgPacks[i] = getInsertMsgPack(ts[i/2])
} else {
msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * deltaTs))
}
@ -774,11 +798,11 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) {
consumerSubName := funcutil.RandomString(8)
inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels)
msgPacks1 := createMsgPacks(3, 10, 10)
msgPacks1 := createRandMsgPacks(3, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels)
msgPacks2 := createMsgPacks(5, 10, 10)
msgPacks2 := createRandMsgPacks(5, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2))
// consume msg
@ -836,11 +860,11 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) {
consumerSubName := funcutil.RandomString(8)
inputStream1 := getPulsarInputStream(pulsarAddr, p1Channels)
msgPacks1 := createMsgPacks(3, 10, 10)
msgPacks1 := createRandMsgPacks(3, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
inputStream2 := getPulsarInputStream(pulsarAddr, p2Channels)
msgPacks2 := createMsgPacks(5, 10, 10)
msgPacks2 := createRandMsgPacks(5, 10, 10)
assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2))
// consume msg