mirror of https://github.com/milvus-io/milvus.git
Fix travel timestamp set unique leads to disable task merge (#23605)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/23725/head
parent
c5f4a47eed
commit
f8ff97fe29
|
@ -332,7 +332,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
|||
|
||||
travelTimestamp := t.request.TravelTimestamp
|
||||
if travelTimestamp == 0 {
|
||||
travelTimestamp = t.BeginTs()
|
||||
travelTimestamp = typeutil.MaxTimestamp
|
||||
}
|
||||
err = validateTravelTimestamp(travelTimestamp, t.BeginTs())
|
||||
if err != nil {
|
||||
|
|
|
@ -429,7 +429,7 @@ func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema
|
|||
suite.collectionID,
|
||||
suite.partitionIDs[i%partNum],
|
||||
suite.validSegmentIDs[i],
|
||||
100,
|
||||
1000,
|
||||
schema,
|
||||
suite.node.vectorStorage,
|
||||
)
|
||||
|
@ -441,7 +441,7 @@ func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema
|
|||
suite.partitionIDs[i%partNum],
|
||||
suite.validSegmentIDs[i],
|
||||
vecFieldIDs[0],
|
||||
100,
|
||||
1000,
|
||||
segments.IndexFaissIVFFlat,
|
||||
segments.L2,
|
||||
suite.node.vectorStorage,
|
||||
|
@ -904,11 +904,11 @@ func (suite *ServiceSuite) TestSearch_Concurrent() {
|
|||
// data
|
||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
|
||||
|
||||
concurrency := 8
|
||||
concurrency := 16
|
||||
futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
future := conc.Go(func() (*internalpb.SearchResults, error) {
|
||||
creq, err := suite.genCSearchRequest(1, IndexFaissIDMap, schema)
|
||||
creq, err := suite.genCSearchRequest(30, IndexFaissIDMap, schema)
|
||||
req := &querypb.SearchRequest{
|
||||
Req: creq,
|
||||
FromShardLeader: false,
|
||||
|
|
|
@ -35,7 +35,7 @@ func NewScheduler() *Scheduler {
|
|||
searchProcessNum: atomic.NewInt32(0),
|
||||
searchWaitQueue: make(chan *SearchTask, maxWaitTaskNum),
|
||||
mergingSearchTasks: make([]*SearchTask, 0),
|
||||
mergedSearchTasks: make(chan *SearchTask, maxReadConcurrency),
|
||||
mergedSearchTasks: make(chan *SearchTask),
|
||||
// queryProcessQueue: make(chan),
|
||||
|
||||
pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)),
|
||||
|
@ -64,39 +64,72 @@ func (s *Scheduler) Schedule(ctx context.Context) {
|
|||
go s.processAll(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
if len(s.mergingSearchTasks) > 0 { // wait for an idle worker or a new task
|
||||
task := s.mergingSearchTasks[0]
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case task = <-s.searchWaitQueue:
|
||||
s.schedule(task)
|
||||
|
||||
case s.mergedSearchTasks <- task:
|
||||
s.mergingSearchTasks = s.mergingSearchTasks[1:]
|
||||
}
|
||||
} else { // wait for a new task if no task
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case task := <-s.searchWaitQueue:
|
||||
s.schedule(task)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) schedule(task Task) {
|
||||
// add this task
|
||||
if err := task.Canceled(); err != nil {
|
||||
task.Done(err)
|
||||
return
|
||||
}
|
||||
s.mergeTasks(task)
|
||||
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
|
||||
|
||||
mergeLimit := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt()
|
||||
mergeCount := 1
|
||||
|
||||
// try to merge the coming tasks
|
||||
outer:
|
||||
for mergeCount < mergeLimit {
|
||||
select {
|
||||
case t := <-s.searchWaitQueue:
|
||||
if err := t.Canceled(); err != nil {
|
||||
t.Done(err)
|
||||
continue
|
||||
}
|
||||
|
||||
mergeCount := 0
|
||||
mergeLimit := paramtable.Get().QueryNodeCfg.MaxGroupNQ.GetAsInt()
|
||||
outer:
|
||||
for i := 0; i < mergeLimit; i++ {
|
||||
s.mergeTasks(t)
|
||||
mergeCount++
|
||||
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
|
||||
|
||||
select {
|
||||
case t = <-s.searchWaitQueue:
|
||||
// Continue the loop to merge task
|
||||
default:
|
||||
break outer
|
||||
}
|
||||
}
|
||||
|
||||
for i := range s.mergingSearchTasks {
|
||||
s.mergedSearchTasks <- s.mergingSearchTasks[i]
|
||||
}
|
||||
s.mergingSearchTasks = s.mergingSearchTasks[:0]
|
||||
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(mergeCount))
|
||||
s.mergeTasks(t)
|
||||
mergeCount++
|
||||
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
|
||||
default:
|
||||
break outer
|
||||
}
|
||||
}
|
||||
|
||||
// submit existing tasks to the pool
|
||||
processedCount := 0
|
||||
processOuter:
|
||||
for i := range s.mergingSearchTasks {
|
||||
select {
|
||||
case s.mergedSearchTasks <- s.mergingSearchTasks[i]:
|
||||
processedCount++
|
||||
default:
|
||||
break processOuter
|
||||
}
|
||||
}
|
||||
s.mergingSearchTasks = s.mergingSearchTasks[processedCount:]
|
||||
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(processedCount))
|
||||
}
|
||||
|
||||
func (s *Scheduler) processAll(ctx context.Context) {
|
||||
|
|
|
@ -207,7 +207,6 @@ func (t *SearchTask) Merge(other *SearchTask) bool {
|
|||
t.originTopks = append(t.originTopks, other.originTopks...)
|
||||
t.originNqs = append(t.originNqs, other.originNqs...)
|
||||
t.others = append(t.others, other)
|
||||
t.others = append(t.others, other.others...)
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -218,7 +217,10 @@ func (t *SearchTask) Done(err error) {
|
|||
metrics.QueryNodeSearchGroupNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.originNqs[0]))
|
||||
metrics.QueryNodeSearchGroupTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.originTopks[0]))
|
||||
}
|
||||
t.notifier <- err
|
||||
select {
|
||||
case t.notifier <- err:
|
||||
default:
|
||||
}
|
||||
for _, other := range t.others {
|
||||
other.Done(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue