mirror of https://github.com/milvus-io/milvus.git
Refine log of ready to reduce (#13448)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/13448/merge
parent
bdb001b20d
commit
f6e23458aa
|
@ -24,6 +24,8 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -523,6 +525,7 @@ func (sched *taskScheduler) queryLoop() {
|
|||
}
|
||||
|
||||
type resultBufHeader struct {
|
||||
msgID UniqueID
|
||||
usedVChans map[interface{}]struct{} // set of vChan
|
||||
receivedVChansSet map[interface{}]struct{} // set of vChan
|
||||
receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID
|
||||
|
@ -540,7 +543,7 @@ type queryResultBuf struct {
|
|||
resultBuf []*internalpb.RetrieveResults
|
||||
}
|
||||
|
||||
func newSearchResultBuf() *searchResultBuf {
|
||||
func newSearchResultBuf(msgID UniqueID) *searchResultBuf {
|
||||
return &searchResultBuf{
|
||||
resultBufHeader: resultBufHeader{
|
||||
usedVChans: make(map[interface{}]struct{}),
|
||||
|
@ -553,7 +556,7 @@ func newSearchResultBuf() *searchResultBuf {
|
|||
}
|
||||
}
|
||||
|
||||
func newQueryResultBuf() *queryResultBuf {
|
||||
func newQueryResultBuf(msgID UniqueID) *queryResultBuf {
|
||||
return &queryResultBuf{
|
||||
resultBufHeader: resultBufHeader{
|
||||
usedVChans: make(map[interface{}]struct{}),
|
||||
|
@ -572,41 +575,20 @@ func (sr *resultBufHeader) readyToReduce() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
receivedVChansSetStrMap := make(map[string]int)
|
||||
|
||||
for x := range sr.receivedVChansSet {
|
||||
receivedVChansSetStrMap[x.(string)] = 1
|
||||
}
|
||||
|
||||
usedVChansSetStrMap := make(map[string]int)
|
||||
for x := range sr.usedVChans {
|
||||
usedVChansSetStrMap[x.(string)] = 1
|
||||
}
|
||||
|
||||
sealedSegmentIDsStrMap := make(map[int64]int)
|
||||
|
||||
for x := range sr.receivedSealedSegmentIDsSet {
|
||||
sealedSegmentIDsStrMap[x.(int64)] = 1
|
||||
}
|
||||
|
||||
sealedGlobalSegmentIDsStrMap := make(map[int64]int)
|
||||
for x := range sr.receivedGlobalSegmentIDsSet {
|
||||
sealedGlobalSegmentIDsStrMap[x.(int64)] = 1
|
||||
}
|
||||
log.Debug("check if result buf is ready to reduce",
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.Int64("MsgID", sr.msgID),
|
||||
zap.Any("receivedVChansSet", funcutil.SetToSlice(sr.receivedVChansSet)),
|
||||
zap.Any("usedVChans", funcutil.SetToSlice(sr.usedVChans)),
|
||||
zap.Any("receivedSealedSegmentIDsSet", funcutil.SetToSlice(sr.receivedSealedSegmentIDsSet)),
|
||||
zap.Any("receivedGlobalSegmentIDsSet", funcutil.SetToSlice(sr.receivedGlobalSegmentIDsSet)))
|
||||
|
||||
ret1 := funcutil.SetContain(sr.receivedVChansSet, sr.usedVChans)
|
||||
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("receivedVChansSet", receivedVChansSetStrMap),
|
||||
zap.Any("usedVChans", usedVChansSetStrMap),
|
||||
zap.Any("receivedSealedSegmentIDsSet", sealedSegmentIDsStrMap),
|
||||
zap.Any("receivedGlobalSegmentIDsSet", sealedGlobalSegmentIDsStrMap),
|
||||
zap.Any("ret1", ret1),
|
||||
)
|
||||
if !ret1 {
|
||||
return false
|
||||
}
|
||||
ret := funcutil.SetContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet)
|
||||
log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("ret", ret))
|
||||
return ret
|
||||
|
||||
return funcutil.SetContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet)
|
||||
}
|
||||
|
||||
func (sr *resultBufHeader) addPartialResult(vchans []vChan, searchSegIDs, globalSegIDs []UniqueID) {
|
||||
|
@ -706,7 +688,7 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
|
||||
resultBuf, ok := searchResultBufs[reqID]
|
||||
if !ok {
|
||||
resultBuf = newSearchResultBuf()
|
||||
resultBuf = newSearchResultBuf(reqID)
|
||||
vchans, err := st.getVChannels()
|
||||
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
|
||||
zap.Error(err))
|
||||
|
@ -805,7 +787,7 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
|
||||
resultBuf, ok := queryResultBufs[reqID]
|
||||
if !ok {
|
||||
resultBuf = newQueryResultBuf()
|
||||
resultBuf = newQueryResultBuf(reqID)
|
||||
vchans, err := st.getVChannels()
|
||||
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
|
||||
zap.Error(err))
|
||||
|
|
|
@ -26,3 +26,14 @@ func SetContain(m1, m2 map[interface{}]struct{}) bool {
|
|||
|
||||
return true
|
||||
}
|
||||
|
||||
// SetToSlice transform the set to a slice.
|
||||
func SetToSlice(m map[interface{}]struct{}) []interface{} {
|
||||
ret := make([]interface{}, 0, len(m))
|
||||
|
||||
for k := range m {
|
||||
ret = append(ret, k)
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
|
|
@ -38,3 +38,21 @@ func Test_SetContain(t *testing.T) {
|
|||
m1[key2] = struct{}{}
|
||||
assert.True(t, SetContain(m1, m2))
|
||||
}
|
||||
|
||||
func TestSetToSlice(t *testing.T) {
|
||||
m := map[interface{}]struct{}{
|
||||
1: {},
|
||||
2.1: {},
|
||||
"string": {},
|
||||
}
|
||||
|
||||
histogram := make(map[interface{}]struct{})
|
||||
s := SetToSlice(m)
|
||||
for _, k := range s {
|
||||
_, exist := m[k]
|
||||
assert.True(t, exist)
|
||||
_, twice := histogram[k]
|
||||
assert.False(t, twice)
|
||||
histogram[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue