mirror of https://github.com/milvus-io/milvus.git
fix: fix crash when enable standby and streaming (#38239)
issue: #38125 - connect kv at standby mode. - make balancer initialization lazy. Signed-off-by: chyezh <chyezh@outlook.com>pull/38267/head
parent
d7a5ad4eca
commit
18bef5e062
|
@ -110,6 +110,7 @@ type Server struct {
|
|||
address string
|
||||
watchClient kv.WatchKV
|
||||
kv kv.MetaKv
|
||||
metaRootPath string
|
||||
meta *meta
|
||||
segmentManager Manager
|
||||
allocator allocator.Allocator
|
||||
|
@ -306,6 +307,15 @@ func (s *Server) Init() error {
|
|||
if err = s.initSession(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.initKV(); err != nil {
|
||||
return err
|
||||
}
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
s.streamingCoord = streamingcoord.NewServerBuilder().
|
||||
WithETCD(s.etcdCli).
|
||||
WithMetaKV(s.kv).
|
||||
WithSession(s.session).Build()
|
||||
}
|
||||
if s.enableActiveStandBy {
|
||||
s.activateFunc = func() error {
|
||||
log.Info("DataCoord switch from standby to active, activating")
|
||||
|
@ -365,10 +375,7 @@ func (s *Server) initDataCoord() error {
|
|||
|
||||
// Initialize streaming coordinator.
|
||||
if streamingutil.IsStreamingServiceEnabled() {
|
||||
s.streamingCoord = streamingcoord.NewServerBuilder().
|
||||
WithETCD(s.etcdCli).
|
||||
WithMetaKV(s.kv).
|
||||
WithSession(s.session).Build()
|
||||
|
||||
if err = s.streamingCoord.Init(context.TODO()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -648,31 +655,36 @@ func (s *Server) initSegmentManager() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
|
||||
if s.meta != nil {
|
||||
func (s *Server) initKV() error {
|
||||
if s.kv != nil {
|
||||
return nil
|
||||
}
|
||||
s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
metaType := Params.MetaStoreCfg.MetaStoreType.GetValue()
|
||||
log.Info("data coordinator connecting to metadata store", zap.String("metaType", metaType))
|
||||
metaRootPath := ""
|
||||
if metaType == util.MetaStoreTypeTiKV {
|
||||
metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath,
|
||||
s.metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, s.metaRootPath,
|
||||
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
} else if metaType == util.MetaStoreTypeEtcd {
|
||||
metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath,
|
||||
s.metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, s.metaRootPath,
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
} else {
|
||||
return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", metaType))
|
||||
}
|
||||
log.Info("data coordinator successfully connected to metadata store", zap.String("metaType", metaType))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
|
||||
if s.meta != nil {
|
||||
return nil
|
||||
}
|
||||
reloadEtcdFn := func() error {
|
||||
var err error
|
||||
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), metaRootPath)
|
||||
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), s.metaRootPath)
|
||||
s.meta, err = newMeta(s.ctx, catalog, chunkManager)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -4,10 +4,13 @@ import (
|
|||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
|
||||
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
|
||||
"github.com/milvus-io/milvus/internal/util/componentutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -41,8 +44,11 @@ func (s *ServerBuilder) Build() *Server {
|
|||
resource.OptETCD(s.etcdClient),
|
||||
resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)),
|
||||
)
|
||||
balancer := syncutil.NewFuture[balancer.Balancer]()
|
||||
return &Server{
|
||||
session: s.session,
|
||||
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole),
|
||||
assignmentService: service.NewAssignmentService(balancer),
|
||||
balancer: balancer,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
)
|
||||
|
||||
// Server is the streamingcoord server.
|
||||
|
@ -26,7 +27,7 @@ type Server struct {
|
|||
componentStateService *componentutil.ComponentStateService // state.
|
||||
|
||||
// basic component variables can be used at service level.
|
||||
balancer balancer.Balancer
|
||||
balancer *syncutil.Future[balancer.Balancer]
|
||||
}
|
||||
|
||||
// Init initializes the streamingcoord server.
|
||||
|
@ -39,7 +40,6 @@ func (s *Server) Init(ctx context.Context) (err error) {
|
|||
return err
|
||||
}
|
||||
// Init all grpc service of streamingcoord server.
|
||||
s.initService()
|
||||
s.componentStateService.OnInitialized(s.session.GetServerID())
|
||||
log.Info("streamingcoord server initialized")
|
||||
return nil
|
||||
|
@ -51,15 +51,14 @@ func (s *Server) initBasicComponent(ctx context.Context) error {
|
|||
var err error
|
||||
// Read new incoming topics from configuration, and register it into balancer.
|
||||
newIncomingTopics := util.GetAllTopicsFromConfiguration()
|
||||
s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
|
||||
balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.balancer.Set(balancer)
|
||||
return err
|
||||
}
|
||||
|
||||
// initService initializes the grpc service.
|
||||
func (s *Server) initService() {
|
||||
s.assignmentService = service.NewAssignmentService(s.balancer)
|
||||
}
|
||||
|
||||
// registerGRPCService register all grpc service to grpc server.
|
||||
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
|
||||
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
|
||||
|
@ -76,6 +75,6 @@ func (s *Server) Start() {
|
|||
func (s *Server) Stop() {
|
||||
s.componentStateService.OnStopping()
|
||||
log.Info("close balancer...")
|
||||
s.balancer.Close()
|
||||
s.balancer.Get().Close()
|
||||
log.Info("streamingcoord server stopped")
|
||||
}
|
||||
|
|
|
@ -8,13 +8,14 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
)
|
||||
|
||||
var _ streamingpb.StreamingCoordAssignmentServiceServer = (*assignmentServiceImpl)(nil)
|
||||
|
||||
// NewAssignmentService returns a new assignment service.
|
||||
func NewAssignmentService(
|
||||
balancer balancer.Balancer,
|
||||
balancer *syncutil.Future[balancer.Balancer],
|
||||
) streamingpb.StreamingCoordAssignmentServiceServer {
|
||||
return &assignmentServiceImpl{
|
||||
balancer: balancer,
|
||||
|
@ -28,7 +29,7 @@ type AssignmentService interface {
|
|||
|
||||
// assignmentServiceImpl is the implementation of the assignment service.
|
||||
type assignmentServiceImpl struct {
|
||||
balancer balancer.Balancer
|
||||
balancer *syncutil.Future[balancer.Balancer]
|
||||
listenerTotal prometheus.Gauge
|
||||
}
|
||||
|
||||
|
@ -37,5 +38,9 @@ func (s *assignmentServiceImpl) AssignmentDiscover(server streamingpb.StreamingC
|
|||
s.listenerTotal.Inc()
|
||||
defer s.listenerTotal.Dec()
|
||||
|
||||
return discover.NewAssignmentDiscoverServer(s.balancer, server).Execute()
|
||||
balancer, err := s.balancer.GetWithContext(server.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return discover.NewAssignmentDiscoverServer(balancer, server).Execute()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue