mirror of https://github.com/milvus-io/milvus.git
Enable indexnode to build index asynchronously
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/4973/head^2
parent
d3db2b0d82
commit
966cf42b54
|
@ -163,42 +163,25 @@ func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status,
|
|||
t := &IndexBuildTask{
|
||||
BaseTask: BaseTask{
|
||||
ctx: ctx,
|
||||
done: make(chan error), // intend to do this
|
||||
done: make(chan error),
|
||||
},
|
||||
cmd: request,
|
||||
kv: i.kv,
|
||||
serviceClient: i.serviceClient,
|
||||
nodeID: Params.NodeID,
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("Enqueue BuildQueue timeout")
|
||||
default:
|
||||
return i.sched.IndexBuildQueue.Enqueue(t)
|
||||
}
|
||||
}
|
||||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}
|
||||
|
||||
err := fn()
|
||||
if err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||
ret.Reason = err.Error()
|
||||
return ret, nil
|
||||
}
|
||||
log.Println("index scheduler successfully with indexBuildID = ", request.IndexBuildID)
|
||||
err = t.WaitToFinish()
|
||||
log.Println("build index finish ...err = ", err)
|
||||
err := i.sched.IndexBuildQueue.Enqueue(t)
|
||||
if err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||
ret.Reason = err.Error()
|
||||
return ret, nil
|
||||
}
|
||||
log.Println("indexnode successfully schedule with indexBuildID = ", request.IndexBuildID)
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -33,12 +33,18 @@ type task interface {
|
|||
WaitToFinish() error
|
||||
Notify(err error)
|
||||
OnEnqueue() error
|
||||
SetError(err error)
|
||||
}
|
||||
|
||||
type BaseTask struct {
|
||||
done chan error
|
||||
ctx context.Context
|
||||
id UniqueID
|
||||
err error
|
||||
}
|
||||
|
||||
func (bt *BaseTask) SetError(err error) {
|
||||
bt.err = err
|
||||
}
|
||||
|
||||
func (bt *BaseTask) ID() UniqueID {
|
||||
|
@ -101,15 +107,16 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
|
|||
|
||||
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
log.Println("PostExecute...")
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if it.err != nil {
|
||||
it.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
if it.serviceClient == nil {
|
||||
err = errors.New("IndexBuildTask, serviceClient is nil")
|
||||
err := errors.New("IndexBuildTask, serviceClient is nil")
|
||||
log.Println("[IndexBuildTask][PostExecute] serviceClient is nil")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -121,6 +128,9 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
|||
NodeID: it.nodeID,
|
||||
IndexFilePaths: it.savePaths,
|
||||
}
|
||||
if it.err != nil {
|
||||
nty.Status.ErrorCode = commonpb.ErrorCode_BUILD_INDEX_ERROR
|
||||
}
|
||||
|
||||
resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty)
|
||||
if err != nil {
|
||||
|
@ -131,6 +141,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
|||
if resp.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
err = errors.New(resp.Reason)
|
||||
}
|
||||
log.Println("[IndexBuildTask][PostExecute] err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
|||
defer queue.utLock.Unlock()
|
||||
|
||||
if queue.unissuedTasks.Len() <= 0 {
|
||||
log.Println("PopUnissuedtask sorry, but the unissued task list is empty!")
|
||||
log.Println("PopUnissued task sorry, but the unissued task list is empty!")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -208,14 +208,18 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
"Type": t.Name(),
|
||||
"ID": t.ID(),
|
||||
})
|
||||
|
||||
defer span.Finish()
|
||||
span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID()))
|
||||
err := t.PreExecute(ctx)
|
||||
t.SetError(err)
|
||||
|
||||
defer func() {
|
||||
t.Notify(err)
|
||||
// log.Printf("notify with error: %v", err)
|
||||
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
|
||||
err := t.PostExecute(ctx)
|
||||
t.SetError(err)
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
trace.LogError(span, err)
|
||||
return
|
||||
|
@ -223,6 +227,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
|
||||
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
|
||||
q.AddActiveTask(t)
|
||||
|
||||
// log.Printf("task add to active list ...")
|
||||
defer func() {
|
||||
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
|
||||
|
@ -232,14 +237,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
|
|||
|
||||
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
|
||||
err = t.Execute(ctx)
|
||||
if err != nil {
|
||||
log.Printf("execute definition task failed, error = %v", err)
|
||||
return
|
||||
}
|
||||
// log.Printf("task execution done ...")
|
||||
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
|
||||
err = t.PostExecute(ctx)
|
||||
// log.Printf("post execute task done ...")
|
||||
t.SetError(err)
|
||||
}
|
||||
|
||||
func (sched *TaskScheduler) indexBuildLoop() {
|
||||
|
|
|
@ -315,9 +315,12 @@ func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIn
|
|||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
}
|
||||
log.Println("[IndexService][NotifyBuildIndex]", nty.String())
|
||||
if err := i.metaTable.NotifyBuildIndex(nty); err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_BUILD_INDEX_ERROR
|
||||
ret.Reason = err.Error()
|
||||
log.Println("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err)
|
||||
|
||||
}
|
||||
i.nodeClients.IncPriority(nty.NodeID, -1)
|
||||
return ret, nil
|
||||
|
|
Loading…
Reference in New Issue