Refactor the logic of assign tasks in IndexCoord (#6328)

* Refactor the logic of assign tasks in IndexCoord

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bugs

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bug for unittest

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Add lock for map

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Improve code

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix unittest bug

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Reduce duriation for timetick

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Update orm version

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Reset sdk version

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bug

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Reset orm version

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Reset test ip

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bug

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bug for unissued

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Rename some variables

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Fix bug

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Use break instead of continue in select::case

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>
pull/6545/head
cai.zhang 2021-07-14 14:15:55 +08:00 committed by GitHub
parent 5ba96dc96f
commit f469a315d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 306 additions and 537 deletions

View File

@ -173,13 +173,6 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
return ret.(*milvuspb.StringResponse), err
}
func (c *Client) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.RegisterNode(ctx, req)
})
return ret.(*indexpb.RegisterNodeResponse), err
}
func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.BuildIndex(ctx, req)

View File

@ -128,10 +128,6 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt
return s.indexcoord.GetStatisticsChannel(ctx)
}
func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
return s.indexcoord.RegisterNode(ctx, req)
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
return s.indexcoord.BuildIndex(ctx, req)
}

View File

@ -15,8 +15,8 @@ import (
"context"
"errors"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -45,12 +45,11 @@ import (
const (
reqTimeoutInterval = time.Second * 10
durationInterval = time.Second * 10
recycleIndexLimit = 20
assignTaskInterval = time.Second * 3
taskLimit = 20
)
type IndexCoord struct {
nodeClients *PriorityQueue
nodeStates map[UniqueID]*internalpb.ComponentStates
stateCode atomic.Value
ID UniqueID
@ -64,15 +63,12 @@ type IndexCoord struct {
eventChan <-chan *sessionutil.SessionEvent
assignChan chan []UniqueID
idAllocator *allocator.GlobalIDAllocator
kv kv.BaseKV
metaTable *metaTable
nodeTasks *nodeTasks
nodeManager *NodeManager
nodeLock sync.RWMutex
@ -90,8 +86,6 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
i := &IndexCoord{
loopCtx: ctx1,
loopCancel: cancel,
nodeClients: &PriorityQueue{},
nodeTasks: &nodeTasks{},
}
i.UpdateStateCode(internalpb.StateCode_Abnormal)
return i, nil
@ -101,14 +95,12 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
func (i *IndexCoord) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
i.session.Init(typeutil.IndexCoordRole, Params.Address, true)
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0)
return nil
}
func (i *IndexCoord) Init() error {
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.assignChan = make(chan []UniqueID, 1024)
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
if err != nil {
@ -129,6 +121,25 @@ func (i *IndexCoord) Init() error {
return err
}
log.Debug("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager()
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Debug("IndexCoord", zap.Any("session number", len(sessions)), zap.Any("revision", revision))
if err != nil {
log.Debug("IndexCoord", zap.Any("Get IndexNode Sessions error", err))
}
for _, session := range sessions {
if err = i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Debug("IndexCoord", zap.Any("ServerID", session.ServerID),
zap.Any("Add IndexNode error", err))
}
}
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1)
nodeTasks := i.metaTable.GetNodeTaskStats()
for nodeID, taskNum := range nodeTasks {
i.nodeManager.pq.UpdatePriority(nodeID, taskNum)
}
//init idAllocator
kvRootPath := Params.KvRootPath
@ -168,13 +179,6 @@ func (i *IndexCoord) Init() error {
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load()))
i.nodeTasks = NewNodeTasks()
err = i.assignTasksServerStart()
if err != nil {
log.Debug("IndexCoord assign tasks server start failed", zap.Error(err))
return err
}
log.Debug("IndexCoord assign tasks server success", zap.Error(err))
return nil
}
@ -187,7 +191,7 @@ func (i *IndexCoord) Start() error {
go i.recycleUnusedIndexFiles()
i.loopWg.Add(1)
go i.assignmentTasksLoop()
go i.assignTaskLoop()
i.loopWg.Add(1)
go i.watchNodeLoop()
@ -233,6 +237,7 @@ func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.Compon
ErrorCode: commonpb.ErrorCode_Success,
},
}
log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo))
return ret, nil
}
@ -290,7 +295,6 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ
},
req: req,
idAllocator: i.idAllocator,
kv: i.kv,
}
var cancel func()
@ -320,7 +324,6 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ
ret.Status.Reason = err.Error()
return ret, nil
}
i.assignChan <- []UniqueID{t.indexBuildID}
ret.Status.ErrorCode = commonpb.ErrorCode_Success
ret.IndexBuildID = t.indexBuildID
return ret, nil
@ -447,7 +450,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
case <-ctx.Done():
return
case <-timeTicker.C:
metas := i.metaTable.GetUnusedIndexFiles(recycleIndexLimit)
metas := i.metaTable.GetUnusedIndexFiles(taskLimit)
for _, meta := range metas {
if meta.indexMeta.MarkDeleted {
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID))
@ -473,63 +476,6 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
}
}
func (i *IndexCoord) assignmentTasksLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexCoord start assignmentTasksLoop start")
for {
select {
case <-ctx.Done():
return
case indexBuildIDs := <-i.assignChan:
for _, indexBuildID := range indexBuildIDs {
meta := i.metaTable.GetIndexMeta(indexBuildID)
log.Debug("IndexCoord assignmentTasksLoop ", zap.Any("Meta", meta))
if meta.indexMeta.State == commonpb.IndexState_Finished {
continue
}
if err := i.metaTable.UpdateVersion(indexBuildID); err != nil {
log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
}
nodeID, builderClient := i.nodeClients.PeekClient()
if builderClient == nil {
log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode")
i.assignChan <- []UniqueID{indexBuildID}
continue
}
i.nodeTasks.assignTask(nodeID, indexBuildID)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID,
IndexName: meta.indexMeta.Req.IndexName,
IndexID: meta.indexMeta.Req.IndexID,
Version: meta.indexMeta.Version + 1,
MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10),
DataPaths: meta.indexMeta.Req.DataPaths,
TypeParams: meta.indexMeta.Req.TypeParams,
IndexParams: meta.indexMeta.Req.IndexParams,
}
resp, err := builderClient.CreateIndex(ctx, req)
if err != nil {
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
continue
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
continue
}
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
}
i.nodeClients.IncPriority(nodeID, 1)
}
}
}
}
func (i *IndexCoord) watchNodeLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
@ -542,18 +488,20 @@ func (i *IndexCoord) watchNodeLoop() {
case <-ctx.Done():
return
case event := <-i.eventChan:
log.Debug("IndexCoord watchNodeLoop event updated")
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID))
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Any("serverID", serverID), zap.Any("address", event.Session.Address))
err := i.nodeManager.AddNode(serverID, event.Session.Address)
if err != nil {
log.Debug("IndexCoord", zap.Any("Add IndexNode err", err))
}
log.Debug("IndexCoord", zap.Any("IndexNode number", len(i.nodeManager.nodeClients)))
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
i.removeNode(serverID)
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Any("serverID", serverID))
indexBuildIDs := i.nodeTasks.getTasksByNodeID(serverID)
log.Debug("IndexNode crashed", zap.Any("IndexNode ID", serverID), zap.Any("task IDs", indexBuildIDs))
i.assignChan <- indexBuildIDs
i.nodeTasks.delete(serverID)
i.nodeManager.RemoveNode(serverID)
}
}
}
@ -587,7 +535,7 @@ func (i *IndexCoord) watchMetaLoop() {
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
log.Debug("IndexCoord watchMetaLoop PUT", zap.Any("IndexBuildID", indexBuildID), zap.Any("reload", reload))
if reload {
i.nodeTasks.finishTask(indexBuildID)
i.nodeManager.pq.IncPriority(indexMeta.NodeID, -1)
}
case mvccpb.DELETE:
}
@ -596,38 +544,74 @@ func (i *IndexCoord) watchMetaLoop() {
}
}
func (i *IndexCoord) assignTasksServerStart() error {
func (i *IndexCoord) assignTaskLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
timeTicker := time.NewTicker(assignTaskInterval)
log.Debug("IndexCoord start assignTask loop")
for {
select {
case <-ctx.Done():
log.Debug("IndexCoord assignTaskLoop ctx Done")
return
case <-timeTicker.C:
sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole)
if err != nil {
return err
}
for _, session := range sessions {
addrs := strings.Split(session.Address, ":")
ip := addrs[0]
port, err := strconv.ParseInt(addrs[1], 10, 64)
if err != nil {
return err
}
req := &indexpb.RegisterNodeRequest{
Address: &commonpb.Address{
Ip: ip,
Port: port,
},
NodeID: session.ServerID,
}
if err = i.addNode(session.ServerID, req); err != nil {
log.Debug("IndexCoord", zap.Any("IndexCoord start find node fatal, err = ", err))
log.Debug("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err))
}
if len(sessions) <= 0 {
break
}
var serverIDs []int64
for _, session := range sessions {
serverIDs = append(serverIDs, session.ServerID)
}
tasks := i.metaTable.GetUnassignedTasks(serverIDs)
for _, taskQueue := range tasks {
i.assignChan <- taskQueue
metas := i.metaTable.GetUnassignedTasks(serverIDs)
sort.Slice(metas, func(i, j int) bool {
return metas[i].indexMeta.Version <= metas[j].indexMeta.Version
})
log.Debug("IndexCoord assignTaskLoop", zap.Any("Unassign tasks number", len(metas)))
for index, meta := range metas {
indexBuildID := meta.indexMeta.IndexBuildID
if err = i.metaTable.UpdateVersion(indexBuildID); err != nil {
log.Debug("IndexCoord assignmentTasksLoop metaTable.UpdateVersion failed", zap.Error(err))
}
nodeID, builderClient := i.nodeManager.PeekClient()
if builderClient == nil {
log.Debug("IndexCoord assignmentTasksLoop can not find available IndexNode")
break
}
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID,
IndexName: meta.indexMeta.Req.IndexName,
IndexID: meta.indexMeta.Req.IndexID,
Version: meta.indexMeta.Version + 1,
MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10),
DataPaths: meta.indexMeta.Req.DataPaths,
TypeParams: meta.indexMeta.Req.TypeParams,
IndexParams: meta.indexMeta.Req.IndexParams,
}
resp, err := builderClient.CreateIndex(ctx, req)
if err != nil {
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
continue
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Debug("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
continue
}
if err = i.metaTable.BuildIndex(indexBuildID, nodeID); err != nil {
log.Debug("IndexCoord assignmentTasksLoop metaTable.BuildIndex failed", zap.Error(err))
}
i.nodeManager.pq.IncPriority(nodeID, 1)
if index > taskLimit {
break
}
}
}
}
return nil
}

View File

@ -361,28 +361,33 @@ func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta {
return meta
}
func (mt *metaTable) GetUnassignedTasks(nodeIDs []int64) [][]UniqueID {
var tasks [][]UniqueID
var indexBuildIDs []UniqueID
func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
mt.lock.RLock()
defer mt.lock.RUnlock()
for indexBuildID, meta := range mt.indexBuildID2Meta {
var metas []Meta
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.State == commonpb.IndexState_Unissued {
metas = append(metas, meta)
continue
}
if meta.indexMeta.State == commonpb.IndexState_Finished || meta.indexMeta.State == commonpb.IndexState_Failed {
continue
}
alive := false
for _, serverID := range nodeIDs {
for _, serverID := range onlineNodeIDs {
if meta.indexMeta.NodeID == serverID {
alive = true
break
}
}
if !alive {
indexBuildIDs = append(indexBuildIDs, indexBuildID)
}
if len(indexBuildIDs) >= 10 {
tasks = append(tasks, indexBuildIDs)
indexBuildIDs = []UniqueID{}
metas = append(metas, meta)
}
}
tasks = append(tasks, indexBuildIDs)
return tasks
return metas
}
func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) {
@ -477,67 +482,16 @@ func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
return true
}
type nodeTasks struct {
nodeID2Tasks map[int64][]UniqueID
func (mt *metaTable) GetNodeTaskStats() map[UniqueID]int {
mt.lock.RLock()
defer mt.lock.RUnlock()
lock sync.RWMutex
}
func NewNodeTasks() *nodeTasks {
return &nodeTasks{
nodeID2Tasks: map[int64][]UniqueID{},
log.Debug("IndexCoord MetaTable GetPriorityForNodeID")
nodePriority := make(map[UniqueID]int)
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.State == commonpb.IndexState_InProgress {
nodePriority[meta.indexMeta.NodeID]++
}
}
func (nt *nodeTasks) getTasksByNodeID(nodeID int64) []UniqueID {
nt.lock.Lock()
defer nt.lock.Unlock()
indexBuildIDs, ok := nt.nodeID2Tasks[nodeID]
if !ok {
return nil
}
return indexBuildIDs
}
func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) {
nt.lock.Lock()
defer nt.lock.Unlock()
indexBuildIDs, ok := nt.nodeID2Tasks[serverID]
if !ok {
var IDs []UniqueID
IDs = append(IDs, indexBuildID)
nt.nodeID2Tasks[serverID] = IDs
return
}
indexBuildIDs = append(indexBuildIDs, indexBuildID)
nt.nodeID2Tasks[serverID] = indexBuildIDs
}
func (nt *nodeTasks) finishTask(indexBuildID UniqueID) {
nt.lock.Lock()
defer nt.lock.Unlock()
removed := false
for serverID, taskIDs := range nt.nodeID2Tasks {
for i := 0; i < len(taskIDs); i++ {
if indexBuildID == taskIDs[i] {
taskIDs = append(taskIDs[:i], taskIDs[i+1:]...)
removed = true
break
}
}
if removed {
nt.nodeID2Tasks[serverID] = taskIDs
break
}
}
}
func (nt *nodeTasks) delete(serverID int64) {
nt.lock.Lock()
defer nt.lock.Unlock()
delete(nt.nodeID2Tasks, serverID)
return nodePriority
}

View File

@ -0,0 +1,88 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexcoord
import (
"context"
"sync"
grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
)
type NodeManager struct {
nodeClients map[UniqueID]types.IndexNode
pq *PriorityQueue
lock sync.RWMutex
}
func NewNodeManager() *NodeManager {
return &NodeManager{
nodeClients: make(map[UniqueID]types.IndexNode),
pq: &PriorityQueue{},
lock: sync.RWMutex{},
}
}
func (nm *NodeManager) RemoveNode(nodeID UniqueID) {
nm.lock.Lock()
defer nm.lock.Unlock()
log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID))
delete(nm.nodeClients, nodeID)
nm.pq.Remove(nodeID)
}
func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
nm.lock.Lock()
defer nm.lock.Unlock()
log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", address))
if nm.pq.CheckExist(nodeID) {
log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
return nil
}
nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), address)
if err != nil {
return err
}
err = nodeClient.Init()
if err != nil {
return err
}
item := &PQItem{
key: nodeID,
priority: 0,
}
nm.nodeClients[nodeID] = nodeClient
nm.pq.Push(item)
return nil
}
func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) {
nm.lock.Lock()
defer nm.lock.Unlock()
log.Debug("IndexCoord NodeManager PeekClient")
nodeID := nm.pq.Peek()
client, ok := nm.nodeClients[nodeID]
if !ok {
log.Error("IndexCoord NodeManager PeekClient", zap.Any("There is no IndexNode client corresponding to NodeID", nodeID))
return nodeID, nil
}
return nodeID, client
}

View File

@ -1,94 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexcoord
import (
"context"
"strconv"
"go.uber.org/zap"
grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func (i *IndexCoord) removeNode(nodeID UniqueID) {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
log.Debug("IndexCoord", zap.Any("Remove node with ID", nodeID))
i.nodeClients.Remove(nodeID)
}
func (i *IndexCoord) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
log.Debug("IndexCoord addNode", zap.Any("nodeID", nodeID), zap.Any("node address", req.Address))
if i.nodeClients.CheckAddressExist(req.Address) {
log.Debug("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
return nil
}
nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), nodeAddress)
if err != nil {
return err
}
err = nodeClient.Init()
if err != nil {
return err
}
item := &PQItem{
value: nodeClient,
key: nodeID,
addr: req.Address,
priority: 0,
}
i.nodeClients.Push(item)
return nil
}
func (i *IndexCoord) prepareNodeInitParams() []*commonpb.KeyValuePair {
var params []*commonpb.KeyValuePair
params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress})
params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID})
params = append(params, &commonpb.KeyValuePair{Key: "minio.secretAccessKey", Value: Params.MinIOSecretAccessKey})
params = append(params, &commonpb.KeyValuePair{Key: "minio.useSSL", Value: strconv.FormatBool(Params.MinIOUseSSL)})
params = append(params, &commonpb.KeyValuePair{Key: "minio.bucketName", Value: Params.MinioBucketName})
return params
}
func (i *IndexCoord) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
log.Debug("indexcoord", zap.Any("register index node, node address = ", req.Address), zap.Any("node ID = ", req.NodeID))
ret := &indexpb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
err := i.addNode(req.NodeID, req)
if err != nil {
ret.Status.Reason = err.Error()
return ret, nil
}
ret.Status.ErrorCode = commonpb.ErrorCode_Success
params := i.prepareNodeInitParams()
ret.InitParams = &internalpb.InitParams{
NodeID: req.NodeID,
StartParams: params,
}
return ret, nil
}

View File

@ -14,16 +14,11 @@ package indexcoord
import (
"container/heap"
"sync"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/types"
)
// An Item is something we manage in a priority queue.
type PQItem struct {
value types.IndexNode // The value of the item; arbitrary.
key UniqueID
addr *commonpb.Address
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
@ -71,12 +66,12 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}
func (pq *PriorityQueue) CheckAddressExist(addr *commonpb.Address) bool {
func (pq *PriorityQueue) CheckExist(nodeID UniqueID) bool {
pq.lock.RLock()
defer pq.lock.RUnlock()
for _, item := range pq.items {
if CompareAddress(addr, item.addr) {
if nodeID == item.key {
return true
}
}
@ -125,33 +120,24 @@ func (pq *PriorityQueue) Remove(key UniqueID) {
}
}
func (pq *PriorityQueue) Peek() interface{} {
// PeekClient picks an key with the lowest load.
func (pq *PriorityQueue) Peek() UniqueID {
pq.lock.RLock()
defer pq.lock.RUnlock()
if pq.Len() == 0 {
return nil
return UniqueID(-1)
}
return pq.items[0]
//item := pq.items[0]
//return item.value
return pq.items[0].key
}
// PeekClient picks an IndexNode with the lowest load.
func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode) {
item := pq.Peek()
if item == nil {
return UniqueID(-1), nil
}
return item.(*PQItem).key, item.(*PQItem).value
}
func (pq *PriorityQueue) PeekAllClients() []types.IndexNode {
func (pq *PriorityQueue) PeekAll() []UniqueID {
pq.lock.RLock()
defer pq.lock.RUnlock()
var ret []types.IndexNode
var ret []UniqueID
for _, item := range pq.items {
ret = append(ret, item.value)
ret = append(ret, item.key)
}
return ret

View File

@ -24,7 +24,6 @@ func newPriorityQueue() *PriorityQueue {
ret := &PriorityQueue{}
for i := 0; i < QueueLen; i++ {
item := &PQItem{
value: nil,
key: UniqueID(i),
priority: i,
index: i,
@ -72,14 +71,14 @@ func TestPriorityQueue_UpdatePriority(t *testing.T) {
pq := newPriorityQueue()
key := UniqueID(pq.Len() / 2)
pq.UpdatePriority(key, -pq.Len())
item := pq.Peek()
assert.Equal(t, key, item.(*PQItem).key)
peekKey := pq.Peek()
assert.Equal(t, key, peekKey)
}
func TestPriorityQueue_IncPriority(t *testing.T) {
pq := newPriorityQueue()
key := UniqueID(pq.Len() / 2)
pq.IncPriority(key, -pq.Len())
item := pq.Peek()
assert.Equal(t, key, item.(*PQItem).key)
peekKey := pq.Peek()
assert.Equal(t, key, peekKey)
}

View File

@ -19,9 +19,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
)
const (
@ -74,10 +72,6 @@ type IndexAddTask struct {
req *indexpb.BuildIndexRequest
indexBuildID UniqueID
idAllocator *allocator.GlobalIDAllocator
buildQueue TaskQueue
kv kv.BaseKV
builderClient types.IndexNode
buildClientNodeID UniqueID
}
func (it *IndexAddTask) Ctx() context.Context {

View File

@ -13,7 +13,6 @@ package indexnode
import (
"context"
"errors"
"io"
"math/rand"
"strconv"
@ -34,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -91,8 +89,6 @@ func (i *IndexNode) Register() error {
}
func (i *IndexNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
@ -104,40 +100,6 @@ func (i *IndexNode) Init() error {
return err
}
log.Debug("IndexNode try connect etcd success")
log.Debug("IndexNode start to wait for IndexCoord ready")
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("IndexNode wait for IndexCoord ready failed", zap.Error(err))
return err
}
log.Debug("IndexNode report IndexCoord is ready")
request := &indexpb.RegisterNodeRequest{
Base: nil,
Address: &commonpb.Address{
Ip: Params.IP,
Port: int64(Params.Port),
},
NodeID: i.session.ServerID,
}
resp, err2 := i.serviceClient.RegisterNode(ctx, request)
if err2 != nil {
log.Debug("IndexNode RegisterNode failed", zap.Error(err2))
return err2
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Debug("IndexNode RegisterNode failed", zap.String("Reason", resp.Status.Reason))
return errors.New(resp.Status.Reason)
}
err = Params.LoadConfigFromInitParams(resp.InitParams)
if err != nil {
log.Debug("IndexNode LoadConfigFromInitParams failed", zap.Error(err))
return err
}
log.Debug("IndexNode LoadConfigFromInitParams success")
option := &miniokv.Option{
Address: Params.MinIOAddress,

View File

@ -12,20 +12,14 @@
package indexnode
import (
"bytes"
"fmt"
"path"
"strconv"
"strings"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/spf13/cast"
"github.com/spf13/viper"
)
const (
@ -81,55 +75,6 @@ func (pt *ParamTable) initParams() {
pt.initMetaRootPath()
}
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error {
pt.NodeID = initParams.NodeID
config := viper.New()
config.SetConfigType("yaml")
for _, pair := range initParams.StartParams {
if pair.Key == StartParamsKey {
err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value)))
if err != nil {
return err
}
break
}
}
for _, key := range config.AllKeys() {
val := config.Get(key)
str, err := cast.ToStringE(val)
if err != nil {
switch val := val.(type) {
case []interface{}:
str = str[:0]
for _, v := range val {
ss, err := cast.ToStringE(v)
if err != nil {
log.Debug("indexnode", zap.String("error", err.Error()))
}
if len(str) == 0 {
str = ss
} else {
str = str + "," + ss
}
}
default:
log.Debug("indexnode", zap.String("undefine config type, key=", key))
}
}
err = pt.Save(key, str)
if err != nil {
panic(err)
}
}
pt.initParams()
return nil
}
func (pt *ParamTable) initMinIOAddress() {
ret, err := pt.Load("_MinioAddress")
if err != nil {

View File

@ -12,7 +12,6 @@ service IndexCoord {
rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {}
rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){}
rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {}
rpc GetIndexFilePaths(GetIndexFilePathsRequest) returns (GetIndexFilePathsResponse){}

View File

@ -809,68 +809,67 @@ func init() {
func init() { proto.RegisterFile("index_coord.proto", fileDescriptor_f9e019eb3fda53c2) }
var fileDescriptor_f9e019eb3fda53c2 = []byte{
// 970 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xef, 0x7a, 0x1b, 0xff, 0x79, 0x36, 0x51, 0x33, 0x94, 0x6a, 0x71, 0xa9, 0xea, 0x2c, 0x05,
0x0c, 0x6a, 0x9d, 0xca, 0xa5, 0x70, 0x42, 0x82, 0xc4, 0x22, 0xb2, 0x50, 0xab, 0x68, 0x1a, 0x71,
0x40, 0x42, 0xd6, 0xc4, 0xfb, 0x92, 0x8c, 0xba, 0xff, 0xb2, 0x33, 0xae, 0xc8, 0x9d, 0x3b, 0x37,
0x10, 0x1f, 0x04, 0xf1, 0x39, 0x38, 0x70, 0xe2, 0xcb, 0xa0, 0x99, 0x9d, 0xdd, 0xee, 0xae, 0xd7,
0x89, 0x43, 0x0a, 0x27, 0x6e, 0xfb, 0xde, 0xbc, 0x37, 0xbf, 0x79, 0xbf, 0xf7, 0xe6, 0xb7, 0x03,
0x5b, 0x3c, 0xf4, 0xf0, 0x87, 0xd9, 0x3c, 0x8a, 0x12, 0x6f, 0x14, 0x27, 0x91, 0x8c, 0x08, 0x09,
0xb8, 0xff, 0x6a, 0x21, 0x52, 0x6b, 0xa4, 0xd7, 0xfb, 0xbd, 0x79, 0x14, 0x04, 0x51, 0x98, 0xfa,
0xfa, 0x9b, 0x3c, 0x94, 0x98, 0x84, 0xcc, 0x37, 0x76, 0xaf, 0x98, 0xe1, 0xfe, 0x62, 0xc1, 0xdb,
0x14, 0x4f, 0xb8, 0x90, 0x98, 0x3c, 0x8f, 0x3c, 0xa4, 0x78, 0xb6, 0x40, 0x21, 0xc9, 0x63, 0xb8,
0x79, 0xc4, 0x04, 0x3a, 0xd6, 0xc0, 0x1a, 0x76, 0xc7, 0xef, 0x8d, 0x4a, 0x30, 0x66, 0xff, 0x67,
0xe2, 0x64, 0x97, 0x09, 0xa4, 0x3a, 0x92, 0x7c, 0x06, 0x2d, 0xe6, 0x79, 0x09, 0x0a, 0xe1, 0x34,
0x2e, 0x48, 0xfa, 0x2a, 0x8d, 0xa1, 0x59, 0x30, 0xb9, 0x03, 0xcd, 0x30, 0xf2, 0x70, 0x3a, 0x71,
0xec, 0x81, 0x35, 0xb4, 0xa9, 0xb1, 0xdc, 0x9f, 0x2c, 0xb8, 0x5d, 0x3e, 0x99, 0x88, 0xa3, 0x50,
0x20, 0x79, 0x02, 0x4d, 0x21, 0x99, 0x5c, 0x08, 0x73, 0xb8, 0xbb, 0xb5, 0x38, 0x2f, 0x74, 0x08,
0x35, 0xa1, 0x64, 0x17, 0xba, 0x3c, 0xe4, 0x72, 0x16, 0xb3, 0x84, 0x05, 0xd9, 0x09, 0xb7, 0x47,
0x15, 0xf6, 0x0c, 0x51, 0xd3, 0x90, 0xcb, 0x03, 0x1d, 0x48, 0x81, 0xe7, 0xdf, 0xee, 0x17, 0xf0,
0xce, 0x3e, 0xca, 0xa9, 0xe2, 0x58, 0xed, 0x8e, 0x22, 0x23, 0xeb, 0x01, 0xbc, 0xa5, 0x99, 0xdf,
0x5d, 0x70, 0xdf, 0x9b, 0x4e, 0xd4, 0xc1, 0xec, 0xa1, 0x4d, 0xcb, 0x4e, 0xf7, 0x77, 0x0b, 0x3a,
0x3a, 0x79, 0x1a, 0x1e, 0x47, 0xe4, 0x29, 0x6c, 0xa8, 0xa3, 0xa5, 0x0c, 0x6f, 0x8e, 0xef, 0xd7,
0x16, 0xf1, 0x1a, 0x8b, 0xa6, 0xd1, 0xc4, 0x85, 0x5e, 0x71, 0x57, 0x5d, 0x88, 0x4d, 0x4b, 0x3e,
0xe2, 0x40, 0x4b, 0xdb, 0x39, 0xa5, 0x99, 0x49, 0xee, 0x01, 0xa4, 0x23, 0x14, 0xb2, 0x00, 0x9d,
0x9b, 0x03, 0x6b, 0xd8, 0xa1, 0x1d, 0xed, 0x79, 0xce, 0x02, 0x54, 0xad, 0x48, 0x90, 0x89, 0x28,
0x74, 0x36, 0xf4, 0x92, 0xb1, 0xdc, 0x1f, 0x2d, 0xb8, 0x53, 0xad, 0xfc, 0x3a, 0xcd, 0x78, 0x9a,
0x26, 0xa1, 0xea, 0x83, 0x3d, 0xec, 0x8e, 0xef, 0x8d, 0x96, 0xa7, 0x78, 0x94, 0x53, 0x45, 0x4d,
0xb0, 0xfb, 0x47, 0x03, 0xc8, 0x5e, 0x82, 0x4c, 0xa2, 0x5e, 0xcb, 0xd8, 0xaf, 0x52, 0x62, 0xd5,
0x50, 0x52, 0x2e, 0xbc, 0x51, 0x2d, 0x7c, 0x35, 0x63, 0x0e, 0xb4, 0x5e, 0x61, 0x22, 0x78, 0x14,
0x6a, 0xba, 0x6c, 0x9a, 0x99, 0xe4, 0x2e, 0x74, 0x02, 0x94, 0x6c, 0x16, 0x33, 0x79, 0x6a, 0xf8,
0x6a, 0x2b, 0xc7, 0x01, 0x93, 0xa7, 0x0a, 0xcf, 0x63, 0x66, 0x51, 0x38, 0xcd, 0x81, 0xad, 0xf0,
0x94, 0x47, 0xad, 0xea, 0x69, 0x94, 0xe7, 0x31, 0x66, 0xd3, 0xd8, 0xd2, 0x2c, 0x6c, 0xd7, 0x52,
0xf7, 0x0d, 0x9e, 0x7f, 0xcb, 0xfc, 0x05, 0x1e, 0x30, 0x9e, 0x50, 0x50, 0x59, 0xe9, 0x34, 0x92,
0x89, 0x29, 0x3b, 0xdb, 0xa4, 0xbd, 0xee, 0x26, 0x5d, 0x9d, 0x66, 0x66, 0xfa, 0xd7, 0x06, 0x6c,
0xa5, 0x24, 0xfd, 0x67, 0x94, 0x96, 0xb9, 0xd9, 0xb8, 0x84, 0x9b, 0xe6, 0x9b, 0xe0, 0xa6, 0xf5,
0x8f, 0xb8, 0x09, 0x80, 0x14, 0xa9, 0xb9, 0xce, 0xc4, 0xaf, 0x71, 0x6d, 0xdd, 0x2f, 0xc1, 0xc9,
0x2e, 0xd9, 0xd7, 0xdc, 0x47, 0xcd, 0xc6, 0xd5, 0x14, 0xe6, 0x67, 0x0b, 0xb6, 0x4a, 0xf9, 0x5a,
0x69, 0xfe, 0xad, 0x03, 0x93, 0x21, 0xdc, 0x4a, 0x59, 0x3e, 0xe6, 0x3e, 0x9a, 0x76, 0xda, 0xba,
0x9d, 0x9b, 0xbc, 0x54, 0x85, 0x3a, 0xd8, 0xbb, 0x35, 0xb5, 0x5d, 0x87, 0xd1, 0x09, 0x40, 0x01,
0x36, 0xd5, 0x91, 0x0f, 0x56, 0xea, 0x48, 0x91, 0x10, 0xda, 0x39, 0xce, 0x0f, 0xf6, 0x57, 0xc3,
0x68, 0xf2, 0x33, 0x94, 0x6c, 0xad, 0xb1, 0xcf, 0x75, 0xbb, 0x71, 0x25, 0xdd, 0xbe, 0x0f, 0xdd,
0x63, 0xc6, 0xfd, 0x99, 0xd1, 0x57, 0x5b, 0x5f, 0x17, 0x50, 0x2e, 0xaa, 0x3d, 0xe4, 0x73, 0xb0,
0x13, 0x3c, 0xd3, 0x22, 0xb3, 0xa2, 0x90, 0xa5, 0x6b, 0x4a, 0x55, 0x46, 0x6d, 0x17, 0x36, 0xea,
0xba, 0x40, 0xb6, 0xa1, 0x17, 0xb0, 0xe4, 0xe5, 0xcc, 0x43, 0x1f, 0x25, 0x7a, 0x4e, 0x73, 0x60,
0x0d, 0xdb, 0xb4, 0xab, 0x7c, 0x93, 0xd4, 0x55, 0xf8, 0x19, 0xb7, 0x8a, 0x3f, 0xe3, 0xa2, 0x0c,
0xb6, 0xcb, 0x32, 0xd8, 0x87, 0x76, 0x82, 0xf3, 0xf3, 0xb9, 0x8f, 0x9e, 0xd3, 0xd1, 0x1b, 0xe6,
0xb6, 0xfb, 0x10, 0x6e, 0x4d, 0x92, 0x28, 0x2e, 0x49, 0x4b, 0x41, 0x17, 0xac, 0x92, 0x2e, 0x8c,
0xff, 0x6c, 0x02, 0xe8, 0xd0, 0x3d, 0xf5, 0xbe, 0x21, 0x31, 0x90, 0x7d, 0x94, 0x7b, 0x51, 0x10,
0x47, 0x21, 0x86, 0x32, 0xfd, 0xef, 0x90, 0xc7, 0x2b, 0x7e, 0xd9, 0xcb, 0xa1, 0x06, 0xb0, 0xff,
0xe1, 0x8a, 0x8c, 0x4a, 0xb8, 0x7b, 0x83, 0x04, 0x1a, 0xf1, 0x90, 0x07, 0x78, 0xc8, 0xe7, 0x2f,
0xf7, 0x4e, 0x59, 0x18, 0xa2, 0x7f, 0x11, 0x62, 0x25, 0x34, 0x43, 0x7c, 0xbf, 0x9c, 0x61, 0x8c,
0x17, 0x32, 0xe1, 0xe1, 0x49, 0x36, 0xf4, 0xee, 0x0d, 0x72, 0x06, 0xb7, 0xf7, 0x51, 0xa3, 0x73,
0x21, 0xf9, 0x5c, 0x64, 0x80, 0xe3, 0xd5, 0x80, 0x4b, 0xc1, 0x57, 0x84, 0x9c, 0x43, 0xaf, 0xf8,
0xa4, 0x22, 0x1f, 0xd5, 0xcd, 0x59, 0xcd, 0x73, 0xb0, 0x3f, 0xbc, 0x3c, 0x30, 0x07, 0xf9, 0x1e,
0xe0, 0xf5, 0xa8, 0x92, 0xf5, 0x46, 0x79, 0xb9, 0x4b, 0xd5, 0xb0, 0x7c, 0x7b, 0x0e, 0x9b, 0xe5,
0xb7, 0x08, 0xf9, 0xb8, 0x2e, 0xb7, 0xf6, 0xa5, 0xd6, 0xff, 0x64, 0x9d, 0xd0, 0x1c, 0x2a, 0x81,
0xad, 0x25, 0xd5, 0x22, 0x0f, 0x2f, 0xda, 0xa2, 0x2a, 0xdc, 0xfd, 0x47, 0x6b, 0x46, 0xe7, 0x98,
0x07, 0xd0, 0xc9, 0xef, 0x0c, 0x79, 0x50, 0x97, 0x5d, 0xbd, 0x52, 0xfd, 0x8b, 0xf4, 0xd2, 0xbd,
0x31, 0xfe, 0xcd, 0x36, 0x1a, 0xa7, 0x5b, 0xfe, 0xff, 0xb5, 0x7a, 0xf3, 0xd7, 0xea, 0x10, 0xba,
0x85, 0x77, 0x29, 0xa9, 0x9d, 0xe5, 0xe5, 0x87, 0xeb, 0x25, 0x7d, 0xdb, 0xfd, 0xf4, 0xbb, 0xf1,
0x09, 0x97, 0xa7, 0x8b, 0x23, 0xb5, 0xb2, 0x93, 0x86, 0x3e, 0xe2, 0x91, 0xf9, 0xda, 0xc9, 0x0a,
0xd8, 0xd1, 0xd9, 0x3b, 0x1a, 0x25, 0x3e, 0x3a, 0x6a, 0x6a, 0xf3, 0xc9, 0xdf, 0x01, 0x00, 0x00,
0xff, 0xff, 0x85, 0x95, 0xb2, 0x56, 0x2c, 0x0e, 0x00, 0x00,
// 952 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6e, 0x1c, 0x45,
0x10, 0xf6, 0xec, 0x64, 0xff, 0x6a, 0x8d, 0x15, 0x37, 0x21, 0x1a, 0x36, 0x44, 0x59, 0x0f, 0x01,
0x2d, 0x28, 0x59, 0x47, 0x1b, 0x02, 0x27, 0x24, 0xb0, 0x57, 0x58, 0x2b, 0x94, 0xc8, 0xea, 0x58,
0x1c, 0x90, 0xd0, 0xaa, 0xbd, 0x53, 0xb6, 0x5b, 0x99, 0x3f, 0x4f, 0xf7, 0x46, 0xf8, 0xce, 0x9d,
0x1b, 0x88, 0x07, 0x41, 0x3c, 0x07, 0x67, 0x5e, 0x82, 0x47, 0x40, 0xdd, 0xd3, 0x33, 0x99, 0x99,
0x9d, 0x75, 0xd6, 0x98, 0x70, 0xca, 0x6d, 0xaa, 0xba, 0xaa, 0xbf, 0xae, 0xaf, 0x6a, 0xbe, 0x6e,
0xd8, 0xe6, 0xa1, 0x87, 0x3f, 0xce, 0xe6, 0x51, 0x94, 0x78, 0xa3, 0x38, 0x89, 0x64, 0x44, 0x48,
0xc0, 0xfd, 0x97, 0x0b, 0x91, 0x5a, 0x23, 0xbd, 0xde, 0xdf, 0x9c, 0x47, 0x41, 0x10, 0x85, 0xa9,
0xaf, 0xbf, 0xc5, 0x43, 0x89, 0x49, 0xc8, 0x7c, 0x63, 0x6f, 0x16, 0x33, 0xdc, 0x5f, 0x2d, 0x78,
0x97, 0xe2, 0x29, 0x17, 0x12, 0x93, 0x67, 0x91, 0x87, 0x14, 0xcf, 0x17, 0x28, 0x24, 0x79, 0x04,
0x37, 0x8e, 0x99, 0x40, 0xc7, 0x1a, 0x58, 0xc3, 0xde, 0xf8, 0x83, 0x51, 0x09, 0xc6, 0xec, 0xff,
0x54, 0x9c, 0xee, 0x31, 0x81, 0x54, 0x47, 0x92, 0xcf, 0xa1, 0xcd, 0x3c, 0x2f, 0x41, 0x21, 0x9c,
0xc6, 0x25, 0x49, 0x5f, 0xa7, 0x31, 0x34, 0x0b, 0x26, 0xb7, 0xa1, 0x15, 0x46, 0x1e, 0x4e, 0x27,
0x8e, 0x3d, 0xb0, 0x86, 0x36, 0x35, 0x96, 0xfb, 0xb3, 0x05, 0xb7, 0xca, 0x27, 0x13, 0x71, 0x14,
0x0a, 0x24, 0x8f, 0xa1, 0x25, 0x24, 0x93, 0x0b, 0x61, 0x0e, 0x77, 0xa7, 0x16, 0xe7, 0xb9, 0x0e,
0xa1, 0x26, 0x94, 0xec, 0x41, 0x8f, 0x87, 0x5c, 0xce, 0x62, 0x96, 0xb0, 0x20, 0x3b, 0xe1, 0xce,
0xa8, 0xc2, 0x9e, 0x21, 0x6a, 0x1a, 0x72, 0x79, 0xa8, 0x03, 0x29, 0xf0, 0xfc, 0xdb, 0xfd, 0x12,
0xde, 0x3b, 0x40, 0x39, 0x55, 0x1c, 0xab, 0xdd, 0x51, 0x64, 0x64, 0xdd, 0x87, 0x77, 0x34, 0xf3,
0x7b, 0x0b, 0xee, 0x7b, 0xd3, 0x89, 0x3a, 0x98, 0x3d, 0xb4, 0x69, 0xd9, 0xe9, 0xfe, 0x61, 0x41,
0x57, 0x27, 0x4f, 0xc3, 0x93, 0x88, 0x3c, 0x81, 0xa6, 0x3a, 0x5a, 0xca, 0xf0, 0xd6, 0xf8, 0x5e,
0x6d, 0x11, 0xaf, 0xb0, 0x68, 0x1a, 0x4d, 0x5c, 0xd8, 0x2c, 0xee, 0xaa, 0x0b, 0xb1, 0x69, 0xc9,
0x47, 0x1c, 0x68, 0x6b, 0x3b, 0xa7, 0x34, 0x33, 0xc9, 0x5d, 0x80, 0x74, 0x84, 0x42, 0x16, 0xa0,
0x73, 0x63, 0x60, 0x0d, 0xbb, 0xb4, 0xab, 0x3d, 0xcf, 0x58, 0x80, 0xaa, 0x15, 0x09, 0x32, 0x11,
0x85, 0x4e, 0x53, 0x2f, 0x19, 0xcb, 0xfd, 0xc9, 0x82, 0xdb, 0xd5, 0xca, 0xaf, 0xd3, 0x8c, 0x27,
0x69, 0x12, 0xaa, 0x3e, 0xd8, 0xc3, 0xde, 0xf8, 0xee, 0x68, 0x79, 0x8a, 0x47, 0x39, 0x55, 0xd4,
0x04, 0xbb, 0x7f, 0x36, 0x80, 0xec, 0x27, 0xc8, 0x24, 0xea, 0xb5, 0x8c, 0xfd, 0x2a, 0x25, 0x56,
0x0d, 0x25, 0xe5, 0xc2, 0x1b, 0xd5, 0xc2, 0x57, 0x33, 0xe6, 0x40, 0xfb, 0x25, 0x26, 0x82, 0x47,
0xa1, 0xa6, 0xcb, 0xa6, 0x99, 0x49, 0xee, 0x40, 0x37, 0x40, 0xc9, 0x66, 0x31, 0x93, 0x67, 0x86,
0xaf, 0x8e, 0x72, 0x1c, 0x32, 0x79, 0xa6, 0xf0, 0x3c, 0x66, 0x16, 0x85, 0xd3, 0x1a, 0xd8, 0x0a,
0x4f, 0x79, 0xd4, 0xaa, 0x9e, 0x46, 0x79, 0x11, 0x63, 0x36, 0x8d, 0x6d, 0xcd, 0xc2, 0x4e, 0x2d,
0x75, 0xdf, 0xe2, 0xc5, 0x77, 0xcc, 0x5f, 0xe0, 0x21, 0xe3, 0x09, 0x05, 0x95, 0x95, 0x4e, 0x23,
0x99, 0x98, 0xb2, 0xb3, 0x4d, 0x3a, 0xeb, 0x6e, 0xd2, 0xd3, 0x69, 0x66, 0xa6, 0x7f, 0x6b, 0xc0,
0x76, 0x4a, 0xd2, 0xff, 0x46, 0x69, 0x99, 0x9b, 0xe6, 0x6b, 0xb8, 0x69, 0xfd, 0x17, 0xdc, 0xb4,
0xff, 0x15, 0x37, 0x01, 0x90, 0x22, 0x35, 0xd7, 0x99, 0xf8, 0x35, 0x7e, 0x5b, 0xf7, 0x2b, 0x70,
0xb2, 0x9f, 0xec, 0x1b, 0xee, 0xa3, 0x66, 0xe3, 0x6a, 0x0a, 0xf3, 0x8b, 0x05, 0xdb, 0xa5, 0x7c,
0xad, 0x34, 0x6f, 0xea, 0xc0, 0x64, 0x08, 0x37, 0x53, 0x96, 0x4f, 0xb8, 0x8f, 0xa6, 0x9d, 0xb6,
0x6e, 0xe7, 0x16, 0x2f, 0x55, 0xa1, 0x0e, 0xf6, 0x7e, 0x4d, 0x6d, 0xd7, 0x61, 0x74, 0x02, 0x50,
0x80, 0x4d, 0x75, 0xe4, 0xa3, 0x95, 0x3a, 0x52, 0x24, 0x84, 0x76, 0x4f, 0xf2, 0x83, 0xfd, 0xd5,
0x30, 0x9a, 0xfc, 0x14, 0x25, 0x5b, 0x6b, 0xec, 0x73, 0xdd, 0x6e, 0x5c, 0x49, 0xb7, 0xef, 0x41,
0xef, 0x84, 0x71, 0x7f, 0x66, 0xf4, 0xd5, 0xd6, 0xbf, 0x0b, 0x28, 0x17, 0xd5, 0x1e, 0xf2, 0x05,
0xd8, 0x09, 0x9e, 0x6b, 0x91, 0x59, 0x51, 0xc8, 0xd2, 0x6f, 0x4a, 0x55, 0x46, 0x6d, 0x17, 0x9a,
0x75, 0x5d, 0x20, 0x3b, 0xb0, 0x19, 0xb0, 0xe4, 0xc5, 0xcc, 0x43, 0x1f, 0x25, 0x7a, 0x4e, 0x6b,
0x60, 0x0d, 0x3b, 0xb4, 0xa7, 0x7c, 0x93, 0xd4, 0x55, 0xb8, 0x8c, 0xdb, 0xc5, 0xcb, 0xb8, 0x28,
0x83, 0x9d, 0xb2, 0x0c, 0xf6, 0xa1, 0x93, 0xe0, 0xfc, 0x62, 0xee, 0xa3, 0xe7, 0x74, 0xf5, 0x86,
0xb9, 0xed, 0x3e, 0x80, 0x9b, 0x93, 0x24, 0x8a, 0x4b, 0xd2, 0x52, 0xd0, 0x05, 0xab, 0xa4, 0x0b,
0xe3, 0xbf, 0x9b, 0x00, 0x3a, 0x74, 0x5f, 0xbd, 0x6f, 0x48, 0x0c, 0xe4, 0x00, 0xe5, 0x7e, 0x14,
0xc4, 0x51, 0x88, 0xa1, 0x4c, 0xef, 0x1d, 0xf2, 0x68, 0xc5, 0x95, 0xbd, 0x1c, 0x6a, 0x00, 0xfb,
0x1f, 0xaf, 0xc8, 0xa8, 0x84, 0xbb, 0x1b, 0x24, 0xd0, 0x88, 0x47, 0x3c, 0xc0, 0x23, 0x3e, 0x7f,
0xb1, 0x7f, 0xc6, 0xc2, 0x10, 0xfd, 0xcb, 0x10, 0x2b, 0xa1, 0x19, 0xe2, 0x87, 0xe5, 0x0c, 0x63,
0x3c, 0x97, 0x09, 0x0f, 0x4f, 0xb3, 0xa1, 0x77, 0x37, 0xc8, 0x39, 0xdc, 0x3a, 0x40, 0x8d, 0xce,
0x85, 0xe4, 0x73, 0x91, 0x01, 0x8e, 0x57, 0x03, 0x2e, 0x05, 0x5f, 0x11, 0xf2, 0x07, 0x80, 0x57,
0x53, 0x44, 0xd6, 0x9b, 0xb2, 0x65, 0x02, 0xab, 0x61, 0xf9, 0xf6, 0x1c, 0xb6, 0xca, 0xcf, 0x04,
0xf2, 0x49, 0x5d, 0x6e, 0xed, 0x23, 0xaa, 0xff, 0xe9, 0x3a, 0xa1, 0x39, 0x54, 0x02, 0xdb, 0x4b,
0x82, 0x42, 0x1e, 0x5c, 0xb6, 0x45, 0x55, 0x53, 0xfb, 0x0f, 0xd7, 0x8c, 0xce, 0x31, 0x0f, 0xa1,
0x9b, 0x8f, 0x33, 0xb9, 0x5f, 0x97, 0x5d, 0x9d, 0xf6, 0xfe, 0x65, 0x52, 0xe6, 0x6e, 0x8c, 0x7f,
0xb7, 0x8d, 0xfc, 0xa8, 0x07, 0xee, 0xdb, 0x89, 0x7f, 0x03, 0x13, 0x7f, 0x04, 0xbd, 0xc2, 0x93,
0x91, 0xd4, 0xce, 0xf2, 0xf2, 0x9b, 0xf2, 0x35, 0x7d, 0xdb, 0xfb, 0xec, 0xfb, 0xf1, 0x29, 0x97,
0x67, 0x8b, 0x63, 0xb5, 0xb2, 0x9b, 0x86, 0x3e, 0xe4, 0x91, 0xf9, 0xda, 0xcd, 0x0a, 0xd8, 0xd5,
0xd9, 0xbb, 0x1a, 0x25, 0x3e, 0x3e, 0x6e, 0x69, 0xf3, 0xf1, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff,
0xfc, 0x30, 0x24, 0x6f, 0xc7, 0x0d, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -888,7 +887,6 @@ type IndexCoordClient interface {
GetComponentStates(ctx context.Context, in *internalpb.GetComponentStatesRequest, opts ...grpc.CallOption) (*internalpb.ComponentStates, error)
GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error)
BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error)
GetIndexStates(ctx context.Context, in *GetIndexStatesRequest, opts ...grpc.CallOption) (*GetIndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, in *GetIndexFilePathsRequest, opts ...grpc.CallOption) (*GetIndexFilePathsResponse, error)
@ -930,15 +928,6 @@ func (c *indexCoordClient) GetStatisticsChannel(ctx context.Context, in *interna
return out, nil
}
func (c *indexCoordClient) RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) {
out := new(RegisterNodeResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexCoord/RegisterNode", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *indexCoordClient) BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) {
out := new(BuildIndexResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexCoord/BuildIndex", in, out, opts...)
@ -980,7 +969,6 @@ type IndexCoordServer interface {
GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error)
GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error)
GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error)
RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error)
BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error)
GetIndexStates(context.Context, *GetIndexStatesRequest) (*GetIndexStatesResponse, error)
GetIndexFilePaths(context.Context, *GetIndexFilePathsRequest) (*GetIndexFilePathsResponse, error)
@ -1000,9 +988,6 @@ func (*UnimplementedIndexCoordServer) GetTimeTickChannel(ctx context.Context, re
func (*UnimplementedIndexCoordServer) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented")
}
func (*UnimplementedIndexCoordServer) RegisterNode(ctx context.Context, req *RegisterNodeRequest) (*RegisterNodeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RegisterNode not implemented")
}
func (*UnimplementedIndexCoordServer) BuildIndex(ctx context.Context, req *BuildIndexRequest) (*BuildIndexResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method BuildIndex not implemented")
}
@ -1074,24 +1059,6 @@ func _IndexCoord_GetStatisticsChannel_Handler(srv interface{}, ctx context.Conte
return interceptor(ctx, in, info, handler)
}
func _IndexCoord_RegisterNode_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RegisterNodeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(IndexCoordServer).RegisterNode(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.index.IndexCoord/RegisterNode",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(IndexCoordServer).RegisterNode(ctx, req.(*RegisterNodeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _IndexCoord_BuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BuildIndexRequest)
if err := dec(in); err != nil {
@ -1180,10 +1147,6 @@ var _IndexCoord_serviceDesc = grpc.ServiceDesc{
MethodName: "GetStatisticsChannel",
Handler: _IndexCoord_GetStatisticsChannel_Handler,
},
{
MethodName: "RegisterNode",
Handler: _IndexCoord_RegisterNode_Handler,
},
{
MethodName: "BuildIndex",
Handler: _IndexCoord_BuildIndex_Handler,

View File

@ -175,8 +175,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
}
log.Debug("ReleaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
qc.meta.printMeta()
qc.cluster.printMeta()
//qc.meta.printMeta()
//qc.cluster.printMeta()
return status, nil
}
@ -336,8 +336,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
return status, err
}
log.Debug("ReleasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
qc.meta.printMeta()
qc.cluster.printMeta()
//qc.meta.printMeta()
//qc.cluster.printMeta()
return status, nil
}

View File

@ -74,7 +74,6 @@ type IndexCoord interface {
Component
TimeTickProvider
RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error)
BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error)

View File

@ -321,6 +321,7 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c
}
eventType = SessionDelEvent
}
log.Debug("WatchService", zap.Any("event type", eventType))
eventCh <- &SessionEvent{
EventType: eventType,
Session: session,