From f469a315d643cc5af24cd713294505f82f3ab0ff Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 14 Jul 2021 14:15:55 +0800 Subject: [PATCH] Refactor the logic of assign tasks in IndexCoord (#6328) * Refactor the logic of assign tasks in IndexCoord Signed-off-by: xiaocai2333 * Fix bugs Signed-off-by: xiaocai2333 * Fix bug for unittest Signed-off-by: xiaocai2333 * Add lock for map Signed-off-by: xiaocai2333 * Improve code Signed-off-by: xiaocai2333 * Fix unittest bug Signed-off-by: xiaocai2333 * Reduce duriation for timetick Signed-off-by: xiaocai2333 * Update orm version Signed-off-by: xiaocai2333 * Reset sdk version Signed-off-by: xiaocai2333 * Fix bug Signed-off-by: xiaocai2333 * Reset orm version Signed-off-by: xiaocai2333 * Reset test ip Signed-off-by: xiaocai2333 * Fix bug Signed-off-by: xiaocai2333 * Fix bug for unissued Signed-off-by: xiaocai2333 * Rename some variables Signed-off-by: xiaocai2333 * Fix bug Signed-off-by: xiaocai2333 * Use break instead of continue in select::case Signed-off-by: xiaocai2333 --- .../distributed/indexcoord/client/client.go | 7 - internal/distributed/indexcoord/service.go | 4 - internal/indexcoord/index_coord.go | 232 ++++++++---------- internal/indexcoord/meta_table.go | 98 ++------ internal/indexcoord/node_manager.go | 88 +++++++ internal/indexcoord/node_mgr.go | 94 ------- internal/indexcoord/priority_queue.go | 36 +-- internal/indexcoord/priority_queue_test.go | 9 +- internal/indexcoord/task.go | 12 +- internal/indexnode/indexnode.go | 38 --- internal/indexnode/paramtable.go | 55 ----- internal/proto/index_coord.proto | 1 - internal/proto/indexpb/index_coord.pb.go | 159 +++++------- internal/querycoord/impl.go | 8 +- internal/types/types.go | 1 - internal/util/sessionutil/session_util.go | 1 + 16 files changed, 306 insertions(+), 537 deletions(-) create mode 100644 internal/indexcoord/node_manager.go delete mode 100644 internal/indexcoord/node_mgr.go diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 2a29f75207..c0d606c797 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -173,13 +173,6 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp return ret.(*milvuspb.StringResponse), err } -func (c *Client) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { - ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.RegisterNode(ctx, req) - }) - return ret.(*indexpb.RegisterNodeResponse), err -} - func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { ret, err := c.recall(func() (interface{}, error) { return c.grpcClient.BuildIndex(ctx, req) diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index 29b0654cec..e726284e40 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -128,10 +128,6 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt return s.indexcoord.GetStatisticsChannel(ctx) } -func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { - return s.indexcoord.RegisterNode(ctx, req) -} - func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { return s.indexcoord.BuildIndex(ctx, req) } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index f22c386e23..e847c0004c 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -15,8 +15,8 @@ import ( "context" "errors" "math/rand" + "sort" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -45,13 +45,12 @@ import ( const ( reqTimeoutInterval = time.Second * 10 durationInterval = time.Second * 10 - recycleIndexLimit = 20 + assignTaskInterval = time.Second * 3 + taskLimit = 20 ) type IndexCoord struct { - nodeClients *PriorityQueue - nodeStates map[UniqueID]*internalpb.ComponentStates - stateCode atomic.Value + stateCode atomic.Value ID UniqueID @@ -64,15 +63,12 @@ type IndexCoord struct { eventChan <-chan *sessionutil.SessionEvent - assignChan chan []UniqueID - idAllocator *allocator.GlobalIDAllocator kv kv.BaseKV - metaTable *metaTable - - nodeTasks *nodeTasks + metaTable *metaTable + nodeManager *NodeManager nodeLock sync.RWMutex @@ -88,10 +84,8 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) i := &IndexCoord{ - loopCtx: ctx1, - loopCancel: cancel, - nodeClients: &PriorityQueue{}, - nodeTasks: &nodeTasks{}, + loopCtx: ctx1, + loopCancel: cancel, } i.UpdateStateCode(internalpb.StateCode_Abnormal) return i, nil @@ -101,14 +95,12 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) { func (i *IndexCoord) Register() error { i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) i.session.Init(typeutil.IndexCoordRole, Params.Address, true) - i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0) return nil } func (i *IndexCoord) Init() error { log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints)) - i.assignChan = make(chan []UniqueID, 1024) connectEtcdFn := func() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) if err != nil { @@ -129,6 +121,25 @@ func (i *IndexCoord) Init() error { return err } log.Debug("IndexCoord try to connect etcd success") + i.nodeManager = NewNodeManager() + + sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) + log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision)) + if err != nil { + log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err)) + } + for _, session := range sessions { + if err = i.nodeManager.AddNode(session.ServerID, session.Address); err != nil { + log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID), + zap.Any("Add IndexNode error", err)) + } + } + log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients))) + i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1) + nodeTasks := i.metaTable.GetNodeTaskStats() + for nodeID, taskNum := range nodeTasks { + i.nodeManager.pq.UpdatePriority(nodeID, taskNum) + } //init idAllocator kvRootPath := Params.KvRootPath @@ -168,13 +179,6 @@ func (i *IndexCoord) Init() error { i.UpdateStateCode(internalpb.StateCode_Healthy) log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load())) - i.nodeTasks = NewNodeTasks() - - err = i.assignTasksServerStart() - if err != nil { - log.Debug("IndexCoord assign tasks server start failed", zap.Error(err)) - return err - } log.Debug("IndexCoord assign tasks server success", zap.Error(err)) return nil } @@ -187,7 +191,7 @@ func (i *IndexCoord) Start() error { go i.recycleUnusedIndexFiles() i.loopWg.Add(1) - go i.assignmentTasksLoop() + go i.assignTaskLoop() i.loopWg.Add(1) go i.watchNodeLoop() @@ -233,6 +237,7 @@ func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.Compon ErrorCode: commonpb.ErrorCode_Success, }, } + log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo)) return ret, nil } @@ -290,7 +295,6 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ }, req: req, idAllocator: i.idAllocator, - kv: i.kv, } var cancel func() @@ -320,7 +324,6 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ ret.Status.Reason = err.Error() return ret, nil } - i.assignChan <- []UniqueID{t.indexBuildID} ret.Status.ErrorCode = commonpb.ErrorCode_Success ret.IndexBuildID = t.indexBuildID return ret, nil @@ -447,7 +450,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() { case <-ctx.Done(): return case <-timeTicker.C: - metas := i.metaTable.GetUnusedIndexFiles(recycleIndexLimit) + metas := i.metaTable.GetUnusedIndexFiles(taskLimit) for _, meta := range metas { if meta.indexMeta.MarkDeleted { unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) @@ -473,63 +476,6 @@ func (i *IndexCoord) recycleUnusedIndexFiles() { } } -func (i *IndexCoord) assignmentTasksLoop() { - ctx, cancel := context.WithCancel(i.loopCtx) - - defer cancel() - defer i.loopWg.Done() - - log.Debug("IndexCoord start assignmentTasksLoop start") - - for { - select { - case <-ctx.Done(): - return - case indexBuildIDs := <-i.assignChan: - for _, indexBuildID := range indexBuildIDs { - meta := i.metaTable.GetIndexMeta(indexBuildID) - log.Debug("IndexCoord assignmentTasksLoop ", zap.Any("Meta", meta)) - if meta.indexMeta.State == commonpb.IndexState_Finished { - continue - } - if err := i.metaTable.UpdateVersion(indexBuildID); err != nil { - log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err)) - } - nodeID, builderClient := i.nodeClients.PeekClient() - if builderClient == nil { - log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode") - i.assignChan <- []UniqueID{indexBuildID} - continue - } - i.nodeTasks.assignTask(nodeID, indexBuildID) - req := &indexpb.CreateIndexRequest{ - IndexBuildID: indexBuildID, - IndexName: meta.indexMeta.Req.IndexName, - IndexID: meta.indexMeta.Req.IndexID, - Version: meta.indexMeta.Version + 1, - MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10), - DataPaths: meta.indexMeta.Req.DataPaths, - TypeParams: meta.indexMeta.Req.TypeParams, - IndexParams: meta.indexMeta.Req.IndexParams, - } - resp, err := builderClient.CreateIndex(ctx, req) - if err != nil { - log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) - continue - } - if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) - continue - } - if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { - log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err)) - } - i.nodeClients.IncPriority(nodeID, 1) - } - } - } -} - func (i *IndexCoord) watchNodeLoop() { ctx, cancel := context.WithCancel(i.loopCtx) @@ -542,18 +488,20 @@ func (i *IndexCoord) watchNodeLoop() { case <-ctx.Done(): return case event := <-i.eventChan: + log.Debug("IndexCoord watchNodeLoop event updated") switch event.EventType { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID - log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID)) + log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID), zap.Any("address", event.Session.Address)) + err := i.nodeManager.AddNode(serverID, event.Session.Address) + if err != nil { + log.Debug("IndexCoord", zap.Any("Add IndexNode err", err)) + } + log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients))) case sessionutil.SessionDelEvent: serverID := event.Session.ServerID - i.removeNode(serverID) - log.Debug("IndexCoord watchNodeLoop SessionDelEvent ", zap.Any("serverID", serverID)) - indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID) - log.Debug("IndexNode crashed", zap.Any("IndexNode ID", serverID), zap.Any("task IDs", indexBuildIDs)) - i.assignChan <- indexBuildIDs - i.nodeTasks.delete(serverID) + log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Any("serverID", serverID)) + i.nodeManager.RemoveNode(serverID) } } } @@ -587,7 +535,7 @@ func (i *IndexCoord) watchMetaLoop() { reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision) log.Debug("IndexCoord watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload)) if reload { - i.nodeTasks.finishTask(indexBuildID) + i.nodeManager.pq.IncPriority(indexMeta.NodeID, -1) } case mvccpb.DELETE: } @@ -596,38 +544,74 @@ func (i *IndexCoord) watchMetaLoop() { } } -func (i *IndexCoord) assignTasksServerStart() error { - sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole) - if err != nil { - return err - } - for _, session := range sessions { - addrs := strings.Split(session.Address, ":") - ip := addrs[0] - port, err := strconv.ParseInt(addrs[1], 10, 64) - if err != nil { - return err - } +func (i *IndexCoord) assignTaskLoop() { + ctx, cancel := context.WithCancel(i.loopCtx) - req := &indexpb.RegisterNodeRequest{ - Address: &commonpb.Address{ - Ip: ip, - Port: port, - }, - NodeID: session.ServerID, - } - if err = i.addNode(session.ServerID, req); err != nil { - log.Debug("IndexCoord", zap.Any("IndexCoord start find node fatal, err = ", err)) - } - } - var serverIDs []int64 - for _, session := range sessions { - serverIDs = append(serverIDs, session.ServerID) - } - tasks := i.metaTable.GetUnassignedTasks(serverIDs) - for _, taskQueue := range tasks { - i.assignChan <- taskQueue - } + defer cancel() + defer i.loopWg.Done() - return nil + timeTicker := time.NewTicker(assignTaskInterval) + log.Debug("IndexCoord start assignTask loop") + + for { + select { + case <-ctx.Done(): + log.Debug("IndexCoord assignTaskLoop ctx Done") + return + case <-timeTicker.C: + sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole) + if err != nil { + log.Debug("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err)) + } + if len(sessions) <= 0 { + break + } + var serverIDs []int64 + for _, session := range sessions { + serverIDs = append(serverIDs, session.ServerID) + } + metas := i.metaTable.GetUnassignedTasks(serverIDs) + sort.Slice(metas, func(i, j int) bool { + return metas[i].indexMeta.Version <= metas[j].indexMeta.Version + }) + log.Debug("IndexCoord assignTaskLoop", zap.Any("Unassign tasks number", len(metas))) + for index, meta := range metas { + indexBuildID := meta.indexMeta.IndexBuildID + if err = i.metaTable.UpdateVersion(indexBuildID); err != nil { + log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err)) + } + nodeID, builderClient := i.nodeManager.PeekClient() + if builderClient == nil { + log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode") + break + } + req := &indexpb.CreateIndexRequest{ + IndexBuildID: indexBuildID, + IndexName: meta.indexMeta.Req.IndexName, + IndexID: meta.indexMeta.Req.IndexID, + Version: meta.indexMeta.Version + 1, + MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10), + DataPaths: meta.indexMeta.Req.DataPaths, + TypeParams: meta.indexMeta.Req.TypeParams, + IndexParams: meta.indexMeta.Req.IndexParams, + } + resp, err := builderClient.CreateIndex(ctx, req) + if err != nil { + log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) + continue + } + if resp.ErrorCode != commonpb.ErrorCode_Success { + log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) + continue + } + if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil { + log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err)) + } + i.nodeManager.pq.IncPriority(nodeID, 1) + if index > taskLimit { + break + } + } + } + } } diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index 87dfa31fa1..f356a9872b 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -361,28 +361,33 @@ func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta { return meta } -func (mt *metaTable) GetUnassignedTasks(nodeIDs []int64) [][]UniqueID { - var tasks [][]UniqueID - var indexBuildIDs []UniqueID +func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta { + mt.lock.RLock() + defer mt.lock.RUnlock() - for indexBuildID, meta := range mt.indexBuildID2Meta { + var metas []Meta + + for _, meta := range mt.indexBuildID2Meta { + if meta.indexMeta.State == commonpb.IndexState_Unissued { + metas = append(metas, meta) + continue + } + if meta.indexMeta.State == commonpb.IndexState_Finished || meta.indexMeta.State == commonpb.IndexState_Failed { + continue + } alive := false - for _, serverID := range nodeIDs { + for _, serverID := range onlineNodeIDs { if meta.indexMeta.NodeID == serverID { alive = true + break } } if !alive { - indexBuildIDs = append(indexBuildIDs, indexBuildID) - } - if len(indexBuildIDs) >= 10 { - tasks = append(tasks, indexBuildIDs) - indexBuildIDs = []UniqueID{} + metas = append(metas, meta) } } - tasks = append(tasks, indexBuildIDs) - return tasks + return metas } func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) { @@ -477,67 +482,16 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { return true } -type nodeTasks struct { - nodeID2Tasks map[int64][]UniqueID +func (mt *metaTable) GetNodeTaskStats() map[UniqueID]int { + mt.lock.RLock() + defer mt.lock.RUnlock() - lock sync.RWMutex -} - -func NewNodeTasks() *nodeTasks { - return &nodeTasks{ - nodeID2Tasks: map[int64][]UniqueID{}, - } -} - -func (nt *nodeTasks) getTasksByNodeID(nodeID int64) []UniqueID { - nt.lock.Lock() - defer nt.lock.Unlock() - - indexBuildIDs, ok := nt.nodeID2Tasks[nodeID] - if !ok { - return nil - } - return indexBuildIDs -} - -func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) { - nt.lock.Lock() - defer nt.lock.Unlock() - - indexBuildIDs, ok := nt.nodeID2Tasks[serverID] - if !ok { - var IDs []UniqueID - IDs = append(IDs, indexBuildID) - nt.nodeID2Tasks[serverID] = IDs - return - } - indexBuildIDs = append(indexBuildIDs, indexBuildID) - nt.nodeID2Tasks[serverID] = indexBuildIDs -} - -func (nt *nodeTasks) finishTask(indexBuildID UniqueID) { - nt.lock.Lock() - defer nt.lock.Unlock() - - removed := false - for serverID, taskIDs := range nt.nodeID2Tasks { - for i := 0; i < len(taskIDs); i++ { - if indexBuildID == taskIDs[i] { - taskIDs = append(taskIDs[:i], taskIDs[i+1:]...) - removed = true - break - } - } - if removed { - nt.nodeID2Tasks[serverID] = taskIDs - break + log.Debug("IndexCoord MetaTable GetPriorityForNodeID") + nodePriority := make(map[UniqueID]int) + for _, meta := range mt.indexBuildID2Meta { + if meta.indexMeta.State == commonpb.IndexState_InProgress { + nodePriority[meta.indexMeta.NodeID]++ } } -} - -func (nt *nodeTasks) delete(serverID int64) { - nt.lock.Lock() - defer nt.lock.Unlock() - - delete(nt.nodeID2Tasks, serverID) + return nodePriority } diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go new file mode 100644 index 0000000000..9a678d132d --- /dev/null +++ b/internal/indexcoord/node_manager.go @@ -0,0 +1,88 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "context" + "sync" + + grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/types" + "go.uber.org/zap" +) + +type NodeManager struct { + nodeClients map[UniqueID]types.IndexNode + pq *PriorityQueue + + lock sync.RWMutex +} + +func NewNodeManager() *NodeManager { + return &NodeManager{ + nodeClients: make(map[UniqueID]types.IndexNode), + pq: &PriorityQueue{}, + lock: sync.RWMutex{}, + } +} + +func (nm *NodeManager) RemoveNode(nodeID UniqueID) { + nm.lock.Lock() + defer nm.lock.Unlock() + + log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID)) + delete(nm.nodeClients, nodeID) + nm.pq.Remove(nodeID) +} + +func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { + nm.lock.Lock() + defer nm.lock.Unlock() + + log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", address)) + if nm.pq.CheckExist(nodeID) { + log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID)) + return nil + } + + nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), address) + if err != nil { + return err + } + err = nodeClient.Init() + if err != nil { + return err + } + item := &PQItem{ + key: nodeID, + priority: 0, + } + nm.nodeClients[nodeID] = nodeClient + nm.pq.Push(item) + return nil +} + +func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) { + nm.lock.Lock() + defer nm.lock.Unlock() + + log.Debug("IndexCoord NodeManager PeekClient") + + nodeID := nm.pq.Peek() + client, ok := nm.nodeClients[nodeID] + if !ok { + log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID)) + return nodeID, nil + } + return nodeID, client +} diff --git a/internal/indexcoord/node_mgr.go b/internal/indexcoord/node_mgr.go deleted file mode 100644 index 880950405b..0000000000 --- a/internal/indexcoord/node_mgr.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package indexcoord - -import ( - "context" - "strconv" - - "go.uber.org/zap" - - grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" -) - -func (i *IndexCoord) removeNode(nodeID UniqueID) { - i.nodeLock.Lock() - defer i.nodeLock.Unlock() - log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID)) - i.nodeClients.Remove(nodeID) -} - -func (i *IndexCoord) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error { - i.nodeLock.Lock() - defer i.nodeLock.Unlock() - - log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", req.Address)) - - if i.nodeClients.CheckAddressExist(req.Address) { - log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID)) - return nil - } - - nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) - nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), nodeAddress) - if err != nil { - return err - } - err = nodeClient.Init() - if err != nil { - return err - } - item := &PQItem{ - value: nodeClient, - key: nodeID, - addr: req.Address, - priority: 0, - } - i.nodeClients.Push(item) - return nil -} - -func (i *IndexCoord) prepareNodeInitParams() []*commonpb.KeyValuePair { - var params []*commonpb.KeyValuePair - params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress}) - params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID}) - params = append(params, &commonpb.KeyValuePair{Key: "minio.secretAccessKey", Value: Params.MinIOSecretAccessKey}) - params = append(params, &commonpb.KeyValuePair{Key: "minio.useSSL", Value: strconv.FormatBool(Params.MinIOUseSSL)}) - params = append(params, &commonpb.KeyValuePair{Key: "minio.bucketName", Value: Params.MinioBucketName}) - return params -} - -func (i *IndexCoord) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { - log.Debug("indexcoord", zap.Any("register index node, node address = ", req.Address), zap.Any("node ID = ", req.NodeID)) - ret := &indexpb.RegisterNodeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - } - err := i.addNode(req.NodeID, req) - if err != nil { - ret.Status.Reason = err.Error() - return ret, nil - } - - ret.Status.ErrorCode = commonpb.ErrorCode_Success - params := i.prepareNodeInitParams() - ret.InitParams = &internalpb.InitParams{ - NodeID: req.NodeID, - StartParams: params, - } - return ret, nil -} diff --git a/internal/indexcoord/priority_queue.go b/internal/indexcoord/priority_queue.go index 85e3ca4d61..ef7112d1a8 100644 --- a/internal/indexcoord/priority_queue.go +++ b/internal/indexcoord/priority_queue.go @@ -14,16 +14,11 @@ package indexcoord import ( "container/heap" "sync" - - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/types" ) // An Item is something we manage in a priority queue. type PQItem struct { - value types.IndexNode // The value of the item; arbitrary. - key UniqueID - addr *commonpb.Address + key UniqueID priority int // The priority of the item in the queue. // The index is needed by update and is maintained by the heap.Interface methods. @@ -71,12 +66,12 @@ func (pq *PriorityQueue) Pop() interface{} { return item } -func (pq *PriorityQueue) CheckAddressExist(addr *commonpb.Address) bool { +func (pq *PriorityQueue) CheckExist(nodeID UniqueID) bool { pq.lock.RLock() defer pq.lock.RUnlock() for _, item := range pq.items { - if CompareAddress(addr, item.addr) { + if nodeID == item.key { return true } } @@ -125,33 +120,24 @@ func (pq *PriorityQueue) Remove(key UniqueID) { } } -func (pq *PriorityQueue) Peek() interface{} { +// PeekClient picks an key with the lowest load. +func (pq *PriorityQueue) Peek() UniqueID { pq.lock.RLock() defer pq.lock.RUnlock() + if pq.Len() == 0 { - return nil + return UniqueID(-1) } - return pq.items[0] - //item := pq.items[0] - //return item.value + return pq.items[0].key } -// PeekClient picks an IndexNode with the lowest load. -func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode) { - item := pq.Peek() - if item == nil { - return UniqueID(-1), nil - } - return item.(*PQItem).key, item.(*PQItem).value -} - -func (pq *PriorityQueue) PeekAllClients() []types.IndexNode { +func (pq *PriorityQueue) PeekAll() []UniqueID { pq.lock.RLock() defer pq.lock.RUnlock() - var ret []types.IndexNode + var ret []UniqueID for _, item := range pq.items { - ret = append(ret, item.value) + ret = append(ret, item.key) } return ret diff --git a/internal/indexcoord/priority_queue_test.go b/internal/indexcoord/priority_queue_test.go index 0de9b29073..ad092fb1cc 100644 --- a/internal/indexcoord/priority_queue_test.go +++ b/internal/indexcoord/priority_queue_test.go @@ -24,7 +24,6 @@ func newPriorityQueue() *PriorityQueue { ret := &PriorityQueue{} for i := 0; i < QueueLen; i++ { item := &PQItem{ - value: nil, key: UniqueID(i), priority: i, index: i, @@ -72,14 +71,14 @@ func TestPriorityQueue_UpdatePriority(t *testing.T) { pq := newPriorityQueue() key := UniqueID(pq.Len() / 2) pq.UpdatePriority(key, -pq.Len()) - item := pq.Peek() - assert.Equal(t, key, item.(*PQItem).key) + peekKey := pq.Peek() + assert.Equal(t, key, peekKey) } func TestPriorityQueue_IncPriority(t *testing.T) { pq := newPriorityQueue() key := UniqueID(pq.Len() / 2) pq.IncPriority(key, -pq.Len()) - item := pq.Peek() - assert.Equal(t, key, item.(*PQItem).key) + peekKey := pq.Peek() + assert.Equal(t, key, peekKey) } diff --git a/internal/indexcoord/task.go b/internal/indexcoord/task.go index 72984c96a4..290b204a32 100644 --- a/internal/indexcoord/task.go +++ b/internal/indexcoord/task.go @@ -19,9 +19,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/types" ) const ( @@ -71,13 +69,9 @@ func (bt *BaseTask) Notify(err error) { type IndexAddTask struct { BaseTask - req *indexpb.BuildIndexRequest - indexBuildID UniqueID - idAllocator *allocator.GlobalIDAllocator - buildQueue TaskQueue - kv kv.BaseKV - builderClient types.IndexNode - buildClientNodeID UniqueID + req *indexpb.BuildIndexRequest + indexBuildID UniqueID + idAllocator *allocator.GlobalIDAllocator } func (it *IndexAddTask) Ctx() context.Context { diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 995d839210..7a5a11adf3 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -13,7 +13,6 @@ package indexnode import ( "context" - "errors" "io" "math/rand" "strconv" @@ -34,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -91,8 +89,6 @@ func (i *IndexNode) Register() error { } func (i *IndexNode) Init() error { - ctx := context.Background() - connectEtcdFn := func() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) @@ -104,40 +100,6 @@ func (i *IndexNode) Init() error { return err } log.Debug("IndexNode try connect etcd success") - log.Debug("IndexNode start to wait for IndexCoord ready") - - err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Debug("IndexNode wait for IndexCoord ready failed", zap.Error(err)) - return err - } - log.Debug("IndexNode report IndexCoord is ready") - request := &indexpb.RegisterNodeRequest{ - Base: nil, - Address: &commonpb.Address{ - Ip: Params.IP, - Port: int64(Params.Port), - }, - NodeID: i.session.ServerID, - } - - resp, err2 := i.serviceClient.RegisterNode(ctx, request) - if err2 != nil { - log.Debug("IndexNode RegisterNode failed", zap.Error(err2)) - return err2 - } - - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Debug("IndexNode RegisterNode failed", zap.String("Reason", resp.Status.Reason)) - return errors.New(resp.Status.Reason) - } - - err = Params.LoadConfigFromInitParams(resp.InitParams) - if err != nil { - log.Debug("IndexNode LoadConfigFromInitParams failed", zap.Error(err)) - return err - } - log.Debug("IndexNode LoadConfigFromInitParams success") option := &miniokv.Option{ Address: Params.MinIOAddress, diff --git a/internal/indexnode/paramtable.go b/internal/indexnode/paramtable.go index 3ac0f7fbac..8c927ea280 100644 --- a/internal/indexnode/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -12,20 +12,14 @@ package indexnode import ( - "bytes" "fmt" "path" "strconv" "strings" "sync" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/paramtable" - "github.com/spf13/cast" - "github.com/spf13/viper" ) const ( @@ -81,55 +75,6 @@ func (pt *ParamTable) initParams() { pt.initMetaRootPath() } -func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error { - pt.NodeID = initParams.NodeID - - config := viper.New() - config.SetConfigType("yaml") - for _, pair := range initParams.StartParams { - if pair.Key == StartParamsKey { - err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value))) - if err != nil { - return err - } - break - } - } - - for _, key := range config.AllKeys() { - val := config.Get(key) - str, err := cast.ToStringE(val) - if err != nil { - switch val := val.(type) { - case []interface{}: - str = str[:0] - for _, v := range val { - ss, err := cast.ToStringE(v) - if err != nil { - log.Debug("indexnode", zap.String("error", err.Error())) - } - if len(str) == 0 { - str = ss - } else { - str = str + "," + ss - } - } - - default: - log.Debug("indexnode", zap.String("undefine config type, key=", key)) - } - } - err = pt.Save(key, str) - if err != nil { - panic(err) - } - - } - - pt.initParams() - return nil -} - func (pt *ParamTable) initMinIOAddress() { ret, err := pt.Load("_MinioAddress") if err != nil { diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index b8536de301..6481fdeba4 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -12,7 +12,6 @@ service IndexCoord { rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {} rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {} rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} - rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){} rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {} rpc GetIndexFilePaths(GetIndexFilePathsRequest) returns (GetIndexFilePathsResponse){} diff --git a/internal/proto/indexpb/index_coord.pb.go b/internal/proto/indexpb/index_coord.pb.go index 7acd58590c..df24ee1617 100644 --- a/internal/proto/indexpb/index_coord.pb.go +++ b/internal/proto/indexpb/index_coord.pb.go @@ -809,68 +809,67 @@ func init() { func init() { proto.RegisterFile("index_coord.proto", fileDescriptor_f9e019eb3fda53c2) } var fileDescriptor_f9e019eb3fda53c2 = []byte{ - // 970 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0x4f, 0x6f, 0x1b, 0x45, - 0x14, 0xef, 0x7a, 0x1b, 0xff, 0x79, 0x36, 0x51, 0x33, 0x94, 0x6a, 0x71, 0xa9, 0xea, 0x2c, 0x05, - 0x0c, 0x6a, 0x9d, 0xca, 0xa5, 0x70, 0x42, 0x82, 0xc4, 0x22, 0xb2, 0x50, 0xab, 0x68, 0x1a, 0x71, - 0x40, 0x42, 0xd6, 0xc4, 0xfb, 0x92, 0x8c, 0xba, 0xff, 0xb2, 0x33, 0xae, 0xc8, 0x9d, 0x3b, 0x37, - 0x10, 0x1f, 0x04, 0xf1, 0x39, 0x38, 0x70, 0xe2, 0xcb, 0xa0, 0x99, 0x9d, 0xdd, 0xee, 0xae, 0xd7, - 0x89, 0x43, 0x0a, 0x27, 0x6e, 0xfb, 0xde, 0xbc, 0x37, 0xbf, 0x79, 0xbf, 0xf7, 0xe6, 0xb7, 0x03, - 0x5b, 0x3c, 0xf4, 0xf0, 0x87, 0xd9, 0x3c, 0x8a, 0x12, 0x6f, 0x14, 0x27, 0x91, 0x8c, 0x08, 0x09, - 0xb8, 0xff, 0x6a, 0x21, 0x52, 0x6b, 0xa4, 0xd7, 0xfb, 0xbd, 0x79, 0x14, 0x04, 0x51, 0x98, 0xfa, - 0xfa, 0x9b, 0x3c, 0x94, 0x98, 0x84, 0xcc, 0x37, 0x76, 0xaf, 0x98, 0xe1, 0xfe, 0x62, 0xc1, 0xdb, - 0x14, 0x4f, 0xb8, 0x90, 0x98, 0x3c, 0x8f, 0x3c, 0xa4, 0x78, 0xb6, 0x40, 0x21, 0xc9, 0x63, 0xb8, - 0x79, 0xc4, 0x04, 0x3a, 0xd6, 0xc0, 0x1a, 0x76, 0xc7, 0xef, 0x8d, 0x4a, 0x30, 0x66, 0xff, 0x67, - 0xe2, 0x64, 0x97, 0x09, 0xa4, 0x3a, 0x92, 0x7c, 0x06, 0x2d, 0xe6, 0x79, 0x09, 0x0a, 0xe1, 0x34, - 0x2e, 0x48, 0xfa, 0x2a, 0x8d, 0xa1, 0x59, 0x30, 0xb9, 0x03, 0xcd, 0x30, 0xf2, 0x70, 0x3a, 0x71, - 0xec, 0x81, 0x35, 0xb4, 0xa9, 0xb1, 0xdc, 0x9f, 0x2c, 0xb8, 0x5d, 0x3e, 0x99, 0x88, 0xa3, 0x50, - 0x20, 0x79, 0x02, 0x4d, 0x21, 0x99, 0x5c, 0x08, 0x73, 0xb8, 0xbb, 0xb5, 0x38, 0x2f, 0x74, 0x08, - 0x35, 0xa1, 0x64, 0x17, 0xba, 0x3c, 0xe4, 0x72, 0x16, 0xb3, 0x84, 0x05, 0xd9, 0x09, 0xb7, 0x47, - 0x15, 0xf6, 0x0c, 0x51, 0xd3, 0x90, 0xcb, 0x03, 0x1d, 0x48, 0x81, 0xe7, 0xdf, 0xee, 0x17, 0xf0, - 0xce, 0x3e, 0xca, 0xa9, 0xe2, 0x58, 0xed, 0x8e, 0x22, 0x23, 0xeb, 0x01, 0xbc, 0xa5, 0x99, 0xdf, - 0x5d, 0x70, 0xdf, 0x9b, 0x4e, 0xd4, 0xc1, 0xec, 0xa1, 0x4d, 0xcb, 0x4e, 0xf7, 0x77, 0x0b, 0x3a, - 0x3a, 0x79, 0x1a, 0x1e, 0x47, 0xe4, 0x29, 0x6c, 0xa8, 0xa3, 0xa5, 0x0c, 0x6f, 0x8e, 0xef, 0xd7, - 0x16, 0xf1, 0x1a, 0x8b, 0xa6, 0xd1, 0xc4, 0x85, 0x5e, 0x71, 0x57, 0x5d, 0x88, 0x4d, 0x4b, 0x3e, - 0xe2, 0x40, 0x4b, 0xdb, 0x39, 0xa5, 0x99, 0x49, 0xee, 0x01, 0xa4, 0x23, 0x14, 0xb2, 0x00, 0x9d, - 0x9b, 0x03, 0x6b, 0xd8, 0xa1, 0x1d, 0xed, 0x79, 0xce, 0x02, 0x54, 0xad, 0x48, 0x90, 0x89, 0x28, - 0x74, 0x36, 0xf4, 0x92, 0xb1, 0xdc, 0x1f, 0x2d, 0xb8, 0x53, 0xad, 0xfc, 0x3a, 0xcd, 0x78, 0x9a, - 0x26, 0xa1, 0xea, 0x83, 0x3d, 0xec, 0x8e, 0xef, 0x8d, 0x96, 0xa7, 0x78, 0x94, 0x53, 0x45, 0x4d, - 0xb0, 0xfb, 0x47, 0x03, 0xc8, 0x5e, 0x82, 0x4c, 0xa2, 0x5e, 0xcb, 0xd8, 0xaf, 0x52, 0x62, 0xd5, - 0x50, 0x52, 0x2e, 0xbc, 0x51, 0x2d, 0x7c, 0x35, 0x63, 0x0e, 0xb4, 0x5e, 0x61, 0x22, 0x78, 0x14, - 0x6a, 0xba, 0x6c, 0x9a, 0x99, 0xe4, 0x2e, 0x74, 0x02, 0x94, 0x6c, 0x16, 0x33, 0x79, 0x6a, 0xf8, - 0x6a, 0x2b, 0xc7, 0x01, 0x93, 0xa7, 0x0a, 0xcf, 0x63, 0x66, 0x51, 0x38, 0xcd, 0x81, 0xad, 0xf0, - 0x94, 0x47, 0xad, 0xea, 0x69, 0x94, 0xe7, 0x31, 0x66, 0xd3, 0xd8, 0xd2, 0x2c, 0x6c, 0xd7, 0x52, - 0xf7, 0x0d, 0x9e, 0x7f, 0xcb, 0xfc, 0x05, 0x1e, 0x30, 0x9e, 0x50, 0x50, 0x59, 0xe9, 0x34, 0x92, - 0x89, 0x29, 0x3b, 0xdb, 0xa4, 0xbd, 0xee, 0x26, 0x5d, 0x9d, 0x66, 0x66, 0xfa, 0xd7, 0x06, 0x6c, - 0xa5, 0x24, 0xfd, 0x67, 0x94, 0x96, 0xb9, 0xd9, 0xb8, 0x84, 0x9b, 0xe6, 0x9b, 0xe0, 0xa6, 0xf5, - 0x8f, 0xb8, 0x09, 0x80, 0x14, 0xa9, 0xb9, 0xce, 0xc4, 0xaf, 0x71, 0x6d, 0xdd, 0x2f, 0xc1, 0xc9, - 0x2e, 0xd9, 0xd7, 0xdc, 0x47, 0xcd, 0xc6, 0xd5, 0x14, 0xe6, 0x67, 0x0b, 0xb6, 0x4a, 0xf9, 0x5a, - 0x69, 0xfe, 0xad, 0x03, 0x93, 0x21, 0xdc, 0x4a, 0x59, 0x3e, 0xe6, 0x3e, 0x9a, 0x76, 0xda, 0xba, - 0x9d, 0x9b, 0xbc, 0x54, 0x85, 0x3a, 0xd8, 0xbb, 0x35, 0xb5, 0x5d, 0x87, 0xd1, 0x09, 0x40, 0x01, - 0x36, 0xd5, 0x91, 0x0f, 0x56, 0xea, 0x48, 0x91, 0x10, 0xda, 0x39, 0xce, 0x0f, 0xf6, 0x57, 0xc3, - 0x68, 0xf2, 0x33, 0x94, 0x6c, 0xad, 0xb1, 0xcf, 0x75, 0xbb, 0x71, 0x25, 0xdd, 0xbe, 0x0f, 0xdd, - 0x63, 0xc6, 0xfd, 0x99, 0xd1, 0x57, 0x5b, 0x5f, 0x17, 0x50, 0x2e, 0xaa, 0x3d, 0xe4, 0x73, 0xb0, - 0x13, 0x3c, 0xd3, 0x22, 0xb3, 0xa2, 0x90, 0xa5, 0x6b, 0x4a, 0x55, 0x46, 0x6d, 0x17, 0x36, 0xea, - 0xba, 0x40, 0xb6, 0xa1, 0x17, 0xb0, 0xe4, 0xe5, 0xcc, 0x43, 0x1f, 0x25, 0x7a, 0x4e, 0x73, 0x60, - 0x0d, 0xdb, 0xb4, 0xab, 0x7c, 0x93, 0xd4, 0x55, 0xf8, 0x19, 0xb7, 0x8a, 0x3f, 0xe3, 0xa2, 0x0c, - 0xb6, 0xcb, 0x32, 0xd8, 0x87, 0x76, 0x82, 0xf3, 0xf3, 0xb9, 0x8f, 0x9e, 0xd3, 0xd1, 0x1b, 0xe6, - 0xb6, 0xfb, 0x10, 0x6e, 0x4d, 0x92, 0x28, 0x2e, 0x49, 0x4b, 0x41, 0x17, 0xac, 0x92, 0x2e, 0x8c, - 0xff, 0x6c, 0x02, 0xe8, 0xd0, 0x3d, 0xf5, 0xbe, 0x21, 0x31, 0x90, 0x7d, 0x94, 0x7b, 0x51, 0x10, - 0x47, 0x21, 0x86, 0x32, 0xfd, 0xef, 0x90, 0xc7, 0x2b, 0x7e, 0xd9, 0xcb, 0xa1, 0x06, 0xb0, 0xff, - 0xe1, 0x8a, 0x8c, 0x4a, 0xb8, 0x7b, 0x83, 0x04, 0x1a, 0xf1, 0x90, 0x07, 0x78, 0xc8, 0xe7, 0x2f, - 0xf7, 0x4e, 0x59, 0x18, 0xa2, 0x7f, 0x11, 0x62, 0x25, 0x34, 0x43, 0x7c, 0xbf, 0x9c, 0x61, 0x8c, - 0x17, 0x32, 0xe1, 0xe1, 0x49, 0x36, 0xf4, 0xee, 0x0d, 0x72, 0x06, 0xb7, 0xf7, 0x51, 0xa3, 0x73, - 0x21, 0xf9, 0x5c, 0x64, 0x80, 0xe3, 0xd5, 0x80, 0x4b, 0xc1, 0x57, 0x84, 0x9c, 0x43, 0xaf, 0xf8, - 0xa4, 0x22, 0x1f, 0xd5, 0xcd, 0x59, 0xcd, 0x73, 0xb0, 0x3f, 0xbc, 0x3c, 0x30, 0x07, 0xf9, 0x1e, - 0xe0, 0xf5, 0xa8, 0x92, 0xf5, 0x46, 0x79, 0xb9, 0x4b, 0xd5, 0xb0, 0x7c, 0x7b, 0x0e, 0x9b, 0xe5, - 0xb7, 0x08, 0xf9, 0xb8, 0x2e, 0xb7, 0xf6, 0xa5, 0xd6, 0xff, 0x64, 0x9d, 0xd0, 0x1c, 0x2a, 0x81, - 0xad, 0x25, 0xd5, 0x22, 0x0f, 0x2f, 0xda, 0xa2, 0x2a, 0xdc, 0xfd, 0x47, 0x6b, 0x46, 0xe7, 0x98, - 0x07, 0xd0, 0xc9, 0xef, 0x0c, 0x79, 0x50, 0x97, 0x5d, 0xbd, 0x52, 0xfd, 0x8b, 0xf4, 0xd2, 0xbd, - 0x31, 0xfe, 0xcd, 0x36, 0x1a, 0xa7, 0x5b, 0xfe, 0xff, 0xb5, 0x7a, 0xf3, 0xd7, 0xea, 0x10, 0xba, - 0x85, 0x77, 0x29, 0xa9, 0x9d, 0xe5, 0xe5, 0x87, 0xeb, 0x25, 0x7d, 0xdb, 0xfd, 0xf4, 0xbb, 0xf1, - 0x09, 0x97, 0xa7, 0x8b, 0x23, 0xb5, 0xb2, 0x93, 0x86, 0x3e, 0xe2, 0x91, 0xf9, 0xda, 0xc9, 0x0a, - 0xd8, 0xd1, 0xd9, 0x3b, 0x1a, 0x25, 0x3e, 0x3a, 0x6a, 0x6a, 0xf3, 0xc9, 0xdf, 0x01, 0x00, 0x00, - 0xff, 0xff, 0x85, 0x95, 0xb2, 0x56, 0x2c, 0x0e, 0x00, 0x00, + // 952 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6e, 0x1c, 0x45, + 0x10, 0xf6, 0xec, 0x64, 0xff, 0x6a, 0x8d, 0x15, 0x37, 0x21, 0x1a, 0x36, 0x44, 0x59, 0x0f, 0x01, + 0x2d, 0x28, 0x59, 0x47, 0x1b, 0x02, 0x27, 0x24, 0xb0, 0x57, 0x58, 0x2b, 0x94, 0xc8, 0xea, 0x58, + 0x1c, 0x90, 0xd0, 0xaa, 0xbd, 0x53, 0xb6, 0x5b, 0x99, 0x3f, 0x4f, 0xf7, 0x46, 0xf8, 0xce, 0x9d, + 0x1b, 0x88, 0x07, 0x41, 0x3c, 0x07, 0x67, 0x5e, 0x82, 0x47, 0x40, 0xdd, 0xd3, 0x33, 0x99, 0x99, + 0x9d, 0x75, 0xd6, 0x98, 0x70, 0xca, 0x6d, 0xaa, 0xba, 0xaa, 0xbf, 0xae, 0xaf, 0x6a, 0xbe, 0x6e, + 0xd8, 0xe6, 0xa1, 0x87, 0x3f, 0xce, 0xe6, 0x51, 0x94, 0x78, 0xa3, 0x38, 0x89, 0x64, 0x44, 0x48, + 0xc0, 0xfd, 0x97, 0x0b, 0x91, 0x5a, 0x23, 0xbd, 0xde, 0xdf, 0x9c, 0x47, 0x41, 0x10, 0x85, 0xa9, + 0xaf, 0xbf, 0xc5, 0x43, 0x89, 0x49, 0xc8, 0x7c, 0x63, 0x6f, 0x16, 0x33, 0xdc, 0x5f, 0x2d, 0x78, + 0x97, 0xe2, 0x29, 0x17, 0x12, 0x93, 0x67, 0x91, 0x87, 0x14, 0xcf, 0x17, 0x28, 0x24, 0x79, 0x04, + 0x37, 0x8e, 0x99, 0x40, 0xc7, 0x1a, 0x58, 0xc3, 0xde, 0xf8, 0x83, 0x51, 0x09, 0xc6, 0xec, 0xff, + 0x54, 0x9c, 0xee, 0x31, 0x81, 0x54, 0x47, 0x92, 0xcf, 0xa1, 0xcd, 0x3c, 0x2f, 0x41, 0x21, 0x9c, + 0xc6, 0x25, 0x49, 0x5f, 0xa7, 0x31, 0x34, 0x0b, 0x26, 0xb7, 0xa1, 0x15, 0x46, 0x1e, 0x4e, 0x27, + 0x8e, 0x3d, 0xb0, 0x86, 0x36, 0x35, 0x96, 0xfb, 0xb3, 0x05, 0xb7, 0xca, 0x27, 0x13, 0x71, 0x14, + 0x0a, 0x24, 0x8f, 0xa1, 0x25, 0x24, 0x93, 0x0b, 0x61, 0x0e, 0x77, 0xa7, 0x16, 0xe7, 0xb9, 0x0e, + 0xa1, 0x26, 0x94, 0xec, 0x41, 0x8f, 0x87, 0x5c, 0xce, 0x62, 0x96, 0xb0, 0x20, 0x3b, 0xe1, 0xce, + 0xa8, 0xc2, 0x9e, 0x21, 0x6a, 0x1a, 0x72, 0x79, 0xa8, 0x03, 0x29, 0xf0, 0xfc, 0xdb, 0xfd, 0x12, + 0xde, 0x3b, 0x40, 0x39, 0x55, 0x1c, 0xab, 0xdd, 0x51, 0x64, 0x64, 0xdd, 0x87, 0x77, 0x34, 0xf3, + 0x7b, 0x0b, 0xee, 0x7b, 0xd3, 0x89, 0x3a, 0x98, 0x3d, 0xb4, 0x69, 0xd9, 0xe9, 0xfe, 0x61, 0x41, + 0x57, 0x27, 0x4f, 0xc3, 0x93, 0x88, 0x3c, 0x81, 0xa6, 0x3a, 0x5a, 0xca, 0xf0, 0xd6, 0xf8, 0x5e, + 0x6d, 0x11, 0xaf, 0xb0, 0x68, 0x1a, 0x4d, 0x5c, 0xd8, 0x2c, 0xee, 0xaa, 0x0b, 0xb1, 0x69, 0xc9, + 0x47, 0x1c, 0x68, 0x6b, 0x3b, 0xa7, 0x34, 0x33, 0xc9, 0x5d, 0x80, 0x74, 0x84, 0x42, 0x16, 0xa0, + 0x73, 0x63, 0x60, 0x0d, 0xbb, 0xb4, 0xab, 0x3d, 0xcf, 0x58, 0x80, 0xaa, 0x15, 0x09, 0x32, 0x11, + 0x85, 0x4e, 0x53, 0x2f, 0x19, 0xcb, 0xfd, 0xc9, 0x82, 0xdb, 0xd5, 0xca, 0xaf, 0xd3, 0x8c, 0x27, + 0x69, 0x12, 0xaa, 0x3e, 0xd8, 0xc3, 0xde, 0xf8, 0xee, 0x68, 0x79, 0x8a, 0x47, 0x39, 0x55, 0xd4, + 0x04, 0xbb, 0x7f, 0x36, 0x80, 0xec, 0x27, 0xc8, 0x24, 0xea, 0xb5, 0x8c, 0xfd, 0x2a, 0x25, 0x56, + 0x0d, 0x25, 0xe5, 0xc2, 0x1b, 0xd5, 0xc2, 0x57, 0x33, 0xe6, 0x40, 0xfb, 0x25, 0x26, 0x82, 0x47, + 0xa1, 0xa6, 0xcb, 0xa6, 0x99, 0x49, 0xee, 0x40, 0x37, 0x40, 0xc9, 0x66, 0x31, 0x93, 0x67, 0x86, + 0xaf, 0x8e, 0x72, 0x1c, 0x32, 0x79, 0xa6, 0xf0, 0x3c, 0x66, 0x16, 0x85, 0xd3, 0x1a, 0xd8, 0x0a, + 0x4f, 0x79, 0xd4, 0xaa, 0x9e, 0x46, 0x79, 0x11, 0x63, 0x36, 0x8d, 0x6d, 0xcd, 0xc2, 0x4e, 0x2d, + 0x75, 0xdf, 0xe2, 0xc5, 0x77, 0xcc, 0x5f, 0xe0, 0x21, 0xe3, 0x09, 0x05, 0x95, 0x95, 0x4e, 0x23, + 0x99, 0x98, 0xb2, 0xb3, 0x4d, 0x3a, 0xeb, 0x6e, 0xd2, 0xd3, 0x69, 0x66, 0xa6, 0x7f, 0x6b, 0xc0, + 0x76, 0x4a, 0xd2, 0xff, 0x46, 0x69, 0x99, 0x9b, 0xe6, 0x6b, 0xb8, 0x69, 0xfd, 0x17, 0xdc, 0xb4, + 0xff, 0x15, 0x37, 0x01, 0x90, 0x22, 0x35, 0xd7, 0x99, 0xf8, 0x35, 0x7e, 0x5b, 0xf7, 0x2b, 0x70, + 0xb2, 0x9f, 0xec, 0x1b, 0xee, 0xa3, 0x66, 0xe3, 0x6a, 0x0a, 0xf3, 0x8b, 0x05, 0xdb, 0xa5, 0x7c, + 0xad, 0x34, 0x6f, 0xea, 0xc0, 0x64, 0x08, 0x37, 0x53, 0x96, 0x4f, 0xb8, 0x8f, 0xa6, 0x9d, 0xb6, + 0x6e, 0xe7, 0x16, 0x2f, 0x55, 0xa1, 0x0e, 0xf6, 0x7e, 0x4d, 0x6d, 0xd7, 0x61, 0x74, 0x02, 0x50, + 0x80, 0x4d, 0x75, 0xe4, 0xa3, 0x95, 0x3a, 0x52, 0x24, 0x84, 0x76, 0x4f, 0xf2, 0x83, 0xfd, 0xd5, + 0x30, 0x9a, 0xfc, 0x14, 0x25, 0x5b, 0x6b, 0xec, 0x73, 0xdd, 0x6e, 0x5c, 0x49, 0xb7, 0xef, 0x41, + 0xef, 0x84, 0x71, 0x7f, 0x66, 0xf4, 0xd5, 0xd6, 0xbf, 0x0b, 0x28, 0x17, 0xd5, 0x1e, 0xf2, 0x05, + 0xd8, 0x09, 0x9e, 0x6b, 0x91, 0x59, 0x51, 0xc8, 0xd2, 0x6f, 0x4a, 0x55, 0x46, 0x6d, 0x17, 0x9a, + 0x75, 0x5d, 0x20, 0x3b, 0xb0, 0x19, 0xb0, 0xe4, 0xc5, 0xcc, 0x43, 0x1f, 0x25, 0x7a, 0x4e, 0x6b, + 0x60, 0x0d, 0x3b, 0xb4, 0xa7, 0x7c, 0x93, 0xd4, 0x55, 0xb8, 0x8c, 0xdb, 0xc5, 0xcb, 0xb8, 0x28, + 0x83, 0x9d, 0xb2, 0x0c, 0xf6, 0xa1, 0x93, 0xe0, 0xfc, 0x62, 0xee, 0xa3, 0xe7, 0x74, 0xf5, 0x86, + 0xb9, 0xed, 0x3e, 0x80, 0x9b, 0x93, 0x24, 0x8a, 0x4b, 0xd2, 0x52, 0xd0, 0x05, 0xab, 0xa4, 0x0b, + 0xe3, 0xbf, 0x9b, 0x00, 0x3a, 0x74, 0x5f, 0xbd, 0x6f, 0x48, 0x0c, 0xe4, 0x00, 0xe5, 0x7e, 0x14, + 0xc4, 0x51, 0x88, 0xa1, 0x4c, 0xef, 0x1d, 0xf2, 0x68, 0xc5, 0x95, 0xbd, 0x1c, 0x6a, 0x00, 0xfb, + 0x1f, 0xaf, 0xc8, 0xa8, 0x84, 0xbb, 0x1b, 0x24, 0xd0, 0x88, 0x47, 0x3c, 0xc0, 0x23, 0x3e, 0x7f, + 0xb1, 0x7f, 0xc6, 0xc2, 0x10, 0xfd, 0xcb, 0x10, 0x2b, 0xa1, 0x19, 0xe2, 0x87, 0xe5, 0x0c, 0x63, + 0x3c, 0x97, 0x09, 0x0f, 0x4f, 0xb3, 0xa1, 0x77, 0x37, 0xc8, 0x39, 0xdc, 0x3a, 0x40, 0x8d, 0xce, + 0x85, 0xe4, 0x73, 0x91, 0x01, 0x8e, 0x57, 0x03, 0x2e, 0x05, 0x5f, 0x11, 0xf2, 0x07, 0x80, 0x57, + 0x53, 0x44, 0xd6, 0x9b, 0xb2, 0x65, 0x02, 0xab, 0x61, 0xf9, 0xf6, 0x1c, 0xb6, 0xca, 0xcf, 0x04, + 0xf2, 0x49, 0x5d, 0x6e, 0xed, 0x23, 0xaa, 0xff, 0xe9, 0x3a, 0xa1, 0x39, 0x54, 0x02, 0xdb, 0x4b, + 0x82, 0x42, 0x1e, 0x5c, 0xb6, 0x45, 0x55, 0x53, 0xfb, 0x0f, 0xd7, 0x8c, 0xce, 0x31, 0x0f, 0xa1, + 0x9b, 0x8f, 0x33, 0xb9, 0x5f, 0x97, 0x5d, 0x9d, 0xf6, 0xfe, 0x65, 0x52, 0xe6, 0x6e, 0x8c, 0x7f, + 0xb7, 0x8d, 0xfc, 0xa8, 0x07, 0xee, 0xdb, 0x89, 0x7f, 0x03, 0x13, 0x7f, 0x04, 0xbd, 0xc2, 0x93, + 0x91, 0xd4, 0xce, 0xf2, 0xf2, 0x9b, 0xf2, 0x35, 0x7d, 0xdb, 0xfb, 0xec, 0xfb, 0xf1, 0x29, 0x97, + 0x67, 0x8b, 0x63, 0xb5, 0xb2, 0x9b, 0x86, 0x3e, 0xe4, 0x91, 0xf9, 0xda, 0xcd, 0x0a, 0xd8, 0xd5, + 0xd9, 0xbb, 0x1a, 0x25, 0x3e, 0x3e, 0x6e, 0x69, 0xf3, 0xf1, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, + 0xfc, 0x30, 0x24, 0x6f, 0xc7, 0x0d, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -888,7 +887,6 @@ type IndexCoordClient interface { GetComponentStates(ctx context.Context, in *internalpb.GetComponentStatesRequest, opts ...grpc.CallOption) (*internalpb.ComponentStates, error) GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) GetIndexStates(ctx context.Context, in *GetIndexStatesRequest, opts ...grpc.CallOption) (*GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, in *GetIndexFilePathsRequest, opts ...grpc.CallOption) (*GetIndexFilePathsResponse, error) @@ -930,15 +928,6 @@ func (c *indexCoordClient) GetStatisticsChannel(ctx context.Context, in *interna return out, nil } -func (c *indexCoordClient) RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) { - out := new(RegisterNodeResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexCoord/RegisterNode", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *indexCoordClient) BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) { out := new(BuildIndexResponse) err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexCoord/BuildIndex", in, out, opts...) @@ -980,7 +969,6 @@ type IndexCoordServer interface { GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) - RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error) BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error) GetIndexStates(context.Context, *GetIndexStatesRequest) (*GetIndexStatesResponse, error) GetIndexFilePaths(context.Context, *GetIndexFilePathsRequest) (*GetIndexFilePathsResponse, error) @@ -1000,9 +988,6 @@ func (*UnimplementedIndexCoordServer) GetTimeTickChannel(ctx context.Context, re func (*UnimplementedIndexCoordServer) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented") } -func (*UnimplementedIndexCoordServer) RegisterNode(ctx context.Context, req *RegisterNodeRequest) (*RegisterNodeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method RegisterNode not implemented") -} func (*UnimplementedIndexCoordServer) BuildIndex(ctx context.Context, req *BuildIndexRequest) (*BuildIndexResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method BuildIndex not implemented") } @@ -1074,24 +1059,6 @@ func _IndexCoord_GetStatisticsChannel_Handler(srv interface{}, ctx context.Conte return interceptor(ctx, in, info, handler) } -func _IndexCoord_RegisterNode_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(RegisterNodeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(IndexCoordServer).RegisterNode(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.index.IndexCoord/RegisterNode", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(IndexCoordServer).RegisterNode(ctx, req.(*RegisterNodeRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _IndexCoord_BuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BuildIndexRequest) if err := dec(in); err != nil { @@ -1180,10 +1147,6 @@ var _IndexCoord_serviceDesc = grpc.ServiceDesc{ MethodName: "GetStatisticsChannel", Handler: _IndexCoord_GetStatisticsChannel_Handler, }, - { - MethodName: "RegisterNode", - Handler: _IndexCoord_RegisterNode_Handler, - }, { MethodName: "BuildIndex", Handler: _IndexCoord_BuildIndex_Handler, diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index d80a3eae16..4fac61079d 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -175,8 +175,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas } log.Debug("ReleaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID)) - qc.meta.printMeta() - qc.cluster.printMeta() + //qc.meta.printMeta() + //qc.cluster.printMeta() return status, nil } @@ -336,8 +336,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas return status, err } log.Debug("ReleasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) - qc.meta.printMeta() - qc.cluster.printMeta() + //qc.meta.printMeta() + //qc.cluster.printMeta() return status, nil } diff --git a/internal/types/types.go b/internal/types/types.go index 165896b87a..fe707bfa3d 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -74,7 +74,6 @@ type IndexCoord interface { Component TimeTickProvider - RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 46c09bfaba..d35bd43b40 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -321,6 +321,7 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c } eventType = SessionDelEvent } + log.Debug("WatchService", zap.Any("event type", eventType)) eventCh <- &SessionEvent{ EventType: eventType, Session: session,