diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c182200547..1e2c7be880 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -44,6 +44,10 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) +// IndexCoord is a component responsible for scheduling index construction tasks and maintaining index status. +// IndexCoord accepts requests from rootcoord to build indexes, delete indexes, and query index information. +// IndexCoord is responsible for assigning IndexBuildID to the request to build the index, and forwarding the +// request to build the index to IndexNode. IndexCoord records the status of the index, and the index file. type IndexCoord struct { stateCode atomic.Value @@ -86,6 +90,7 @@ type IndexCoord struct { type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp +// NewIndexCoord creates a new IndexCoord component. func NewIndexCoord(ctx context.Context) (*IndexCoord, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) @@ -101,7 +106,7 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) { return i, nil } -// Register register index service at etcd +// Register register IndexCoord role at etcd. func (i *IndexCoord) Register() error { i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) if i.session == nil { @@ -111,6 +116,7 @@ func (i *IndexCoord) Register() error { return nil } +// Init initializes the IndexCoord component. func (i *IndexCoord) Init() error { var initErr error = nil Params.InitOnce() @@ -219,6 +225,7 @@ func (i *IndexCoord) Init() error { return initErr } +// Start starts the IndexCoord component. func (i *IndexCoord) Start() error { var startErr error = nil i.startOnce.Do(func() { @@ -259,6 +266,7 @@ func (i *IndexCoord) Start() error { return startErr } +// Stop stops the IndexCoord component. func (i *IndexCoord) Stop() error { i.loopCancel() i.sched.Close() @@ -269,6 +277,7 @@ func (i *IndexCoord) Stop() error { return nil } +// UpdateStateCode updates the component state of IndexCoord. func (i *IndexCoord) UpdateStateCode(code internalpb.StateCode) { i.stateCode.Store(code) } @@ -278,6 +287,7 @@ func (i *IndexCoord) isHealthy() bool { return code == internalpb.StateCode_Healthy } +// GetComponentStates gets the component states of IndexCoord. func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { log.Debug("get IndexCoord component states ...") stateInfo := &internalpb.ComponentInfo{ @@ -297,6 +307,7 @@ func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.Compon return ret, nil } +// GetTimeTickChannel gets the time tick channel of IndexCoord. func (i *IndexCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { log.Debug("get IndexCoord time tick channel ...") return &milvuspb.StringResponse{ @@ -308,6 +319,7 @@ func (i *IndexCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRe }, nil } +// GetStatisticsChannel gets the statistics channel of IndexCoord. func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { log.Debug("get IndexCoord statistics channel ...") return &milvuspb.StringResponse{ @@ -391,6 +403,7 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ return ret, nil } +// GetIndexStates gets the index states from IndexCoord. func (i *IndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) { sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex") defer sp.Finish() @@ -429,6 +442,9 @@ func (i *IndexCoord) GetIndexStates(ctx context.Context, req *indexpb.GetIndexSt return ret, nil } +// DropIndex deletes indexes based on IndexID. One IndexID corresponds to the index of an entire column. A column is +// divided into many segments, and each segment corresponds to an IndexBuildID. IndexCoord uses IndexBuildID to record +// index tasks. Therefore, when DropIndex, delete all tasks corresponding to IndexBuildID corresponding to IndexID. func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { log.Debug("IndexCoord DropIndex", zap.Int64("IndexID", req.IndexID)) sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex") @@ -457,6 +473,7 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques return ret, nil } +// GetIndexFilePaths gets the index file paths from IndexCoord. func (i *IndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { log.Debug("IndexCoord GetIndexFilePaths", zap.Int64s("IndexBuildIds", req.IndexBuildIDs)) sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "IndexCoord-BuildIndex") @@ -483,6 +500,7 @@ func (i *IndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetInde return ret, nil } +// GetMetrics gets the metrics info of IndexCoord. func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { log.Debug("IndexCoord.GetMetrics", zap.Int64("node_id", i.ID), @@ -708,7 +726,6 @@ func (i *IndexCoord) watchMetaLoop() { } } -// assignTaskLoop is used to assign index construction tasks. func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateIndexRequest) bool { ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval) defer cancel() @@ -725,6 +742,7 @@ func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.Crea return true } +// assignTaskLoop is used to assign index construction tasks. func (i *IndexCoord) assignTaskLoop() { ctx, cancel := context.WithCancel(i.loopCtx)