Add vChannels in proxy for query results (#5802)

* Fix proxynode for new retrieve logic

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Remove querynodenum from proxynode and querynode

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

* Remove QueryNodeIDList from proxy

Signed-off-by: fishpenguin <kun.yu@zilliz.com>

Co-authored-by: zhenshan.cao <zhenshan.cao@zilliz.com>
recovery2-backup
yukun 2021-06-16 20:15:59 +08:00 committed by GitHub
parent 18eb27aa7a
commit e9a8d1c404
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 206 additions and 178 deletions

View File

@ -10,9 +10,6 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
nodeID: # will be deprecated later
queryNodeIDList: [1]
etcd:
endpoints:
- localhost:2379

View File

@ -383,8 +383,6 @@ type GlobalParamsTable struct {
MasterAddress string
PulsarAddress string
QueryNodeNum int
QueryNodeIDList []UniqueID
ProxyID UniqueID
TimeTickInterval time.Duration
InsertChannelNames []string

View File

@ -1355,6 +1355,7 @@ func (node *ProxyNode) Query(ctx context.Context, request *milvuspb.QueryRequest
queryMsgStream: node.queryMsgStream,
resultBuf: make(chan []*internalpb.RetrieveResults),
retrieve: retrieveRequest,
chMgr: node.chMgr,
}
err := node.sched.DqQueue.Enqueue(rt)

View File

@ -19,8 +19,6 @@ import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
@ -43,8 +41,6 @@ type ParamTable struct {
MasterAddress string
PulsarAddress string
QueryNodeNum int
QueryNodeIDList []UniqueID
ProxyID UniqueID
TimeTickInterval time.Duration
K2SChannelNames []string
@ -85,8 +81,6 @@ func (pt *ParamTable) initParams() {
pt.initEtcdEndpoints()
pt.initMetaRootPath()
pt.initPulsarAddress()
pt.initQueryNodeIDList()
pt.initQueryNodeNum()
pt.initTimeTickInterval()
pt.initK2SChannelNames()
pt.initProxySubName()
@ -110,28 +104,6 @@ func (pt *ParamTable) initPulsarAddress() {
pt.PulsarAddress = ret
}
func (pt *ParamTable) initQueryNodeNum() {
pt.QueryNodeNum = len(pt.QueryNodeIDList)
}
func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
queryNodeIDStr, err := pt.Load("nodeID.queryNodeIDList")
if err != nil {
panic(err)
}
var ret []UniqueID
queryNodeIDs := strings.Split(queryNodeIDStr, ",")
for _, i := range queryNodeIDs {
v, err := strconv.Atoi(i)
if err != nil {
log.Error("ProxyNode ParamsTable", zap.String("load QueryNodeID list error", err.Error()))
}
ret = append(ret, UniqueID(v))
}
pt.QueryNodeIDList = ret
return ret
}
func (pt *ParamTable) initTimeTickInterval() {
intervalStr, err := pt.Load("proxyNode.timeTickInterval")
if err != nil {

View File

@ -1466,6 +1466,7 @@ type RetrieveTask struct {
resultBuf chan []*internalpb.RetrieveResults
result *milvuspb.RetrieveResults
retrieve *milvuspb.RetrieveRequest
chMgr channelsMgr
}
func (rt *RetrieveTask) TraceCtx() context.Context {
@ -1505,6 +1506,32 @@ func (rt *RetrieveTask) OnEnqueue() error {
return nil
}
func (rt *RetrieveTask) getChannels() ([]pChan, error) {
collID, err := globalMetaCache.GetCollectionID(rt.ctx, rt.retrieve.CollectionName)
if err != nil {
return nil, err
}
return rt.chMgr.getChannels(collID)
}
func (rt *RetrieveTask) getVChannels() ([]vChan, error) {
collID, err := globalMetaCache.GetCollectionID(rt.ctx, rt.retrieve.CollectionName)
if err != nil {
return nil, err
}
_, err = rt.chMgr.getChannels(collID)
if err != nil {
err := rt.chMgr.createDMLMsgStream(collID)
if err != nil {
return nil, err
}
}
return rt.chMgr.getVChannels(collID)
}
func (rt *RetrieveTask) PreExecute(ctx context.Context) error {
rt.Base.MsgType = commonpb.MsgType_Retrieve
rt.Base.SourceID = Params.ProxyID

View File

@ -522,25 +522,50 @@ func (sched *TaskScheduler) queryLoop() {
}
}
type searchResultBuf struct {
type resultBufHeader struct {
usedVChans map[interface{}]struct{} // set of vChan
usedChans map[interface{}]struct{} // set of Chan todo
receivedVChansSet map[interface{}]struct{} // set of vChan
receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID
receivedGlobalSegmentIDsSet map[interface{}]struct{} // set of UniqueID
resultBuf []*internalpb.SearchResults
haveError bool
}
type searchResultBuf struct {
resultBufHeader
resultBuf []*internalpb.SearchResults
}
type queryResultBuf struct {
resultBufHeader
resultBuf []*internalpb.RetrieveResults
}
func newSearchResultBuf() *searchResultBuf {
return &searchResultBuf{
usedVChans: make(map[interface{}]struct{}),
usedChans: make(map[interface{}]struct{}),
receivedVChansSet: make(map[interface{}]struct{}),
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
resultBuf: make([]*internalpb.SearchResults, 0),
haveError: false,
resultBufHeader: resultBufHeader{
usedVChans: make(map[interface{}]struct{}),
usedChans: make(map[interface{}]struct{}),
receivedVChansSet: make(map[interface{}]struct{}),
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
haveError: false,
},
resultBuf: make([]*internalpb.SearchResults, 0),
}
}
func newQueryResultBuf() *queryResultBuf {
return &queryResultBuf{
resultBufHeader: resultBufHeader{
usedVChans: make(map[interface{}]struct{}),
usedChans: make(map[interface{}]struct{}),
receivedVChansSet: make(map[interface{}]struct{}),
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
haveError: false,
},
resultBuf: make([]*internalpb.RetrieveResults, 0),
}
}
@ -563,7 +588,7 @@ func setContain(m1, m2 map[interface{}]struct{}) bool {
return true
}
func (sr *searchResultBuf) readyToReduce() bool {
func (sr *resultBufHeader) readyToReduce() bool {
if sr.haveError {
log.Debug("ProxyNode searchResultBuf readyToReduce", zap.Any("haveError", true))
return true
@ -613,27 +638,42 @@ func (sr *searchResultBuf) readyToReduce() bool {
return ret
}
func (sr *resultBufHeader) addPartialResult(vchans []vChan, searchSegIDs, globalSegIDs []UniqueID) {
for _, vchan := range vchans {
sr.receivedVChansSet[vchan] = struct{}{}
}
for _, sealedSegment := range searchSegIDs {
sr.receivedSealedSegmentIDsSet[sealedSegment] = struct{}{}
}
for _, globalSegment := range globalSegIDs {
sr.receivedGlobalSegmentIDsSet[globalSegment] = struct{}{}
}
}
func (sr *searchResultBuf) addPartialResult(result *internalpb.SearchResults) {
sr.resultBuf = append(sr.resultBuf, result)
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
sr.haveError = true
return
}
for _, vchan := range result.ChannelIDsSearched {
sr.receivedVChansSet[vchan] = struct{}{}
}
for _, sealedSegment := range result.SealedSegmentIDsSearched {
sr.receivedSealedSegmentIDsSet[sealedSegment] = struct{}{}
}
for _, globalSegment := range result.GlobalSealedSegmentIDs {
sr.receivedGlobalSegmentIDsSet[globalSegment] = struct{}{}
}
sr.resultBufHeader.addPartialResult(result.ChannelIDsSearched, result.SealedSegmentIDsSearched,
result.GlobalSealedSegmentIDs)
}
func (sched *TaskScheduler) queryResultLoop() {
func (qr *queryResultBuf) addPartialResult(result *internalpb.RetrieveResults) {
qr.resultBuf = append(qr.resultBuf, result)
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
qr.haveError = true
return
}
qr.resultBufHeader.addPartialResult(result.ChannelIDsRetrieved, result.SealedSegmentIDsRetrieved,
result.GlobalSealedSegmentIDs)
}
func (sched *TaskScheduler) collectResultLoop() {
defer sched.wg.Done()
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
@ -641,20 +681,19 @@ func (sched *TaskScheduler) queryResultLoop() {
log.Debug("ProxyNode", zap.Strings("SearchResultChannelNames", Params.SearchResultChannelNames),
zap.Any("ProxySubName", Params.ProxySubName))
queryNodeNum := Params.QueryNodeNum
queryResultMsgStream.Start()
defer queryResultMsgStream.Close()
queryResultBuf := make(map[UniqueID]*searchResultBuf)
queryResultBufFlag := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
retrieveResultBuf := make(map[UniqueID][]*internalpb.RetrieveResults)
searchResultBufs := make(map[UniqueID]*searchResultBuf)
searchResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
queryResultBufs := make(map[UniqueID]*queryResultBuf)
queryResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
for {
select {
case msgPack, ok := <-queryResultMsgStream.Chan():
if !ok {
log.Debug("ProxyNode queryResultLoop exit Chan closed")
log.Debug("ProxyNode collectResultLoop exit Chan closed")
return
}
if msgPack == nil {
@ -666,110 +705,178 @@ func (sched *TaskScheduler) queryResultLoop() {
if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk {
reqID := searchResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := queryResultBufFlag[reqID]
ignoreThisResult, ok := searchResultBufFlags[reqID]
if !ok {
queryResultBufFlag[reqID] = false
searchResultBufFlags[reqID] = false
ignoreThisResult = false
}
if ignoreThisResult {
log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID))
log.Debug("ProxyNode collectResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID))
continue
}
t := sched.getTaskByReqID(reqID)
log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
log.Debug("ProxyNode collectResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
if t == nil {
log.Debug("ProxyNode queryResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(queryResultBuf, reqID)
queryResultBufFlag[reqID] = true
log.Debug("ProxyNode collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(searchResultBufs, reqID)
searchResultBufFlags[reqID] = true
continue
}
st, ok := t.(*SearchTask)
if !ok {
log.Debug("ProxyNode queryResultLoop type assert t as SearchTask failed", zap.Any("t", t))
delete(queryResultBuf, reqID)
queryResultBufFlag[reqID] = true
log.Debug("ProxyNode collectResultLoop type assert t as SearchTask failed", zap.Any("t", t))
delete(searchResultBufs, reqID)
searchResultBufFlags[reqID] = true
continue
}
resultBuf, ok := queryResultBuf[reqID]
resultBuf, ok := searchResultBufs[reqID]
if !ok {
resultBuf = newSearchResultBuf()
vchans, err := st.getVChannels()
log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
zap.Error(err))
if err != nil {
delete(queryResultBuf, reqID)
delete(searchResultBufs, reqID)
continue
}
for _, vchan := range vchans {
resultBuf.usedVChans[vchan] = struct{}{}
}
pchans, err := st.getChannels()
log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
zap.Error(err))
if err != nil {
delete(queryResultBuf, reqID)
delete(searchResultBufs, reqID)
continue
}
for _, pchan := range pchans {
resultBuf.usedChans[pchan] = struct{}{}
}
queryResultBuf[reqID] = resultBuf
searchResultBufs[reqID] = resultBuf
}
resultBuf.addPartialResult(&searchResultMsg.SearchResults)
//t := sched.getTaskByReqID(reqID)
{
colName := t.(*SearchTask).query.CollectionName
log.Debug("ProxyNode queryResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBuf[reqID].resultBuf)))
log.Debug("ProxyNode collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf)))
}
if resultBuf.readyToReduce() {
log.Debug("ProxyNode queryResultLoop readyToReduce and assign to reduce")
queryResultBufFlag[reqID] = true
log.Debug("ProxyNode collectResultLoop readyToReduce and assign to reduce")
searchResultBufFlags[reqID] = true
st.resultBuf <- resultBuf.resultBuf
delete(queryResultBuf, reqID)
delete(searchResultBufs, reqID)
}
sp.Finish()
}
if retrieveResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk {
reqID := retrieveResultMsg.Base.MsgID
if queryResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk {
//reqID := retrieveResultMsg.Base.MsgID
//reqIDStr := strconv.FormatInt(reqID, 10)
//t := sched.getTaskByReqID(reqID)
//if t == nil {
// log.Debug("proxynode", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr))
// delete(queryResultBufs, reqID)
// continue
//}
//
//_, ok = queryResultBufs[reqID]
//if !ok {
// queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0)
//}
//queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults)
//
//{
// colName := t.(*RetrieveTask).retrieve.CollectionName
// log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID])))
//}
//if len(queryResultBufs[reqID]) == queryNodeNum {
// t := sched.getTaskByReqID(reqID)
// if t != nil {
// rt, ok := t.(*RetrieveTask)
// if ok {
// rt.resultBuf <- queryResultBufs[reqID]
// delete(queryResultBufs, reqID)
// }
// } else {
// }
//}
reqID := queryResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := queryResultBufFlags[reqID]
if !ok {
queryResultBufFlags[reqID] = false
ignoreThisResult = false
}
if ignoreThisResult {
log.Debug("ProxyNode collectResultLoop Got a queryResultMsg, but we should ignore", zap.Any("ReqID", reqID))
continue
}
t := sched.getTaskByReqID(reqID)
log.Debug("ProxyNode collectResultLoop Got a queryResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
if t == nil {
log.Debug("proxynode", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr))
delete(retrieveResultBuf, reqID)
log.Debug("ProxyNode collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(queryResultBufs, reqID)
queryResultBufFlags[reqID] = true
continue
}
_, ok = retrieveResultBuf[reqID]
st, ok := t.(*RetrieveTask)
if !ok {
retrieveResultBuf[reqID] = make([]*internalpb.RetrieveResults, 0)
log.Debug("ProxyNode collectResultLoop type assert t as RetrieveTask failed", zap.Any("t", t))
delete(queryResultBufs, reqID)
queryResultBufFlags[reqID] = true
continue
}
retrieveResultBuf[reqID] = append(retrieveResultBuf[reqID], &retrieveResultMsg.RetrieveResults)
resultBuf, ok := queryResultBufs[reqID]
if !ok {
resultBuf = newQueryResultBuf()
vchans, err := st.getVChannels()
log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
zap.Error(err))
if err != nil {
delete(queryResultBufs, reqID)
continue
}
for _, vchan := range vchans {
resultBuf.usedVChans[vchan] = struct{}{}
}
pchans, err := st.getChannels()
log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
zap.Error(err))
if err != nil {
delete(queryResultBufs, reqID)
continue
}
for _, pchan := range pchans {
resultBuf.usedChans[pchan] = struct{}{}
}
queryResultBufs[reqID] = resultBuf
}
resultBuf.addPartialResult(&queryResultMsg.RetrieveResults)
//t := sched.getTaskByReqID(reqID)
{
colName := t.(*RetrieveTask).retrieve.CollectionName
log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(retrieveResultBuf[reqID])))
log.Debug("ProxyNode collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf)))
}
if len(retrieveResultBuf[reqID]) == queryNodeNum {
t := sched.getTaskByReqID(reqID)
if t != nil {
rt, ok := t.(*RetrieveTask)
if ok {
rt.resultBuf <- retrieveResultBuf[reqID]
delete(retrieveResultBuf, reqID)
}
} else {
}
if resultBuf.readyToReduce() {
log.Debug("ProxyNode collectResultLoop readyToReduce and assign to reduce")
queryResultBufFlags[reqID] = true
st.resultBuf <- resultBuf.resultBuf
delete(queryResultBufs, reqID)
}
sp.Finish()
}
}
case <-sched.ctx.Done():
log.Debug("proxynode server is closed ...")
log.Debug("ProxyNode collectResultLoop is closed ...")
return
}
}
@ -786,7 +893,7 @@ func (sched *TaskScheduler) Start() error {
go sched.queryLoop()
sched.wg.Add(1)
go sched.queryResultLoop()
go sched.collectResultLoop()
return nil
}

View File

@ -13,7 +13,6 @@ package querynode
import (
"fmt"
"os"
"path"
"strconv"
"strings"
@ -33,7 +32,6 @@ type ParamTable struct {
QueryNodeIP string
QueryNodePort int64
QueryNodeID UniqueID
QueryNodeNum int
QueryTimeTickChannelName string
FlowGraphMaxQueueLength int32
@ -82,23 +80,6 @@ func (p *ParamTable) Init() {
panic(err)
}
queryNodeIDStr := os.Getenv("QUERY_NODE_ID")
if queryNodeIDStr == "" {
queryNodeIDList := p.QueryNodeIDList()
if len(queryNodeIDList) <= 0 {
queryNodeIDStr = "0"
} else {
queryNodeIDStr = strconv.Itoa(int(queryNodeIDList[0]))
}
}
err = p.Save("_queryNodeID", queryNodeIDStr)
if err != nil {
panic(err)
}
p.initQueryNodeID()
p.initQueryNodeNum()
//p.initQueryTimeTickChannelName()
p.initMinioEndPoint()
@ -113,7 +94,6 @@ func (p *ParamTable) Init() {
p.initGracefulTime()
p.initMsgChannelSubName()
p.initSliceIndex()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
@ -130,22 +110,6 @@ func (p *ParamTable) Init() {
}
// ---------------------------------------------------------- query node
func (p *ParamTable) initQueryNodeID() {
queryNodeID, err := p.Load("_queryNodeID")
if err != nil {
panic(err)
}
id, err := strconv.Atoi(queryNodeID)
if err != nil {
panic(err)
}
p.QueryNodeID = UniqueID(id)
}
func (p *ParamTable) initQueryNodeNum() {
p.QueryNodeNum = len(p.QueryNodeIDList())
}
func (p *ParamTable) initQueryTimeTickChannelName() {
ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
@ -265,11 +229,7 @@ func (p *ParamTable) initMsgChannelSubName() {
if err != nil {
log.Error(err.Error())
}
queryNodeIDStr, err := p.Load("_QueryNodeID")
if err != nil {
panic(err)
}
p.MsgChannelSubName = name + "-" + queryNodeIDStr
p.MsgChannelSubName = name
}
func (p *ParamTable) initStatsChannelName() {
@ -280,18 +240,6 @@ func (p *ParamTable) initStatsChannelName() {
p.StatsChannelName = channels
}
func (p *ParamTable) initSliceIndex() {
queryNodeID := p.QueryNodeID
queryNodeIDList := p.QueryNodeIDList()
for i := 0; i < len(queryNodeIDList); i++ {
if queryNodeID == queryNodeIDList[i] {
p.SliceIndex = i
return
}
}
p.SliceIndex = -1
}
func (p *ParamTable) initLogCfg() {
p.Log = log.Config{}
format, err := p.Load("log.format")

View File

@ -27,11 +27,6 @@ func TestParamTable_PulsarAddress(t *testing.T) {
}
func TestParamTable_QueryNode(t *testing.T) {
t.Run("Test id", func(t *testing.T) {
id := Params.QueryNodeID
assert.Contains(t, Params.QueryNodeIDList(), id)
})
t.Run("Test time tick channel", func(t *testing.T) {
ch := Params.QueryTimeTickChannelName
assert.Equal(t, ch, "queryTimeTick")
@ -93,7 +88,7 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
func TestParamTable_msgChannelSubName(t *testing.T) {
name := Params.MsgChannelSubName
expectName := fmt.Sprintf("queryNode-%d", Params.QueryNodeID)
expectName := "queryNode"
assert.Equal(t, expectName, name)
}

View File

@ -303,23 +303,6 @@ func (gp *BaseTable) ParseInt(key string) int {
return value
}
func (gp *BaseTable) QueryNodeIDList() []UniqueID {
queryNodeIDStr, err := gp.Load("nodeID.queryNodeIDList")
if err != nil {
panic(err)
}
var ret []UniqueID
queryNodeIDs := strings.Split(queryNodeIDStr, ",")
for _, i := range queryNodeIDs {
v, err := strconv.Atoi(i)
if err != nil {
log.Panicf("load proxy id list error, %s", err.Error())
}
ret = append(ret, UniqueID(v))
}
return ret
}
// package methods
func ConvertRangeToIntRange(rangeStr, sep string) []int {