mirror of https://github.com/milvus-io/milvus.git
Unsubscribe topic in DataCoord (#15353)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/15370/head
parent
1fbb70b318
commit
3987574448
|
@ -18,11 +18,13 @@ package datacoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"go.uber.org/zap"
|
||||
"stathat.com/c/consistent"
|
||||
|
@ -44,6 +46,7 @@ type ChannelManager struct {
|
|||
assignPolicy ChannelAssignPolicy
|
||||
reassignPolicy ChannelReassignPolicy
|
||||
bgChecker ChannelBGChecker
|
||||
msgstreamFactory msgstream.Factory
|
||||
}
|
||||
|
||||
type channel struct {
|
||||
|
@ -57,10 +60,15 @@ type ChannelManagerOpt func(c *ChannelManager)
|
|||
func withFactory(f ChannelPolicyFactory) ChannelManagerOpt {
|
||||
return func(c *ChannelManager) { c.factory = f }
|
||||
}
|
||||
|
||||
func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory {
|
||||
return NewConsistentHashChannelPolicyFactory(hash)
|
||||
}
|
||||
|
||||
func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt {
|
||||
return func(c *ChannelManager) { c.msgstreamFactory = f }
|
||||
}
|
||||
|
||||
// NewChannelManager returns a new ChannelManager
|
||||
func NewChannelManager(
|
||||
kv kv.TxnKV,
|
||||
|
@ -226,6 +234,13 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
nodeChannelInfo := c.store.GetNode(nodeID)
|
||||
if nodeChannelInfo == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.tryToUnsubscribe(nodeChannelInfo)
|
||||
|
||||
updates := c.deregisterPolicy(c.store, nodeID)
|
||||
log.Debug("deregister node",
|
||||
zap.Int64("unregistered node", nodeID),
|
||||
|
@ -243,6 +258,41 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) {
|
||||
if nodeChannelInfo == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if c.msgstreamFactory == nil {
|
||||
log.Warn("msgstream factory is not set")
|
||||
return
|
||||
}
|
||||
|
||||
nodeID := nodeChannelInfo.NodeID
|
||||
for _, ch := range nodeChannelInfo.Channels {
|
||||
subscriptionName := subscriptionGenerator(ch.CollectionID, nodeID)
|
||||
err := c.unsubscribe(subscriptionName, ch.Name)
|
||||
if err != nil {
|
||||
log.Warn("failed to unsubcribe topic", zap.String("subscription name", subscriptionName), zap.String("channel name", ch.Name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func subscriptionGenerator(collectionID int64, nodeID int64) string {
|
||||
return fmt.Sprintf("%s-%s-%d-%d", Params.DataNodeCfg.MsgChannelSubName, Params.DataNodeCfg.SubscriptionNamePrefix, nodeID, collectionID)
|
||||
}
|
||||
|
||||
func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error {
|
||||
msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgStream.AsConsumer([]string{channel}, subscriptionName)
|
||||
msgStream.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch try to add the channel to cluster. If the channel already exists, do nothing
|
||||
func (c *ChannelManager) Watch(ch *channel) error {
|
||||
c.mu.Lock()
|
||||
|
|
|
@ -307,7 +307,7 @@ func (s *Server) initCluster() error {
|
|||
}
|
||||
|
||||
var err error
|
||||
s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
|
||||
s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1246,6 +1246,8 @@ type dataNodeConfig struct {
|
|||
|
||||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
||||
SubscriptionNamePrefix string
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) init(bp *BaseParamTable) {
|
||||
|
@ -1267,6 +1269,7 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) {
|
|||
|
||||
p.initDmlChannelName()
|
||||
p.initDeltaChannelName()
|
||||
p.initSubscriptionNamePrefix()
|
||||
}
|
||||
|
||||
// Refresh is called after session init
|
||||
|
@ -1364,6 +1367,13 @@ func (p *dataNodeConfig) initDeltaChannelName() {
|
|||
p.DeltaChannelName = strings.Join(s, "-")
|
||||
}
|
||||
|
||||
func (p *dataNodeConfig) initSubscriptionNamePrefix() {
|
||||
prefix, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix")
|
||||
if err != nil {
|
||||
p.SubscriptionNamePrefix = prefix
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// --- indexcoord ---
|
||||
type indexCoordConfig struct {
|
||||
|
|
Loading…
Reference in New Issue