mirror of https://github.com/milvus-io/milvus.git
Refactor querynode historical replica (#15992)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/16030/head
parent
c51c8c0a74
commit
b61e8201e7
|
@ -59,28 +59,16 @@ func (h *historical) close() {
|
|||
|
||||
// // retrieve will retrieve from the segments in historical
|
||||
func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm storage.ChunkManager,
|
||||
plan *RetrievePlan) ([]*segcorepb.RetrieveResults, []UniqueID, []UniqueID, error) {
|
||||
|
||||
retrieveResults := make([]*segcorepb.RetrieveResults, 0)
|
||||
retrieveSegmentIDs := make([]UniqueID, 0)
|
||||
plan *RetrievePlan) (retrieveResults []*segcorepb.RetrieveResults, retrieveSegmentIDs []UniqueID, retrievePartIDs []UniqueID, err error) {
|
||||
|
||||
// get historical partition ids
|
||||
var retrievePartIDs []UniqueID
|
||||
if len(partIDs) == 0 {
|
||||
hisPartIDs, err := h.replica.getPartitionIDs(collID)
|
||||
if err != nil {
|
||||
return retrieveResults, retrieveSegmentIDs, retrievePartIDs, err
|
||||
}
|
||||
retrievePartIDs = hisPartIDs
|
||||
} else {
|
||||
for _, id := range partIDs {
|
||||
_, err := h.replica.getPartitionByID(id)
|
||||
if err == nil {
|
||||
retrievePartIDs = append(retrievePartIDs, id)
|
||||
}
|
||||
}
|
||||
retrievePartIDs, err = h.getTargetPartIDs(collID, partIDs)
|
||||
if err != nil {
|
||||
return retrieveResults, retrieveSegmentIDs, retrievePartIDs, err
|
||||
}
|
||||
|
||||
log.Debug("retrieve target partitions", zap.Int64("collectionID", collID), zap.Int64s("partitionIDs", retrievePartIDs))
|
||||
|
||||
for _, partID := range retrievePartIDs {
|
||||
segIDs, err := h.replica.getSegmentIDs(partID)
|
||||
if err != nil {
|
||||
|
@ -109,36 +97,15 @@ func (h *historical) retrieve(collID UniqueID, partIDs []UniqueID, vcm storage.C
|
|||
|
||||
// search will search all the target segments in historical
|
||||
func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partIDs []UniqueID, plan *SearchPlan,
|
||||
searchTs Timestamp) ([]*SearchResult, []UniqueID, []UniqueID, error) {
|
||||
searchTs Timestamp) (searchResults []*SearchResult, searchSegmentIDs []UniqueID, searchPartIDs []UniqueID, err error) {
|
||||
|
||||
searchResults := make([]*SearchResult, 0)
|
||||
searchSegmentIDs := make([]UniqueID, 0)
|
||||
|
||||
// get historical partition ids
|
||||
var searchPartIDs []UniqueID
|
||||
if len(partIDs) == 0 {
|
||||
hisPartIDs, err := h.replica.getPartitionIDs(collID)
|
||||
if err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
log.Debug("no partition specified, search all partitions",
|
||||
zap.Any("collectionID", collID),
|
||||
zap.Any("all partitions", hisPartIDs),
|
||||
)
|
||||
searchPartIDs = hisPartIDs
|
||||
} else {
|
||||
for _, id := range partIDs {
|
||||
_, err := h.replica.getPartitionByID(id)
|
||||
if err == nil {
|
||||
log.Debug("append search partition id",
|
||||
zap.Any("collectionID", collID),
|
||||
zap.Any("partitionID", id),
|
||||
)
|
||||
searchPartIDs = append(searchPartIDs, id)
|
||||
}
|
||||
}
|
||||
searchPartIDs, err = h.getTargetPartIDs(collID, partIDs)
|
||||
if err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
|
||||
log.Debug("search target partitions", zap.Int64("collectionID", collID), zap.Int64s("partitionIDs", searchPartIDs))
|
||||
|
||||
col, err := h.replica.getCollectionByID(collID)
|
||||
if err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
|
@ -146,7 +113,7 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID
|
|||
|
||||
// all partitions have been released
|
||||
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, errors.New("partitions have been released , collectionID = " +
|
||||
return nil, nil, searchPartIDs, errors.New("partitions have been released , collectionID = " +
|
||||
fmt.Sprintln(collID) + "target partitionIDs = " + fmt.Sprintln(partIDs))
|
||||
}
|
||||
|
||||
|
@ -154,50 +121,92 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID
|
|||
return searchResults, searchSegmentIDs, searchPartIDs, nil
|
||||
}
|
||||
|
||||
var segmentLock sync.RWMutex
|
||||
var segmentIDs []UniqueID
|
||||
for _, partID := range searchPartIDs {
|
||||
segIDs, err := h.replica.getSegmentIDs(partID)
|
||||
if err != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
|
||||
var err2 error
|
||||
var wg sync.WaitGroup
|
||||
for _, segID := range segIDs {
|
||||
segID2 := segID
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
seg, err := h.replica.getSegmentByID(segID2)
|
||||
if err != nil {
|
||||
err2 = err
|
||||
return
|
||||
}
|
||||
if !seg.getOnService() {
|
||||
return
|
||||
}
|
||||
tr := timerecord.NewTimeRecorder("searchOnSealed")
|
||||
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
|
||||
if err != nil {
|
||||
err2 = err
|
||||
return
|
||||
}
|
||||
metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.SearchLabel,
|
||||
metrics.SealedSegmentLabel,
|
||||
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
segmentLock.Lock()
|
||||
searchResults = append(searchResults, searchResult)
|
||||
searchSegmentIDs = append(searchSegmentIDs, seg.segmentID)
|
||||
segmentLock.Unlock()
|
||||
}()
|
||||
|
||||
}
|
||||
wg.Wait()
|
||||
if err2 != nil {
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err2
|
||||
}
|
||||
segmentIDs = append(segmentIDs, segIDs...)
|
||||
}
|
||||
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, nil
|
||||
searchResults, searchSegmentIDs, err = h.searchSegments(segmentIDs, searchReqs, plan, searchTs)
|
||||
|
||||
return searchResults, searchSegmentIDs, searchPartIDs, err
|
||||
}
|
||||
|
||||
// getSearchPartIDs fetchs the partition ids to search from the request ids
|
||||
func (h *historical) getTargetPartIDs(collID UniqueID, partIDs []UniqueID) ([]UniqueID, error) {
|
||||
// no partition id specified, get all partition ids in collection
|
||||
if len(partIDs) == 0 {
|
||||
return h.replica.getPartitionIDs(collID)
|
||||
}
|
||||
|
||||
var targetPartIDs []UniqueID
|
||||
for _, id := range partIDs {
|
||||
_, err := h.replica.getPartitionByID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
targetPartIDs = append(targetPartIDs, id)
|
||||
}
|
||||
return targetPartIDs, nil
|
||||
}
|
||||
|
||||
// searchSegments performs search on listed segments
|
||||
// all segment ids are validated before calling this function
|
||||
func (h *historical) searchSegments(segIDs []UniqueID, searchReqs []*searchRequest, plan *SearchPlan, searchTs Timestamp) ([]*SearchResult, []UniqueID, error) {
|
||||
// pre-fetch all the segment
|
||||
// if error found, return before executing segment search
|
||||
segments := make([]*Segment, 0, len(segIDs))
|
||||
for _, segID := range segIDs {
|
||||
seg, err := h.replica.getSegmentByID(segID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
segments = append(segments, seg)
|
||||
}
|
||||
|
||||
// results variables
|
||||
var searchResults []*SearchResult
|
||||
var searchSegmentIDs []UniqueID
|
||||
var lock sync.Mutex
|
||||
var serr error
|
||||
|
||||
// calling segment search in goroutines
|
||||
var wg sync.WaitGroup
|
||||
for _, seg := range segments {
|
||||
wg.Add(1)
|
||||
go func(seg *Segment) {
|
||||
defer wg.Done()
|
||||
if !seg.getOnService() {
|
||||
log.Warn("segment no on service", zap.Int64("segmentID", seg.segmentID))
|
||||
return
|
||||
}
|
||||
// record search time
|
||||
tr := timerecord.NewTimeRecorder("searchOnSealed")
|
||||
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
|
||||
|
||||
// update metrics
|
||||
metrics.QueryNodeSQSegmentLatency.WithLabelValues(metrics.SearchLabel,
|
||||
metrics.SealedSegmentLabel,
|
||||
fmt.Sprint(Params.QueryNodeCfg.QueryNodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
// write back result into list
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if err != nil {
|
||||
serr = err
|
||||
return
|
||||
}
|
||||
searchResults = append(searchResults, searchResult)
|
||||
searchSegmentIDs = append(searchSegmentIDs, seg.segmentID)
|
||||
}(seg)
|
||||
}
|
||||
wg.Wait()
|
||||
if serr != nil {
|
||||
return nil, nil, serr
|
||||
}
|
||||
return searchResults, searchSegmentIDs, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue