mirror of https://github.com/milvus-io/milvus.git
Use virtual channel in search result and retrieve result (#6030)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/6036/head
parent
5417fee45f
commit
0f89f40488
|
@ -524,7 +524,6 @@ func (sched *TaskScheduler) queryLoop() {
|
|||
|
||||
type resultBufHeader struct {
|
||||
usedVChans map[interface{}]struct{} // set of vChan
|
||||
usedChans map[interface{}]struct{} // set of pChan todo
|
||||
receivedVChansSet map[interface{}]struct{} // set of vChan
|
||||
receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID
|
||||
receivedGlobalSegmentIDsSet map[interface{}]struct{} // set of UniqueID
|
||||
|
@ -545,7 +544,6 @@ func newSearchResultBuf() *searchResultBuf {
|
|||
return &searchResultBuf{
|
||||
resultBufHeader: resultBufHeader{
|
||||
usedVChans: make(map[interface{}]struct{}),
|
||||
usedChans: make(map[interface{}]struct{}),
|
||||
receivedVChansSet: make(map[interface{}]struct{}),
|
||||
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
|
||||
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
|
||||
|
@ -559,7 +557,6 @@ func newQueryResultBuf() *queryResultBuf {
|
|||
return &queryResultBuf{
|
||||
resultBufHeader: resultBufHeader{
|
||||
usedVChans: make(map[interface{}]struct{}),
|
||||
usedChans: make(map[interface{}]struct{}),
|
||||
receivedVChansSet: make(map[interface{}]struct{}),
|
||||
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
|
||||
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
|
||||
|
@ -594,11 +591,6 @@ func (sr *resultBufHeader) readyToReduce() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
usedChansSetStrMap := make(map[string]int)
|
||||
for x := range sr.usedChans {
|
||||
usedChansSetStrMap[x.(string)] = 1
|
||||
}
|
||||
|
||||
receivedVChansSetStrMap := make(map[string]int)
|
||||
|
||||
for x := range sr.receivedVChansSet {
|
||||
|
@ -622,15 +614,13 @@ func (sr *resultBufHeader) readyToReduce() bool {
|
|||
}
|
||||
|
||||
ret1 := setContain(sr.receivedVChansSet, sr.usedVChans)
|
||||
ret2 := setContain(sr.receivedVChansSet, sr.usedChans)
|
||||
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("receivedVChansSet", receivedVChansSetStrMap),
|
||||
zap.Any("usedVChans", usedVChansSetStrMap),
|
||||
zap.Any("usedChans", usedChansSetStrMap),
|
||||
zap.Any("receivedSealedSegmentIDsSet", sealedSegmentIDsStrMap),
|
||||
zap.Any("receivedGlobalSegmentIDsSet", sealedGlobalSegmentIDsStrMap),
|
||||
zap.Any("ret1", ret1),
|
||||
zap.Any("ret2", ret2))
|
||||
if !ret1 && !ret2 {
|
||||
)
|
||||
if !ret1 {
|
||||
return false
|
||||
}
|
||||
ret := setContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet)
|
||||
|
@ -752,9 +742,6 @@ func (sched *TaskScheduler) collectResultLoop() {
|
|||
delete(searchResultBufs, reqID)
|
||||
continue
|
||||
}
|
||||
for _, pchan := range pchans {
|
||||
resultBuf.usedChans[pchan] = struct{}{}
|
||||
}
|
||||
searchResultBufs[reqID] = resultBuf
|
||||
}
|
||||
resultBuf.addPartialResult(&searchResultMsg.SearchResults)
|
||||
|
@ -854,9 +841,6 @@ func (sched *TaskScheduler) collectResultLoop() {
|
|||
delete(queryResultBufs, reqID)
|
||||
continue
|
||||
}
|
||||
for _, pchan := range pchans {
|
||||
resultBuf.usedChans[pchan] = struct{}{}
|
||||
}
|
||||
queryResultBufs[reqID] = resultBuf
|
||||
}
|
||||
resultBuf.addPartialResult(&queryResultMsg.RetrieveResults)
|
||||
|
|
|
@ -459,13 +459,13 @@ func (rc *retrieveCollection) retrieve(retrieveMsg *msgstream.RetrieveMsg) error
|
|||
FieldsData: result.FieldsData,
|
||||
ResultChannelID: retrieveMsg.ResultChannelID,
|
||||
SealedSegmentIDsRetrieved: sealedSegmentRetrieved,
|
||||
ChannelIDsRetrieved: collection.getPChannels(),
|
||||
ChannelIDsRetrieved: collection.getVChannels(),
|
||||
//TODO(yukun):: get global sealed segment from etcd
|
||||
GlobalSealedSegmentIDs: sealedSegmentRetrieved,
|
||||
},
|
||||
}
|
||||
log.Debug("QueryNode RetrieveResultMsg",
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("vChannels", collection.getVChannels()),
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
zap.Any("sealedSegmentRetrieved", sealedSegmentRetrieved),
|
||||
)
|
||||
|
|
|
@ -761,7 +761,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
SlicedNumCount: 1,
|
||||
MetricType: plan.getMetricType(),
|
||||
SealedSegmentIDsSearched: sealedSegmentSearched,
|
||||
ChannelIDsSearched: collection.getPChannels(),
|
||||
ChannelIDsSearched: collection.getVChannels(),
|
||||
//TODO:: get global sealed segment from etcd
|
||||
GlobalSealedSegmentIDs: sealedSegmentSearched,
|
||||
},
|
||||
|
@ -769,7 +769,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
log.Debug("QueryNode Empty SearchResultMsg",
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
zap.Any("msgID", searchMsg.ID()),
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("vChannels", collection.getVChannels()),
|
||||
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
|
||||
)
|
||||
err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID)
|
||||
|
@ -869,7 +869,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
SlicedNumCount: 1,
|
||||
MetricType: plan.getMetricType(),
|
||||
SealedSegmentIDsSearched: sealedSegmentSearched,
|
||||
ChannelIDsSearched: collection.getPChannels(),
|
||||
ChannelIDsSearched: collection.getVChannels(),
|
||||
//TODO:: get global sealed segment from etcd
|
||||
GlobalSealedSegmentIDs: sealedSegmentSearched,
|
||||
},
|
||||
|
@ -877,7 +877,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
|
|||
log.Debug("QueryNode SearchResultMsg",
|
||||
zap.Any("collectionID", collection.ID()),
|
||||
zap.Any("msgID", searchMsg.ID()),
|
||||
zap.Any("pChannels", collection.getPChannels()),
|
||||
zap.Any("vChannels", collection.getVChannels()),
|
||||
zap.Any("sealedSegmentSearched", sealedSegmentSearched),
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue