Fix pulsar unscubsribe fail because of consumer not found (#16839)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/16852/head
Xiaofan 2022-05-09 12:07:52 +08:00 committed by GitHub
parent b0c9f25352
commit 62658dcda6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 55 deletions

View File

@ -42,6 +42,7 @@ const (
// ChannelManager manages the allocation and the balance between channels and data nodes.
type ChannelManager struct {
ctx context.Context
mu sync.RWMutex
h Handler
store RWChannelStore
@ -89,6 +90,7 @@ func NewChannelManager(
options ...ChannelManagerOpt,
) (*ChannelManager, error) {
c := &ChannelManager{
ctx: context.TODO(),
h: h,
factory: NewChannelPolicyFactoryV1(kv),
store: NewChannelStore(kv),
@ -113,6 +115,7 @@ func NewChannelManager(
// Startup adjusts the channel store according to current cluster states.
func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
c.ctx = ctx
channels := c.store.GetNodesChannels()
// Retrieve the current old nodes.
oNodes := make([]int64, 0, len(channels))
@ -366,32 +369,12 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
nodeID := ncInfo.NodeID
for _, ch := range ncInfo.Channels {
subName := buildSubName(ch.CollectionID, nodeID)
err := c.unsubscribe(subName, ch.Name)
if err != nil {
log.Warn("failed to unsubscribe topic", zap.String("subscription name", subName), zap.String("channel name", ch.Name))
}
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, ch.CollectionID)
pchannelName := funcutil.ToPhysicalChannel(ch.Name)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
}
// buildSubName generates a subscription name by concatenating DataNodeSubName, node ID and collection ID.
func buildSubName(collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, collectionID)
}
func (c *ChannelManager) unsubscribe(subName string, channel string) error {
msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO())
if err != nil {
return err
}
pchannelName := funcutil.ToPhysicalChannel(channel)
msgStream.AsConsumer([]string{pchannelName}, subName)
msgStream.Close()
return nil
}
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ch *channel) error {
c.mu.Lock()
@ -624,11 +607,9 @@ func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) e
if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := buildSubName(chToCleanUp.CollectionID, nodeID)
err := c.unsubscribe(subName, channelName)
if err != nil {
log.Warn("failed to unsubscribe topic", zap.String("subcription", subName), zap.String("channel name", channelName), zap.Error(err))
}
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName, nodeID, chToCleanUp.CollectionID)
pchannelName := funcutil.ToPhysicalChannel(channelName)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
if !c.isMarkedDrop(channelName) {

View File

@ -118,7 +118,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
ms.producerChannels = append(ms.producerChannels, channel)
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
errMsg := "Failed to create producer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -167,7 +167,7 @@ func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string,
ms.consumerChannels = append(ms.consumerChannels, channel)
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -648,7 +648,7 @@ func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName strin
ms.addConsumer(pc, channel)
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
errMsg := "Failed to create consumer " + channel + ", error = " + err.Error()
panic(errMsg)
@ -873,7 +873,8 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
if len(mp.MsgID) == 0 {
return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface")
}
if err = retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200)); err != nil {
err = retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
return fmt.Errorf("failed to seek, error %s", err.Error())
}
ms.addConsumer(consumer, mp.ChannelName)

View File

@ -112,12 +112,18 @@ func (pc *Consumer) Ack(message mqwrapper.Message) {
// Close the consumer and stop the broker to push more messages
func (pc *Consumer) Close() {
pc.closeOnce.Do(func() {
defer pc.c.Close()
// Unsubscribe for the consumer
err := retry.Do(context.Background(), func() error {
//TODO need to check error retryable
return pc.c.Unsubscribe()
}, retry.MaxSleepTime(50*time.Millisecond), retry.Attempts(6))
fn := func() error {
err := pc.c.Unsubscribe()
if err != nil {
return err
}
// only close if unsubscribe successfully
pc.c.Close()
return nil
}
err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second))
if err != nil {
log.Error("failed to unsubscribe", zap.String("subscription", pc.Subscription()), zap.Error(err))
panic(err)

View File

@ -0,0 +1,37 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msgstream
import (
"context"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
// unsubscribeChannels create consumer first, and unsubscribe channel through msgStream.close()
// TODO use streamnative pulsarctl
func UnsubscribeChannels(ctx context.Context, factory Factory, subName string, channels []string) {
log.Info("unsubscribe channel", zap.String("subname", subName), zap.Any("channels", channels))
msgStream, err := factory.NewMsgStream(ctx)
if err != nil {
log.Error("unsubscribe channels failed", zap.String("subname", subName), zap.Any("channels", channels))
panic(err)
}
msgStream.AsConsumer(channels, subName)
msgStream.Close()
}

View File

@ -134,13 +134,8 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Error("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
panic(err)
}
msgstream.UnsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
}
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err := csh.kvClient.Remove(channelInfoKey)
if err != nil {
@ -161,15 +156,3 @@ func (csh *channelUnsubscribeHandler) close() {
csh.cancel()
csh.wg.Wait()
}
// unsubscribeChannels create consumer fist, and unsubscribe channel through msgStream.close()
func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) error {
msgStream, err := factory.NewMsgStream(ctx)
if err != nil {
return err
}
msgStream.AsConsumer(channels, subName)
msgStream.Close()
return nil
}