fix: Use uber atomic instead sync/atomic which only supported after go v1.20 (#29377)

relate: https://github.com/milvus-io/milvus/issues/29376

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/29515/head
aoiasd 2023-12-26 16:06:49 +08:00 committed by GitHub
parent 2cbfcb7e0d
commit 9e6da45497
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
uatomic "go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -43,8 +44,8 @@ import (
)
var (
_ MsgStream = (*mqMsgStream)(nil)
streamCount atomic.Int64
_ MsgStream = (*mqMsgStream)(nil)
streamCounter uatomic.Int64
)
type mqMsgStream struct {
@ -102,7 +103,7 @@ func NewMqMsgStream(ctx context.Context,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCount.Add(1)), func(event *config.Event) {
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
pulsarwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -96,6 +97,20 @@ func consumer(ctx context.Context, mq MsgStream) *MsgPack {
}
}
func TestStream_ConfigEvent(t *testing.T) {
pulsarAddress := getPulsarAddress()
factory := ProtoUDFactory{}
pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress})
assert.NoError(t, err)
stream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
assert.NoError(t, err)
stream.configEvent.OnEvent(&config.Event{Value: "false"})
stream.configEvent.OnEvent(&config.Event{Value: "????"})
assert.False(t, stream.isEnabledProduce())
stream.configEvent.OnEvent(&config.Event{Value: "true"})
assert.True(t, stream.isEnabledProduce())
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)