From 966cf42b5448137e59e64905e2679e71287e4ca3 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 27 Feb 2021 10:45:03 +0800 Subject: [PATCH] Enable indexnode to build index asynchronously Signed-off-by: zhenshan.cao --- internal/indexnode/indexnode.go | 23 +++-------------------- internal/indexnode/task.go | 17 ++++++++++++++--- internal/indexnode/task_scheduler.go | 20 +++++++++----------- internal/indexservice/indexservice.go | 3 +++ 4 files changed, 29 insertions(+), 34 deletions(-) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 143fa661c4..61bbe3ce0c 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -163,42 +163,25 @@ func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status, t := &IndexBuildTask{ BaseTask: BaseTask{ ctx: ctx, - done: make(chan error), // intend to do this + done: make(chan error), }, cmd: request, kv: i.kv, serviceClient: i.serviceClient, nodeID: Params.NodeID, } - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) - defer cancel() - fn := func() error { - select { - case <-ctx.Done(): - return errors.New("Enqueue BuildQueue timeout") - default: - return i.sched.IndexBuildQueue.Enqueue(t) - } - } ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, } - err := fn() - if err != nil { - ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - ret.Reason = err.Error() - return ret, nil - } - log.Println("index scheduler successfully with indexBuildID = ", request.IndexBuildID) - err = t.WaitToFinish() - log.Println("build index finish ...err = ", err) + err := i.sched.IndexBuildQueue.Enqueue(t) if err != nil { ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR ret.Reason = err.Error() return ret, nil } + log.Println("indexnode successfully schedule with indexBuildID = ", request.IndexBuildID) return ret, nil } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 65312cefac..89935945e4 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -33,12 +33,18 @@ type task interface { WaitToFinish() error Notify(err error) OnEnqueue() error + SetError(err error) } type BaseTask struct { done chan error ctx context.Context id UniqueID + err error +} + +func (bt *BaseTask) SetError(err error) { + bt.err = err } func (bt *BaseTask) ID() UniqueID { @@ -101,15 +107,16 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error { func (it *IndexBuildTask) PostExecute(ctx context.Context) error { log.Println("PostExecute...") - var err error + defer func() { - if err != nil { + if it.err != nil { it.Rollback() } }() if it.serviceClient == nil { - err = errors.New("IndexBuildTask, serviceClient is nil") + err := errors.New("IndexBuildTask, serviceClient is nil") + log.Println("[IndexBuildTask][PostExecute] serviceClient is nil") return err } @@ -121,6 +128,9 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { NodeID: it.nodeID, IndexFilePaths: it.savePaths, } + if it.err != nil { + nty.Status.ErrorCode = commonpb.ErrorCode_BUILD_INDEX_ERROR + } resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty) if err != nil { @@ -131,6 +141,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { if resp.ErrorCode != commonpb.ErrorCode_SUCCESS { err = errors.New(resp.Reason) } + log.Println("[IndexBuildTask][PostExecute] err", err) return err } diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 400577aea5..91721082f0 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -81,7 +81,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task { defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { - log.Println("PopUnissuedtask sorry, but the unissued task list is empty!") + log.Println("PopUnissued task sorry, but the unissued task list is empty!") return nil } @@ -208,14 +208,18 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { "Type": t.Name(), "ID": t.ID(), }) + defer span.Finish() span.LogFields(oplog.Int64("scheduler process PreExecute", t.ID())) err := t.PreExecute(ctx) + t.SetError(err) defer func() { - t.Notify(err) - // log.Printf("notify with error: %v", err) + span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) + err := t.PostExecute(ctx) + t.SetError(err) }() + if err != nil { trace.LogError(span, err) return @@ -223,6 +227,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID())) q.AddActiveTask(t) + // log.Printf("task add to active list ...") defer func() { span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID())) @@ -232,14 +237,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { span.LogFields(oplog.Int64("scheduler process Execute", t.ID())) err = t.Execute(ctx) - if err != nil { - log.Printf("execute definition task failed, error = %v", err) - return - } - // log.Printf("task execution done ...") - span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID())) - err = t.PostExecute(ctx) - // log.Printf("post execute task done ...") + t.SetError(err) } func (sched *TaskScheduler) indexBuildLoop() { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 6cab5a6c66..3206f8621b 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -315,9 +315,12 @@ func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIn ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, } + log.Println("[IndexService][NotifyBuildIndex]", nty.String()) if err := i.metaTable.NotifyBuildIndex(nty); err != nil { ret.ErrorCode = commonpb.ErrorCode_BUILD_INDEX_ERROR ret.Reason = err.Error() + log.Println("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err) + } i.nodeClients.IncPriority(nty.NodeID, -1) return ret, nil