2021-11-17 11:41:41 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 05:47:10 +00:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 11:41:41 +00:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 05:47:10 +00:00
//
2021-11-17 11:41:41 +00:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 05:47:10 +00:00
2021-06-22 08:44:09 +00:00
package querycoord
2021-04-15 07:15:46 +00:00
import (
"context"
"errors"
2021-11-18 03:31:12 +00:00
"fmt"
2021-08-17 02:06:11 +00:00
2021-11-19 05:57:12 +00:00
"github.com/milvus-io/milvus/internal/common"
2021-04-15 07:15:46 +00:00
"go.uber.org/zap"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
2021-09-15 14:17:49 +00:00
"github.com/milvus-io/milvus/internal/util/metricsinfo"
2021-04-15 07:15:46 +00:00
)
2021-09-29 09:05:58 +00:00
// GetComponentStates return information about whether the coord is healthy
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) GetComponentStates ( ctx context . Context ) ( * internalpb . ComponentStates , error ) {
2021-11-19 05:57:12 +00:00
nodeID := common . NotRegisteredID
if qc . session != nil && qc . session . Registered ( ) {
nodeID = qc . session . ServerID
}
2021-04-15 07:15:46 +00:00
serviceComponentInfo := & internalpb . ComponentInfo {
2021-11-19 05:57:12 +00:00
// NodeID: Params.QueryCoordID, // will race with QueryCoord.Register()
NodeID : nodeID ,
2021-06-22 08:44:09 +00:00
StateCode : qc . stateCode . Load ( ) . ( internalpb . StateCode ) ,
2021-04-15 07:15:46 +00:00
}
2021-06-19 03:45:09 +00:00
//subComponentInfos, err := qs.cluster.GetComponentInfos(ctx)
//if err != nil {
// return &internalpb.ComponentStates{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: err.Error(),
// },
// }, err
//}
2021-04-15 07:15:46 +00:00
return & internalpb . ComponentStates {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
2021-06-19 03:45:09 +00:00
State : serviceComponentInfo ,
//SubcomponentStates: subComponentInfos,
2021-04-15 07:15:46 +00:00
} , nil
}
2021-10-15 01:02:40 +00:00
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which has been sent by query nodes
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) GetTimeTickChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
2021-04-15 07:15:46 +00:00
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
Value : Params . TimeTickChannelName ,
} , nil
}
2021-10-20 01:14:37 +00:00
// GetStatisticsChannel return the statistics channel
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) GetStatisticsChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
2021-04-15 07:15:46 +00:00
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
Value : Params . StatsChannelName ,
} , nil
}
2021-09-29 15:26:56 +00:00
// ShowCollections return all the collections that have been loaded
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) ShowCollections ( ctx context . Context , req * querypb . ShowCollectionsRequest ) ( * querypb . ShowCollectionsResponse , error ) {
2021-04-15 07:15:46 +00:00
dbID := req . DbID
2021-06-19 03:45:09 +00:00
log . Debug ( "show collection start" , zap . Int64 ( "dbID" , dbID ) )
2021-06-24 08:00:15 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "show collection end with query coordinator not healthy" )
return & querypb . ShowCollectionsResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-06-24 08:00:15 +00:00
}
2021-08-02 14:39:25 +00:00
collectionInfos := qc . meta . showCollections ( )
ID2collectionInfo := make ( map [ UniqueID ] * querypb . CollectionInfo )
inMemoryCollectionIDs := make ( [ ] UniqueID , 0 )
for _ , info := range collectionInfos {
ID2collectionInfo [ info . CollectionID ] = info
inMemoryCollectionIDs = append ( inMemoryCollectionIDs , info . CollectionID )
}
inMemoryPercentages := make ( [ ] int64 , 0 )
if len ( req . CollectionIDs ) == 0 {
for _ , id := range inMemoryCollectionIDs {
inMemoryPercentages = append ( inMemoryPercentages , ID2collectionInfo [ id ] . InMemoryPercentage )
}
log . Debug ( "show collection end" , zap . Int64s ( "collections" , inMemoryCollectionIDs ) , zap . Int64s ( "inMemoryPercentage" , inMemoryPercentages ) )
return & querypb . ShowCollectionsResponse {
Status : status ,
CollectionIDs : inMemoryCollectionIDs ,
InMemoryPercentages : inMemoryPercentages ,
} , nil
}
for _ , id := range req . CollectionIDs {
if _ , ok := ID2collectionInfo [ id ] ; ! ok {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
2021-11-18 03:31:12 +00:00
err := fmt . Errorf ( "collection %d has not been loaded to memory or load failed" , id )
2021-08-02 14:39:25 +00:00
status . Reason = err . Error ( )
return & querypb . ShowCollectionsResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-08-02 14:39:25 +00:00
}
inMemoryPercentages = append ( inMemoryPercentages , ID2collectionInfo [ id ] . InMemoryPercentage )
}
log . Debug ( "show collection end" , zap . Int64s ( "collections" , req . CollectionIDs ) , zap . Int64s ( "inMemoryPercentage" , inMemoryPercentages ) )
2021-04-15 07:15:46 +00:00
return & querypb . ShowCollectionsResponse {
2021-08-02 14:39:25 +00:00
Status : status ,
CollectionIDs : req . CollectionIDs ,
InMemoryPercentages : inMemoryPercentages ,
2021-04-15 07:15:46 +00:00
} , nil
}
2021-09-30 03:12:03 +00:00
// LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) LoadCollection ( ctx context . Context , req * querypb . LoadCollectionRequest ) ( * commonpb . Status , error ) {
2021-04-15 07:15:46 +00:00
collectionID := req . CollectionID
2021-06-19 03:45:09 +00:00
//schema := req.Schema
2021-12-08 01:41:05 +00:00
log . Debug ( "loadCollectionRequest received" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) ,
2021-04-15 07:15:46 +00:00
zap . Stringer ( "schema" , req . Schema ) )
status := & commonpb . Status {
2021-06-23 09:44:12 +00:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-04-15 07:15:46 +00:00
}
2021-06-24 08:00:15 +00:00
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "load collection end with query coordinator not healthy" )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-24 08:00:15 +00:00
}
2021-04-15 07:15:46 +00:00
2021-10-11 01:54:37 +00:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_grpcRequest )
2021-10-18 13:34:47 +00:00
loadCollectionTask := & loadCollectionTask {
baseTask : baseTask ,
2021-04-15 07:15:46 +00:00
LoadCollectionRequest : req ,
2021-06-22 08:44:09 +00:00
rootCoord : qc . rootCoordClient ,
dataCoord : qc . dataCoordClient ,
2021-11-17 01:47:12 +00:00
indexCoord : qc . indexCoordClient ,
2021-06-22 08:44:09 +00:00
cluster : qc . cluster ,
meta : qc . meta ,
2021-04-15 07:15:46 +00:00
}
2021-10-11 01:54:37 +00:00
err := qc . scheduler . Enqueue ( loadCollectionTask )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-10-11 01:54:37 +00:00
}
2021-04-15 07:15:46 +00:00
2021-10-14 12:18:33 +00:00
err = loadCollectionTask . waitToFinish ( )
2021-06-21 11:20:31 +00:00
if err != nil {
2021-06-23 09:44:12 +00:00
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
2021-06-21 11:20:31 +00:00
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-21 11:20:31 +00:00
}
2021-04-15 07:15:46 +00:00
2021-12-08 01:41:05 +00:00
log . Debug ( "loadCollectionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) , zap . Any ( "status" , status ) )
2021-04-15 07:15:46 +00:00
return status , nil
}
2021-10-01 02:03:52 +00:00
// ReleaseCollection clears all data related to this collecion on the querynode
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) ReleaseCollection ( ctx context . Context , req * querypb . ReleaseCollectionRequest ) ( * commonpb . Status , error ) {
2021-06-15 04:41:40 +00:00
//dbID := req.DbID
2021-04-15 07:15:46 +00:00
collectionID := req . CollectionID
2021-12-07 13:33:04 +00:00
log . Debug ( "releaseCollectionRequest received" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) )
2021-04-15 07:15:46 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
2021-06-24 08:00:15 +00:00
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "release collection end with query coordinator not healthy" )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-24 08:00:15 +00:00
}
2021-06-22 08:44:09 +00:00
hasCollection := qc . meta . hasCollection ( collectionID )
2021-06-15 04:41:40 +00:00
if ! hasCollection {
2021-09-15 14:17:49 +00:00
log . Warn ( "release collection end, query coordinator don't have the log of" , zap . Int64 ( "collectionID" , collectionID ) )
2021-04-15 07:15:46 +00:00
return status , nil
}
2021-10-11 01:54:37 +00:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_grpcRequest )
2021-10-18 13:34:47 +00:00
releaseCollectionTask := & releaseCollectionTask {
baseTask : baseTask ,
2021-04-15 07:15:46 +00:00
ReleaseCollectionRequest : req ,
2021-06-22 08:44:09 +00:00
cluster : qc . cluster ,
2021-07-02 02:40:13 +00:00
meta : qc . meta ,
rootCoord : qc . rootCoordClient ,
2021-04-15 07:15:46 +00:00
}
2021-10-11 01:54:37 +00:00
err := qc . scheduler . Enqueue ( releaseCollectionTask )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-10-11 01:54:37 +00:00
}
2021-06-15 04:41:40 +00:00
2021-10-14 12:18:33 +00:00
err = releaseCollectionTask . waitToFinish ( )
2021-04-15 07:15:46 +00:00
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-04-15 07:15:46 +00:00
}
2021-12-07 13:33:04 +00:00
log . Debug ( "releaseCollectionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) )
2021-08-02 14:39:25 +00:00
//qc.MetaReplica.printMeta()
2021-07-14 06:15:55 +00:00
//qc.cluster.printMeta()
2021-04-15 07:15:46 +00:00
return status , nil
}
2021-09-29 15:30:45 +00:00
// ShowPartitions return all the partitions that have been loaded
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) ShowPartitions ( ctx context . Context , req * querypb . ShowPartitionsRequest ) ( * querypb . ShowPartitionsResponse , error ) {
2021-04-15 07:15:46 +00:00
collectionID := req . CollectionID
2021-08-02 14:39:25 +00:00
log . Debug ( "show partitions start, " , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , req . PartitionIDs ) )
2021-06-24 08:00:15 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "show partition end with query coordinator not healthy" )
return & querypb . ShowPartitionsResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-06-24 08:00:15 +00:00
}
2021-08-02 14:39:25 +00:00
partitionStates , err := qc . meta . showPartitions ( collectionID )
2021-04-15 07:15:46 +00:00
if err != nil {
2021-06-24 08:00:15 +00:00
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-04-15 07:15:46 +00:00
return & querypb . ShowPartitionsResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-04-15 07:15:46 +00:00
}
2021-08-02 14:39:25 +00:00
ID2PartitionState := make ( map [ UniqueID ] * querypb . PartitionStates )
inMemoryPartitionIDs := make ( [ ] UniqueID , 0 )
for _ , state := range partitionStates {
ID2PartitionState [ state . PartitionID ] = state
inMemoryPartitionIDs = append ( inMemoryPartitionIDs , state . PartitionID )
}
inMemoryPercentages := make ( [ ] int64 , 0 )
if len ( req . PartitionIDs ) == 0 {
for _ , id := range inMemoryPartitionIDs {
inMemoryPercentages = append ( inMemoryPercentages , ID2PartitionState [ id ] . InMemoryPercentage )
}
log . Debug ( "show partitions end" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , inMemoryPartitionIDs ) , zap . Int64s ( "inMemoryPercentage" , inMemoryPercentages ) )
return & querypb . ShowPartitionsResponse {
Status : status ,
PartitionIDs : inMemoryPartitionIDs ,
InMemoryPercentages : inMemoryPercentages ,
} , nil
}
for _ , id := range req . PartitionIDs {
if _ , ok := ID2PartitionState [ id ] ; ! ok {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
2021-08-03 14:03:25 +00:00
err := errors . New ( "partition has not been loaded to memory or load failed" )
2021-08-02 14:39:25 +00:00
status . Reason = err . Error ( )
return & querypb . ShowPartitionsResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-08-02 14:39:25 +00:00
}
inMemoryPercentages = append ( inMemoryPercentages , ID2PartitionState [ id ] . InMemoryPercentage )
}
2021-06-15 04:41:40 +00:00
2021-08-02 14:39:25 +00:00
log . Debug ( "show partitions end" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , req . PartitionIDs ) , zap . Int64s ( "inMemoryPercentage" , inMemoryPercentages ) )
2021-06-19 03:45:09 +00:00
2021-04-15 07:15:46 +00:00
return & querypb . ShowPartitionsResponse {
2021-08-02 14:39:25 +00:00
Status : status ,
PartitionIDs : req . PartitionIDs ,
InMemoryPercentages : inMemoryPercentages ,
2021-04-15 07:15:46 +00:00
} , nil
}
2021-10-15 01:04:32 +00:00
// LoadPartitions loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) LoadPartitions ( ctx context . Context , req * querypb . LoadPartitionsRequest ) ( * commonpb . Status , error ) {
2021-04-15 07:15:46 +00:00
collectionID := req . CollectionID
partitionIDs := req . PartitionIDs
2021-12-09 01:25:16 +00:00
log . Debug ( "loadPartitionRequest received" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-04-15 07:15:46 +00:00
status := & commonpb . Status {
2021-06-23 09:44:12 +00:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-06-19 03:45:09 +00:00
}
2021-06-24 08:00:15 +00:00
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "load partition end with query coordinator not healthy" )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-24 08:00:15 +00:00
}
2021-04-15 07:15:46 +00:00
if len ( partitionIDs ) == 0 {
2021-06-23 09:44:12 +00:00
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
2021-04-15 07:15:46 +00:00
err := errors . New ( "partitionIDs are empty" )
status . Reason = err . Error ( )
2021-12-09 01:25:16 +00:00
log . Debug ( "loadPartitionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-12-01 14:15:37 +00:00
return status , nil
2021-04-15 07:15:46 +00:00
}
2021-06-23 09:44:12 +00:00
hasCollection := qc . meta . hasCollection ( collectionID )
if hasCollection {
partitionIDsToLoad := make ( [ ] UniqueID , 0 )
2021-08-02 14:39:25 +00:00
loadType , _ := qc . meta . getLoadType ( collectionID )
if loadType == querypb . LoadType_loadCollection {
2021-06-23 09:44:12 +00:00
for _ , partitionID := range partitionIDs {
hasReleasePartition := qc . meta . hasReleasePartition ( collectionID , partitionID )
if hasReleasePartition {
partitionIDsToLoad = append ( partitionIDsToLoad , partitionID )
}
}
} else {
for _ , partitionID := range partitionIDs {
hasPartition := qc . meta . hasPartition ( collectionID , partitionID )
if ! hasPartition {
partitionIDsToLoad = append ( partitionIDsToLoad , partitionID )
}
}
2021-06-19 03:45:09 +00:00
}
2021-06-23 09:44:12 +00:00
if len ( partitionIDsToLoad ) == 0 {
2021-12-09 01:25:16 +00:00
log . Debug ( "loadPartitionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-23 09:44:12 +00:00
return status , nil
2021-06-21 11:20:31 +00:00
}
2021-06-23 09:44:12 +00:00
req . PartitionIDs = partitionIDsToLoad
}
2021-10-11 01:54:37 +00:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_grpcRequest )
2021-10-18 13:34:47 +00:00
loadPartitionTask := & loadPartitionTask {
baseTask : baseTask ,
2021-06-23 09:44:12 +00:00
LoadPartitionsRequest : req ,
2021-11-17 01:47:12 +00:00
rootCoord : qc . rootCoordClient ,
2021-06-23 09:44:12 +00:00
dataCoord : qc . dataCoordClient ,
2021-11-17 01:47:12 +00:00
indexCoord : qc . indexCoordClient ,
2021-06-23 09:44:12 +00:00
cluster : qc . cluster ,
meta : qc . meta ,
}
2021-10-11 01:54:37 +00:00
err := qc . scheduler . Enqueue ( loadPartitionTask )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-10-11 01:54:37 +00:00
}
2021-06-23 09:44:12 +00:00
2021-10-14 12:18:33 +00:00
err = loadPartitionTask . waitToFinish ( )
2021-06-23 09:44:12 +00:00
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-09 01:25:16 +00:00
log . Debug ( "loadPartitionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-19 03:45:09 +00:00
}
2021-12-09 01:25:16 +00:00
log . Debug ( "loadPartitionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-15 04:41:40 +00:00
return status , nil
2021-04-15 07:15:46 +00:00
}
2021-10-15 12:21:12 +00:00
// ReleasePartitions clears all data related to this partition on the querynode
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) ReleasePartitions ( ctx context . Context , req * querypb . ReleasePartitionsRequest ) ( * commonpb . Status , error ) {
2021-06-15 04:41:40 +00:00
//dbID := req.DbID
2021-04-15 07:15:46 +00:00
collectionID := req . CollectionID
partitionIDs := req . PartitionIDs
2021-12-09 01:27:06 +00:00
log . Debug ( "releasePartitionRequest received" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-04-15 07:15:46 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
2021-06-24 08:00:15 +00:00
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "release partition end with query coordinator not healthy" )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-24 08:00:15 +00:00
}
2021-06-23 09:44:12 +00:00
hasCollection := qc . meta . hasCollection ( collectionID )
if ! hasCollection {
2021-12-09 01:27:06 +00:00
log . Warn ( "release partitions end, query coordinator don't have the log of" , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-06-23 09:44:12 +00:00
return status , nil
2021-04-15 07:15:46 +00:00
}
2021-06-23 09:44:12 +00:00
if len ( partitionIDs ) == 0 {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "partitionIDs are empty" )
status . Reason = err . Error ( )
2021-12-09 01:27:06 +00:00
log . Debug ( "releasePartitionsRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-12-01 14:15:37 +00:00
return status , nil
2021-06-23 09:44:12 +00:00
}
2021-04-15 07:15:46 +00:00
2021-09-10 11:14:01 +00:00
toReleasedPartitions := make ( [ ] UniqueID , 0 )
for _ , id := range partitionIDs {
hasPartition := qc . meta . hasPartition ( collectionID , id )
if hasPartition {
toReleasedPartitions = append ( toReleasedPartitions , id )
}
}
if len ( toReleasedPartitions ) == 0 {
2021-12-09 01:27:06 +00:00
log . Warn ( "release partitions end, query coordinator don't have the log of" , zap . Int64 ( "collectionID" , req . CollectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-09-10 11:14:01 +00:00
return status , nil
}
req . PartitionIDs = toReleasedPartitions
2021-10-11 01:54:37 +00:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_grpcRequest )
2021-10-18 13:34:47 +00:00
releasePartitionTask := & releasePartitionTask {
baseTask : baseTask ,
2021-06-23 09:44:12 +00:00
ReleasePartitionsRequest : req ,
cluster : qc . cluster ,
}
2021-10-11 01:54:37 +00:00
err := qc . scheduler . Enqueue ( releasePartitionTask )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-10-11 01:54:37 +00:00
}
2021-06-23 09:44:12 +00:00
2021-10-14 12:18:33 +00:00
err = releasePartitionTask . waitToFinish ( )
2021-06-23 09:44:12 +00:00
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-04-15 07:15:46 +00:00
}
2021-12-09 01:27:06 +00:00
log . Debug ( "releasePartitionRequest completed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , req . Base . MsgID ) , zap . Int64 ( "collectionID" , collectionID ) , zap . Int64s ( "partitionIDs" , partitionIDs ) )
2021-08-02 14:39:25 +00:00
//qc.MetaReplica.printMeta()
2021-07-14 06:15:55 +00:00
//qc.cluster.printMeta()
2021-04-15 07:15:46 +00:00
return status , nil
}
2021-10-01 04:11:47 +00:00
// CreateQueryChannel assigns unique querychannel and resultchannel to the specified collecion
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) CreateQueryChannel ( ctx context . Context , req * querypb . CreateQueryChannelRequest ) ( * querypb . CreateQueryChannelResponse , error ) {
2021-06-24 08:00:15 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "createQueryChannel end with query coordinator not healthy" )
return & querypb . CreateQueryChannelResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-06-24 08:00:15 +00:00
}
2021-06-15 04:41:40 +00:00
collectionID := req . CollectionID
2021-10-22 11:07:15 +00:00
info , err := qc . meta . getQueryChannelInfoByID ( collectionID )
2021-09-29 01:56:04 +00:00
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
log . Debug ( "createQueryChannel end with error" )
return & querypb . CreateQueryChannelResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-09-29 01:56:04 +00:00
}
2021-06-15 04:41:40 +00:00
return & querypb . CreateQueryChannelResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
2021-10-22 11:07:15 +00:00
RequestChannel : info . QueryChannelID ,
ResultChannel : info . QueryResultChannelID ,
2021-06-15 04:41:40 +00:00
} , nil
}
2021-10-01 04:19:41 +00:00
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) GetPartitionStates ( ctx context . Context , req * querypb . GetPartitionStatesRequest ) ( * querypb . GetPartitionStatesResponse , error ) {
2021-06-24 08:00:15 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "getPartitionStates end with query coordinator not healthy" )
return & querypb . GetPartitionStatesResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-06-24 08:00:15 +00:00
}
2021-06-15 04:41:40 +00:00
partitionIDs := req . PartitionIDs
partitionStates := make ( [ ] * querypb . PartitionStates , 0 )
for _ , partitionID := range partitionIDs {
2021-08-02 14:39:25 +00:00
res , err := qc . meta . getPartitionStatesByID ( req . CollectionID , partitionID )
2021-04-15 07:15:46 +00:00
if err != nil {
2021-06-24 08:00:15 +00:00
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-06-15 04:41:40 +00:00
return & querypb . GetPartitionStatesResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
partitionState := & querypb . PartitionStates {
PartitionID : partitionID ,
2021-08-02 14:39:25 +00:00
State : res . State ,
2021-06-15 04:41:40 +00:00
}
partitionStates = append ( partitionStates , partitionState )
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
return & querypb . GetPartitionStatesResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
2021-06-15 04:41:40 +00:00
PartitionDescriptions : partitionStates ,
2021-04-15 07:15:46 +00:00
} , nil
}
2021-10-02 03:15:55 +00:00
// GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ...
2021-06-22 08:44:09 +00:00
func ( qc * QueryCoord ) GetSegmentInfo ( ctx context . Context , req * querypb . GetSegmentInfoRequest ) ( * querypb . GetSegmentInfoResponse , error ) {
2021-06-24 08:00:15 +00:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "getSegmentInfo end with query coordinator not healthy" )
return & querypb . GetSegmentInfoResponse {
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-06-24 08:00:15 +00:00
}
2021-06-15 04:41:40 +00:00
totalMemSize := int64 ( 0 )
totalNumRows := int64 ( 0 )
2021-08-02 14:39:25 +00:00
//TODO::get segment infos from MetaReplica
2021-06-15 04:41:40 +00:00
//segmentIDs := req.SegmentIDs
2021-08-02 14:39:25 +00:00
//segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs)
2021-06-22 08:44:09 +00:00
segmentInfos , err := qc . cluster . getSegmentInfo ( ctx , req )
2021-04-15 07:15:46 +00:00
if err != nil {
2021-06-24 08:00:15 +00:00
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-06-15 04:41:40 +00:00
return & querypb . GetSegmentInfoResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
2021-12-01 14:15:37 +00:00
} , nil
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
for _ , info := range segmentInfos {
totalNumRows += info . NumRows
totalMemSize += info . MemSize
2021-04-15 07:15:46 +00:00
}
2021-06-15 04:41:40 +00:00
log . Debug ( "getSegmentInfo" , zap . Int64 ( "num rows" , totalNumRows ) , zap . Int64 ( "memory size" , totalMemSize ) )
2021-04-15 07:15:46 +00:00
return & querypb . GetSegmentInfoResponse {
2021-06-24 08:00:15 +00:00
Status : status ,
Infos : segmentInfos ,
2021-04-15 07:15:46 +00:00
} , nil
}
2021-08-17 02:06:11 +00:00
2021-11-06 08:54:59 +00:00
// LoadBalance would do a load balancing operation between query nodes
func ( qc * QueryCoord ) LoadBalance ( ctx context . Context , req * querypb . LoadBalanceRequest ) ( * commonpb . Status , error ) {
log . Debug ( "LoadBalanceRequest received" ,
zap . String ( "role" , Params . RoleName ) ,
zap . Int64 ( "msgID" , req . Base . MsgID ) ,
zap . Any ( "req" , req ) ,
)
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
}
if qc . stateCode . Load ( ) != internalpb . StateCode_Healthy {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
err := errors . New ( "query coordinator is not healthy" )
status . Reason = err . Error ( )
log . Debug ( "LoadBalance failed" , zap . Error ( err ) )
return status , nil
}
2021-11-08 03:15:04 +00:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_loadBalance )
2021-11-06 08:54:59 +00:00
loadBalanceTask := & loadBalanceTask {
baseTask : baseTask ,
LoadBalanceRequest : req ,
rootCoord : qc . rootCoordClient ,
dataCoord : qc . dataCoordClient ,
2021-11-17 01:47:12 +00:00
indexCoord : qc . indexCoordClient ,
2021-11-06 08:54:59 +00:00
cluster : qc . cluster ,
meta : qc . meta ,
}
err := qc . scheduler . Enqueue ( loadBalanceTask )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
return status , nil
}
err = loadBalanceTask . waitToFinish ( )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
2021-12-01 14:15:37 +00:00
return status , nil
2021-11-06 08:54:59 +00:00
}
log . Debug ( "LoadBalanceRequest completed" ,
zap . String ( "role" , Params . RoleName ) ,
zap . Int64 ( "msgID" , req . Base . MsgID ) ,
zap . Any ( "req" , req ) ,
)
return status , nil
}
2021-08-17 02:06:11 +00:00
func ( qc * QueryCoord ) isHealthy ( ) bool {
code := qc . stateCode . Load ( ) . ( internalpb . StateCode )
return code == internalpb . StateCode_Healthy
}
2021-10-02 01:42:23 +00:00
// GetMetrics returns all the queryCoord's metrics
2021-08-17 02:06:11 +00:00
func ( qc * QueryCoord ) GetMetrics ( ctx context . Context , req * milvuspb . GetMetricsRequest ) ( * milvuspb . GetMetricsResponse , error ) {
log . Debug ( "QueryCoord.GetMetrics" ,
zap . Int64 ( "node_id" , Params . QueryCoordID ) ,
zap . String ( "req" , req . Request ) )
if ! qc . isHealthy ( ) {
log . Warn ( "QueryCoord.GetMetrics failed" ,
zap . Int64 ( "node_id" , Params . QueryCoordID ) ,
zap . String ( "req" , req . Request ) ,
zap . Error ( errQueryCoordIsUnhealthy ( Params . QueryCoordID ) ) )
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : msgQueryCoordIsUnhealthy ( Params . QueryCoordID ) ,
} ,
Response : "" ,
} , nil
}
metricType , err := metricsinfo . ParseMetricType ( req . Request )
if err != nil {
log . Warn ( "QueryCoord.GetMetrics failed to parse metric type" ,
zap . Int64 ( "node_id" , Params . QueryCoordID ) ,
zap . String ( "req" , req . Request ) ,
zap . Error ( err ) )
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : err . Error ( ) ,
} ,
Response : "" ,
2021-12-01 14:15:37 +00:00
} , nil
2021-08-17 02:06:11 +00:00
}
log . Debug ( "QueryCoord.GetMetrics" ,
zap . String ( "metric_type" , metricType ) )
if metricType == metricsinfo . SystemInfoMetrics {
2021-09-03 09:15:26 +00:00
ret , err := qc . metricsCacheManager . GetSystemInfoMetrics ( )
if err == nil && ret != nil {
return ret , nil
}
log . Debug ( "failed to get system info metrics from cache, recompute instead" ,
zap . Error ( err ) )
2021-08-17 02:06:11 +00:00
metrics , err := getSystemInfoMetrics ( ctx , req , qc )
log . Debug ( "QueryCoord.GetMetrics" ,
zap . Int64 ( "node_id" , Params . QueryCoordID ) ,
zap . String ( "req" , req . Request ) ,
zap . String ( "metric_type" , metricType ) ,
zap . Any ( "metrics" , metrics ) , // TODO(dragondriver): necessary? may be very large
zap . Error ( err ) )
2021-09-03 09:15:26 +00:00
qc . metricsCacheManager . UpdateSystemInfoMetrics ( metrics )
2021-12-01 14:15:37 +00:00
return metrics , nil
2021-08-17 02:06:11 +00:00
}
2021-09-10 11:14:01 +00:00
err = errors . New ( metricsinfo . MsgUnimplementedMetric )
log . Debug ( "QueryCoord.GetMetrics failed" ,
2021-08-17 02:06:11 +00:00
zap . Int64 ( "node_id" , Params . QueryCoordID ) ,
zap . String ( "req" , req . Request ) ,
2021-09-10 11:14:01 +00:00
zap . String ( "metric_type" , metricType ) ,
zap . Error ( err ) )
2021-08-17 02:06:11 +00:00
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-09-10 11:14:01 +00:00
Reason : err . Error ( ) ,
2021-08-17 02:06:11 +00:00
} ,
Response : "" ,
2021-12-01 14:15:37 +00:00
} , nil
2021-08-17 02:06:11 +00:00
}