Use multiple search channels (#5018)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/5024/head
bigsheeper 2021-04-24 10:44:28 +08:00 committed by GitHub
parent 4495cf84f0
commit 2b5bedf736
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 17 deletions

View File

@ -46,8 +46,8 @@ type ParamTable struct {
MinioBucketName string
// search
SearchChannelName string
SearchResultChannelName string
SearchChannelNames []string
SearchResultChannelNames []string
SearchReceiveBufSize int64
SearchPulsarBufSize int64
SearchResultReceiveBufSize int64

View File

@ -141,9 +141,9 @@ func (node *QueryNode) Init() error {
case "TimeTickChannelName":
Params.QueryTimeTickChannelName = kv.Value
case "SearchChannelName":
Params.SearchChannelName = kv.Value
Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
case "SearchResultChannelName":
Params.SearchResultChannelName = kv.Value
Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
default:
return fmt.Errorf("Invalid key: %v", kv.Key)
}

View File

@ -40,13 +40,13 @@ func newSearchService(ctx context.Context, replica ReplicaInterface, factory msg
searchStream, _ := factory.NewQueryMsgStream(ctx)
searchResultStream, _ := factory.NewQueryMsgStream(ctx)
if Params.SearchChannelName != "" && Params.SearchResultChannelName != "" {
if len(Params.SearchChannelNames) > 0 && len(Params.SearchResultChannelNames) > 0 {
// query node need to consumer search channels and produce search result channels when init.
consumeChannels := []string{Params.SearchChannelName}
consumeChannels := Params.SearchChannelNames
consumeSubName := Params.MsgChannelSubName
searchStream.AsConsumer(consumeChannels, consumeSubName)
log.Debug("query node AsConsumer", zap.Any("searchChannels", consumeChannels), zap.Any("consumeSubName", consumeSubName))
producerChannels := []string{Params.SearchResultChannelName}
producerChannels := Params.SearchResultChannelNames
searchResultStream.AsProducer(producerChannels)
log.Debug("query node AsProducer", zap.Any("searchResultChannels", producerChannels))
}

View File

@ -402,8 +402,8 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele
func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) {
channelID := len(qs.queryChannels)
searchPrefix := Params.SearchChannelName
searchResultPrefix := Params.SearchResultChannelName
searchPrefix := Params.SearchChannelPrefix
searchResultPrefix := Params.SearchResultChannelPrefix
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

View File

@ -42,8 +42,8 @@ type ParamTable struct {
RoleName string
// search
SearchChannelName string
SearchResultChannelName string
SearchChannelPrefix string
SearchResultChannelPrefix string
}
var Params ParamTable
@ -74,8 +74,8 @@ func (p *ParamTable) Init() {
p.initTimeTickChannelName()
p.initQueryServiceAddress()
p.initRoleName()
p.initSearchChannelName()
p.initSearchResultChannelName()
p.initSearchChannelPrefix()
p.initSearchResultChannelPrefix()
})
}
@ -147,20 +147,20 @@ func (p *ParamTable) initRoleName() {
p.RoleName = fmt.Sprintf("%s-%d", "QueryService", p.NodeID)
}
func (p *ParamTable) initSearchChannelName() {
func (p *ParamTable) initSearchChannelPrefix() {
channelName, err := p.Load("msgChannel.chanNamePrefix.search")
if err != nil {
log.Error(err.Error())
}
p.SearchChannelName = channelName
p.SearchChannelPrefix = channelName
}
func (p *ParamTable) initSearchResultChannelName() {
func (p *ParamTable) initSearchResultChannelPrefix() {
channelName, err := p.Load("msgChannel.chanNamePrefix.searchResult")
if err != nil {
log.Error(err.Error())
}
p.SearchResultChannelName = channelName
p.SearchResultChannelPrefix = channelName
}