Validate the number of replicas for load collection/partitions request (#16697)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/16704/head
yah01 2022-04-28 10:54:00 +08:00 committed by GitHub
parent 0a953948af
commit 7017756601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 1 deletions

View File

@ -184,6 +184,24 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
// if collection has been loaded by load collection request, return success
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
if collectionInfo.ReplicaNumber != req.ReplicaNumber {
msg := fmt.Sprintf("collection has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
collectionInfo.ReplicaNumber,
req.ReplicaNumber)
log.Warn(msg,
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
zap.Int32("requestReplicaNumber", req.ReplicaNumber))
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
status.Reason = msg
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return status, nil
}
log.Info("collection has already been loaded, return load success directly",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
@ -478,6 +496,24 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
}
if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
if collectionInfo.ReplicaNumber != req.ReplicaNumber {
msg := fmt.Sprintf("partitions has already been loaded, and the number of replicas %v is not same as the request's %v. Should release first then reload with the new number of replicas",
collectionInfo.ReplicaNumber,
req.ReplicaNumber)
log.Warn(msg,
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Int32("collectionReplicaNumber", collectionInfo.ReplicaNumber),
zap.Int32("requestReplicaNumber", req.ReplicaNumber))
status.ErrorCode = commonpb.ErrorCode_IllegalArgument
status.Reason = msg
metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc()
return status, nil
}
for _, toLoadPartitionID := range partitionIDs {
needLoad := true
for _, loadedPartitionID := range collectionInfo.PartitionIDs {

View File

@ -937,6 +937,12 @@ func TestLoadCollectionWithReplicas(t *testing.T) {
assert.Equal(t, loadCollectionReq.CollectionID, replicas[i].CollectionID)
}
// Load the loaded collection with different replica number should fail
loadCollectionReq.ReplicaNumber = 2
status, err = queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode)
status, err = queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
@ -954,7 +960,7 @@ func TestLoadCollectionWithReplicas(t *testing.T) {
assert.Nil(t, err)
}
func Test_LoadPartitionsWithReplicas(t *testing.T) {
func TestLoadPartitionsWithReplicas(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
@ -994,6 +1000,12 @@ func Test_LoadPartitionsWithReplicas(t *testing.T) {
waitLoadPartitionDone(ctx, queryCoord,
loadPartitionsReq.CollectionID, loadPartitionsReq.PartitionIDs)
// Load the loaded partitions with different replica number should fail
loadPartitionsReq.ReplicaNumber = 2
status, err = queryCoord.LoadPartitions(ctx, loadPartitionsReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_IllegalArgument, status.ErrorCode)
status, err = queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,