mirror of https://github.com/milvus-io/milvus.git
watch proxynode (#5424)
watch proxy node and, send InvalidataCollectionMetaCache to each proxynode Signed-off-by: yefu.chen <yefu.chen@zilliz.com> Resolves: #5423pull/5441/head^2
parent
5fd92ec80c
commit
f80dbab6ec
|
@ -36,6 +36,7 @@ type IDAllocator struct {
|
|||
Allocator
|
||||
|
||||
etcdAddr []string
|
||||
metaRoot string
|
||||
masterAddress string
|
||||
masterClient types.MasterService
|
||||
|
||||
|
@ -47,7 +48,7 @@ type IDAllocator struct {
|
|||
PeerID UniqueID
|
||||
}
|
||||
|
||||
func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) (*IDAllocator, error) {
|
||||
func NewIDAllocator(ctx context.Context, masterAddr, metaRoot string, etcdAddr []string) (*IDAllocator, error) {
|
||||
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
a := &IDAllocator{
|
||||
|
@ -57,6 +58,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) (
|
|||
Role: "IDAllocator",
|
||||
},
|
||||
countPerRPC: IDCountPerRPC,
|
||||
metaRoot: metaRoot,
|
||||
etcdAddr: etcdAddr,
|
||||
masterAddress: masterAddr,
|
||||
}
|
||||
|
@ -72,7 +74,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) (
|
|||
func (ia *IDAllocator) Start() error {
|
||||
var err error
|
||||
|
||||
ia.masterClient, err = msc.NewClient(ia.masterAddress, ia.etcdAddr, 20*time.Second)
|
||||
ia.masterClient, err = msc.NewClient(ia.masterAddress, ia.metaRoot, ia.etcdAddr, 20*time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
|
|||
|
||||
// Register register data node at etcd
|
||||
func (node *DataNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress})
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
Params.NodeID = node.session.ServerID
|
||||
return nil
|
||||
|
|
|
@ -113,7 +113,7 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) {
|
|||
|
||||
// Register register data service at etcd
|
||||
func (s *Server) Register() error {
|
||||
s.session = sessionutil.NewSession(s.ctx, []string{Params.EtcdAddress})
|
||||
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
s.session.Init(typeutil.DataServiceRole, Params.IP, true)
|
||||
Params.NodeID = s.session.ServerID
|
||||
return nil
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -789,7 +790,8 @@ func newTestServer(t *testing.T) *Server {
|
|||
|
||||
etcdCli, err := initEtcd(Params.EtcdAddress)
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Delete(context.Background(), sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
|
||||
sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
svr, err := CreateServer(context.TODO(), factory)
|
||||
|
|
|
@ -125,7 +125,7 @@ func TestRun(t *testing.T) {
|
|||
dnServer.newMasterServiceClient = func(s string) (types.MasterService, error) {
|
||||
return &mockMaster{}, nil
|
||||
}
|
||||
dnServer.newDataServiceClient = func(s, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
dnServer.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
return &mockDataService{}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ type Server struct {
|
|||
dataService types.DataService
|
||||
|
||||
newMasterServiceClient func(string) (types.MasterService, error)
|
||||
newDataServiceClient func(string, string, time.Duration) types.DataService
|
||||
newDataServiceClient func(string, string, string, time.Duration) types.DataService
|
||||
|
||||
closer io.Closer
|
||||
}
|
||||
|
@ -70,10 +70,10 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
|||
msFactory: factory,
|
||||
grpcErrChan: make(chan error),
|
||||
newMasterServiceClient: func(s string) (types.MasterService, error) {
|
||||
return msc.NewClient(s, []string{dn.Params.EtcdAddress}, 20*time.Second)
|
||||
return msc.NewClient(s, dn.Params.MetaRootPath, []string{dn.Params.EtcdAddress}, 20*time.Second)
|
||||
},
|
||||
newDataServiceClient: func(s, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
return dsc.NewClient(Params.DataServiceAddress, []string{etcdAddress}, timeout)
|
||||
newDataServiceClient: func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
return dsc.NewClient(Params.DataServiceAddress, etcdMetaRoot, []string{etcdAddress}, timeout)
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -205,7 +205,7 @@ func (s *Server) init() error {
|
|||
if s.newDataServiceClient != nil {
|
||||
log.Debug("Data service address", zap.String("address", Params.DataServiceAddress))
|
||||
log.Debug("DataNode Init data service client ...")
|
||||
dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.EtcdAddress, 10)
|
||||
dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.MetaRootPath, dn.Params.EtcdAddress, 10)
|
||||
if err = dataServiceClient.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -56,8 +56,8 @@ func getDataServiceAddress(sess *sessionutil.Session) (string, error) {
|
|||
return ms.Address, nil
|
||||
}
|
||||
|
||||
func NewClient(address string, etcdAddr []string, timeout time.Duration) *Client {
|
||||
sess := sessionutil.NewSession(context.Background(), etcdAddr)
|
||||
func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duration) *Client {
|
||||
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr)
|
||||
return &Client{
|
||||
addr: address,
|
||||
ctx: context.Background(),
|
||||
|
|
|
@ -68,7 +68,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
|||
cancel: cancel,
|
||||
grpcErrChan: make(chan error),
|
||||
newMasterServiceClient: func(s string) (types.MasterService, error) {
|
||||
return msc.NewClient(s, []string{dataservice.Params.EtcdAddress}, 10*time.Second)
|
||||
return msc.NewClient(s, dataservice.Params.MetaRootPath, []string{dataservice.Params.EtcdAddress}, 10*time.Second)
|
||||
},
|
||||
}
|
||||
s.dataService, err = dataservice.CreateServer(s.ctx, factory)
|
||||
|
|
|
@ -54,8 +54,8 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) {
|
|||
return ms.Address, nil
|
||||
}
|
||||
|
||||
func NewClient(addr string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) {
|
||||
sess := sessionutil.NewSession(context.Background(), etcdAddr)
|
||||
func NewClient(addr string, metaRoot string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) {
|
||||
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr)
|
||||
if sess == nil {
|
||||
return nil, fmt.Errorf("new session error, maybe can not connect to etcd")
|
||||
}
|
||||
|
|
|
@ -13,8 +13,10 @@ package grpcmasterservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -32,6 +34,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/masterpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
|
@ -87,6 +90,14 @@ func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack {
|
|||
return &msgPack
|
||||
}
|
||||
|
||||
type proxyNodeMock struct {
|
||||
types.ProxyNode
|
||||
invalidateCollectionMetaCache func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
|
||||
}
|
||||
|
||||
func (p *proxyNodeMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
return p.invalidateCollectionMetaCache(ctx, request)
|
||||
}
|
||||
func TestGrpcService(t *testing.T) {
|
||||
const (
|
||||
dbName = "testDB"
|
||||
|
@ -141,7 +152,17 @@ func TestGrpcService(t *testing.T) {
|
|||
|
||||
etcdCli, err := initEtcd(cms.Params.EtcdAddress)
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
|
||||
sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
_, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
pnb, err := json.Marshal(
|
||||
&sessionutil.Session{
|
||||
ServerID: 100,
|
||||
},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyNodeRole+"-100"), string(pnb))
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = core.Init()
|
||||
|
@ -213,9 +234,15 @@ func TestGrpcService(t *testing.T) {
|
|||
}
|
||||
|
||||
collectionMetaCache := make([]string, 0, 16)
|
||||
core.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
|
||||
collectionMetaCache = append(collectionMetaCache, collectionName)
|
||||
return nil
|
||||
pnm := proxyNodeMock{}
|
||||
core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) {
|
||||
return &pnm, nil
|
||||
}
|
||||
pnm.invalidateCollectionMetaCache = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
collectionMetaCache = append(collectionMetaCache, request.CollectionName)
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
}
|
||||
|
||||
core.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
|
||||
|
@ -227,7 +254,7 @@ func TestGrpcService(t *testing.T) {
|
|||
|
||||
svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy)
|
||||
|
||||
cli, err := grpcmasterserviceclient.NewClient(Params.Address, []string{cms.Params.EtcdAddress}, 3*time.Second)
|
||||
cli, err := grpcmasterserviceclient.NewClient(Params.Address, cms.Params.MetaRootPath, []string{cms.Params.EtcdAddress}, 3*time.Second)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = cli.Init()
|
||||
|
@ -825,6 +852,9 @@ func (m *mockCore) Stop() error {
|
|||
return fmt.Errorf("stop error")
|
||||
}
|
||||
|
||||
func (m *mockCore) SetNewProxyClient(func(sess *sessionutil.Session) (types.ProxyNode, error)) {
|
||||
}
|
||||
|
||||
type mockProxy struct {
|
||||
types.ProxyService
|
||||
}
|
||||
|
@ -925,7 +955,7 @@ func TestRun(t *testing.T) {
|
|||
svr.newProxyServiceClient = func(s string) types.ProxyService {
|
||||
return &mockProxy{}
|
||||
}
|
||||
svr.newDataServiceClient = func(s, address string, timeout time.Duration) types.DataService {
|
||||
svr.newDataServiceClient = func(s, metaRoot, address string, timeout time.Duration) types.DataService {
|
||||
return &mockDataService{}
|
||||
}
|
||||
svr.newIndexServiceClient = func(s string) types.IndexService {
|
||||
|
@ -944,7 +974,8 @@ func TestRun(t *testing.T) {
|
|||
|
||||
etcdCli, err := initEtcd(cms.Params.EtcdAddress)
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
|
||||
sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
_, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
err = svr.Run()
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
|
||||
isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
|
||||
pnc "github.com/milvus-io/milvus/internal/distributed/proxynode/client"
|
||||
psc "github.com/milvus-io/milvus/internal/distributed/proxyservice/client"
|
||||
qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -40,6 +41,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/masterpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
)
|
||||
|
||||
// grpc wrapper
|
||||
|
@ -59,7 +61,7 @@ type Server struct {
|
|||
queryService types.QueryService
|
||||
|
||||
newProxyServiceClient func(string) types.ProxyService
|
||||
newDataServiceClient func(string, string, time.Duration) types.DataService
|
||||
newDataServiceClient func(string, string, string, time.Duration) types.DataService
|
||||
newIndexServiceClient func(string) types.IndexService
|
||||
newQueryServiceClient func(string) (types.QueryService, error)
|
||||
|
||||
|
@ -98,8 +100,8 @@ func (s *Server) setClient() {
|
|||
}
|
||||
return psClient
|
||||
}
|
||||
s.newDataServiceClient = func(s, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
dsClient := dsc.NewClient(s, []string{etcdAddress}, timeout)
|
||||
s.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService {
|
||||
dsClient := dsc.NewClient(s, etcdMetaRoot, []string{etcdAddress}, timeout)
|
||||
if err := dsClient.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -173,6 +175,19 @@ func (s *Server) init() error {
|
|||
|
||||
s.masterService.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
|
||||
s.masterService.SetNewProxyClient(
|
||||
func(s *sessionutil.Session) (types.ProxyNode, error) {
|
||||
cli := pnc.NewClient(ctx, s.Address)
|
||||
if err := cli.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := cli.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cli, nil
|
||||
},
|
||||
)
|
||||
|
||||
if s.newProxyServiceClient != nil {
|
||||
log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress))
|
||||
proxyService := s.newProxyServiceClient(Params.ProxyServiceAddress)
|
||||
|
@ -183,7 +198,7 @@ func (s *Server) init() error {
|
|||
}
|
||||
if s.newDataServiceClient != nil {
|
||||
log.Debug("data service", zap.String("address", Params.DataServiceAddress))
|
||||
dataService := s.newDataServiceClient(Params.DataServiceAddress, cms.Params.EtcdAddress, 10)
|
||||
dataService := s.newDataServiceClient(Params.DataServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress, 10)
|
||||
if err := s.masterService.SetDataService(ctx, dataService); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ func (s *Server) init() error {
|
|||
masterServiceAddr := Params.MasterAddress
|
||||
log.Debug("proxynode", zap.String("master address", masterServiceAddr))
|
||||
timeout := 3 * time.Second
|
||||
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, []string{proxynode.Params.EtcdAddress}, timeout)
|
||||
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ func (s *Server) init() error {
|
|||
|
||||
dataServiceAddr := Params.DataServiceAddress
|
||||
log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
|
||||
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, []string{proxynode.Params.EtcdAddress}, 10)
|
||||
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10)
|
||||
err = s.dataServiceClient.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -134,7 +134,7 @@ func (s *Server) init() error {
|
|||
log.Debug("Master service", zap.String("address", addr))
|
||||
log.Debug("Init master service client ...")
|
||||
|
||||
masterService, err := msc.NewClient(addr, []string{qn.Params.EtcdAddress}, 20*time.Second)
|
||||
masterService, err := msc.NewClient(addr, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 20*time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ func (s *Server) init() error {
|
|||
log.Debug("Data service", zap.String("address", Params.DataServiceAddress))
|
||||
log.Debug("QueryNode Init data service client ...")
|
||||
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress, []string{qn.Params.EtcdAddress}, 10)
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 10)
|
||||
if err = dataService.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ func (s *Server) init() error {
|
|||
log.Debug("Master service", zap.String("address", Params.MasterAddress))
|
||||
log.Debug("Init master service client ...")
|
||||
|
||||
masterService, err := msc.NewClient(Params.MasterAddress, []string{qs.Params.EtcdAddress}, 20*time.Second)
|
||||
masterService, err := msc.NewClient(Params.MasterAddress, qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 20*time.Second)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -138,7 +138,7 @@ func (s *Server) init() error {
|
|||
log.Debug("DataService", zap.String("Address", Params.DataServiceAddress))
|
||||
log.Debug("QueryService Init data service client ...")
|
||||
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress, []string{qs.Params.EtcdAddress}, 10)
|
||||
dataService := dsc.NewClient(Params.DataServiceAddress, qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 10)
|
||||
if err = dataService.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
|
|||
|
||||
// Register register index node at etcd
|
||||
func (i *IndexNode) Register() error {
|
||||
i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress})
|
||||
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
Params.NodeID = i.session.ServerID
|
||||
return nil
|
||||
|
|
|
@ -87,7 +87,7 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
|
|||
|
||||
// Register register index service at etcd
|
||||
func (i *IndexService) Register() error {
|
||||
i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress})
|
||||
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
i.session.Init(typeutil.IndexServiceRole, Params.Address, true)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -128,8 +129,7 @@ type Core struct {
|
|||
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
|
||||
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
|
||||
|
||||
//proxy service interface, notify proxy service to drop collection
|
||||
CallInvalidateCollectionMetaCacheService func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error
|
||||
NewProxyClient func(sess *sessionutil.Session) (types.ProxyNode, error)
|
||||
|
||||
//query service interface, notify query service to release collection
|
||||
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
|
||||
|
@ -137,6 +137,12 @@ type Core struct {
|
|||
//dd request scheduler
|
||||
ddReqQueue chan reqTask //dd request will be push into this chan
|
||||
|
||||
//proxynode manager
|
||||
proxyNodeManager *proxyNodeManager
|
||||
|
||||
// proxy clients
|
||||
proxyClientManager *proxyClientManager
|
||||
|
||||
// channel timetick
|
||||
chanTimeTick *timetickSync
|
||||
|
||||
|
@ -151,7 +157,8 @@ type Core struct {
|
|||
startOnce sync.Once
|
||||
//isInit atomic.Value
|
||||
|
||||
session *sessionutil.Session
|
||||
session *sessionutil.Session
|
||||
sessCloseCh <-chan bool
|
||||
|
||||
msFactory ms.Factory
|
||||
}
|
||||
|
@ -226,8 +233,8 @@ func (c *Core) checkInit() error {
|
|||
if c.CallDropIndexService == nil {
|
||||
return fmt.Errorf("CallDropIndexService is nil")
|
||||
}
|
||||
if c.CallInvalidateCollectionMetaCacheService == nil {
|
||||
return fmt.Errorf("CallInvalidateCollectionMetaCacheService is nil")
|
||||
if c.NewProxyClient == nil {
|
||||
return fmt.Errorf("NewProxyNodeClient is nil")
|
||||
}
|
||||
if c.CallReleaseCollectionService == nil {
|
||||
return fmt.Errorf("CallReleaseCollectionService is nil")
|
||||
|
@ -238,6 +245,7 @@ func (c *Core) checkInit() error {
|
|||
if c.DataNodeFlushedSegmentChan == nil {
|
||||
return fmt.Errorf("DataNodeFlushedSegmentChan is nil")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -458,6 +466,27 @@ func (c *Core) tsLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Core) sessionLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case _, ok := <-c.sessCloseCh:
|
||||
if !ok {
|
||||
log.Error("master service disconnect with etcd, process will exit in 1 second")
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
os.Exit(-1)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Core) watchProxyNodeLoop() {
|
||||
|
||||
}
|
||||
|
||||
func (c *Core) setDdMsgSendFlag(b bool) error {
|
||||
flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
if err != nil {
|
||||
|
@ -672,28 +701,18 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error
|
|||
Params.ProxyTimeTickChannel = rsp.Value
|
||||
log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel))
|
||||
|
||||
c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
|
||||
status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO,MsgType
|
||||
MsgID: 0,
|
||||
Timestamp: ts,
|
||||
SourceID: c.session.ServerID,
|
||||
},
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
if status == nil {
|
||||
return fmt.Errorf("invalidate collection metacache resp is nil")
|
||||
}
|
||||
if status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf(status.Reason)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//SetNewProxyClient create proxy node by this func
|
||||
func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyNode, error)) {
|
||||
if c.NewProxyClient == nil {
|
||||
c.NewProxyClient = f
|
||||
} else {
|
||||
log.Debug("NewProxyClient has alread set")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
|
||||
rsp, err := s.GetSegmentInfoChannel(ctx)
|
||||
if err != nil {
|
||||
|
@ -702,10 +721,18 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
|
|||
Params.DataServiceSegmentChannel = rsp.Value
|
||||
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
|
||||
|
||||
c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
|
||||
c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
retFiles = nil
|
||||
retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err)
|
||||
}
|
||||
}()
|
||||
ts, err := c.TSOAllocator(1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
retFiles = nil
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
@ -717,23 +744,40 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
|
|||
SegmentID: segID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
retFiles = nil
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
if binlog.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return nil, fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
|
||||
retFiles = nil
|
||||
retErr = fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
|
||||
return
|
||||
}
|
||||
for i := range binlog.FieldIDs {
|
||||
if binlog.FieldIDs[i] == fieldID {
|
||||
return binlog.Paths[i].Values, nil
|
||||
retFiles = binlog.Paths[i].Values
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
|
||||
retFiles = nil
|
||||
retErr = fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
|
||||
return
|
||||
}
|
||||
|
||||
c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
|
||||
c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (retRows int64, retErr error) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
retRows = 0
|
||||
retErr = fmt.Errorf("get num rows panic, msg = %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
ts, err := c.TSOAllocator(1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
retRows = 0
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
@ -745,26 +789,41 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
|
|||
SegmentIDs: []typeutil.UniqueID{segID},
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
retRows = 0
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return 0, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason)
|
||||
}
|
||||
if len(segInfo.Infos) != 1 {
|
||||
log.Debug("get segment info empty")
|
||||
return 0, nil
|
||||
retRows = 0
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed {
|
||||
log.Debug("segment id not flushed", zap.Int64("segment id", segID))
|
||||
return 0, nil
|
||||
retRows = 0
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
return segInfo.Infos[0].NumRows, nil
|
||||
retRows = segInfo.Infos[0].NumRows
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) SetIndexService(s types.IndexService) error {
|
||||
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
|
||||
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
retID = 0
|
||||
retErr = fmt.Errorf("build index panic, msg = %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
|
||||
DataPaths: binlog,
|
||||
TypeParams: field.TypeParams,
|
||||
|
@ -773,32 +832,53 @@ func (c *Core) SetIndexService(s types.IndexService) error {
|
|||
IndexName: idxInfo.IndexName,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
retID = 0
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return 0, fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason)
|
||||
retID = 0
|
||||
retErr = fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason)
|
||||
return
|
||||
}
|
||||
return rsp.IndexBuildID, nil
|
||||
retID = rsp.IndexBuildID
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
|
||||
c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error {
|
||||
c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) (retErr error) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
retErr = fmt.Errorf("drop index from index service panic, msg = %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{
|
||||
IndexID: indexID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf(rsp.Reason)
|
||||
retErr = fmt.Errorf(rsp.Reason)
|
||||
return
|
||||
}
|
||||
return nil
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) SetQueryService(s types.QueryService) error {
|
||||
c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
|
||||
c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) (retErr error) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
retErr = fmt.Errorf("release collection from query service panic, msg = %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
req := &querypb.ReleaseCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleaseCollection,
|
||||
|
@ -811,12 +891,15 @@ func (c *Core) SetQueryService(s types.QueryService) error {
|
|||
}
|
||||
rsp, err := s.ReleaseCollection(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
retErr = err
|
||||
return
|
||||
}
|
||||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
|
||||
retErr = fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
|
||||
return
|
||||
}
|
||||
return nil
|
||||
retErr = nil
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -851,8 +934,8 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema,
|
|||
|
||||
// Register register master service at etcd
|
||||
func (c *Core) Register() error {
|
||||
c.session = sessionutil.NewSession(c.ctx, []string{Params.EtcdAddress})
|
||||
c.session.Init(typeutil.MasterServiceRole, Params.Address, true)
|
||||
c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
c.sessCloseCh = c.session.Init(typeutil.MasterServiceRole, Params.Address, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -919,10 +1002,17 @@ func (c *Core) Init() error {
|
|||
if initError = c.msFactory.SetParams(m); initError != nil {
|
||||
return
|
||||
}
|
||||
c.chanTimeTick, initError = newTimeTickSync(c)
|
||||
if initError != nil {
|
||||
return
|
||||
}
|
||||
c.chanTimeTick = newTimeTickSync(c)
|
||||
c.proxyClientManager = newProxyClientManager(c)
|
||||
|
||||
c.proxyNodeManager, initError = newProxyNodeManager(
|
||||
c.ctx,
|
||||
[]string{Params.EtcdAddress},
|
||||
c.chanTimeTick.GetProxyNodes,
|
||||
c.proxyClientManager.GetProxyClients,
|
||||
)
|
||||
c.proxyNodeManager.AddSession(c.chanTimeTick.AddProxyNode, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyNodeManager.DelSession(c.chanTimeTick.DelProxyNode, c.proxyClientManager.DelProxyClient)
|
||||
|
||||
c.ddReqQueue = make(chan reqTask, 1024)
|
||||
initError = c.setMsgStreams()
|
||||
|
@ -975,6 +1065,18 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
|
|||
if err = c.SendDdDropCollectionReq(ctx, &ddReq); err != nil {
|
||||
return err
|
||||
}
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ddReq.Base.Timestamp,
|
||||
SourceID: c.session.ServerID,
|
||||
},
|
||||
DbName: ddReq.DbName,
|
||||
CollectionName: ddReq.CollectionName,
|
||||
}
|
||||
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
|
||||
|
||||
case CreatePartitionDDType:
|
||||
var ddReq = internalpb.CreatePartitionRequest{}
|
||||
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
|
||||
|
@ -983,6 +1085,17 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
|
|||
if err = c.SendDdCreatePartitionReq(ctx, &ddReq); err != nil {
|
||||
return err
|
||||
}
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ddReq.Base.Timestamp,
|
||||
SourceID: c.session.ServerID,
|
||||
},
|
||||
DbName: ddReq.DbName,
|
||||
CollectionName: ddReq.CollectionName,
|
||||
}
|
||||
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
|
||||
case DropPartitionDDType:
|
||||
var ddReq = internalpb.DropPartitionRequest{}
|
||||
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
|
||||
|
@ -991,6 +1104,17 @@ func (c *Core) reSendDdMsg(ctx context.Context) error {
|
|||
if err = c.SendDdDropPartitionReq(ctx, &ddReq); err != nil {
|
||||
return err
|
||||
}
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ddReq.Base.Timestamp,
|
||||
SourceID: c.session.ServerID,
|
||||
},
|
||||
DbName: ddReq.DbName,
|
||||
CollectionName: ddReq.CollectionName,
|
||||
}
|
||||
c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req)
|
||||
default:
|
||||
return fmt.Errorf("Invalid DdOperation %s", ddOp.Type)
|
||||
}
|
||||
|
@ -1009,6 +1133,9 @@ func (c *Core) Start() error {
|
|||
log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))
|
||||
|
||||
c.startOnce.Do(func() {
|
||||
if err := c.proxyNodeManager.WatchProxyNode(); err != nil {
|
||||
return
|
||||
}
|
||||
if err := c.reSendDdMsg(c.ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -1017,6 +1144,7 @@ func (c *Core) Start() error {
|
|||
go c.startDataServiceSegmentLoop()
|
||||
go c.startDataNodeFlushedSegmentLoop()
|
||||
go c.tsLoop()
|
||||
go c.sessionLoop()
|
||||
go c.chanTimeTick.StartWatch()
|
||||
c.stateCode.Store(internalpb.StateCode_Healthy)
|
||||
})
|
||||
|
@ -1649,6 +1777,17 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
|
|||
}
|
||||
|
||||
func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &masterpb.AllocTimestampResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
|
||||
},
|
||||
Timestamp: 0,
|
||||
Count: 0,
|
||||
}, nil
|
||||
}
|
||||
ts, err := c.TSOAllocator(in.Count)
|
||||
if err != nil {
|
||||
log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
|
||||
|
@ -1673,6 +1812,17 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe
|
|||
}
|
||||
|
||||
func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &masterpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
|
||||
},
|
||||
ID: 0,
|
||||
Count: 0,
|
||||
}, nil
|
||||
}
|
||||
start, _, err := c.IDAllocator(in.Count)
|
||||
if err != nil {
|
||||
log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
|
||||
|
@ -1698,6 +1848,13 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste
|
|||
|
||||
// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
|
||||
func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
|
||||
}, nil
|
||||
}
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
|
|
@ -43,9 +43,7 @@ import (
|
|||
|
||||
type proxyMock struct {
|
||||
types.ProxyService
|
||||
randVal int
|
||||
collArray []string
|
||||
mutex sync.Mutex
|
||||
randVal int
|
||||
}
|
||||
|
||||
func (p *proxyMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
|
@ -56,7 +54,14 @@ func (p *proxyMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes
|
|||
Value: fmt.Sprintf("proxy-time-tick-%d", p.randVal),
|
||||
}, nil
|
||||
}
|
||||
func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
|
||||
type proxyNodeMock struct {
|
||||
types.ProxyNode
|
||||
collArray []string
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func (p *proxyNodeMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
p.collArray = append(p.collArray, request.CollectionName)
|
||||
|
@ -64,7 +69,7 @@ func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *
|
|||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
}
|
||||
func (p *proxyMock) GetCollArray() []string {
|
||||
func (p *proxyNodeMock) GetCollArray() []string {
|
||||
p.mutex.Lock()
|
||||
defer p.mutex.Unlock()
|
||||
ret := make([]string, 0, len(p.collArray))
|
||||
|
@ -222,20 +227,36 @@ func TestMasterService(t *testing.T) {
|
|||
|
||||
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
|
||||
sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
_, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
defer func() {
|
||||
_, _ = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix())
|
||||
_, _ = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
}()
|
||||
|
||||
pnb, err := json.Marshal(
|
||||
&sessionutil.Session{
|
||||
ServerID: 100,
|
||||
},
|
||||
)
|
||||
assert.Nil(t, err)
|
||||
_, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyNodeRole+"-100"), string(pnb))
|
||||
assert.Nil(t, err)
|
||||
|
||||
pm := &proxyMock{
|
||||
randVal: randVal,
|
||||
collArray: make([]string, 0, 16),
|
||||
mutex: sync.Mutex{},
|
||||
randVal: randVal,
|
||||
}
|
||||
err = core.SetProxyService(ctx, pm)
|
||||
assert.Nil(t, err)
|
||||
|
||||
pnm := &proxyNodeMock{
|
||||
collArray: make([]string, 0, 16),
|
||||
mutex: sync.Mutex{},
|
||||
}
|
||||
core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) {
|
||||
return pnm, nil
|
||||
}
|
||||
|
||||
dm := &dataMock{randVal: randVal}
|
||||
err = core.SetDataService(ctx, dm)
|
||||
assert.Nil(t, err)
|
||||
|
@ -571,8 +592,8 @@ func TestMasterService(t *testing.T) {
|
|||
assert.Equal(t, collMeta.ID, partMsg.CollectionID)
|
||||
assert.Equal(t, partMeta.PartitionID, partMsg.PartitionID)
|
||||
|
||||
assert.Equal(t, 1, len(pm.GetCollArray()))
|
||||
assert.Equal(t, collName, pm.GetCollArray()[0])
|
||||
assert.Equal(t, 1, len(pnm.GetCollArray()))
|
||||
assert.Equal(t, collName, pnm.GetCollArray()[0])
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
|
@ -976,8 +997,8 @@ func TestMasterService(t *testing.T) {
|
|||
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
|
||||
assert.Equal(t, dropPartID, dmsg.PartitionID)
|
||||
|
||||
assert.Equal(t, 2, len(pm.GetCollArray()))
|
||||
assert.Equal(t, collName, pm.GetCollArray()[1])
|
||||
assert.Equal(t, 2, len(pnm.GetCollArray()))
|
||||
assert.Equal(t, collName, pnm.GetCollArray()[1])
|
||||
|
||||
// check DD operation info
|
||||
flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0)
|
||||
|
@ -1024,7 +1045,7 @@ func TestMasterService(t *testing.T) {
|
|||
dmsg, ok := (msg.Msgs[0]).(*msgstream.DropCollectionMsg)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, collMeta.ID, dmsg.CollectionID)
|
||||
collArray := pm.GetCollArray()
|
||||
collArray := pnm.GetCollArray()
|
||||
assert.Equal(t, 3, len(collArray))
|
||||
assert.Equal(t, collName, collArray[2])
|
||||
|
||||
|
@ -1049,7 +1070,7 @@ func TestMasterService(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
time.Sleep(time.Second)
|
||||
assert.Zero(t, len(ddStream.Chan()))
|
||||
collArray = pm.GetCollArray()
|
||||
collArray = pnm.GetCollArray()
|
||||
assert.Equal(t, 3, len(collArray))
|
||||
assert.Equal(t, collName, collArray[2])
|
||||
|
||||
|
@ -1454,9 +1475,9 @@ func TestMasterService(t *testing.T) {
|
|||
s2, err := json.Marshal(&p2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, err = core.etcdCli.Put(ctx2, path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)+"-1", string(s1))
|
||||
_, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyNodeRole)+"-1", string(s1))
|
||||
assert.Nil(t, err)
|
||||
_, err = core.etcdCli.Put(ctx2, path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)+"-2", string(s2))
|
||||
_, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyNodeRole)+"-2", string(s2))
|
||||
assert.Nil(t, err)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
|
@ -1726,9 +1747,7 @@ func TestMasterService2(t *testing.T) {
|
|||
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
|
||||
|
||||
pm := &proxyMock{
|
||||
randVal: randVal,
|
||||
collArray: make([]string, 0, 16),
|
||||
mutex: sync.Mutex{},
|
||||
randVal: randVal,
|
||||
}
|
||||
err = core.SetProxyService(ctx, pm)
|
||||
assert.Nil(t, err)
|
||||
|
@ -1754,6 +1773,10 @@ func TestMasterService2(t *testing.T) {
|
|||
err = core.SetQueryService(qm)
|
||||
assert.Nil(t, err)
|
||||
|
||||
core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
err = core.Init()
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -1949,8 +1972,8 @@ func TestCheckInit(t *testing.T) {
|
|||
err = c.checkInit()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error {
|
||||
return nil
|
||||
c.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) {
|
||||
return nil, nil
|
||||
}
|
||||
err = c.checkInit()
|
||||
assert.NotNil(t, err)
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type proxyClientManager struct {
|
||||
core *Core
|
||||
lock sync.Mutex
|
||||
proxyClient map[int64]types.ProxyNode
|
||||
}
|
||||
|
||||
func newProxyClientManager(c *Core) *proxyClientManager {
|
||||
return &proxyClientManager{
|
||||
core: c,
|
||||
lock: sync.Mutex{},
|
||||
proxyClient: make(map[int64]types.ProxyNode),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxyClientManager) GetProxyClients(sess []*sessionutil.Session) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
for _, s := range sess {
|
||||
if _, ok := p.proxyClient[s.ServerID]; ok {
|
||||
continue
|
||||
}
|
||||
pc, err := p.core.NewProxyClient(s)
|
||||
if err != nil {
|
||||
log.Debug("create proxy client failed", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
p.proxyClient[s.ServerID] = pc
|
||||
log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxyClientManager) AddProxyClient(s *sessionutil.Session) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
if _, ok := p.proxyClient[s.ServerID]; ok {
|
||||
return
|
||||
}
|
||||
pc, err := p.core.NewProxyClient(s)
|
||||
if err != nil {
|
||||
log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
p.proxyClient[s.ServerID] = pc
|
||||
log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID))
|
||||
}
|
||||
|
||||
func (p *proxyClientManager) DelProxyClient(s *sessionutil.Session) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
delete(p.proxyClient, s.ServerID)
|
||||
log.Debug("remove proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID))
|
||||
}
|
||||
|
||||
func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
if len(p.proxyClient) == 0 {
|
||||
log.Debug("proxy client is empty,InvalidateCollectionMetaCache will not send to any client")
|
||||
return
|
||||
}
|
||||
|
||||
for k, f := range p.proxyClient {
|
||||
err := func() error {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Debug("call InvalidateCollectionMetaCache panic", zap.Int64("proxy id", k), zap.Any("msg", err))
|
||||
}
|
||||
|
||||
}()
|
||||
sta, err := f.InvalidateCollectionMetaCache(ctx, request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("grpc fail,error=%w", err)
|
||||
}
|
||||
if sta.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("message = %s", sta.Reason)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
log.Error("call invalidate collection meta failed", zap.Int64("proxy id", k), zap.Error(err))
|
||||
} else {
|
||||
log.Debug("send invalidate collection meta cache to proxy node", zap.Int64("node id", k))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type proxyNodeManager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
lock sync.Mutex
|
||||
etcdCli *clientv3.Client
|
||||
getSessions []func([]*sessionutil.Session)
|
||||
addSessions []func(*sessionutil.Session)
|
||||
delSessions []func(*sessionutil.Session)
|
||||
}
|
||||
|
||||
func newProxyNodeManager(ctx context.Context, etcdAddr []string, fns ...func([]*sessionutil.Session)) (*proxyNodeManager, error) {
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx2, cancel2 := context.WithCancel(ctx)
|
||||
p := &proxyNodeManager{
|
||||
ctx: ctx2,
|
||||
cancel: cancel2,
|
||||
lock: sync.Mutex{},
|
||||
etcdCli: cli,
|
||||
}
|
||||
p.getSessions = append(p.getSessions, fns...)
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *proxyNodeManager) AddSession(fns ...func(*sessionutil.Session)) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.addSessions = append(p.addSessions, fns...)
|
||||
}
|
||||
|
||||
func (p *proxyNodeManager) DelSession(fns ...func(*sessionutil.Session)) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.delSessions = append(p.delSessions, fns...)
|
||||
}
|
||||
|
||||
func (p *proxyNodeManager) WatchProxyNode() error {
|
||||
ctx2, cancel := context.WithTimeout(p.ctx, RequestTimeout)
|
||||
defer cancel()
|
||||
resp, err := p.etcdCli.Get(
|
||||
ctx2,
|
||||
path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole),
|
||||
clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("proxyNodeManager,watch proxy node failed, error = %w", err)
|
||||
}
|
||||
sessions := []*sessionutil.Session{}
|
||||
for _, v := range resp.Kvs {
|
||||
sess := new(sessionutil.Session)
|
||||
err := json.Unmarshal(v.Value, sess)
|
||||
if err != nil {
|
||||
log.Debug("unmarshal SvrSession failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
sessions = append(sessions, sess)
|
||||
}
|
||||
for _, f := range p.getSessions {
|
||||
f(sessions)
|
||||
}
|
||||
for _, s := range sessions {
|
||||
log.Debug("Get proxy node", zap.Int64("node id", s.ServerID), zap.String("node addr", s.Address), zap.String("node name", s.ServerName))
|
||||
}
|
||||
|
||||
rch := p.etcdCli.Watch(
|
||||
p.ctx,
|
||||
path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole),
|
||||
clientv3.WithPrefix(),
|
||||
clientv3.WithCreatedNotify(),
|
||||
clientv3.WithPrevKV(),
|
||||
clientv3.WithRev(resp.Header.Revision+1),
|
||||
)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
log.Debug("context done", zap.Error(p.ctx.Err()))
|
||||
return
|
||||
case wresp, ok := <-rch:
|
||||
if !ok {
|
||||
log.Debug("watch proxy node failed")
|
||||
return
|
||||
}
|
||||
for _, ev := range wresp.Events {
|
||||
switch ev.Type {
|
||||
case mvccpb.PUT:
|
||||
sess := new(sessionutil.Session)
|
||||
err := json.Unmarshal(ev.Kv.Value, sess)
|
||||
if err != nil {
|
||||
log.Debug("watch proxy node, unmarshal failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
p.lock.Lock()
|
||||
for _, f := range p.addSessions {
|
||||
f(sess)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
case mvccpb.DELETE:
|
||||
sess := new(sessionutil.Session)
|
||||
err := json.Unmarshal(ev.PrevKv.Value, sess)
|
||||
if err != nil {
|
||||
log.Debug("watch proxy node, unmarshal failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
p.lock.Lock()
|
||||
for _, f := range p.delSessions {
|
||||
f(sess)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *proxyNodeManager) Stop() {
|
||||
p.cancel()
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
)
|
||||
|
||||
func TestProxyNodeManager(t *testing.T) {
|
||||
Params.Init()
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
|
||||
assert.Nil(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
cli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
defer cli.Delete(ctx, sessKey, clientv3.WithPrefix())
|
||||
s1 := sessionutil.Session{
|
||||
ServerID: 100,
|
||||
}
|
||||
b1, err := json.Marshal(&s1)
|
||||
assert.Nil(t, err)
|
||||
k1 := path.Join(sessKey, typeutil.ProxyNodeRole+"-100")
|
||||
_, err = cli.Put(ctx, k1, string(b1))
|
||||
assert.Nil(t, err)
|
||||
|
||||
s0 := sessionutil.Session{
|
||||
ServerID: 99,
|
||||
}
|
||||
b0, err := json.Marshal(&s0)
|
||||
assert.Nil(t, err)
|
||||
k0 := path.Join(sessKey, typeutil.ProxyNodeRole+"-99")
|
||||
_, err = cli.Put(ctx, k0, string(b0))
|
||||
assert.Nil(t, err)
|
||||
|
||||
f1 := func(sess []*sessionutil.Session) {
|
||||
assert.Equal(t, len(sess), 2)
|
||||
assert.Equal(t, int64(100), sess[0].ServerID)
|
||||
assert.Equal(t, int64(99), sess[1].ServerID)
|
||||
t.Log("get sessions", sess[0], sess[1])
|
||||
}
|
||||
|
||||
pm, err := newProxyNodeManager(ctx, []string{Params.EtcdAddress}, f1)
|
||||
assert.Nil(t, err)
|
||||
fa := func(sess *sessionutil.Session) {
|
||||
assert.Equal(t, int64(101), sess.ServerID)
|
||||
t.Log("add session", sess)
|
||||
}
|
||||
fd := func(sess *sessionutil.Session) {
|
||||
assert.Equal(t, int64(100), sess.ServerID)
|
||||
t.Log("del session", sess)
|
||||
}
|
||||
pm.AddSession(fa)
|
||||
pm.DelSession(fd)
|
||||
|
||||
err = pm.WatchProxyNode()
|
||||
assert.Nil(t, err)
|
||||
t.Log("======== start watch proxy node ==========")
|
||||
|
||||
s2 := sessionutil.Session{
|
||||
ServerID: 101,
|
||||
}
|
||||
b2, err := json.Marshal(&s2)
|
||||
assert.Nil(t, err)
|
||||
k2 := path.Join(sessKey, typeutil.ProxyNodeRole+"-101")
|
||||
_, err = cli.Put(ctx, k2, string(b2))
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, err = cli.Delete(ctx, k1)
|
||||
assert.Nil(t, err)
|
||||
time.Sleep(time.Second)
|
||||
pm.Stop()
|
||||
time.Sleep(time.Second)
|
||||
}
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
|
@ -310,13 +311,23 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
|||
|
||||
//notify query service to release collection
|
||||
go func() {
|
||||
if err = t.core.CallReleaseCollectionService(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
|
||||
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil {
|
||||
log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error()))
|
||||
}
|
||||
}()
|
||||
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ts,
|
||||
SourceID: t.core.session.ServerID,
|
||||
},
|
||||
DbName: t.Req.DbName,
|
||||
CollectionName: t.Req.CollectionName,
|
||||
}
|
||||
// error doesn't matter here
|
||||
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
|
||||
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
|
@ -482,8 +493,18 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
|
|||
t.core.SendTimeTick(ts)
|
||||
}
|
||||
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ts,
|
||||
SourceID: t.core.session.ServerID,
|
||||
},
|
||||
DbName: t.Req.DbName,
|
||||
CollectionName: t.Req.CollectionName,
|
||||
}
|
||||
// error doesn't matter here
|
||||
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
|
||||
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
|
@ -548,8 +569,18 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
|
|||
t.core.SendTimeTick(ts)
|
||||
}
|
||||
|
||||
req := proxypb.InvalidateCollMetaCacheRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TODO, msg type
|
||||
MsgID: 0, //TODO, msg id
|
||||
Timestamp: ts,
|
||||
SourceID: t.core.session.ServerID,
|
||||
},
|
||||
DbName: t.Req.DbName,
|
||||
CollectionName: t.Req.CollectionName,
|
||||
}
|
||||
// error doesn't matter here
|
||||
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
|
||||
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
|
|
|
@ -12,20 +12,15 @@
|
|||
package masterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -37,35 +32,14 @@ type timetickSync struct {
|
|||
sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg
|
||||
}
|
||||
|
||||
func newTimeTickSync(core *Core) (*timetickSync, error) {
|
||||
tss := timetickSync{
|
||||
func newTimeTickSync(core *Core) *timetickSync {
|
||||
return &timetickSync{
|
||||
lock: sync.Mutex{},
|
||||
core: core,
|
||||
proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg),
|
||||
chanStream: make(map[string]msgstream.MsgStream),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16),
|
||||
}
|
||||
|
||||
ctx2, cancel := context.WithTimeout(core.ctx, RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := core.etcdCli.Get(ctx2, ProxyMetaPrefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range resp.Kvs {
|
||||
var sess sessionutil.Session
|
||||
err := json.Unmarshal(v.Value, &sess)
|
||||
if err != nil {
|
||||
log.Debug("unmarshal SvrSession failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if _, ok := tss.proxyTimeTick[sess.ServerID]; !ok {
|
||||
tss.proxyTimeTick[sess.ServerID] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return &tss, nil
|
||||
}
|
||||
|
||||
// sendToChannel send all channels' timetick to sendChan
|
||||
|
@ -101,50 +75,36 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *timetickSync) AddProxyNode(sess *sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.proxyTimeTick[sess.ServerID] = nil
|
||||
}
|
||||
|
||||
func (t *timetickSync) DelProxyNode(sess *sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if _, ok := t.proxyTimeTick[sess.ServerID]; ok {
|
||||
delete(t.proxyTimeTick, sess.ServerID)
|
||||
t.sendToChannel()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *timetickSync) GetProxyNodes(sess []*sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
for _, s := range sess {
|
||||
t.proxyTimeTick[s.ServerID] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// StartWatch watch proxy node change and process all channels' timetick msg
|
||||
func (t *timetickSync) StartWatch() {
|
||||
proxyNodePrefix := path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)
|
||||
rch := t.core.etcdCli.Watch(t.core.ctx, proxyNodePrefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
|
||||
for {
|
||||
select {
|
||||
case <-t.core.ctx.Done():
|
||||
log.Debug("master service context done", zap.Error(t.core.ctx.Err()))
|
||||
return
|
||||
case wresp, ok := <-rch:
|
||||
if !ok {
|
||||
log.Debug("time tick sync watch etcd failed")
|
||||
}
|
||||
for _, ev := range wresp.Events {
|
||||
switch ev.Type {
|
||||
case mvccpb.PUT:
|
||||
var sess sessionutil.Session
|
||||
err := json.Unmarshal(ev.Kv.Value, &sess)
|
||||
if err != nil {
|
||||
log.Debug("watch proxy node, unmarshal failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
func() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.proxyTimeTick[sess.ServerID] = nil
|
||||
}()
|
||||
case mvccpb.DELETE:
|
||||
var sess sessionutil.Session
|
||||
err := json.Unmarshal(ev.PrevKv.Value, &sess)
|
||||
if err != nil {
|
||||
log.Debug("watch proxy node, unmarshal failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
func() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if _, ok := t.proxyTimeTick[sess.ServerID]; ok {
|
||||
delete(t.proxyTimeTick, sess.ServerID)
|
||||
t.sendToChannel()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
case ptt, ok := <-t.sendChan:
|
||||
if !ok {
|
||||
log.Debug("timetickSync sendChan closed")
|
||||
|
|
|
@ -87,7 +87,7 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
|
|||
|
||||
// Register register proxy node at etcd
|
||||
func (node *ProxyNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress})
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
node.session.Init(typeutil.ProxyNodeRole, Params.NetworkAddress, false)
|
||||
Params.ProxyID = node.session.ServerID
|
||||
return nil
|
||||
|
@ -181,7 +181,7 @@ func (node *ProxyNode) Init() error {
|
|||
log.Debug("create query message stream ...")
|
||||
|
||||
masterAddr := Params.MasterAddress
|
||||
idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr, []string{Params.EtcdAddress})
|
||||
idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -120,7 +120,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
|
|||
|
||||
// Register register query node at etcd
|
||||
func (node *QueryNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, []string{Params.EtcdAddress})
|
||||
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
|
||||
Params.QueryNodeID = node.session.ServerID
|
||||
return nil
|
||||
|
|
|
@ -58,7 +58,7 @@ type QueryService struct {
|
|||
|
||||
// Register register query service at etcd
|
||||
func (qs *QueryService) Register() error {
|
||||
qs.session = sessionutil.NewSession(qs.loopCtx, []string{Params.EtcdAddress})
|
||||
qs.session = sessionutil.NewSession(qs.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress})
|
||||
qs.session.Init(typeutil.QueryServiceRole, Params.Address, true)
|
||||
Params.NodeID = uint64(qs.session.ServerID)
|
||||
return nil
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
)
|
||||
|
||||
type TimeTickProvider interface {
|
||||
|
@ -123,6 +124,7 @@ type MasterComponent interface {
|
|||
SetDataService(context.Context, DataService) error
|
||||
SetIndexService(IndexService) error
|
||||
SetQueryService(QueryService) error
|
||||
SetNewProxyClient(func(sess *sessionutil.Session) (ProxyNode, error))
|
||||
}
|
||||
|
||||
type ProxyNode interface {
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
DefaultServiceRoot = "/session/"
|
||||
DefaultServiceRoot = "session/"
|
||||
DefaultIDKey = "id"
|
||||
DefaultRetryTimes = 30
|
||||
DefaultTTL = 10
|
||||
|
@ -41,9 +41,10 @@ type Session struct {
|
|||
Address string `json:"Address,omitempty"`
|
||||
Exclusive bool `json:"Exclusive,omitempty"`
|
||||
|
||||
etcdCli *clientv3.Client
|
||||
leaseID clientv3.LeaseID
|
||||
cancel context.CancelFunc
|
||||
etcdCli *clientv3.Client
|
||||
leaseID clientv3.LeaseID
|
||||
cancel context.CancelFunc
|
||||
metaRoot string
|
||||
}
|
||||
|
||||
type SessionEvent struct {
|
||||
|
@ -54,11 +55,12 @@ type SessionEvent struct {
|
|||
// NewSession is a helper to build Session object.
|
||||
// ServerID and LeaseID will be assigned after registeration.
|
||||
// etcdCli is initialized when NewSession
|
||||
func NewSession(ctx context.Context, etcdAddress []string) *Session {
|
||||
func NewSession(ctx context.Context, metaRoot string, etcdAddress []string) *Session {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
session := &Session{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
metaRoot: metaRoot,
|
||||
}
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
|
@ -104,17 +106,17 @@ func (s *Session) getServerID() (int64, error) {
|
|||
func (s *Session) checkIDExist() {
|
||||
s.etcdCli.Txn(s.ctx).If(
|
||||
clientv3.Compare(
|
||||
clientv3.Version(path.Join(DefaultServiceRoot, DefaultIDKey)),
|
||||
clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)),
|
||||
"=",
|
||||
0)).
|
||||
Then(clientv3.OpPut(path.Join(DefaultServiceRoot, DefaultIDKey), "1")).Commit()
|
||||
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), "1")).Commit()
|
||||
|
||||
}
|
||||
|
||||
func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) {
|
||||
res := int64(0)
|
||||
getServerIDWithKeyFn := func() error {
|
||||
getResp, err := s.etcdCli.Get(s.ctx, path.Join(DefaultServiceRoot, key))
|
||||
getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -129,10 +131,10 @@ func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error)
|
|||
}
|
||||
txnResp, err := s.etcdCli.Txn(s.ctx).If(
|
||||
clientv3.Compare(
|
||||
clientv3.Value(path.Join(DefaultServiceRoot, DefaultIDKey)),
|
||||
clientv3.Value(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)),
|
||||
"=",
|
||||
value)).
|
||||
Then(clientv3.OpPut(path.Join(DefaultServiceRoot, DefaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit()
|
||||
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -182,10 +184,10 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
|
|||
}
|
||||
txnResp, err := s.etcdCli.Txn(s.ctx).If(
|
||||
clientv3.Compare(
|
||||
clientv3.Version(path.Join(DefaultServiceRoot, key)),
|
||||
clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, key)),
|
||||
"=",
|
||||
0)).
|
||||
Then(clientv3.OpPut(path.Join(DefaultServiceRoot, key), string(sessionJSON), clientv3.WithLease(resp.ID))).Commit()
|
||||
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, key), string(sessionJSON), clientv3.WithLease(resp.ID))).Commit()
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("compare and swap error %s\n. maybe the key has registered", err)
|
||||
|
@ -239,7 +241,7 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
|
|||
// GetSessions will get all sessions registered in etcd.
|
||||
func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) {
|
||||
res := make(map[string]*Session)
|
||||
key := path.Join(DefaultServiceRoot, prefix)
|
||||
key := path.Join(s.metaRoot, DefaultServiceRoot, prefix)
|
||||
resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
if err != nil {
|
||||
|
@ -263,7 +265,7 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error)
|
|||
// If a server is offline, it will be add to delChannel.
|
||||
func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) {
|
||||
eventCh := make(chan *SessionEvent, 100)
|
||||
rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
|
||||
rch := s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package sessionutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -20,6 +22,7 @@ func TestGetServerIDConcurrently(t *testing.T) {
|
|||
Params.Init()
|
||||
|
||||
etcdAddr, err := Params.Load("_EtcdAddress")
|
||||
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -27,7 +30,7 @@ func TestGetServerIDConcurrently(t *testing.T) {
|
|||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "")
|
||||
_, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix())
|
||||
_, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
defer etcdKV.Close()
|
||||
|
@ -36,7 +39,7 @@ func TestGetServerIDConcurrently(t *testing.T) {
|
|||
var wg sync.WaitGroup
|
||||
var muList sync.Mutex = sync.Mutex{}
|
||||
|
||||
s := NewSession(ctx, []string{etcdAddr})
|
||||
s := NewSession(ctx, metaRoot, []string{etcdAddr})
|
||||
res := make([]int64, 0)
|
||||
|
||||
getIDFunc := func() {
|
||||
|
@ -72,13 +75,14 @@ func TestInit(t *testing.T) {
|
|||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "")
|
||||
_, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix())
|
||||
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
|
||||
_, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
defer etcdKV.Close()
|
||||
defer etcdKV.RemoveWithPrefix("")
|
||||
|
||||
s := NewSession(ctx, []string{etcdAddr})
|
||||
s := NewSession(ctx, metaRoot, []string{etcdAddr})
|
||||
s.Init("inittest", "testAddr", false)
|
||||
assert.NotEqual(t, int64(0), s.leaseID)
|
||||
assert.NotEqual(t, int64(0), s.ServerID)
|
||||
|
@ -99,7 +103,8 @@ func TestUpdateSessions(t *testing.T) {
|
|||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "")
|
||||
_, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix())
|
||||
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
|
||||
_, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
defer etcdKV.Close()
|
||||
|
@ -108,7 +113,7 @@ func TestUpdateSessions(t *testing.T) {
|
|||
var wg sync.WaitGroup
|
||||
var muList sync.Mutex = sync.Mutex{}
|
||||
|
||||
s := NewSession(ctx, []string{etcdAddr})
|
||||
s := NewSession(ctx, metaRoot, []string{etcdAddr})
|
||||
|
||||
sessions, rev, err := s.GetSessions("test")
|
||||
assert.Nil(t, err)
|
||||
|
@ -118,7 +123,7 @@ func TestUpdateSessions(t *testing.T) {
|
|||
sList := []*Session{}
|
||||
|
||||
getIDFunc := func() {
|
||||
singleS := NewSession(ctx, []string{etcdAddr})
|
||||
singleS := NewSession(ctx, metaRoot, []string{etcdAddr})
|
||||
singleS.Init("test", "testAddr", false)
|
||||
muList.Lock()
|
||||
sList = append(sList, singleS)
|
||||
|
@ -139,7 +144,7 @@ func TestUpdateSessions(t *testing.T) {
|
|||
notExistSessions, _, _ := s.GetSessions("testt")
|
||||
assert.Equal(t, len(notExistSessions), 0)
|
||||
|
||||
etcdKV.RemoveWithPrefix(DefaultServiceRoot)
|
||||
etcdKV.RemoveWithPrefix(metaRoot)
|
||||
assert.Eventually(t, func() bool {
|
||||
sessions, _, _ := s.GetSessions("test")
|
||||
return len(sessions) == 0
|
||||
|
|
Loading…
Reference in New Issue