Index coord should not get all node from etcd each time (#12668)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/12744/head
Xiaofan 2021-12-04 11:39:34 +08:00 committed by GitHub
parent 1b1911791e
commit 0f5776e5fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 18 deletions

View File

@ -800,18 +800,7 @@ func (i *IndexCoord) assignTaskLoop() {
log.Debug("IndexCoord assignTaskLoop ctx Done")
return
case <-timeTicker.C:
sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole)
if err != nil {
log.Error("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err))
}
if len(sessions) <= 0 {
log.Warn("There is no IndexNode available as this time.")
break
}
var serverIDs []int64
for _, session := range sessions {
serverIDs = append(serverIDs, session.ServerID)
}
serverIDs := i.nodeManager.ListNode()
metas := i.metaTable.GetUnassignedTasks(serverIDs)
sort.Slice(metas, func(i, j int) bool {
return metas[i].indexMeta.Version <= metas[j].indexMeta.Version
@ -822,7 +811,7 @@ func (i *IndexCoord) assignTaskLoop() {
}
for index, meta := range metas {
indexBuildID := meta.indexMeta.IndexBuildID
if err = i.metaTable.UpdateVersion(indexBuildID); err != nil {
if err := i.metaTable.UpdateVersion(indexBuildID); err != nil {
log.Warn("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
continue
}
@ -847,12 +836,11 @@ func (i *IndexCoord) assignTaskLoop() {
log.Warn("IndexCoord assignTask assign task to IndexNode failed")
continue
}
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
if err := i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
log.Error("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
break
}
log.Debug("This task has been assigned", zap.Int64("indexBuildID", indexBuildID),
zap.Int64("The IndexNode execute this task", nodeID))
log.Debug("This task has been assigned successfully", zap.Int64("indexBuildID", indexBuildID), zap.Int64("nodeID", nodeID))
i.nodeManager.pq.IncPriority(nodeID, 1)
if index > i.taskLimit {
break

View File

@ -397,6 +397,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
}
}
if !alive {
log.Info("Reassign because node no longer alive", zap.Any("onlineID", onlineNodeIDs), zap.Int64("nodeID", meta.indexMeta.NodeID))
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
}
}

View File

@ -97,17 +97,26 @@ func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) {
nm.lock.Lock()
defer nm.lock.Unlock()
log.Debug("IndexCoord NodeManager PeekClient")
nodeID := nm.pq.Peek()
client, ok := nm.nodeClients[nodeID]
if !ok {
log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID))
return nodeID, nil
}
log.Debug("IndexCoord NodeManager PeekClient ", zap.Int64("node", nodeID))
return nodeID, client
}
func (nm *NodeManager) ListNode() []UniqueID {
nm.lock.Lock()
defer nm.lock.Unlock()
clients := []UniqueID{}
for id := range nm.nodeClients {
clients = append(clients, id)
}
return clients
}
type indexNodeGetMetricsResponse struct {
resp *milvuspb.GetMetricsResponse
err error