mirror of https://github.com/milvus-io/milvus.git
Fix pulsar unsubscribe (#16877)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/17018/head
parent
898533c5e4
commit
7d6387c74d
|
@ -42,6 +42,7 @@ const (
|
|||
|
||||
// ChannelManager manages the allocation and the balance between channels and data nodes.
|
||||
type ChannelManager struct {
|
||||
ctx context.Context
|
||||
mu sync.RWMutex
|
||||
h Handler
|
||||
store RWChannelStore
|
||||
|
@ -89,6 +90,7 @@ func NewChannelManager(
|
|||
options ...ChannelManagerOpt,
|
||||
) (*ChannelManager, error) {
|
||||
c := &ChannelManager{
|
||||
ctx: context.TODO(),
|
||||
h: h,
|
||||
factory: NewChannelPolicyFactoryV1(kv),
|
||||
store: NewChannelStore(kv),
|
||||
|
@ -113,6 +115,7 @@ func NewChannelManager(
|
|||
|
||||
// Startup adjusts the channel store according to current cluster states.
|
||||
func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
||||
c.ctx = ctx
|
||||
channels := c.store.GetNodesChannels()
|
||||
// Retrieve the current old nodes.
|
||||
oNodes := make([]int64, 0, len(channels))
|
||||
|
@ -346,10 +349,7 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
|
|||
nodeID := ncInfo.NodeID
|
||||
for _, ch := range ncInfo.Channels {
|
||||
subName := buildSubName(ch.CollectionID, nodeID)
|
||||
err := c.unsubscribe(subName, ch.Name)
|
||||
if err != nil {
|
||||
log.Warn("failed to unsubscribe topic", zap.String("subscription name", subName), zap.String("channel name", ch.Name))
|
||||
}
|
||||
c.unsubscribe(subName, ch.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -358,17 +358,17 @@ func buildSubName(collectionID int64, nodeID int64) string {
|
|||
return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID)
|
||||
}
|
||||
|
||||
func (c *ChannelManager) unsubscribe(subName string, channel string) error {
|
||||
msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO())
|
||||
func (c *ChannelManager) unsubscribe(subName string, channel string) {
|
||||
log.Info("unsubscribe channel", zap.String("subname", subName), zap.Any("channel", channel))
|
||||
msgStream, err := c.msgstreamFactory.NewMsgStream(c.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Error("unsubscribe channels failed", zap.String("subname", subName), zap.Any("channel", channel))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pchannelName := funcutil.ToPhysicalChannel(channel)
|
||||
|
||||
msgStream.AsConsumer([]string{pchannelName}, subName)
|
||||
msgStream.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
|
||||
|
@ -604,10 +604,7 @@ func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) e
|
|||
log.Warn("msgstream factory is not set, unable to clean up topics")
|
||||
} else {
|
||||
subName := buildSubName(chToCleanUp.CollectionID, nodeID)
|
||||
err := c.unsubscribe(subName, channelName)
|
||||
if err != nil {
|
||||
log.Warn("failed to unsubscribe topic", zap.String("subcription", subName), zap.String("channel name", channelName), zap.Error(err))
|
||||
}
|
||||
c.unsubscribe(subName, channelName)
|
||||
}
|
||||
|
||||
if !c.isMarkedDrop(channelName) {
|
||||
|
|
|
@ -159,11 +159,12 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
|
|||
ms.consumerChannels = append(ms.consumerChannels, channel)
|
||||
return nil
|
||||
}
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(50), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
if err != nil {
|
||||
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
|
||||
panic(errMsg)
|
||||
}
|
||||
log.Info("Successfully create consumer", zap.String("channel", channel), zap.String("subname", subName))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -198,6 +199,7 @@ func (ms *mqMsgStream) AsReader(channels []string, subName string) {
|
|||
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
|
||||
panic(errMsg)
|
||||
}
|
||||
log.Info("Successfully create reader", zap.String("channel", channel), zap.String("subname", subName))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,21 +21,19 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo"
|
||||
unsubscribeChannelCheckInterval = time.Second
|
||||
unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo"
|
||||
)
|
||||
|
||||
type channelUnsubscribeHandler struct {
|
||||
|
@ -133,11 +131,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
|
|||
for _, collectionChannels := range channelInfo.CollectionChannels {
|
||||
collectionID := collectionChannels.CollectionID
|
||||
subName := funcutil.GenChannelSubName(Params.MsgChannelCfg.QueryNodeSubName, collectionID, nodeID)
|
||||
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
|
||||
if err != nil {
|
||||
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
|
||||
panic(err)
|
||||
}
|
||||
unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
|
||||
}
|
||||
|
||||
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
|
||||
|
@ -162,13 +156,13 @@ func (csh *channelUnsubscribeHandler) close() {
|
|||
}
|
||||
|
||||
// unsubscribeChannels create consumer fist, and unsubscribe channel through msgStream.close()
|
||||
func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) error {
|
||||
func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) {
|
||||
log.Info("unsubscribe channel", zap.String("subname", subName), zap.Any("channels", channels))
|
||||
msgStream, err := factory.NewMsgStream(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Error("unsubscribe channels failed", zap.String("subname", subName), zap.Any("channels", channels))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
msgStream.AsConsumer(channels, subName)
|
||||
msgStream.Close()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -965,9 +965,13 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
|
|||
}
|
||||
}
|
||||
segmentInfos := m.getSegmentInfosByNode(nodeID)
|
||||
// get delta/search channel the node has watched
|
||||
colIDs := make(map[UniqueID]bool)
|
||||
// iterate through segments to find unique ids
|
||||
for _, segmentInfo := range segmentInfos {
|
||||
collectionID := segmentInfo.CollectionID
|
||||
colIDs[segmentInfo.CollectionID] = true
|
||||
}
|
||||
// get delta/search channel the node has watched
|
||||
for collectionID := range colIDs {
|
||||
if _, ok := colID2DeltaChannels[collectionID]; !ok {
|
||||
deltaChanelInfos, err := m.getDeltaChannelsByCollectionID(collectionID)
|
||||
if err != nil {
|
||||
|
@ -995,6 +999,7 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
|
|||
for collectionID, channels := range colID2DeltaChannels {
|
||||
colID2Channels[collectionID] = append(colID2Channels[collectionID], channels...)
|
||||
}
|
||||
// TODO, something is wrong here, because it's possible that the server loaded query channel but with not segment or dml chanel
|
||||
for collectionID, channel := range colID2QueryChannel {
|
||||
colID2Channels[collectionID] = append(colID2Channels[collectionID], channel)
|
||||
}
|
||||
|
@ -1010,7 +1015,6 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
|
|||
Channels: channels,
|
||||
})
|
||||
}
|
||||
|
||||
return unsubscribeChannelInfo
|
||||
}
|
||||
|
||||
|
|
|
@ -110,12 +110,25 @@ func (pc *PulsarConsumer) Ack(message Message) {
|
|||
// Close the consumer and stop the broker to push more messages
|
||||
func (pc *PulsarConsumer) Close() {
|
||||
pc.closeOnce.Do(func() {
|
||||
defer pc.c.Close()
|
||||
// Unsubscribe for the consumer
|
||||
err := retry.Do(context.Background(), func() error {
|
||||
//TODO need to check error retryable
|
||||
return pc.c.Unsubscribe()
|
||||
}, retry.MaxSleepTime(50*time.Millisecond), retry.Attempts(6))
|
||||
fn := func() error {
|
||||
err := pc.c.Unsubscribe()
|
||||
if isPulsarError(err, pulsar.ConsumerNotFound) || isPulsarError(err, pulsar.SubscriptionNotFound) {
|
||||
log.Warn("failed to find consumer, skip unsubcribe",
|
||||
zap.String("subscription", pc.Subscription()),
|
||||
zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.Info("failed to unsubscribe ", zap.String("subscription", pc.Subscription()))
|
||||
return err
|
||||
}
|
||||
// only close if unsubscribe successfully
|
||||
pc.c.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
|
||||
if err != nil {
|
||||
log.Error("failed to unsubscribe", zap.String("subscription", pc.Subscription()), zap.Error(err))
|
||||
panic(err)
|
||||
|
|
Loading…
Reference in New Issue