Fail fast when pulsar subscribe exlusive timeout (#14844)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/14865/head
congqixia 2022-01-05 16:19:20 +08:00 committed by GitHub
parent c64ad70d43
commit 921501dfac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)
@ -32,6 +33,24 @@ type pulsarClient struct {
var sc *pulsarClient
var once sync.Once
func isPulsarError(err error, result ...pulsar.Result) bool {
if len(result) == 0 {
return false
}
perr, ok := err.(*pulsar.Error)
if !ok {
return false
}
for _, r := range result {
if perr.Result() == r {
return true
}
}
return false
}
// GetPulsarClientInstance creates a pulsarClient object
// according to the parameter opts of type pulsar.ClientOptions
func GetPulsarClientInstance(opts pulsar.ClientOptions) (*pulsarClient, error) {
@ -91,6 +110,10 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
MessageChannel: receiveChannel,
})
if err != nil {
// exclusive consumer already exist
if isPulsarError(err, pulsar.ConsumerBusy) {
return nil, retry.Unrecoverable(err)
}
return nil, err
}

View File

@ -20,16 +20,19 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"testing"
"time"
"unsafe"
"go.uber.org/zap"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
@ -561,3 +564,101 @@ func TestPulsarClient_BytesToMsgID(t *testing.T) {
assert.Nil(t, res)
assert.NotNil(t, err)
}
type mPulsarError struct {
msg string
result pulsar.Result
}
func hackPulsarError(result pulsar.Result) *pulsar.Error {
pe := &pulsar.Error{}
// use unsafe to generate test case
/* #nosec G103 */
mpe := (*mPulsarError)(unsafe.Pointer(pe))
mpe.result = result
return pe
}
func TestIsPulsarError(t *testing.T) {
type testCase struct {
err error
results []pulsar.Result
expected bool
}
cases := []testCase{
{
err: errors.New(""),
results: []pulsar.Result{},
expected: false,
},
{
err: errors.New(""),
results: []pulsar.Result{pulsar.ConnectError},
expected: false,
},
{
err: hackPulsarError(pulsar.ConsumerBusy),
results: []pulsar.Result{pulsar.ConnectError},
expected: false,
},
{
err: hackPulsarError(pulsar.ConsumerBusy),
results: []pulsar.Result{pulsar.ConnectError, pulsar.ConsumerBusy},
expected: true,
},
}
for _, tc := range cases {
assert.Equal(t, tc.expected, isPulsarError(tc.err, tc.results...))
}
}
type mockPulsarClient struct{}
// CreateProducer Creates the producer instance
// This method will block until the producer is created successfully
func (c *mockPulsarClient) CreateProducer(_ pulsar.ProducerOptions) (pulsar.Producer, error) {
return nil, hackPulsarError(pulsar.ConnectError)
}
// Subscribe Creates a `Consumer` by subscribing to a topic.
//
// If the subscription does not exist, a new subscription will be created and all messages published after the
// creation will be retained until acknowledged, even if the consumer is not connected
func (c *mockPulsarClient) Subscribe(_ pulsar.ConsumerOptions) (pulsar.Consumer, error) {
return nil, hackPulsarError(pulsar.ConsumerBusy)
}
// CreateReader Creates a Reader instance.
// This method will block until the reader is created successfully.
func (c *mockPulsarClient) CreateReader(_ pulsar.ReaderOptions) (pulsar.Reader, error) {
return nil, hackPulsarError(pulsar.ConnectError)
}
// TopicPartitions Fetches the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
// If the topic is not partitioned, the returned list will contain the topic
// name itself.
//
// This can be used to discover the partitions and create {@link Reader},
// {@link Consumer} or {@link Producer} instances directly on a particular partition.
func (c *mockPulsarClient) TopicPartitions(topic string) ([]string, error) {
return nil, hackPulsarError(pulsar.ConnectError)
}
// Close Closes the Client and free associated resources
func (c *mockPulsarClient) Close() {
}
func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) {
t.Run("exclusive pulsar consumer failure", func(t *testing.T) {
pc := &pulsarClient{
client: &mockPulsarClient{},
}
_, err := pc.Subscribe(ConsumerOptions{})
assert.Error(t, err)
assert.True(t, retry.IsUncoverable(err))
})
}