2021-04-19 05:47:10 +00:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-06-22 08:44:09 +00:00
package querycoord
2021-04-15 07:15:46 +00:00
import (
"container/list"
"context"
2021-06-19 03:45:09 +00:00
"errors"
"fmt"
"path/filepath"
"strconv"
2021-04-15 07:15:46 +00:00
"sync"
2021-06-19 03:45:09 +00:00
"github.com/golang/protobuf/proto"
2021-04-15 07:15:46 +00:00
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
2021-06-19 03:45:09 +00:00
"github.com/milvus-io/milvus/internal/allocator"
2021-06-15 04:41:40 +00:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/log"
2021-06-19 03:45:09 +00:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/util/trace"
2021-06-19 03:45:09 +00:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2021-04-15 07:15:46 +00:00
oplog "github.com/opentracing/opentracing-go/log"
)
2021-06-15 04:41:40 +00:00
type TaskQueue struct {
tasks * list . List
2021-04-15 07:15:46 +00:00
2021-06-15 04:41:40 +00:00
maxTask int64
taskChan chan int // to block scheduler
2021-04-15 07:15:46 +00:00
2021-06-15 04:41:40 +00:00
sync . Mutex
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( queue * TaskQueue ) Chan ( ) <- chan int {
return queue . taskChan
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( queue * TaskQueue ) taskEmpty ( ) bool {
queue . Lock ( )
defer queue . Unlock ( )
return queue . tasks . Len ( ) == 0
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( queue * TaskQueue ) taskFull ( ) bool {
return int64 ( queue . tasks . Len ( ) ) >= queue . maxTask
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( queue * TaskQueue ) addTask ( tasks [ ] task ) {
queue . Lock ( )
defer queue . Unlock ( )
2021-04-15 07:15:46 +00:00
2021-06-15 04:41:40 +00:00
for _ , t := range tasks {
if queue . tasks . Len ( ) == 0 {
queue . taskChan <- 1
queue . tasks . PushBack ( t )
continue
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
for e := queue . tasks . Back ( ) ; e != nil ; e = e . Prev ( ) {
if t . TaskPriority ( ) > e . Value . ( task ) . TaskPriority ( ) {
if e . Prev ( ) == nil {
queue . taskChan <- 1
queue . tasks . InsertBefore ( t , e )
break
}
continue
}
//TODO:: take care of timestamp
queue . taskChan <- 1
queue . tasks . InsertAfter ( t , e )
break
}
2021-04-15 07:15:46 +00:00
}
}
2021-06-19 03:45:09 +00:00
func ( queue * TaskQueue ) addTaskToFront ( t task ) {
queue . taskChan <- 1
if queue . tasks . Len ( ) == 0 {
queue . tasks . PushBack ( t )
} else {
queue . tasks . PushFront ( t )
}
}
2021-06-15 04:41:40 +00:00
func ( queue * TaskQueue ) PopTask ( ) task {
queue . Lock ( )
defer queue . Unlock ( )
2021-04-15 07:15:46 +00:00
2021-06-15 04:41:40 +00:00
if queue . tasks . Len ( ) <= 0 {
2021-04-15 07:15:46 +00:00
log . Warn ( "sorry, but the unissued task list is empty!" )
return nil
}
2021-06-15 04:41:40 +00:00
ft := queue . tasks . Front ( )
queue . tasks . Remove ( ft )
2021-04-15 07:15:46 +00:00
return ft . Value . ( task )
}
2021-06-19 03:45:09 +00:00
func NewTaskQueue ( ) * TaskQueue {
2021-06-15 04:41:40 +00:00
return & TaskQueue {
2021-06-19 03:45:09 +00:00
tasks : list . New ( ) ,
maxTask : 1024 ,
taskChan : make ( chan int , 1024 ) ,
2021-04-15 07:15:46 +00:00
}
}
type TaskScheduler struct {
2021-06-15 04:41:40 +00:00
triggerTaskQueue * TaskQueue
activateTaskChan chan task
meta * meta
2021-06-19 03:45:09 +00:00
cluster * queryNodeCluster
2021-06-15 04:41:40 +00:00
taskIDAllocator func ( ) ( UniqueID , error )
2021-06-19 03:45:09 +00:00
client * etcdkv . EtcdKV
2021-06-21 10:22:13 +00:00
rootCoord types . RootCoord
dataCoord types . DataCoord
2021-04-15 07:15:46 +00:00
wg sync . WaitGroup
ctx context . Context
cancel context . CancelFunc
}
2021-06-21 10:22:13 +00:00
func NewTaskScheduler ( ctx context . Context , meta * meta , cluster * queryNodeCluster , kv * etcdkv . EtcdKV , rootCoord types . RootCoord , dataCoord types . DataCoord ) ( * TaskScheduler , error ) {
2021-04-15 07:15:46 +00:00
ctx1 , cancel := context . WithCancel ( ctx )
2021-06-15 04:41:40 +00:00
taskChan := make ( chan task , 1024 )
2021-04-15 07:15:46 +00:00
s := & TaskScheduler {
2021-06-15 04:41:40 +00:00
ctx : ctx1 ,
cancel : cancel ,
meta : meta ,
2021-06-19 03:45:09 +00:00
cluster : cluster ,
2021-06-15 04:41:40 +00:00
activateTaskChan : taskChan ,
2021-06-19 03:45:09 +00:00
client : kv ,
2021-06-21 09:28:03 +00:00
rootCoord : rootCoord ,
2021-06-21 10:22:13 +00:00
dataCoord : dataCoord ,
2021-06-19 03:45:09 +00:00
}
s . triggerTaskQueue = NewTaskQueue ( )
2021-06-22 08:44:09 +00:00
idAllocator := allocator . NewGlobalIDAllocator ( "idTimestamp" , tsoutil . NewTSOKVBase ( Params . EtcdEndpoints , Params . KvRootPath , "query coordinator task id" ) )
2021-06-19 03:45:09 +00:00
if err := idAllocator . Initialize ( ) ; err != nil {
2021-06-22 08:44:09 +00:00
log . Debug ( "query coordinator idAllocator initialize failed" , zap . Error ( err ) )
2021-06-19 03:45:09 +00:00
return nil , err
}
s . taskIDAllocator = func ( ) ( UniqueID , error ) {
return idAllocator . AllocOne ( )
}
err := s . reloadFromKV ( )
if err != nil {
return nil , err
}
return s , nil
}
func ( scheduler * TaskScheduler ) reloadFromKV ( ) error {
triggerTaskIDKeys , triggerTaskValues , err := scheduler . client . LoadWithPrefix ( triggerTaskPrefix )
if err != nil {
return err
}
activeTaskIDKeys , activeTaskValues , err := scheduler . client . LoadWithPrefix ( activeTaskPrefix )
if err != nil {
return err
}
taskInfoKeys , taskInfoValues , err := scheduler . client . LoadWithPrefix ( taskInfoPrefix )
if err != nil {
return err
}
triggerTasks := make ( map [ int64 ] task )
for index := range triggerTaskIDKeys {
taskID , err := strconv . ParseInt ( filepath . Base ( triggerTaskIDKeys [ index ] ) , 10 , 64 )
if err != nil {
return err
}
t , err := scheduler . unmarshalTask ( triggerTaskValues [ index ] )
if err != nil {
return err
}
triggerTasks [ taskID ] = t
}
activeTasks := make ( map [ int64 ] task )
for index := range activeTaskIDKeys {
taskID , err := strconv . ParseInt ( filepath . Base ( activeTaskIDKeys [ index ] ) , 10 , 64 )
if err != nil {
return err
}
t , err := scheduler . unmarshalTask ( activeTaskValues [ index ] )
if err != nil {
return err
}
activeTasks [ taskID ] = t
}
taskInfos := make ( map [ int64 ] taskState )
for index := range taskInfoKeys {
taskID , err := strconv . ParseInt ( filepath . Base ( taskInfoKeys [ index ] ) , 10 , 64 )
if err != nil {
return err
}
value , err := strconv . ParseInt ( taskInfoValues [ index ] , 10 , 64 )
if err != nil {
return err
}
state := taskState ( value )
taskInfos [ taskID ] = state
}
var doneTriggerTask task = nil
for id , t := range triggerTasks {
if taskInfos [ id ] == taskDone {
doneTriggerTask = t
for _ , childTask := range activeTasks {
t . AddChildTask ( childTask )
}
continue
}
scheduler . triggerTaskQueue . addTask ( [ ] task { t } )
}
if doneTriggerTask != nil {
scheduler . triggerTaskQueue . addTaskToFront ( doneTriggerTask )
}
return nil
}
func ( scheduler * TaskScheduler ) unmarshalTask ( t string ) ( task , error ) {
header := commonpb . MsgHeader { }
err := proto . UnmarshalText ( t , & header )
if err != nil {
return nil , fmt . Errorf ( "Failed to unmarshal message header, err %s " , err . Error ( ) )
}
var newTask task
switch header . Base . MsgType {
case commonpb . MsgType_LoadCollection :
loadReq := querypb . LoadCollectionRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
loadCollectionTask := & LoadCollectionTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
LoadCollectionRequest : & loadReq ,
2021-06-21 09:28:03 +00:00
rootCoord : scheduler . rootCoord ,
2021-06-21 10:22:13 +00:00
dataCoord : scheduler . dataCoord ,
2021-06-19 03:45:09 +00:00
cluster : scheduler . cluster ,
meta : scheduler . meta ,
}
newTask = loadCollectionTask
case commonpb . MsgType_LoadPartitions :
loadReq := querypb . LoadPartitionsRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
loadPartitionTask := & LoadPartitionTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
LoadPartitionsRequest : & loadReq ,
2021-06-21 10:22:13 +00:00
dataCoord : scheduler . dataCoord ,
2021-06-19 03:45:09 +00:00
cluster : scheduler . cluster ,
meta : scheduler . meta ,
}
newTask = loadPartitionTask
case commonpb . MsgType_ReleaseCollection :
loadReq := querypb . ReleaseCollectionRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
releaseCollectionTask := & ReleaseCollectionTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
ReleaseCollectionRequest : & loadReq ,
cluster : scheduler . cluster ,
2021-07-02 02:40:13 +00:00
meta : scheduler . meta ,
rootCoord : scheduler . rootCoord ,
2021-06-19 03:45:09 +00:00
}
newTask = releaseCollectionTask
case commonpb . MsgType_ReleasePartitions :
loadReq := querypb . ReleasePartitionsRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
releasePartitionTask := & ReleasePartitionTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
ReleasePartitionsRequest : & loadReq ,
cluster : scheduler . cluster ,
}
newTask = releasePartitionTask
case commonpb . MsgType_LoadSegments :
loadReq := querypb . LoadSegmentsRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
loadSegmentTask := & LoadSegmentTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
LoadSegmentsRequest : & loadReq ,
cluster : scheduler . cluster ,
meta : scheduler . meta ,
}
newTask = loadSegmentTask
case commonpb . MsgType_ReleaseSegments :
loadReq := querypb . ReleaseSegmentsRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
releaseSegmentTask := & ReleaseSegmentTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
ReleaseSegmentsRequest : & loadReq ,
cluster : scheduler . cluster ,
}
newTask = releaseSegmentTask
case commonpb . MsgType_WatchDmChannels :
loadReq := querypb . WatchDmChannelsRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
watchDmChannelTask := & WatchDmChannelTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
WatchDmChannelsRequest : & loadReq ,
cluster : scheduler . cluster ,
meta : scheduler . meta ,
}
newTask = watchDmChannelTask
case commonpb . MsgType_WatchQueryChannels :
loadReq := querypb . AddQueryChannelRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
watchQueryChannelTask := & WatchQueryChannelTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : querypb . TriggerCondition_grpcRequest ,
} ,
AddQueryChannelRequest : & loadReq ,
cluster : scheduler . cluster ,
}
newTask = watchQueryChannelTask
case commonpb . MsgType_LoadBalanceSegments :
loadReq := querypb . LoadBalanceRequest { }
err = proto . UnmarshalText ( t , & loadReq )
if err != nil {
log . Error ( err . Error ( ) )
}
loadBalanceTask := & LoadBalanceTask {
BaseTask : BaseTask {
ctx : scheduler . ctx ,
Condition : NewTaskCondition ( scheduler . ctx ) ,
triggerCondition : loadReq . BalanceReason ,
} ,
LoadBalanceRequest : & loadReq ,
2021-06-21 09:28:03 +00:00
rootCoord : scheduler . rootCoord ,
2021-06-21 10:22:13 +00:00
dataCoord : scheduler . dataCoord ,
2021-06-19 03:45:09 +00:00
cluster : scheduler . cluster ,
meta : scheduler . meta ,
}
newTask = loadBalanceTask
default :
err = errors . New ( "inValid msg type when unMarshal task" )
log . Error ( err . Error ( ) )
return nil , err
2021-04-15 07:15:46 +00:00
}
2021-06-19 03:45:09 +00:00
return newTask , nil
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( scheduler * TaskScheduler ) Enqueue ( tasks [ ] task ) {
2021-06-19 03:45:09 +00:00
for _ , t := range tasks {
id , err := scheduler . taskIDAllocator ( )
if err != nil {
log . Error ( err . Error ( ) )
}
t . SetID ( id )
kvs := make ( map [ string ] string )
taskKey := fmt . Sprintf ( "%s/%d" , triggerTaskPrefix , t . ID ( ) )
kvs [ taskKey ] = t . Marshal ( )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
2021-06-21 09:28:03 +00:00
kvs [ stateKey ] = strconv . Itoa ( int ( taskUndo ) )
2021-06-19 03:45:09 +00:00
err = scheduler . client . MultiSave ( kvs )
if err != nil {
log . Error ( "error when save trigger task to etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
}
log . Debug ( "EnQueue a triggerTask and save to etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
}
2021-06-15 04:41:40 +00:00
scheduler . triggerTaskQueue . addTask ( tasks )
2021-04-15 07:15:46 +00:00
}
2021-06-19 03:45:09 +00:00
func ( scheduler * TaskScheduler ) processTask ( t task ) error {
2021-04-15 07:15:46 +00:00
span , ctx := trace . StartSpanFromContext ( t . TraceCtx ( ) ,
opentracing . Tags {
2021-06-15 04:41:40 +00:00
"Type" : t . Type ( ) ,
2021-04-15 07:15:46 +00:00
"ID" : t . ID ( ) ,
} )
defer span . Finish ( )
2021-06-19 03:45:09 +00:00
span . LogFields ( oplog . Int64 ( "processTask: scheduler process PreExecute" , t . ID ( ) ) )
2021-06-30 08:18:13 +00:00
t . PreExecute ( ctx )
2021-06-23 09:44:12 +00:00
2021-06-19 03:45:09 +00:00
key := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
2021-06-21 09:28:03 +00:00
err := scheduler . client . Save ( key , strconv . Itoa ( int ( taskDoing ) ) )
2021-04-15 07:15:46 +00:00
if err != nil {
2021-06-19 03:45:09 +00:00
log . Debug ( "processTask: update task state err" , zap . String ( "reason" , err . Error ( ) ) )
2021-04-15 07:15:46 +00:00
trace . LogError ( span , err )
2021-06-19 03:45:09 +00:00
return err
2021-04-15 07:15:46 +00:00
}
2021-06-19 03:45:09 +00:00
span . LogFields ( oplog . Int64 ( "processTask: scheduler process Execute" , t . ID ( ) ) )
2021-04-15 07:15:46 +00:00
err = t . Execute ( ctx )
if err != nil {
2021-06-19 03:45:09 +00:00
log . Debug ( "processTask: execute err" , zap . String ( "reason" , err . Error ( ) ) )
2021-04-15 07:15:46 +00:00
trace . LogError ( span , err )
2021-06-19 03:45:09 +00:00
return err
2021-04-15 07:15:46 +00:00
}
2021-06-19 03:45:09 +00:00
for _ , childTask := range t . GetChildTask ( ) {
if childTask == nil {
log . Error ( "processTask: child task equal nil" )
continue
}
id , err := scheduler . taskIDAllocator ( )
if err != nil {
return err
}
childTask . SetID ( id )
kvs := make ( map [ string ] string )
taskKey := fmt . Sprintf ( "%s/%d" , activeTaskPrefix , childTask . ID ( ) )
kvs [ taskKey ] = t . Marshal ( )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , childTask . ID ( ) )
2021-06-21 09:28:03 +00:00
kvs [ stateKey ] = strconv . Itoa ( int ( taskUndo ) )
2021-06-19 03:45:09 +00:00
err = scheduler . client . MultiSave ( kvs )
if err != nil {
2021-06-23 09:44:12 +00:00
log . Debug ( "processTask: save active task info err" , zap . String ( "reason" , err . Error ( ) ) )
trace . LogError ( span , err )
2021-06-19 03:45:09 +00:00
return err
}
log . Debug ( "processTask: save active task to etcd" , zap . Int64 ( "parent taskID" , t . ID ( ) ) , zap . Int64 ( "child taskID" , childTask . ID ( ) ) )
}
2021-06-21 09:28:03 +00:00
err = scheduler . client . Save ( key , strconv . Itoa ( int ( taskDone ) ) )
2021-06-19 03:45:09 +00:00
if err != nil {
log . Debug ( "processTask: update task state err" , zap . String ( "reason" , err . Error ( ) ) )
trace . LogError ( span , err )
return err
}
2021-06-23 09:44:12 +00:00
span . LogFields ( oplog . Int64 ( "processTask: scheduler process PostExecute" , t . ID ( ) ) )
2021-06-30 08:18:13 +00:00
t . PostExecute ( ctx )
2021-06-23 09:44:12 +00:00
return nil
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
func ( scheduler * TaskScheduler ) scheduleLoop ( ) {
defer scheduler . wg . Done ( )
2021-06-19 03:45:09 +00:00
activeTaskWg := & sync . WaitGroup { }
2021-06-22 06:10:09 +00:00
2021-04-15 07:15:46 +00:00
for {
2021-06-22 06:10:09 +00:00
var err error = nil
2021-04-15 07:15:46 +00:00
select {
2021-06-15 04:41:40 +00:00
case <- scheduler . ctx . Done ( ) :
2021-04-15 07:15:46 +00:00
return
2021-06-15 04:41:40 +00:00
case <- scheduler . triggerTaskQueue . Chan ( ) :
t := scheduler . triggerTaskQueue . PopTask ( )
2021-06-19 03:45:09 +00:00
log . Debug ( "scheduleLoop: pop a triggerTask from triggerTaskQueue" , zap . Int64 ( "taskID" , t . ID ( ) ) )
if t . State ( ) < taskDone {
err = scheduler . processTask ( t )
if err != nil {
log . Error ( "scheduleLoop: process task error" , zap . Any ( "error" , err . Error ( ) ) )
2021-06-22 06:10:09 +00:00
t . Notify ( err )
2021-06-30 08:18:13 +00:00
t . PostExecute ( scheduler . ctx )
2021-06-19 03:45:09 +00:00
}
2021-06-21 11:20:31 +00:00
if t . Type ( ) == commonpb . MsgType_LoadCollection || t . Type ( ) == commonpb . MsgType_LoadPartitions {
t . Notify ( err )
}
2021-06-19 03:45:09 +00:00
}
log . Debug ( "scheduleLoop: num of child task" , zap . Int ( "num child task" , len ( t . GetChildTask ( ) ) ) )
for _ , childTask := range t . GetChildTask ( ) {
if childTask != nil {
log . Debug ( "scheduleLoop: add a activate task to activateChan" , zap . Int64 ( "taskID" , childTask . ID ( ) ) )
scheduler . activateTaskChan <- childTask
activeTaskWg . Add ( 1 )
go scheduler . waitActivateTaskDone ( activeTaskWg , childTask )
}
}
activeTaskWg . Wait ( )
keys := make ( [ ] string , 0 )
taskKey := fmt . Sprintf ( "%s/%d" , triggerTaskPrefix , t . ID ( ) )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
keys = append ( keys , taskKey )
keys = append ( keys , stateKey )
err = scheduler . client . MultiRemove ( keys )
if err != nil {
log . Error ( "scheduleLoop: error when remove trigger task to etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
2021-06-22 06:10:09 +00:00
t . Notify ( err )
continue
2021-06-19 03:45:09 +00:00
}
log . Debug ( "scheduleLoop: trigger task done and delete from etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
2021-06-22 06:10:09 +00:00
t . Notify ( err )
2021-06-15 04:41:40 +00:00
}
}
}
func ( scheduler * TaskScheduler ) waitActivateTaskDone ( wg * sync . WaitGroup , t task ) {
defer wg . Done ( )
err := t . WaitToFinish ( )
if err != nil {
2021-06-19 03:45:09 +00:00
log . Debug ( "waitActivateTaskDone: activate task return err" , zap . Any ( "error" , err . Error ( ) ) , zap . Int64 ( "taskID" , t . ID ( ) ) )
redoFunc1 := func ( ) {
if ! t . IsValid ( ) {
reScheduledTasks , err := t . Reschedule ( )
if err != nil {
log . Error ( err . Error ( ) )
return
}
removes := make ( [ ] string , 0 )
taskKey := fmt . Sprintf ( "%s/%d" , activeTaskPrefix , t . ID ( ) )
removes = append ( removes , taskKey )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
removes = append ( removes , stateKey )
saves := make ( map [ string ] string )
reSchedID := make ( [ ] int64 , 0 )
for _ , rt := range reScheduledTasks {
if rt != nil {
id , err := scheduler . taskIDAllocator ( )
if err != nil {
log . Error ( err . Error ( ) )
continue
}
rt . SetID ( id )
taskKey := fmt . Sprintf ( "%s/%d" , activeTaskPrefix , rt . ID ( ) )
saves [ taskKey ] = rt . Marshal ( )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , rt . ID ( ) )
2021-06-21 09:28:03 +00:00
saves [ stateKey ] = strconv . Itoa ( int ( taskUndo ) )
2021-06-19 03:45:09 +00:00
reSchedID = append ( reSchedID , rt . ID ( ) )
}
}
err = scheduler . client . MultiSaveAndRemove ( saves , removes )
if err != nil {
log . Error ( "waitActivateTaskDone: error when save and remove task from etcd" )
}
log . Debug ( "waitActivateTaskDone: delete failed active task and save reScheduled task to etcd" , zap . Int64 ( "failed taskID" , t . ID ( ) ) , zap . Int64s ( "reScheduled taskIDs" , reSchedID ) )
for _ , rt := range reScheduledTasks {
if rt != nil {
log . Debug ( "waitActivateTaskDone: add a reScheduled active task to activateChan" , zap . Int64 ( "taskID" , rt . ID ( ) ) )
scheduler . activateTaskChan <- rt
wg . Add ( 1 )
go scheduler . waitActivateTaskDone ( wg , rt )
}
}
//delete task from etcd
} else {
log . Debug ( "waitActivateTaskDone: retry the active task" , zap . Int64 ( "taskID" , t . ID ( ) ) )
scheduler . activateTaskChan <- t
wg . Add ( 1 )
go scheduler . waitActivateTaskDone ( wg , t )
}
}
redoFunc2 := func ( ) {
if t . IsValid ( ) {
scheduler . activateTaskChan <- t
wg . Add ( 1 )
go scheduler . waitActivateTaskDone ( wg , t )
}
}
switch t . Type ( ) {
case commonpb . MsgType_LoadSegments :
redoFunc1 ( )
case commonpb . MsgType_WatchDmChannels :
redoFunc1 ( )
case commonpb . MsgType_WatchQueryChannels :
redoFunc2 ( )
case commonpb . MsgType_ReleaseSegments :
redoFunc2 ( )
case commonpb . MsgType_ReleaseCollection :
redoFunc2 ( )
case commonpb . MsgType_ReleasePartitions :
redoFunc2 ( )
default :
//TODO:: case commonpb.MsgType_RemoveDmChannels:
}
} else {
keys := make ( [ ] string , 0 )
taskKey := fmt . Sprintf ( "%s/%d" , activeTaskPrefix , t . ID ( ) )
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
keys = append ( keys , taskKey )
keys = append ( keys , stateKey )
err = scheduler . client . MultiRemove ( keys )
if err != nil {
log . Error ( "waitActivateTaskDone: error when remove task from etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
}
log . Debug ( "waitActivateTaskDone: delete activate task from etcd" , zap . Int64 ( "taskID" , t . ID ( ) ) )
2021-06-15 04:41:40 +00:00
}
2021-06-19 03:45:09 +00:00
log . Debug ( "waitActivateTaskDone: one activate task done" , zap . Int64 ( "taskID" , t . ID ( ) ) )
2021-06-15 04:41:40 +00:00
}
2021-06-19 03:45:09 +00:00
func ( scheduler * TaskScheduler ) processActivateTaskLoop ( ) {
defer scheduler . wg . Done ( )
2021-06-15 04:41:40 +00:00
for {
select {
case <- scheduler . ctx . Done ( ) :
return
case t := <- scheduler . activateTaskChan :
if t == nil {
2021-06-19 03:45:09 +00:00
log . Error ( "processActivateTaskLoop: pop a nil active task" , zap . Int64 ( "taskID" , t . ID ( ) ) )
continue
}
stateKey := fmt . Sprintf ( "%s/%d" , taskInfoPrefix , t . ID ( ) )
2021-06-21 09:28:03 +00:00
err := scheduler . client . Save ( stateKey , strconv . Itoa ( int ( taskDoing ) ) )
2021-06-19 03:45:09 +00:00
if err != nil {
t . Notify ( err )
continue
2021-06-15 04:41:40 +00:00
}
2021-06-19 03:45:09 +00:00
log . Debug ( "processActivateTaskLoop: pop a active task from activateChan" , zap . Int64 ( "taskID" , t . ID ( ) ) )
go func ( ) {
err := scheduler . processTask ( t )
t . Notify ( err )
} ( )
2021-06-15 04:41:40 +00:00
}
}
}
2021-04-15 07:15:46 +00:00
2021-06-15 04:41:40 +00:00
func ( scheduler * TaskScheduler ) Start ( ) error {
2021-06-19 03:45:09 +00:00
scheduler . wg . Add ( 2 )
2021-06-15 04:41:40 +00:00
go scheduler . scheduleLoop ( )
2021-06-19 03:45:09 +00:00
go scheduler . processActivateTaskLoop ( )
2021-04-15 07:15:46 +00:00
return nil
}
2021-06-15 04:41:40 +00:00
func ( scheduler * TaskScheduler ) Close ( ) {
scheduler . cancel ( )
scheduler . wg . Wait ( )
2021-04-15 07:15:46 +00:00
}