diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index cd839a17a6..54dd4732d3 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -220,9 +220,19 @@ func (node *DataNode) Init() error { } Params.DataNodeCfg.Refresh() + m := map[string]interface{}{ + "PulsarAddress": Params.DataNodeCfg.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024, + } + + if err := node.msFactory.SetParams(m); err != nil { + log.Warn("DataNode Init msFactory SetParams failed, use default", + zap.Error(err)) + return err + } log.Debug("DataNode Init", - zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName), - ) + zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName)) return nil } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 0c0ea5cd32..3f27d76665 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -137,18 +137,6 @@ func (dsService *dataSyncService) close() { // initNodes inits a TimetickedFlowGraph func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - - m := map[string]interface{}{ - "PulsarAddress": Params.DataNodeCfg.PulsarAddress, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024, - } - - err := dsService.msFactory.SetParams(m) - if err != nil { - return err - } - // initialize flush manager for DataSync Service dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService)) @@ -216,6 +204,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro parallelConfig: newParallelConfig(), } + var err error var dmStreamNode Node dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c) if err != nil { diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 2059a575f9..ed3f677d59 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -96,7 +96,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { ctx := context.Background() tests := []*testInfo{ - {false, false, &mockMsgStreamFactory{false, true}, + {true, false, &mockMsgStreamFactory{false, true}, 0, "by-dev-rootcoord-dml-test_v0", 0, 0, "", 0, 0, 0, "", 0,