Verify target id for FlushSegments request in datanode (#20308)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/20323/head
congqixia 2022-11-03 22:47:35 +08:00 committed by GitHub
parent deb9963d0e
commit f557af1143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 3 deletions

View File

@ -104,6 +104,7 @@ func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string,
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Flush),
commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()),
commonpbutil.WithTargetID(nodeID),
),
CollectionID: ch.CollectionID,
SegmentIDs: lo.Map(segments, getSegmentID),

View File

@ -594,6 +594,18 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
return errStatus, nil
}
if req.GetBase().GetTargetID() != node.session.ServerID {
log.Warn("flush segment target id not matched",
zap.Int64("targetID", req.GetBase().GetTargetID()),
zap.Int64("serverID", node.session.ServerID),
)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), node.session.ServerID),
}
return status, nil
}
log.Info("receiving FlushSegments request",
zap.Int64("collection ID", req.GetCollectionID()),
zap.Int64s("segments", req.GetSegmentIDs()),

View File

@ -227,7 +227,9 @@ func TestDataNode(t *testing.T) {
assert.Nil(t, err)
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
Base: &commonpb.MsgBase{
TargetID: node1.session.ServerID,
},
DbID: 0,
CollectionID: 1,
SegmentIDs: []int64{0},
@ -288,7 +290,21 @@ func TestDataNode(t *testing.T) {
// failure call
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
Base: &commonpb.MsgBase{
TargetID: -1,
},
DbID: 0,
CollectionID: 1,
SegmentIDs: []int64{1},
}
status, err = node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NodeIDNotMatch, status.ErrorCode)
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: node1.session.ServerID,
},
DbID: 0,
CollectionID: 1,
SegmentIDs: []int64{1},
@ -299,7 +315,9 @@ func TestDataNode(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
Base: &commonpb.MsgBase{
TargetID: node1.session.ServerID,
},
DbID: 0,
CollectionID: 1,
SegmentIDs: []int64{},