Parallel execution between segments (#12199)

Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
pull/12300/head
cai.zhang 2021-11-27 20:05:16 +08:00 committed by GitHub
parent cbbeb2a383
commit 2c484c7c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 42 deletions

View File

@ -276,20 +276,39 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID
if err != nil {
return searchResults, searchSegmentIDs, err
}
var err2 error
var segmentLock sync.RWMutex
var wg sync.WaitGroup
for _, segID := range segIDs {
seg, err := h.replica.getSegmentByID(segID)
if err != nil {
return searchResults, searchSegmentIDs, err
}
if !seg.getOnService() {
continue
}
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
return searchResults, searchSegmentIDs, err
}
searchResults = append(searchResults, searchResult)
searchSegmentIDs = append(searchSegmentIDs, seg.segmentID)
segID2 := segID
wg.Add(1)
go func() {
defer wg.Done()
seg, err := h.replica.getSegmentByID(segID2)
if err != nil {
err2 = err
return
}
if !seg.getOnService() {
return
}
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
err2 = err
return
}
segmentLock.Lock()
searchResults = append(searchResults, searchResult)
searchSegmentIDs = append(searchSegmentIDs, seg.segmentID)
segmentLock.Unlock()
}()
}
wg.Wait()
if err2 != nil {
return searchResults, searchSegmentIDs, err2
}
}

View File

@ -15,6 +15,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
@ -174,38 +175,53 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs
log.Warn(err.Error())
return searchResults, err
}
var err2 error
var wg sync.WaitGroup
for _, segID := range segIDs {
seg, err := s.replica.getSegmentByID(segID)
if err != nil {
log.Warn(err.Error())
return searchResults, err
}
segID2 := segID
wg.Add(1)
go func() {
defer wg.Done()
seg, err := s.replica.getSegmentByID(segID2)
if err != nil {
log.Warn(err.Error())
err2 = err
return
}
// TSafe less than searchTs means this vChannel is not available
//ts := s.tSafeReplica.getTSafe(seg.vChannelID)
//gracefulTimeInMilliSecond := Params.GracefulTime
//if gracefulTimeInMilliSecond > 0 {
// gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
// ts += gracefulTime
//}
//tsp, _ := tsoutil.ParseTS(ts)
//stp, _ := tsoutil.ParseTS(searchTs)
//log.Debug("timestamp check in streaming search",
// zap.Any("collectionID", collID),
// zap.Any("serviceTime_l", ts),
// zap.Any("searchTime_l", searchTs),
// zap.Any("serviceTime_p", tsp),
// zap.Any("searchTime_p", stp),
//)
//if ts < searchTs {
// continue
//}
// TSafe less than searchTs means this vChannel is not available
//ts := s.tSafeReplica.getTSafe(seg.vChannelID)
//gracefulTimeInMilliSecond := Params.GracefulTime
//if gracefulTimeInMilliSecond > 0 {
// gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
// ts += gracefulTime
//}
//tsp, _ := tsoutil.ParseTS(ts)
//stp, _ := tsoutil.ParseTS(searchTs)
//log.Debug("timestamp check in streaming search",
// zap.Any("collectionID", collID),
// zap.Any("serviceTime_l", ts),
// zap.Any("searchTime_l", searchTs),
// zap.Any("serviceTime_p", tsp),
// zap.Any("searchTime_p", stp),
//)
//if ts < searchTs {
// continue
//}
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
return searchResults, err
}
searchResults = append(searchResults, searchResult)
searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs})
if err != nil {
err2 = err
return
}
searchResults = append(searchResults, searchResult)
}()
}
wg.Wait()
if err2 != nil {
return searchResults, err2
}
}