fix: Seperate msgstream initctx with lifecycle ctx (#29308)

The close of msgstream should be controlled by Close() not ctx from
outside when init clients

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29365/head
XuanYang-cn 2023-12-21 11:56:45 +08:00 committed by GitHub
parent fcba1c0d9e
commit 48f506b077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 7 deletions

View File

@ -28,7 +28,7 @@ func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err err
if err != nil {
return nil, err
}
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
return NewMqMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
}
// NewTtMsgStream is used to generate a new TtMsgstream object
@ -38,7 +38,7 @@ func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err e
if err != nil {
return nil, err
}
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
return NewMqTtMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, cli, f.DispatcherFactory.NewUnmarshalDispatcher())
}
// NewMsgStreamDisposer returns a function that can be used to dispose of a message stream.

View File

@ -100,7 +100,7 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
if err != nil {
return nil, err
}
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
return NewMqMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
// NewTtMsgStream is used to generate a new TtMsgstream object
@ -127,7 +127,8 @@ func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
if err != nil {
return nil, err
}
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
return NewMqTtMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func (f *PmsFactory) getAuthentication() (pulsar.Authentication, error) {
@ -187,7 +188,7 @@ func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
if err != nil {
return nil, err
}
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher())
return NewMqMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
@ -195,7 +196,7 @@ func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
if err != nil {
return nil, err
}
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher())
return NewMqTtMsgStream(context.Background(), f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher())
}
func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error {

View File

@ -85,6 +85,7 @@ func NewMqMsgStream(ctx context.Context,
stream := &mqMsgStream{
ctx: streamCtx,
streamCancel: streamCancel,
client: client,
producers: producers,
producerChannels: producerChannels,
@ -94,7 +95,6 @@ func NewMqMsgStream(ctx context.Context,
unmarshal: unmarshal,
bufSize: bufSize,
receiveBuf: receiveBuf,
streamCancel: streamCancel,
producerLock: &sync.RWMutex{},
consumerLock: &sync.Mutex{},
closeRWMutex: &sync.RWMutex{},