Add retry for seek function

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2021-03-30 13:53:49 +08:00 committed by yefu.chen
parent 0be85b3a62
commit e823b56040
3 changed files with 36 additions and 29 deletions

View File

@ -3,7 +3,6 @@ package pulsar
import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type pulsarConsumer struct {
@ -20,10 +19,7 @@ func (pc *pulsarConsumer) Chan() <-chan client.ConsumerMessage {
}
func (pc *pulsarConsumer) Seek(id client.MessageID) error {
messageID, err := typeutil.StringToPulsarMsgID(string(id.Serialize()))
if err != nil {
return err
}
messageID := id.(*pulsarID).messageID
return pc.c.Seek(messageID)
}

View File

@ -1,8 +1,6 @@
package rocksmq
import (
"strconv"
"github.com/zilliztech/milvus-distributed/internal/msgstream/client"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/client/rocksmq"
)
@ -21,10 +19,7 @@ func (rc *rmqConsumer) Chan() <-chan client.ConsumerMessage {
}
func (rc *rmqConsumer) Seek(id client.MessageID) error {
msgID, err := strconv.ParseInt(string(id.Serialize()), 10, 64)
if err != nil {
return err
}
msgID := id.(*rmqID).messageID
return rc.c.Seek(msgID)
}

View File

@ -676,26 +676,42 @@ func (ms *TtMsgStream) Seek(mp *internalpb.MsgPosition) error {
return errors.New("the channel should has been subscribed")
}
receiveChannel := make(chan client.ConsumerMessage, ms.bufSize)
consumer, err = ms.client.Subscribe(client.ConsumerOptions{
Topic: seekChannel,
SubscriptionName: subName,
SubscriptionInitialPosition: client.SubscriptionPositionEarliest,
Type: client.KeyShared,
MessageChannel: receiveChannel,
})
if err != nil {
return err
}
if consumer == nil {
return errors.New("Consumer is nil")
}
fn := func() error {
receiveChannel := make(chan client.ConsumerMessage, ms.bufSize)
consumer, err = ms.client.Subscribe(client.ConsumerOptions{
Topic: seekChannel,
SubscriptionName: subName,
SubscriptionInitialPosition: client.SubscriptionPositionEarliest,
Type: client.KeyShared,
MessageChannel: receiveChannel,
})
if err != nil {
return err
}
if consumer == nil {
err = errors.New("consumer is nil")
log.Debug("subscribe error", zap.String("error = ", err.Error()))
return err
}
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
return err
seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID)
if err != nil {
log.Debug("convert messageID error", zap.String("error = ", err.Error()))
return err
}
err = consumer.Seek(seekMsgID)
if err != nil {
log.Debug("seek error ", zap.String("error = ", err.Error()))
return err
}
return nil
}
err = util.Retry(20, time.Millisecond*200, fn)
if err != nil {
errMsg := "Failed to seek, error = " + err.Error()
panic(errMsg)
}
_ = consumer.Seek(seekMsgID)
ms.addConsumer(consumer, seekChannel)
//TODO: May cause problem