diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 1a866c1c95..5f17529e3a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -232,7 +232,7 @@ queryCoord: queryNode: dataSync: flowGraph: - maxQueueLength: 1024 # Maximum length of task queue in flowgraph + maxQueueLength: 16 # Maximum length of task queue in flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph stats: publishInterval: 1000 # Interval for querynode to report node information (milliseconds) @@ -369,7 +369,7 @@ dataCoord: dataNode: dataSync: flowGraph: - maxQueueLength: 1024 # Maximum length of task queue in flowgraph + maxQueueLength: 16 # Maximum length of task queue in flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph maxParallelSyncTaskNum: 2 # Maximum number of sync tasks executed in parallel in each flush manager segment: diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index f0cbeb51a7..60f1dea3f0 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -5,11 +5,12 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" "go.uber.org/zap" ) // NewRocksmqFactory creates a new message stream factory based on rocksmq. -func NewRocksmqFactory(path string) msgstream.Factory { +func NewRocksmqFactory(path string, cfg *paramtable.ServiceParam) msgstream.Factory { if err := server.InitRocksMQ(path); err != nil { log.Fatal("fail to init rocksmq", zap.Error(err)) } @@ -18,7 +19,7 @@ func NewRocksmqFactory(path string) msgstream.Factory { return &msgstream.CommonFactory{ Newer: rmq.NewClientWithDefaultOptions, DispatcherFactory: msgstream.ProtoUDFactory{}, - ReceiveBufSize: 1024, - MQBufSize: 1024, + ReceiveBufSize: cfg.MQCfg.ReceiveBufSize.GetAsInt64(), + MQBufSize: cfg.MQCfg.MQBufSize.GetAsInt64(), } } diff --git a/internal/mq/msgstream/mq_factory_test.go b/internal/mq/msgstream/mq_factory_test.go index f0c42f7d3a..879becefbb 100644 --- a/internal/mq/msgstream/mq_factory_test.go +++ b/internal/mq/msgstream/mq_factory_test.go @@ -32,7 +32,7 @@ func TestRmsFactory(t *testing.T) { dir := t.TempDir() - rmsFactory := NewRocksmqFactory(dir) + rmsFactory := NewRocksmqFactory(dir, ¶mtable.Get().ServiceParam) ctx := context.Background() _, err := rmsFactory.NewMsgStream(ctx) diff --git a/internal/util/dependency/factory.go b/internal/util/dependency/factory.go index fb6274e71f..8ab8dc495e 100644 --- a/internal/util/dependency/factory.go +++ b/internal/util/dependency/factory.go @@ -38,7 +38,7 @@ type DefaultFactory struct { func NewDefaultFactory(standAlone bool) *DefaultFactory { return &DefaultFactory{ standAlone: standAlone, - msgStreamFactory: smsgstream.NewRocksmqFactory("/tmp/milvus/rocksmq/"), + msgStreamFactory: smsgstream.NewRocksmqFactory("/tmp/milvus/rocksmq/", ¶mtable.Get().ServiceParam), chunkManagerFactory: storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus")), } @@ -48,7 +48,7 @@ func NewDefaultFactory(standAlone bool) *DefaultFactory { func MockDefaultFactory(standAlone bool, params *paramtable.ComponentParam) *DefaultFactory { return &DefaultFactory{ standAlone: standAlone, - msgStreamFactory: smsgstream.NewRocksmqFactory("/tmp/milvus/rocksmq/"), + msgStreamFactory: smsgstream.NewRocksmqFactory("/tmp/milvus/rocksmq/", ¶mtable.Get().ServiceParam), chunkManagerFactory: storage.NewChunkManagerFactoryWithParam(params), } } @@ -86,11 +86,11 @@ func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentPar case mqTypeNatsmq: f.msgStreamFactory = msgstream.NewNatsmqFactory() case mqTypeRocksmq: - f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue()) + f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), ¶ms.ServiceParam) case mqTypePulsar: - f.msgStreamFactory = msgstream.NewPmsFactory(¶ms.PulsarCfg) + f.msgStreamFactory = msgstream.NewPmsFactory(¶ms.ServiceParam) case mqTypeKafka: - f.msgStreamFactory = msgstream.NewKmsFactory(¶ms.KafkaCfg) + f.msgStreamFactory = msgstream.NewKmsFactory(¶ms.ServiceParam) } if f.msgStreamFactory == nil { return errors.New("failed to create MQ: check the milvus log for initialization failures") diff --git a/pkg/mq/msgdispatcher/mock_test.go b/pkg/mq/msgdispatcher/mock_test.go index 68cf5e3a39..f82f4eaa7a 100644 --- a/pkg/mq/msgdispatcher/mock_test.go +++ b/pkg/mq/msgdispatcher/mock_test.go @@ -47,7 +47,7 @@ func TestMain(m *testing.M) { } func newMockFactory() msgstream.Factory { - return msgstream.NewPmsFactory(&Params.PulsarCfg) + return msgstream.NewPmsFactory(&Params.ServiceParam) } func newMockProducer(factory msgstream.Factory, pchannel string) (msgstream.MsgStream, error) { diff --git a/pkg/mq/msgstream/mq_factory.go b/pkg/mq/msgstream/mq_factory.go index 68551ed43b..5dfe92523a 100644 --- a/pkg/mq/msgstream/mq_factory.go +++ b/pkg/mq/msgstream/mq_factory.go @@ -42,17 +42,18 @@ type PmsFactory struct { PulsarAddress string PulsarWebAddress string ReceiveBufSize int64 - PulsarBufSize int64 + MQBufSize int64 PulsarAuthPlugin string PulsarAuthParams string PulsarTenant string PulsarNameSpace string } -func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory { +func NewPmsFactory(serviceParam *paramtable.ServiceParam) *PmsFactory { + config := &serviceParam.PulsarCfg return &PmsFactory{ - PulsarBufSize: 1024, - ReceiveBufSize: 1024, + MQBufSize: serviceParam.MQCfg.MQBufSize.GetAsInt64(), + ReceiveBufSize: serviceParam.MQCfg.ReceiveBufSize.GetAsInt64(), PulsarAddress: config.Address.GetValue(), PulsarWebAddress: config.WebAddress.GetValue(), PulsarAuthPlugin: config.AuthPlugin.GetValue(), @@ -77,7 +78,7 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { if err != nil { return nil, err } - return NewMqMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) + return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } // NewTtMsgStream is used to generate a new TtMsgstream object @@ -95,7 +96,7 @@ func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { if err != nil { return nil, err } - return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) + return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } func (f *PmsFactory) getAuthentication() (pulsar.Authentication, error) { @@ -152,16 +153,17 @@ type KmsFactory struct { dispatcherFactory ProtoUDFactory config *paramtable.KafkaConfig ReceiveBufSize int64 + MQBufSize int64 } func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfig(f.config) - return NewMqMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) + return NewMqMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfig(f.config) - return NewMqTtMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) + return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.MQBufSize, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { @@ -180,11 +182,12 @@ func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st } } -func NewKmsFactory(config *paramtable.KafkaConfig) Factory { +func NewKmsFactory(config *paramtable.ServiceParam) Factory { f := &KmsFactory{ dispatcherFactory: ProtoUDFactory{}, - ReceiveBufSize: 1024, - config: config, + ReceiveBufSize: config.MQCfg.ReceiveBufSize.GetAsInt64(), + MQBufSize: config.MQCfg.MQBufSize.GetAsInt64(), + config: &config.KafkaCfg, } return f } @@ -192,11 +195,12 @@ func NewKmsFactory(config *paramtable.KafkaConfig) Factory { // NewNatsmqFactory create a new nats-mq factory. func NewNatsmqFactory() Factory { paramtable.Init() - nmq.MustInitNatsMQ(nmq.ParseServerOption(paramtable.Get())) + paramtable := paramtable.Get() + nmq.MustInitNatsMQ(nmq.ParseServerOption(paramtable)) return &CommonFactory{ Newer: nmq.NewClientWithDefaultOptions, DispatcherFactory: ProtoUDFactory{}, - ReceiveBufSize: 1024, - MQBufSize: 1024, + ReceiveBufSize: paramtable.MQCfg.ReceiveBufSize.GetAsInt64(), + MQBufSize: paramtable.MQCfg.MQBufSize.GetAsInt64(), } } diff --git a/pkg/mq/msgstream/mq_factory_test.go b/pkg/mq/msgstream/mq_factory_test.go index 94e0184c42..095a5407a7 100644 --- a/pkg/mq/msgstream/mq_factory_test.go +++ b/pkg/mq/msgstream/mq_factory_test.go @@ -25,7 +25,7 @@ import ( func TestPmsFactory(t *testing.T) { Params.Init() - pmsFactory := NewPmsFactory(&Params.PulsarCfg) + pmsFactory := NewPmsFactory(&Params.ServiceParam) ctx := context.Background() _, err := pmsFactory.NewMsgStream(ctx) @@ -42,7 +42,7 @@ func TestPmsFactory(t *testing.T) { } func TestPmsFactoryWithAuth(t *testing.T) { - config := &Params.PulsarCfg + config := &Params.ServiceParam Params.Save(Params.PulsarCfg.AuthPlugin.Key, "token") Params.Save(Params.PulsarCfg.AuthParams.Key, "token:fake_token") defer func() { @@ -77,7 +77,7 @@ func TestPmsFactoryWithAuth(t *testing.T) { } func TestKafkaFactory(t *testing.T) { - kmsFactory := NewKmsFactory(&Params.KafkaCfg) + kmsFactory := NewKmsFactory(&Params.ServiceParam) ctx := context.Background() _, err := kmsFactory.NewMsgStream(ctx) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 0ee3679ada..0ea3e8c959 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -166,7 +166,7 @@ func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.C metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition) - consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName, options.SubscriptionInitialPosition) + consumer, err := newKafkaConsumer(config, options.BufSize, options.Topic, options.SubscriptionName, options.SubscriptionInitialPosition) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, err diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index a3e112584a..2b49792299 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -29,8 +29,8 @@ type Consumer struct { const timeout = 3000 -func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string, position mqwrapper.SubscriptionInitialPosition) (*Consumer, error) { - msgChannel := make(chan mqwrapper.Message, 256) +func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, groupID string, position mqwrapper.SubscriptionInitialPosition) (*Consumer, error) { + msgChannel := make(chan mqwrapper.Message, bufSize) kc := &Consumer{ config: config, msgChannel: msgChannel, diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index e8f75ecb55..5078c541e3 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -20,7 +20,7 @@ func TestKafkaConsumer_Subscription(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - kc, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + kc, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer kc.Close() assert.Equal(t, kc.Subscription(), groupID) @@ -32,7 +32,7 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer consumer.Close() @@ -58,7 +58,7 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer consumer.Close() @@ -84,7 +84,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer consumer.Close() @@ -101,7 +101,7 @@ func TestKafkaConsumer_ChanWithNoAssign(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer consumer.Close() @@ -138,7 +138,7 @@ func TestKafkaConsumer_SeekAfterChan(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest) assert.NoError(t, err) defer consumer.Close() @@ -159,7 +159,7 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionUnknown) assert.NoError(t, err) defer consumer.Close() @@ -186,7 +186,7 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) { testKafkaConsumerProduceData(t, topic, data1, data2) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionLatest) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionLatest) assert.NoError(t, err) defer consumer.Close() data1 = []int{444, 555} @@ -211,7 +211,7 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) { testKafkaConsumerProduceData(t, topic, data1, data2) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest) assert.NoError(t, err) msg := <-consumer.Chan() assert.Equal(t, 111, BytesToInt(msg.Payload())) @@ -220,7 +220,7 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) { defer consumer.Close() config = createConfig(groupID) - consumer2, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest) + consumer2, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest) assert.NoError(t, err) msg = <-consumer2.Chan() assert.Equal(t, 111, BytesToInt(msg.Payload())) @@ -262,7 +262,7 @@ func TestKafkaConsumer_CheckPreTopicValid(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest) + consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest) assert.NoError(t, err) defer consumer.Close() diff --git a/pkg/mq/msgstream/msgstream_util_test.go b/pkg/mq/msgstream/msgstream_util_test.go index 17d639b281..5bdeab4edf 100644 --- a/pkg/mq/msgstream/msgstream_util_test.go +++ b/pkg/mq/msgstream/msgstream_util_test.go @@ -24,7 +24,7 @@ import ( ) func TestPulsarMsgUtil(t *testing.T) { - pmsFactory := NewPmsFactory(&Params.PulsarCfg) + pmsFactory := NewPmsFactory(&Params.ServiceParam) ctx := context.Background() msgStream, err := pmsFactory.NewMsgStream(ctx) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 422ddeeac7..1766a53fc8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1576,7 +1576,7 @@ func (p *queryNodeConfig) init(base *BaseTable) { p.FlowGraphMaxQueueLength = ParamItem{ Key: "queryNode.dataSync.flowGraph.maxQueueLength", Version: "2.0.0", - DefaultValue: "1024", + DefaultValue: "16", Doc: "Maximum length of task queue in flowgraph", Export: true, } @@ -2370,7 +2370,7 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.FlowGraphMaxQueueLength = ParamItem{ Key: "dataNode.dataSync.flowGraph.maxQueueLength", Version: "2.0.0", - DefaultValue: "1024", + DefaultValue: "16", Doc: "Maximum length of task queue in flowgraph", Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 5c13b35d79..073981b5b6 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -289,7 +289,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1000, interval) length := Params.FlowGraphMaxQueueLength.GetAsInt32() - assert.Equal(t, int32(1024), length) + assert.Equal(t, int32(16), length) maxParallelism := Params.FlowGraphMaxParallelism.GetAsInt32() assert.Equal(t, int32(1024), maxParallelism) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index fed2d3c910..d99c805923 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -373,6 +373,9 @@ type MQConfig struct { EnablePursuitMode ParamItem `refreshable:"true"` PursuitLag ParamItem `refreshable:"true"` PursuitBufferSize ParamItem `refreshable:"true"` + + MQBufSize ParamItem `refreshable:"false"` + ReceiveBufSize ParamItem `refreshable:"false"` } // Init initializes the MQConfig object with a BaseTable. @@ -413,6 +416,23 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`, Export: true, } p.PursuitBufferSize.Init(base.mgr) + + p.MQBufSize = ParamItem{ + Key: "mq.mqBufSize", + Version: "2.3.0", + DefaultValue: "16", + Doc: `MQ client consumer buffer length`, + Export: true, + } + p.MQBufSize.Init(base.mgr) + + p.ReceiveBufSize = ParamItem{ + Key: "mq.receiveBufSize", + Version: "2.3.0", + DefaultValue: "16", + Doc: "MQ consumer chan buffer length", + } + p.ReceiveBufSize.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////