Fix SyncDistribution set Action (#19360)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/19362/head
congqixia 2022-09-22 13:18:50 +08:00 committed by GitHub
parent 24ec354701
commit c4761100a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 6 deletions

View File

@ -1233,12 +1233,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
case querypb.SyncType_Remove:
shardCluster.forceRemoveSegment(action.GetSegmentID())
case querypb.SyncType_Set:
shardCluster.updateSegment(shardSegmentInfo{
segmentID: action.GetSegmentID(),
partitionID: action.GetPartitionID(),
nodeID: action.GetNodeID(),
state: segmentStateLoaded,
})
shardCluster.SyncSegments([]*querypb.ReplicaSegmentsInfo{
{NodeId: action.GetNodeID(), PartitionId: action.GetPartitionID(), SegmentIds: []int64{action.GetSegmentID()}},
}, segmentStateLoaded)
default:
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,

View File

@ -763,3 +763,78 @@ func TestImpl_SyncReplicaSegments(t *testing.T) {
})
}
func TestSyncDistribution(t *testing.T) {
t.Run("QueryNode not healthy", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
defer node.Stop()
assert.NoError(t, err)
node.UpdateStateCode(internalpb.StateCode_Abnormal)
resp, err := node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
})
t.Run("Sync non-exist channel", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
defer node.Stop()
assert.NoError(t, err)
resp, err := node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{
CollectionID: defaultCollectionID,
Channel: defaultDMLChannel,
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
NodeID: 99,
},
},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
})
t.Run("Normal sync segments", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
defer node.Stop()
assert.NoError(t, err)
node.ShardClusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel)
resp, err := node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{
CollectionID: defaultCollectionID,
Channel: defaultDMLChannel,
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: defaultPartitionID,
SegmentID: defaultSegmentID,
NodeID: 99,
},
},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
cs, ok := node.ShardClusterService.getShardCluster(defaultDMLChannel)
require.True(t, ok)
segment, ok := cs.getSegment(defaultSegmentID)
require.True(t, ok)
assert.Equal(t, common.InvalidNodeID, segment.nodeID)
assert.Equal(t, defaultPartitionID, segment.partitionID)
assert.Equal(t, segmentStateLoaded, segment.state)
})
}