From 4008ee6d600e4e0620e281e985b4d72c8d02c521 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 27 Mar 2023 14:18:00 +0800 Subject: [PATCH] Remove datanode ddNode forward delta logic (#23005) Signed-off-by: Congqi Xia --- internal/datanode/data_sync_service.go | 1 - internal/datanode/flow_graph_dd_node.go | 111 +------------------ internal/datanode/flow_graph_dd_node_test.go | 72 +----------- 3 files changed, 6 insertions(+), 178 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3268c2fe26..5d0c420088 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -314,7 +314,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick vchanInfo.GetDroppedSegmentIds(), flushedSegmentInfos, unflushedSegmentInfos, - dsService.msFactory, dsService.compactor) if err != nil { return err diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 075cefbf6d..fbdc4bf788 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -28,19 +28,14 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/msgpb" - "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" - "github.com/milvus-io/milvus/internal/util/retry" - "github.com/milvus-io/milvus/internal/util/timerecord" - "github.com/milvus-io/milvus/internal/util/tsoutil" ) // make sure ddNode implements flowgraph.Node @@ -69,7 +64,6 @@ type ddNode struct { collectionID UniqueID vChannelName string - deltaMsgStream msgstream.MsgStream dropMode atomic.Value compactionExecutor *compactionExecutor @@ -150,7 +144,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { dropCollection: false, } - var forwardMsgs []msgstream.TsMsg for _, msg := range msMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_DropCollection: @@ -219,13 +212,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { for i := int64(0); i < dmsg.NumRows; i++ { dmsg.HashValues = append(dmsg.HashValues, uint32(0)) } - deltaVChannel, err := funcutil.ConvertChannelName(dmsg.ShardName, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue()) - if err != nil { - log.Error("convert dmlVChannel to deltaVChannel failed", zap.String("vchannel", ddn.vChannelName), zap.Error(err)) - panic(err) - } - dmsg.ShardName = deltaVChannel - forwardMsgs = append(forwardMsgs, dmsg) if dmsg.CollectionID != ddn.collectionID { log.Warn("filter invalid DeleteMsg, collection mis-match", zap.Int64("Get collID", dmsg.CollectionID), @@ -244,16 +230,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg) } } - err := retry.Do(ddn.ctx, func() error { - return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) - }, getFlowGraphRetryOpt()) - if err != nil { - err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vChannelName, err) - log.Error(err.Error()) - if !common.IsIgnorableError(err) { - panic(err) - } - } fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...) @@ -302,97 +278,15 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool { return false } -func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error { - tr := timerecord.NewTimeRecorder("forwardDeleteMsg") - - if len(msgs) != 0 { - var msgPack = msgstream.MsgPack{ - Msgs: msgs, - BeginTs: minTs, - EndTs: maxTs, - } - if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { - return err - } - } - if err := ddn.sendDeltaTimeTick(maxTs); err != nil { - return err - } - - metrics.DataNodeForwardDeleteMsgTimeTaken. - WithLabelValues(fmt.Sprint(paramtable.GetNodeID())). - Observe(float64(tr.ElapseSpan().Milliseconds())) - return nil -} - -func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error { - msgPack := msgstream.MsgPack{} - baseMsg := msgstream.BaseMsg{ - BeginTimestamp: ts, - EndTimestamp: ts, - HashValues: []uint32{0}, - } - timeTickResult := msgpb.TimeTickMsg{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), - commonpbutil.WithMsgID(0), - commonpbutil.WithTimeStamp(ts), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - } - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: baseMsg, - TimeTickMsg: timeTickResult, - } - msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - - if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil { - return err - } - p, _ := tsoutil.ParseTS(ts) - log.RatedDebug(10.0, "DDNode sent delta timeTick", - zap.Any("collectionID", ddn.collectionID), - zap.Any("ts", ts), - zap.Any("ts_p", p), - zap.Any("channel", ddn.vChannelName), - ) - return nil -} - -func (ddn *ddNode) Close() { - if ddn.deltaMsgStream != nil { - ddn.deltaMsgStream.Close() - } -} +func (ddn *ddNode) Close() {} func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID, - sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, - msFactory msgstream.Factory, compactor *compactionExecutor) (*ddNode, error) { + sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, compactor *compactionExecutor) (*ddNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()) - deltaStream, err := msFactory.NewMsgStream(ctx) - if err != nil { - return nil, err - } - pChannelName := funcutil.ToPhysicalChannel(vChannelName) - log.Info("ddNode convert vChannel to pChannel", - zap.String("vChannelName", vChannelName), - zap.String("pChannelName", pChannelName), - ) - - deltaChannelName, err := funcutil.ConvertChannelName(pChannelName, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue()) - if err != nil { - return nil, err - } - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{deltaChannelName}) - metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() - log.Info("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName)) - var deltaMsgStream msgstream.MsgStream = deltaStream - dd := &ddNode{ ctx: ctx, BaseNode: baseNode, @@ -401,7 +295,6 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe growingSegInfo: make(map[UniqueID]*datapb.SegmentInfo, len(growingSegments)), droppedSegmentIDs: droppedSegmentIDs, vChannelName: vChannelName, - deltaMsgStream: deltaMsgStream, compactionExecutor: compactor, } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index dc08f66a9c..96645aaa32 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -28,9 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/flowgraph" - "github.com/milvus-io/milvus/internal/util/retry" ) const ( @@ -69,7 +67,6 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - mockFactory := &mockMsgStreamFactory{true, true} ddNode, err := newDDNode( context.Background(), collectionID, @@ -77,7 +74,6 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) { droppedSegIDs, test.inSealedSegs, test.inGrowingSegs, - mockFactory, newCompactionExecutor(), ) require.NoError(t, err) @@ -129,15 +125,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) ddn := ddNode{ ctx: context.Background(), collectionID: test.ddnCollID, - deltaMsgStream: deltaStream, vChannelName: "ddn_drop_msg", compactionExecutor: newCompactionExecutor(), } @@ -182,15 +172,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) ddn := ddNode{ ctx: context.Background(), collectionID: test.ddnCollID, - deltaMsgStream: deltaStream, vChannelName: "ddn_drop_msg", compactionExecutor: newCompactionExecutor(), } @@ -217,12 +201,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { }) t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - require.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) - var ( collectionID UniqueID = 1 ) @@ -231,7 +209,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { ctx: context.Background(), collectionID: collectionID, droppedSegmentIDs: []UniqueID{100}, - deltaMsgStream: deltaStream, } tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)} @@ -257,16 +234,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) - // Prepare ddNode states ddn := ddNode{ - ctx: context.Background(), - collectionID: test.ddnCollID, - deltaMsgStream: deltaStream, + ctx: context.Background(), + collectionID: test.ddnCollID, } // Prepare delete messages @@ -291,38 +261,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { } }) - t.Run("Test forwardDeleteMsg failed", func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - // Prepare ddNode states - ddn := ddNode{ - ctx: context.Background(), - collectionID: 1, - deltaMsgStream: deltaStream, - } - - // Prepare delete messages - var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{ - BaseMsg: msgstream.BaseMsg{ - EndTimestamp: 2000, - HashValues: []uint32{0}, - }, - DeleteRequest: msgpb.DeleteRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete}, - CollectionID: 1, - }, - } - tsMessages := []msgstream.TsMsg{dMsg} - var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) - - // Test - setFlowGraphRetryOpt(retry.Attempts(1)) - assert.Panics(t, func() { - ddn.Operate([]Msg{msgStreamMsg}) - }) - }) } func TestFlowGraph_DDNode_filterMessages(t *testing.T) { @@ -577,10 +515,8 @@ func TestFlowGraph_DDNode_isDropped(t *testing.T) { for _, seg := range test.indroppedSegment { dsIDs = append(dsIDs, seg.GetID()) } - factory := mockMsgStreamFactory{true, true} - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - ddn := &ddNode{droppedSegmentIDs: dsIDs, deltaMsgStream: deltaStream, vChannelName: ddNodeChannelName} + + ddn := &ddNode{droppedSegmentIDs: dsIDs, vChannelName: ddNodeChannelName} assert.Equal(t, test.expectedOut, ddn.isDropped(test.inSeg)) }) }