Remove datanode ddNode forward delta logic (#23005)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/23009/head
congqixia 2023-03-27 14:18:00 +08:00 committed by GitHub
parent 081572d31c
commit 4008ee6d60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 178 deletions

View File

@ -314,7 +314,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
vchanInfo.GetDroppedSegmentIds(),
flushedSegmentInfos,
unflushedSegmentInfos,
dsService.msFactory,
dsService.compactor)
if err != nil {
return err

View File

@ -28,19 +28,14 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
// make sure ddNode implements flowgraph.Node
@ -69,7 +64,6 @@ type ddNode struct {
collectionID UniqueID
vChannelName string
deltaMsgStream msgstream.MsgStream
dropMode atomic.Value
compactionExecutor *compactionExecutor
@ -150,7 +144,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
dropCollection: false,
}
var forwardMsgs []msgstream.TsMsg
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
@ -219,13 +212,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
for i := int64(0); i < dmsg.NumRows; i++ {
dmsg.HashValues = append(dmsg.HashValues, uint32(0))
}
deltaVChannel, err := funcutil.ConvertChannelName(dmsg.ShardName, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue())
if err != nil {
log.Error("convert dmlVChannel to deltaVChannel failed", zap.String("vchannel", ddn.vChannelName), zap.Error(err))
panic(err)
}
dmsg.ShardName = deltaVChannel
forwardMsgs = append(forwardMsgs, dmsg)
if dmsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid DeleteMsg, collection mis-match",
zap.Int64("Get collID", dmsg.CollectionID),
@ -244,16 +230,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
}
}
err := retry.Do(ddn.ctx, func() error {
return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
}, getFlowGraphRetryOpt())
if err != nil {
err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vChannelName, err)
log.Error(err.Error())
if !common.IsIgnorableError(err) {
panic(err)
}
}
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...)
@ -302,97 +278,15 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool {
return false
}
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
tr := timerecord.NewTimeRecorder("forwardDeleteMsg")
if len(msgs) != 0 {
var msgPack = msgstream.MsgPack{
Msgs: msgs,
BeginTs: minTs,
EndTs: maxTs,
}
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
}
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
return err
}
metrics.DataNodeForwardDeleteMsgTimeTaken.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).
Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
}
timeTickResult := msgpb.TimeTickMsg{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_TimeTick),
commonpbutil.WithMsgID(0),
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := ddn.deltaMsgStream.Produce(&msgPack); err != nil {
return err
}
p, _ := tsoutil.ParseTS(ts)
log.RatedDebug(10.0, "DDNode sent delta timeTick",
zap.Any("collectionID", ddn.collectionID),
zap.Any("ts", ts),
zap.Any("ts_p", p),
zap.Any("channel", ddn.vChannelName),
)
return nil
}
func (ddn *ddNode) Close() {
if ddn.deltaMsgStream != nil {
ddn.deltaMsgStream.Close()
}
}
func (ddn *ddNode) Close() {}
func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo,
msFactory msgstream.Factory, compactor *compactionExecutor) (*ddNode, error) {
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, compactor *compactionExecutor) (*ddNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
deltaStream, err := msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
pChannelName := funcutil.ToPhysicalChannel(vChannelName)
log.Info("ddNode convert vChannel to pChannel",
zap.String("vChannelName", vChannelName),
zap.String("pChannelName", pChannelName),
)
deltaChannelName, err := funcutil.ConvertChannelName(pChannelName, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue())
if err != nil {
return nil, err
}
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{deltaChannelName})
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
log.Info("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
var deltaMsgStream msgstream.MsgStream = deltaStream
dd := &ddNode{
ctx: ctx,
BaseNode: baseNode,
@ -401,7 +295,6 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe
growingSegInfo: make(map[UniqueID]*datapb.SegmentInfo, len(growingSegments)),
droppedSegmentIDs: droppedSegmentIDs,
vChannelName: vChannelName,
deltaMsgStream: deltaMsgStream,
compactionExecutor: compactor,
}

View File

@ -28,9 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/retry"
)
const (
@ -69,7 +67,6 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
mockFactory := &mockMsgStreamFactory{true, true}
ddNode, err := newDDNode(
context.Background(),
collectionID,
@ -77,7 +74,6 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
droppedSegIDs,
test.inSealedSegs,
test.inGrowingSegs,
mockFactory,
newCompactionExecutor(),
)
require.NoError(t, err)
@ -129,15 +125,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
vChannelName: "ddn_drop_msg",
compactionExecutor: newCompactionExecutor(),
}
@ -182,15 +172,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
vChannelName: "ddn_drop_msg",
compactionExecutor: newCompactionExecutor(),
}
@ -217,12 +201,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
})
t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
require.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
var (
collectionID UniqueID = 1
)
@ -231,7 +209,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
ctx: context.Background(),
collectionID: collectionID,
droppedSegmentIDs: []UniqueID{100},
deltaMsgStream: deltaStream,
}
tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)}
@ -257,16 +234,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
// Prepare ddNode states
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
}
// Prepare delete messages
@ -291,38 +261,6 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
}
})
t.Run("Test forwardDeleteMsg failed", func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
// Prepare ddNode states
ddn := ddNode{
ctx: context.Background(),
collectionID: 1,
deltaMsgStream: deltaStream,
}
// Prepare delete messages
var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
EndTimestamp: 2000,
HashValues: []uint32{0},
},
DeleteRequest: msgpb.DeleteRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete},
CollectionID: 1,
},
}
tsMessages := []msgstream.TsMsg{dMsg}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
// Test
setFlowGraphRetryOpt(retry.Attempts(1))
assert.Panics(t, func() {
ddn.Operate([]Msg{msgStreamMsg})
})
})
}
func TestFlowGraph_DDNode_filterMessages(t *testing.T) {
@ -577,10 +515,8 @@ func TestFlowGraph_DDNode_isDropped(t *testing.T) {
for _, seg := range test.indroppedSegment {
dsIDs = append(dsIDs, seg.GetID())
}
factory := mockMsgStreamFactory{true, true}
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := &ddNode{droppedSegmentIDs: dsIDs, deltaMsgStream: deltaStream, vChannelName: ddNodeChannelName}
ddn := &ddNode{droppedSegmentIDs: dsIDs, vChannelName: ddNodeChannelName}
assert.Equal(t, test.expectedOut, ddn.isDropped(test.inSeg))
})
}