diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index f9bd06bc40..00348e45e3 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -21,10 +21,6 @@ import ( "sync/atomic" "time" - "github.com/milvus-io/milvus/internal/util/metricsinfo" - - "github.com/milvus-io/milvus/internal/util/trace" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -39,8 +35,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/tso" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -649,6 +648,22 @@ func (i *IndexCoord) watchMetaLoop() { } } +func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateIndexRequest) bool { + ctx, cancel := context.WithTimeout(i.loopCtx, reqTimeoutInterval) + defer cancel() + resp, err := builderClient.CreateIndex(ctx, req) + if err != nil { + log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) + return false + } + + if resp.ErrorCode != commonpb.ErrorCode_Success { + log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) + return false + } + return true +} + func (i *IndexCoord) assignTaskLoop() { ctx, cancel := context.WithCancel(i.loopCtx) @@ -693,6 +708,7 @@ func (i *IndexCoord) assignTaskLoop() { log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode") break } + log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID)) req := &indexpb.CreateIndexRequest{ IndexBuildID: indexBuildID, IndexName: meta.indexMeta.Req.IndexName, @@ -703,13 +719,8 @@ func (i *IndexCoord) assignTaskLoop() { TypeParams: meta.indexMeta.Req.TypeParams, IndexParams: meta.indexMeta.Req.IndexParams, } - resp, err := builderClient.CreateIndex(ctx, req) - if err != nil { - log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) - continue - } - if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) + if !i.assignTask(builderClient, req) { + log.Debug("IndexCoord assignTask assign task to IndexNode failed") continue } if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {