Add constraints to the combination of load and release (#13110)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/13470/head
xige-16 2021-12-15 22:11:09 +08:00 committed by GitHub
parent bcebb691c3
commit 42ac5f76ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1107 additions and 334 deletions

View File

@ -1545,7 +1545,7 @@ func TestProxy(t *testing.T) {
Type: milvuspb.ShowType_InMemory,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
// default partition
assert.Equal(t, 0, len(resp.PartitionNames))

View File

@ -1,34 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"errors"
"fmt"
)
func errQueryNodeIsNotOnService(id UniqueID) error {
return fmt.Errorf("query node %d is not on service", id)
}
func msgQueryCoordIsUnhealthy(coordID UniqueID) string {
return fmt.Sprintf("QueryCoord %d is not ready", coordID)
}
func errQueryCoordIsUnhealthy(coordID UniqueID) error {
return errors.New(msgQueryCoordIsUnhealthy(coordID))
}

View File

@ -1,49 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestErrQueryNodeIsNotOnService(t *testing.T) {
queryNodeIDList := []UniqueID{
1,
}
for _, id := range queryNodeIDList {
log.Info("TestErrQueryNodeIsNotOnService",
zap.Error(errQueryNodeIsNotOnService(id)))
}
}
func TestMsgQueryCoordIsUnhealthy(t *testing.T) {
nodeIDList := []UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestMsgQueryCoordIsUnhealthy", zap.String("msg", msgQueryCoordIsUnhealthy(nodeID)))
}
}
func TestErrQueryCoordIsUnhealthy(t *testing.T) {
nodeIDList := []UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestErrQueryCoordIsUnhealthy", zap.Error(errQueryCoordIsUnhealthy(nodeID)))
}
}

View File

@ -89,16 +89,18 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
// ShowCollections return all the collections that have been loaded
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
dbID := req.DbID
log.Debug("show collection start", zap.Int64("dbID", dbID))
log.Debug("show collection start",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collectionIDs", req.CollectionIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("show collection end with query coordinator not healthy")
log.Error("show collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.ShowCollectionsResponse{
Status: status,
}, nil
@ -115,7 +117,11 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
for _, id := range inMemoryCollectionIDs {
inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
}
log.Debug("show collection end", zap.Int64s("collections", inMemoryCollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
log.Debug("show collection end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collections", inMemoryCollectionIDs),
zap.Int64s("inMemoryPercentage", inMemoryPercentages),
zap.Int64("msgID", req.Base.MsgID))
return &querypb.ShowCollectionsResponse{
Status: status,
CollectionIDs: inMemoryCollectionIDs,
@ -127,13 +133,22 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := fmt.Errorf("collection %d has not been loaded to memory or load failed", id)
status.Reason = err.Error()
log.Warn("show collection failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", id),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &querypb.ShowCollectionsResponse{
Status: status,
}, nil
}
inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
}
log.Debug("show collection end", zap.Int64s("collections", req.CollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
log.Debug("show collection end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collections", req.CollectionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Int64s("inMemoryPercentage", inMemoryPercentages))
return &querypb.ShowCollectionsResponse{
Status: status,
CollectionIDs: req.CollectionIDs,
@ -145,20 +160,48 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
collectionID := req.CollectionID
//schema := req.Schema
log.Debug("loadCollectionRequest received", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
zap.Stringer("schema", req.Schema))
log.Debug("loadCollectionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("load collection end with query coordinator not healthy")
log.Error("load collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
// if collection has been loaded by load collection request, return success
if collectionInfo.LoadType == querypb.LoadType_loadCollection {
log.Debug("collection has already been loaded, return load success directly",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
// if some partitions of the collection have been loaded by load partitions request, return error
// should release partitions first, then load collection again
if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
collectionInfo.PartitionIDs, collectionID)
status.Reason = err.Error()
log.Warn("loadCollectionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return status, nil
}
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
loadCollectionTask := &loadCollectionTask{
baseTask: baseTask,
@ -171,6 +214,11 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
}
err := qc.scheduler.Enqueue(loadCollectionTask)
if err != nil {
log.Error("loadCollectionRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
@ -178,14 +226,21 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
err = loadCollectionTask.waitToFinish()
if err != nil {
log.Error("load collection to query nodes failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
log.Debug("loadCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
zap.Any("status", status))
log.Debug("loadCollectionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
@ -193,22 +248,29 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
//dbID := req.DbID
collectionID := req.CollectionID
log.Debug("releaseCollectionRequest received", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
log.Debug("releaseCollectionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("release collection end with query coordinator not healthy")
log.Error("release collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
// if collection has not been loaded into memory, return release collection successfully
hasCollection := qc.meta.hasCollection(collectionID)
if !hasCollection {
log.Warn("release collection end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
log.Debug("release collection end, the collection has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
@ -222,6 +284,11 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
}
err := qc.scheduler.Enqueue(releaseCollectionTask)
if err != nil {
log.Error("releaseCollectionRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
@ -229,13 +296,20 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
err = releaseCollectionTask.waitToFinish()
if err != nil {
log.Error("release collection from query nodes failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
log.Debug("releaseCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
log.Debug("releaseCollectionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
//qc.MetaReplica.printMeta()
//qc.cluster.printMeta()
return status, nil
@ -244,15 +318,19 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
// ShowPartitions return all the partitions that have been loaded
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
collectionID := req.CollectionID
log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs))
log.Debug("show partitions start",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("show partition end with query coordinator not healthy")
log.Error("show partition failed", zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.ShowPartitionsResponse{
Status: status,
}, nil
@ -260,8 +338,13 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
partitionStates, err := qc.meta.showPartitions(collectionID)
if err != nil {
err = fmt.Errorf("collection %d has not been loaded into QueryNode", collectionID)
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Warn("show partitions failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.ShowPartitionsResponse{
Status: status,
}, nil
@ -277,7 +360,12 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
for _, id := range inMemoryPartitionIDs {
inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
}
log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", inMemoryPartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
log.Debug("show partitions end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Int64s("partitionIDs", inMemoryPartitionIDs),
zap.Int64s("inMemoryPercentage", inMemoryPercentages))
return &querypb.ShowPartitionsResponse{
Status: status,
PartitionIDs: inMemoryPartitionIDs,
@ -286,9 +374,15 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
}
for _, id := range req.PartitionIDs {
if _, ok := ID2PartitionState[id]; !ok {
err = fmt.Errorf("partition %d of collection %d has not been loaded into QueryNode", id, collectionID)
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("partition has not been loaded to memory or load failed")
status.Reason = err.Error()
log.Warn("show partitions failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", id),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &querypb.ShowPartitionsResponse{
Status: status,
}, nil
@ -296,7 +390,12 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
}
log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
log.Debug("show partitions end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Int64s("inMemoryPercentage", inMemoryPercentages))
return &querypb.ShowPartitionsResponse{
Status: status,
@ -309,57 +408,84 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
log.Debug("loadPartitionRequest received", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Debug("loadPartitionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("load partition end with query coordinator not healthy")
log.Error("load partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
// if partitionIDs to load are empty, return error
if len(partitionIDs) == 0 {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("partitionIDs are empty")
status.Reason = err.Error()
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Warn("loadPartitionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return status, nil
}
hasCollection := qc.meta.hasCollection(collectionID)
if hasCollection {
partitionIDsToLoad := make([]UniqueID, 0)
loadType, _ := qc.meta.getLoadType(collectionID)
if loadType == querypb.LoadType_loadCollection {
for _, partitionID := range partitionIDs {
hasReleasePartition := qc.meta.hasReleasePartition(collectionID, partitionID)
if hasReleasePartition {
partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
// if the collection has been loaded into memory by load collection request, return error
// should release collection first, then load partitions again
if collectionInfo.LoadType == querypb.LoadType_loadCollection {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err = fmt.Errorf("collection %d has been loaded into QueryNode, please release collection firstly", collectionID)
status.Reason = err.Error()
}
if collectionInfo.LoadType == querypb.LoadType_LoadPartition {
for _, toLoadPartitionID := range partitionIDs {
needLoad := true
for _, loadedPartitionID := range collectionInfo.PartitionIDs {
if toLoadPartitionID == loadedPartitionID {
needLoad = false
break
}
}
}
} else {
for _, partitionID := range partitionIDs {
hasPartition := qc.meta.hasPartition(collectionID, partitionID)
if !hasPartition {
partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
if needLoad {
// if new partitions need to be loaded, return error
// should release partitions first, then load partitions again
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err = fmt.Errorf("some partitions %v of collection %d has been loaded into QueryNode, please release partitions firstly",
collectionInfo.PartitionIDs, collectionID)
status.Reason = err.Error()
}
}
}
if len(partitionIDsToLoad) == 0 {
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
if status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("loadPartitionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return status, nil
}
req.PartitionIDs = partitionIDsToLoad
log.Debug("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
@ -374,6 +500,12 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
}
err := qc.scheduler.Enqueue(loadPartitionTask)
if err != nil {
log.Error("loadPartitionRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
@ -383,15 +515,21 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Error("loadPartitionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return status, nil
}
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Debug("loadPartitionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
@ -400,23 +538,20 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
//dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
log.Debug("releasePartitionRequest received", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Debug("releasePartitionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("release partition end with query coordinator not healthy")
return status, nil
}
hasCollection := qc.meta.hasCollection(collectionID)
if !hasCollection {
log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
log.Error("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
@ -424,47 +559,122 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("partitionIDs are empty")
status.Reason = err.Error()
log.Debug("releasePartitionsRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Warn("releasePartitionsRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
toReleasedPartitions := make([]UniqueID, 0)
for _, id := range partitionIDs {
hasPartition := qc.meta.hasPartition(collectionID, id)
if hasPartition {
toReleasedPartitions = append(toReleasedPartitions, id)
releaseCollection := true
var toReleasedPartitions []UniqueID
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
// if collection has been loaded into memory by load collection request, return error
// part of the partitions released after load collection is temporarily not supported, and will be supported soon
if collectionInfo.LoadType == querypb.LoadType_loadCollection {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("releasing some partitions after load collection is not supported")
status.Reason = err.Error()
log.Warn("releasePartitionsRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return status, nil
}
for _, partitionID := range collectionInfo.PartitionIDs {
toRelease := false
for _, releasedPartitionID := range partitionIDs {
if partitionID == releasedPartitionID {
toRelease = true
toReleasedPartitions = append(toReleasedPartitions, releasedPartitionID)
}
}
if !toRelease {
releaseCollection = false
}
}
} else {
log.Debug("release partitions end, the collection has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
if len(toReleasedPartitions) == 0 {
log.Debug("release partitions end, the partitions has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
var releaseTask task
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
if releaseCollection {
// if all loaded partitions will be released from memory, then upgrade release partitions request to release collection request
log.Debug(fmt.Sprintf("all partitions of collection %d will released from QueryNode, so release the collection directly", collectionID),
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID))
msgBase := req.Base
msgBase.MsgType = commonpb.MsgType_ReleaseCollection
releaseCollectionRequest := &querypb.ReleaseCollectionRequest{
Base: msgBase,
CollectionID: req.CollectionID,
}
releaseTask = &releaseCollectionTask{
baseTask: baseTask,
ReleaseCollectionRequest: releaseCollectionRequest,
cluster: qc.cluster,
meta: qc.meta,
rootCoord: qc.rootCoordClient,
}
} else {
req.PartitionIDs = toReleasedPartitions
releaseTask = &releasePartitionTask{
baseTask: baseTask,
ReleasePartitionsRequest: req,
cluster: qc.cluster,
}
}
if len(toReleasedPartitions) == 0 {
log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
return status, nil
}
req.PartitionIDs = toReleasedPartitions
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
releasePartitionTask := &releasePartitionTask{
baseTask: baseTask,
ReleasePartitionsRequest: req,
cluster: qc.cluster,
}
err := qc.scheduler.Enqueue(releasePartitionTask)
err := qc.scheduler.Enqueue(releaseTask)
if err != nil {
log.Error("releasePartitionRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
err = releasePartitionTask.waitToFinish()
err = releaseTask.waitToFinish()
if err != nil {
log.Error("releasePartitionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
log.Debug("releasePartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
log.Debug("releasePartitionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
//qc.MetaReplica.printMeta()
//qc.cluster.printMeta()
return status, nil
@ -472,14 +682,18 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
// CreateQueryChannel assigns unique querychannel and resultchannel to the specified collecion
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
log.Debug("createQueryChannelRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("createQueryChannel end with query coordinator not healthy")
log.Error("createQueryChannel failed", zap.String("role", typeutil.QueryCoordRole), zap.Error(err))
return &querypb.CreateQueryChannelResponse{
Status: status,
}, nil
@ -490,12 +704,20 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Debug("createQueryChannel end with error")
log.Error("createQueryChannel end with error",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Error(err))
return &querypb.CreateQueryChannelResponse{
Status: status,
}, nil
}
log.Debug("createQueryChannelRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.String("request channel", info.QueryChannel),
zap.String("result channel", info.QueryResultChannel))
return &querypb.CreateQueryChannelResponse{
Status: status,
QueryChannel: info.QueryChannel,
@ -505,14 +727,20 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
log.Debug("getPartitionStatesRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("getPartitionStates end with query coordinator not healthy")
log.Error("getPartitionStates failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.GetPartitionStatesResponse{
Status: status,
}, nil
@ -523,8 +751,15 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
for _, partitionID := range partitionIDs {
res, err := qc.meta.getPartitionStatesByID(req.CollectionID, partitionID)
if err != nil {
err = fmt.Errorf("partition %d of collection %d has not been loaded into QueryNode", partitionID, req.CollectionID)
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Warn("getPartitionStatesRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &querypb.GetPartitionStatesResponse{
Status: status,
}, nil
@ -535,7 +770,11 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
}
partitionStates = append(partitionStates, partitionState)
}
log.Debug("getPartitionStatesRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID))
return &querypb.GetPartitionStatesResponse{
Status: status,
PartitionDescriptions: partitionStates,
@ -544,14 +783,20 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
// GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ...
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
log.Debug("getSegmentInfoRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("segmentIDs", req.SegmentIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("getSegmentInfo end with query coordinator not healthy")
log.Error("getSegmentInfo failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.GetSegmentInfoResponse{
Status: status,
}, nil
@ -566,6 +811,12 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
log.Error("getSegmentInfoRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("segmentIDs", req.SegmentIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &querypb.GetSegmentInfoResponse{
Status: status,
}, nil
@ -574,7 +825,12 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
totalNumRows += info.NumRows
totalMemSize += info.MemSize
}
log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
log.Debug("getSegmentInfoRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID),
zap.Int64("num rows", totalNumRows),
zap.Int64("memory size", totalMemSize))
return &querypb.GetSegmentInfoResponse{
Status: status,
Infos: segmentInfos,
@ -583,19 +839,21 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
// LoadBalance would do a load balancing operation between query nodes
func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
log.Debug("LoadBalanceRequest received",
log.Debug("loadBalanceRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Any("req", req),
)
zap.Int64s("source nodeIDs", req.SourceNodeIDs),
zap.Int64s("dst nodeIDs", req.DstNodeIDs),
zap.Int64s("balanced segments", req.SealedSegmentIDs),
zap.Int64("msgID", req.Base.MsgID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Debug("LoadBalance failed", zap.Error(err))
log.Error("loadBalance failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
@ -611,6 +869,10 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
}
err := qc.scheduler.Enqueue(loadBalanceTask)
if err != nil {
log.Error("loadBalanceRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
@ -618,96 +880,99 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
err = loadBalanceTask.waitToFinish()
if err != nil {
log.Error("loadBalanceRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
log.Debug("LoadBalanceRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Any("req", req),
)
return status, nil
}
func (qc *QueryCoord) isHealthy() bool {
code := qc.stateCode.Load().(internalpb.StateCode)
return code == internalpb.StateCode_Healthy
log.Debug("loadBalanceRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("source nodeIDs", req.SourceNodeIDs),
zap.Int64s("dst nodeIDs", req.DstNodeIDs),
zap.Int64s("balanced segments", req.SealedSegmentIDs),
zap.Int64("msgID", req.Base.MsgID))
return status, nil
}
// GetMetrics returns all the queryCoord's metrics
func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("QueryCoord.GetMetrics",
zap.Int64("node_id", Params.QueryCoordID),
zap.String("req", req.Request))
log.Debug("getMetricsRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.String("req", req.Request),
zap.Int64("msgID", req.Base.MsgID))
if !qc.isHealthy() {
log.Warn("QueryCoord.GetMetrics failed",
zap.Int64("node_id", Params.QueryCoordID),
zap.String("req", req.Request),
zap.Error(errQueryCoordIsUnhealthy(Params.QueryCoordID)))
getMetricsResponse := &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID),
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgQueryCoordIsUnhealthy(Params.QueryCoordID),
},
Response: "",
}, nil
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
err := errors.New("QueryCoord is not healthy")
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return getMetricsResponse, nil
}
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("QueryCoord.GetMetrics failed to parse metric type",
zap.Int64("node_id", Params.QueryCoordID),
zap.String("req", req.Request),
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed to parse metric type",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
return getMetricsResponse, nil
}
log.Debug("QueryCoord.GetMetrics",
log.Debug("getMetrics",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.String("metric_type", metricType))
if metricType == metricsinfo.SystemInfoMetrics {
ret, err := qc.metricsCacheManager.GetSystemInfoMetrics()
if err == nil && ret != nil {
log.Debug("getMetrics completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID))
return ret, nil
}
log.Debug("failed to get system info metrics from cache, recompute instead",
zap.Error(err))
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID))
metrics, err := getSystemInfoMetrics(ctx, req, qc)
if err != nil {
log.Error("getSystemInfoMetrics failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
getMetricsResponse.Status.Reason = err.Error()
return getMetricsResponse, nil
}
log.Debug("QueryCoord.GetMetrics",
zap.Int64("node_id", Params.QueryCoordID),
// get metric success, the set the status.ErrorCode to success
getMetricsResponse.Response = metrics
qc.metricsCacheManager.UpdateSystemInfoMetrics(getMetricsResponse)
log.Debug("getMetrics completed",
zap.String("role", typeutil.QueryCoordRole),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
qc.metricsCacheManager.UpdateSystemInfoMetrics(metrics)
return metrics, nil
zap.Int64("msgID", req.Base.MsgID))
getMetricsResponse.Status.ErrorCode = commonpb.ErrorCode_Success
return getMetricsResponse, nil
}
err = errors.New(metricsinfo.MsgUnimplementedMetric)
log.Debug("QueryCoord.GetMetrics failed",
zap.Int64("node_id", Params.QueryCoordID),
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed",
zap.String("role", typeutil.QueryCoordRole),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
return getMetricsResponse, nil
}

View File

@ -35,6 +35,63 @@ import (
"github.com/milvus-io/milvus/internal/util/metricsinfo"
)
func waitLoadPartitionDone(ctx context.Context, queryCoord *QueryCoord, collectionID UniqueID, partitionIDs []UniqueID) error {
for {
showPartitionReq := &querypb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: collectionID,
PartitionIDs: partitionIDs,
}
res, err := queryCoord.ShowPartitions(ctx, showPartitionReq)
if err != nil || res.Status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New("showPartitions failed")
}
loadDone := true
for _, percent := range res.InMemoryPercentages {
if percent < 100 {
loadDone = false
}
}
if loadDone {
break
}
}
return nil
}
func waitLoadCollectionDone(ctx context.Context, queryCoord *QueryCoord, collectionID UniqueID) error {
for {
showCollectionReq := &querypb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionIDs: []UniqueID{collectionID},
}
res, err := queryCoord.ShowCollections(ctx, showCollectionReq)
if err != nil || res.Status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New("showCollection failed")
}
loadDone := true
for _, percent := range res.InMemoryPercentages {
if percent < 100 {
loadDone = false
}
}
if loadDone {
break
}
}
return nil
}
func TestGrpcTask(t *testing.T) {
refreshParams()
ctx := context.Background()
@ -130,6 +187,41 @@ func TestGrpcTask(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Test ReleaseEmptyPartitions", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
})
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test ReleaseNotExistPartition", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{-1},
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test ReleasePartition", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test LoadCollection", func(t *testing.T) {
status, err := queryCoord.LoadCollection(ctx, &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
@ -142,18 +234,18 @@ func TestGrpcTask(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Test LoadParAfterLoadCol", func(t *testing.T) {
status, err := queryCoord.LoadPartitions(ctx, &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
})
//t.Run("Test LoadParAfterLoadCol", func(t *testing.T) {
// status, err := queryCoord.LoadPartitions(ctx, &querypb.LoadPartitionsRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_LoadPartitions,
// },
// CollectionID: defaultCollectionID,
// PartitionIDs: []UniqueID{defaultPartitionID},
// Schema: genCollectionSchema(defaultCollectionID, false),
// })
// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
// assert.Nil(t, err)
//})
t.Run("Test ShowCollections", func(t *testing.T) {
res, err := queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
@ -210,41 +302,6 @@ func TestGrpcTask(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Test ReleaseEmptyPartitions", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
})
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test ReleaseNotExistPartition", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{-1},
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test ReleasePartition", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test ReleaseNotExistCollection", func(t *testing.T) {
status, err := queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
@ -403,6 +460,41 @@ func TestGrpcTaskEnqueueFail(t *testing.T) {
assert.Nil(t, err)
queryCoord.scheduler.taskIDAllocator = failedAllocator
t.Run("Test ReleaseCollection", func(t *testing.T) {
status, err := queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
})
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
})
queryCoord.scheduler.taskIDAllocator = taskIDAllocator
status, err = queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
status, err = queryCoord.LoadPartitions(ctx, &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
})
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
queryCoord.scheduler.taskIDAllocator = failedAllocator
t.Run("Test ReleasePartition", func(t *testing.T) {
status, err := queryCoord.ReleasePartitions(ctx, &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
@ -415,17 +507,6 @@ func TestGrpcTaskEnqueueFail(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Test ReleaseCollection", func(t *testing.T) {
status, err := queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
})
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
})
t.Run("Test LoadBalance", func(t *testing.T) {
status, err := queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
@ -711,3 +792,516 @@ func TestQueryCoord_GetComponentStates(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
func Test_RepeatedLoadSameCollection(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
//first load defaultCollectionID
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadCollectionDone(ctx, queryCoord, defaultCollectionID)
// second load defaultCollectionID
status, err = queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadCollectionAndLoadPartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
//first load defaultCollectionID
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadCollectionDone(ctx, queryCoord, defaultCollectionID)
// second load defaultPartitionID
status, err = queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_RepeatedLoadSamePartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
//first load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// second load defaultPartitionID
status, err = queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_RepeatedLoadDifferentPartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
//first load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// second load defaultPartitionID+1
loadPartitionReq.PartitionIDs = []UniqueID{defaultPartitionID + 1}
status, err = queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadPartitionsAndLoadCollection(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
//first load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// second load defaultCollectionID
status, err = queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadAndReleaseCollection(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
releaseCollectionReq := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
}
//first load defaultCollectionID
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadCollectionDone(ctx, queryCoord, defaultCollectionID)
// second release defaultCollectionID
status, err = queryCoord.ReleaseCollection(ctx, releaseCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadAndReleasePartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
releasePartitionReq := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
}
//first load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// second release defaultPartitionID
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadCollectionAndReleasePartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
releasePartitionReq := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
}
//first load defaultCollectionID
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadCollectionDone(ctx, queryCoord, defaultCollectionID)
// second release defaultPartitionID
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_LoadPartitionsAndReleaseCollection(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
releaseCollectionReq := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
}
//first load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// second release defaultCollectionID
status, err = queryCoord.ReleaseCollection(ctx, releaseCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_RepeatedReleaseCollection(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadCollectionReq := &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
},
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
releaseCollectionReq := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
},
CollectionID: defaultCollectionID,
}
// load defaultCollectionID
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadCollectionDone(ctx, queryCoord, defaultCollectionID)
// first release defaultCollectionID
status, err = queryCoord.ReleaseCollection(ctx, releaseCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
// second release defaultCollectionID
status, err = queryCoord.ReleaseCollection(ctx, releaseCollectionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_RepeatedReleaseSamePartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
releasePartitionReq := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
}
// load defaultPartitionID
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID})
// first release defaultPartitionID
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
// second release defaultPartitionID
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}
func Test_RepeatedReleaseDifferentPartitions(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
loadPartitionReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID, defaultPartitionID + 1},
Schema: genCollectionSchema(defaultCollectionID, false),
}
releasePartitionReq := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
}
// load defaultPartitionID and defaultPartitionID+1
status, err := queryCoord.LoadPartitions(ctx, loadPartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
waitLoadPartitionDone(ctx, queryCoord, defaultCollectionID, []UniqueID{defaultPartitionID, defaultPartitionID + 1})
// first release defaultPartitionID
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
// second release defaultPartitionID+1
releasePartitionReq.PartitionIDs = []UniqueID{defaultPartitionID + 1}
status, err = queryCoord.ReleasePartitions(ctx, releasePartitionReq)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Nil(t, err)
node.stop()
queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
}

View File

@ -36,8 +36,7 @@ import (
func getSystemInfoMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
qc *QueryCoord,
) (*milvuspb.GetMetricsResponse, error) {
qc *QueryCoord) (string, error) {
clusterTopology := metricsinfo.QueryClusterTopology{
Self: metricsinfo.QueryCoordInfos{
@ -128,22 +127,8 @@ func getSystemInfoMetrics(
resp, err := metricsinfo.MarshalTopology(coordTopology)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID),
}, nil
return "", err
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID),
}, nil
return resp, nil
}

View File

@ -583,7 +583,7 @@ func (qn *queryNode) getComponentInfo(ctx context.Context) *internalpb.Component
func (qn *queryNode) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if !qn.isOnline() {
return nil, errQueryNodeIsNotOnService(qn.id)
return nil, fmt.Errorf("getMetrics: queryNode %d is offline", qn.id)
}
return qn.client.GetMetrics(qn.ctx, in)

View File

@ -728,6 +728,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
if !lpt.meta.hasCollection(collectionID) {
lpt.meta.addCollection(collectionID, lpt.Schema)
lpt.meta.setLoadType(collectionID, querypb.LoadType_LoadPartition)
lpt.addCol = true
}
for _, id := range partitionIDs {

View File

@ -630,10 +630,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
//notify query service to release partition
if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil {
log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
return err
}
// TODO::xige-16, reOpen when queryCoord support release partitions after load collection
//if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil {
// log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
// return err
//}
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)

View File

@ -2922,6 +2922,7 @@ class TestLoadCollection:
"""
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_load_collection_release_part_partitions(self, connect, collection):
"""
target: test release part partitions after load collection
@ -2942,6 +2943,7 @@ class TestLoadCollection:
assert len(res[0]) == default_top_k
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_load_collection_release_all_partitions(self, connect, collection):
"""
target: test release all partitions after load collection

View File

@ -235,6 +235,7 @@ class TestPartitionParams(TestcaseBase):
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_partition_release(self):
"""
target: verify release partition

View File

@ -476,6 +476,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
"into memory" % collection_w.name})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_release_partition(self):
"""
target: test the scenario which search the released collection
@ -544,6 +545,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_deleted(self):
"""
target: test search deleted partition
@ -973,6 +975,7 @@ class TestCollectionSearch(TestcaseBase):
"_async": _async})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_before_after_delete(self, nq, dim, auto_id, _async):
"""
target: test search function before and after deletion
@ -1020,6 +1023,7 @@ class TestCollectionSearch(TestcaseBase):
"_async": _async})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_one(self, nq, dim, auto_id, _async):
"""
target: test search function before and after release
@ -1067,6 +1071,7 @@ class TestCollectionSearch(TestcaseBase):
"_async": _async})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_all(self, nq, dim, auto_id, _async):
"""
target: test search function before and after release
@ -1142,6 +1147,7 @@ class TestCollectionSearch(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.xfail(reason="issue 6997")
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_load(self, nb, nq, dim, auto_id, _async):
"""
target: search the pre-released collection after load

View File

@ -825,6 +825,7 @@ class TestUtilityBase(TestcaseBase):
assert res == exp_res
@pytest.mark.tag(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_loading_progress_with_release_partition(self):
"""
target: test loading progress after release part partitions