2022-07-18 01:58:28 +00:00
package proxy
import (
"context"
"errors"
"fmt"
"strconv"
2022-10-16 12:49:27 +00:00
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
2022-07-18 01:58:28 +00:00
"github.com/milvus-io/milvus/internal/proto/datapb"
2022-10-16 12:49:27 +00:00
"github.com/milvus-io/milvus-proto/go-api/commonpb"
2022-07-18 01:58:28 +00:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
2022-10-19 02:01:26 +00:00
"github.com/milvus-io/milvus/internal/util/commonpbutil"
2022-07-18 01:58:28 +00:00
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
2022-10-08 07:38:58 +00:00
const (
GetCollectionStatisticsTaskName = "GetCollectionStatisticsTask"
GetPartitionStatisticsTaskName = "GetPartitionStatisticsTask"
)
2022-07-18 01:58:28 +00:00
type getStatisticsTask struct {
request * milvuspb . GetStatisticsRequest
result * milvuspb . GetStatisticsResponse
Condition
collectionName string
partitionNames [ ] string
// partition ids that are loaded into query node, require get statistics from QueryNode
loadedPartitionIDs [ ] UniqueID
// partition ids that are not loaded into query node, require get statistics from DataCoord
unloadedPartitionIDs [ ] UniqueID
ctx context . Context
dc types . DataCoord
tr * timerecord . TimeRecorder
toReduceResults [ ] * internalpb . GetStatisticsResponse
fromDataCoord bool
fromQueryNode bool
// if query from shard
* internalpb . GetStatisticsRequest
qc types . QueryCoord
resultBuf chan * internalpb . GetStatisticsResponse
statisticShardPolicy pickShardPolicy
shardMgr * shardClientMgr
}
func ( g * getStatisticsTask ) TraceCtx ( ) context . Context {
return g . ctx
}
func ( g * getStatisticsTask ) ID ( ) UniqueID {
return g . Base . MsgID
}
func ( g * getStatisticsTask ) SetID ( uid UniqueID ) {
g . Base . MsgID = uid
}
func ( g * getStatisticsTask ) Name ( ) string {
return GetPartitionStatisticsTaskName
}
func ( g * getStatisticsTask ) Type ( ) commonpb . MsgType {
return g . Base . MsgType
}
func ( g * getStatisticsTask ) BeginTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getStatisticsTask ) EndTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getStatisticsTask ) SetTs ( ts Timestamp ) {
g . Base . Timestamp = ts
}
func ( g * getStatisticsTask ) OnEnqueue ( ) error {
g . GetStatisticsRequest = & internalpb . GetStatisticsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBase ( ) ,
2022-07-18 01:58:28 +00:00
}
return nil
}
func ( g * getStatisticsTask ) PreExecute ( ctx context . Context ) error {
g . DbID = 0
g . collectionName = g . request . GetCollectionName ( )
g . partitionNames = g . request . GetPartitionNames ( )
// g.TravelTimestamp = g.request.GetTravelTimestamp()
g . GuaranteeTimestamp = g . request . GetGuaranteeTimestamp ( )
sp , ctx := trace . StartSpanFromContextWithOperationName ( g . TraceCtx ( ) , "Proxy-GetStatistics-PreExecute" )
defer sp . Finish ( )
if g . statisticShardPolicy == nil {
g . statisticShardPolicy = mergeRoundRobinPolicy
}
// TODO: Maybe we should create a new MsgType: GetStatistics?
g . Base . MsgType = commonpb . MsgType_GetPartitionStatistics
g . Base . SourceID = Params . ProxyCfg . GetNodeID ( )
collID , err := globalMetaCache . GetCollectionID ( ctx , g . collectionName )
if err != nil { // err is not nil if collection not exists
return err
}
partIDs , err := getPartitionIDs ( ctx , g . collectionName , g . partitionNames )
if err != nil { // err is not nil if partition not exists
return err
}
g . GetStatisticsRequest . DbID = 0 // todo
g . GetStatisticsRequest . CollectionID = collID
if g . TravelTimestamp == 0 {
g . TravelTimestamp = g . BeginTs ( )
}
err = validateTravelTimestamp ( g . TravelTimestamp , g . BeginTs ( ) )
if err != nil {
return err
}
g . GuaranteeTimestamp = parseGuaranteeTs ( g . GuaranteeTimestamp , g . BeginTs ( ) )
deadline , ok := g . TraceCtx ( ) . Deadline ( )
if ok {
g . TimeoutTimestamp = tsoutil . ComposeTSByTime ( deadline , 0 )
}
// check if collection/partitions are loaded into query node
loaded , unloaded , err := checkFullLoaded ( ctx , g . qc , g . collectionName , partIDs )
if err != nil {
g . fromDataCoord = true
g . unloadedPartitionIDs = partIDs
log . Debug ( "checkFullLoaded failed, try get statistics from DataCoord" , zap . Int64 ( "msgID" , g . ID ( ) ) , zap . Error ( err ) )
return nil
}
if len ( unloaded ) > 0 {
g . fromDataCoord = true
g . unloadedPartitionIDs = unloaded
log . Debug ( "some partitions has not been loaded, try get statistics from DataCoord" , zap . Int64 ( "msgID" , g . ID ( ) ) , zap . String ( "collection" , g . collectionName ) , zap . Int64s ( "unloaded partitions" , unloaded ) , zap . Error ( err ) )
}
if len ( loaded ) > 0 {
g . fromQueryNode = true
g . loadedPartitionIDs = loaded
log . Debug ( "some partitions has been loaded, try get statistics from QueryNode" , zap . Int64 ( "msgID" , g . ID ( ) ) , zap . String ( "collection" , g . collectionName ) , zap . Int64s ( "loaded partitions" , loaded ) , zap . Error ( err ) )
}
return nil
}
func ( g * getStatisticsTask ) Execute ( ctx context . Context ) error {
sp , ctx := trace . StartSpanFromContextWithOperationName ( g . TraceCtx ( ) , "Proxy-GetStatistics-Execute" )
defer sp . Finish ( )
if g . fromQueryNode {
// if request get statistics of collection which is full loaded into query node
// then we need not pass partition ids params
if len ( g . request . GetPartitionNames ( ) ) == 0 && len ( g . unloadedPartitionIDs ) == 0 {
g . loadedPartitionIDs = [ ] UniqueID { }
}
err := g . getStatisticsFromQueryNode ( ctx )
if err != nil {
return err
}
log . Debug ( "get collection statistics from QueryNode execute done" , zap . Int64 ( "msgID" , g . ID ( ) ) )
}
if g . fromDataCoord {
err := g . getStatisticsFromDataCoord ( ctx )
if err != nil {
return err
}
log . Debug ( "get collection statistics from DataCoord execute done" , zap . Int64 ( "msgID" , g . ID ( ) ) )
}
return nil
}
func ( g * getStatisticsTask ) PostExecute ( ctx context . Context ) error {
sp , _ := trace . StartSpanFromContextWithOperationName ( g . TraceCtx ( ) , "Proxy-GetStatistic-PostExecute" )
defer sp . Finish ( )
tr := timerecord . NewTimeRecorder ( "getStatisticTask PostExecute" )
defer func ( ) {
tr . Elapse ( "done" )
} ( )
if g . fromQueryNode {
select {
case <- g . TraceCtx ( ) . Done ( ) :
log . Debug ( "wait to finish timeout!" , zap . Int64 ( "msgID" , g . ID ( ) ) )
return nil
default :
log . Debug ( "all get statistics are finished or canceled" , zap . Int64 ( "msgID" , g . ID ( ) ) )
close ( g . resultBuf )
for res := range g . resultBuf {
g . toReduceResults = append ( g . toReduceResults , res )
log . Debug ( "proxy receives one get statistic response" , zap . Int64 ( "sourceID" , res . GetBase ( ) . GetSourceID ( ) ) , zap . Int64 ( "msgID" , g . ID ( ) ) )
}
}
}
validResults , err := decodeGetStatisticsResults ( g . toReduceResults )
if err != nil {
return err
}
result , err := reduceStatisticResponse ( validResults )
if err != nil {
return err
}
g . result = & milvuspb . GetStatisticsResponse {
Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_Success } ,
Stats : result ,
}
log . Info ( "get statistics post execute done" , zap . Int64 ( "msgID" , g . ID ( ) ) , zap . Any ( "result" , result ) )
return nil
}
func ( g * getStatisticsTask ) getStatisticsFromDataCoord ( ctx context . Context ) error {
collID := g . CollectionID
partIDs := g . unloadedPartitionIDs
req := & datapb . GetPartitionStatisticsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBaseCopy (
g . Base ,
commonpbutil . WithMsgType ( commonpb . MsgType_GetPartitionStatistics ) ,
) ,
2022-07-18 01:58:28 +00:00
CollectionID : collID ,
PartitionIDs : partIDs ,
}
result , err := g . dc . GetPartitionStatistics ( ctx , req )
if err != nil {
return err
}
if result . Status . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( result . Status . Reason )
}
g . toReduceResults = append ( g . toReduceResults , & internalpb . GetStatisticsResponse {
Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_Success } ,
Stats : result . Stats ,
} )
return nil
}
func ( g * getStatisticsTask ) getStatisticsFromQueryNode ( ctx context . Context ) error {
g . GetStatisticsRequest . PartitionIDs = g . loadedPartitionIDs
executeGetStatistics := func ( withCache bool ) error {
shard2Leaders , err := globalMetaCache . GetShards ( ctx , withCache , g . collectionName )
if err != nil {
return err
}
g . resultBuf = make ( chan * internalpb . GetStatisticsResponse , len ( shard2Leaders ) )
if err := g . statisticShardPolicy ( ctx , g . shardMgr , g . getStatisticsShard , shard2Leaders ) ; err != nil {
log . Warn ( "failed to get statistics" , zap . Int64 ( "msgID" , g . ID ( ) ) , zap . Error ( err ) , zap . String ( "Shards" , fmt . Sprintf ( "%v" , shard2Leaders ) ) )
return err
}
return nil
}
err := executeGetStatistics ( WithCache )
if errors . Is ( err , errInvalidShardLeaders ) || funcutil . IsGrpcErr ( err ) || errors . Is ( err , grpcclient . ErrConnect ) {
log . Warn ( "first get statistics failed, updating shard leader caches and retry" ,
zap . Int64 ( "msgID" , g . ID ( ) ) , zap . Error ( err ) )
err = executeGetStatistics ( WithoutCache )
}
if err != nil {
return fmt . Errorf ( "fail to get statistics on all shard leaders, err=%w" , err )
}
return nil
}
func ( g * getStatisticsTask ) getStatisticsShard ( ctx context . Context , nodeID int64 , qn types . QueryNode , channelIDs [ ] string ) error {
req := & querypb . GetStatisticsRequest {
Req : g . GetStatisticsRequest ,
DmlChannels : channelIDs ,
Scope : querypb . DataScope_All ,
}
result , err := qn . GetStatistics ( ctx , req )
if err != nil {
log . Warn ( "QueryNode statistic return error" , zap . Int64 ( "msgID" , g . ID ( ) ) ,
zap . Int64 ( "nodeID" , nodeID ) , zap . Strings ( "channels" , channelIDs ) , zap . Error ( err ) )
return err
}
if result . GetStatus ( ) . GetErrorCode ( ) == commonpb . ErrorCode_NotShardLeader {
log . Warn ( "QueryNode is not shardLeader" , zap . Int64 ( "msgID" , g . ID ( ) ) ,
zap . Int64 ( "nodeID" , nodeID ) , zap . Strings ( "channels" , channelIDs ) )
return errInvalidShardLeaders
}
if result . GetStatus ( ) . GetErrorCode ( ) != commonpb . ErrorCode_Success {
log . Warn ( "QueryNode statistic result error" , zap . Int64 ( "msgID" , g . ID ( ) ) ,
zap . Int64 ( "nodeID" , nodeID ) , zap . String ( "reason" , result . GetStatus ( ) . GetReason ( ) ) )
return fmt . Errorf ( "fail to get statistic, QueryNode ID=%d, reason=%s" , nodeID , result . GetStatus ( ) . GetReason ( ) )
}
g . resultBuf <- result
return nil
}
// checkFullLoaded check if collection / partition was fully loaded into QueryNode
// return loaded partitions, unloaded partitions and error
func checkFullLoaded ( ctx context . Context , qc types . QueryCoord , collectionName string , searchPartitionIDs [ ] UniqueID ) ( [ ] UniqueID , [ ] UniqueID , error ) {
var loadedPartitionIDs [ ] UniqueID
var unloadPartitionIDs [ ] UniqueID
// TODO: Consider to check if partition loaded from cache to save rpc.
info , err := globalMetaCache . GetCollectionInfo ( ctx , collectionName )
if err != nil {
return nil , nil , fmt . Errorf ( "GetCollectionInfo failed, collection = %s, err = %s" , collectionName , err )
}
// If request to search partitions
if len ( searchPartitionIDs ) > 0 {
resp , err := qc . ShowPartitions ( ctx , & querypb . ShowPartitionsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithMsgType ( commonpb . MsgType_ShowPartitions ) ,
commonpbutil . WithSourceID ( Params . ProxyCfg . GetNodeID ( ) ) ,
) ,
2022-07-18 01:58:28 +00:00
CollectionID : info . collID ,
PartitionIDs : searchPartitionIDs ,
} )
if err != nil {
return nil , nil , fmt . Errorf ( "showPartitions failed, collection = %s, partitionIDs = %v, err = %s" , collectionName , searchPartitionIDs , err )
}
if resp . Status . ErrorCode != commonpb . ErrorCode_Success {
return nil , nil , fmt . Errorf ( "showPartitions failed, collection = %s, partitionIDs = %v, reason = %s" , collectionName , searchPartitionIDs , resp . GetStatus ( ) . GetReason ( ) )
}
for i , percentage := range resp . GetInMemoryPercentages ( ) {
if percentage >= 100 {
loadedPartitionIDs = append ( loadedPartitionIDs , resp . GetPartitionIDs ( ) [ i ] )
}
unloadPartitionIDs = append ( unloadPartitionIDs , resp . GetPartitionIDs ( ) [ i ] )
}
return loadedPartitionIDs , unloadPartitionIDs , nil
}
// If request to search collection
resp , err := qc . ShowPartitions ( ctx , & querypb . ShowPartitionsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithMsgType ( commonpb . MsgType_ShowPartitions ) ,
commonpbutil . WithSourceID ( Params . ProxyCfg . GetNodeID ( ) ) ,
) ,
2022-07-18 01:58:28 +00:00
CollectionID : info . collID ,
} )
if err != nil {
return nil , nil , fmt . Errorf ( "showPartitions failed, collection = %s, partitionIDs = %v, err = %s" , collectionName , searchPartitionIDs , err )
}
if resp . Status . ErrorCode != commonpb . ErrorCode_Success {
return nil , nil , fmt . Errorf ( "showPartitions failed, collection = %s, partitionIDs = %v, reason = %s" , collectionName , searchPartitionIDs , resp . GetStatus ( ) . GetReason ( ) )
}
loadedMap := make ( map [ UniqueID ] bool )
for i , percentage := range resp . GetInMemoryPercentages ( ) {
if percentage >= 100 {
loadedMap [ resp . GetPartitionIDs ( ) [ i ] ] = true
loadedPartitionIDs = append ( loadedPartitionIDs , resp . GetPartitionIDs ( ) [ i ] )
}
}
for _ , partInfo := range info . partInfo {
if _ , ok := loadedMap [ partInfo . partitionID ] ; ! ok {
unloadPartitionIDs = append ( unloadPartitionIDs , partInfo . partitionID )
}
}
return loadedPartitionIDs , unloadPartitionIDs , nil
}
func decodeGetStatisticsResults ( results [ ] * internalpb . GetStatisticsResponse ) ( [ ] map [ string ] string , error ) {
ret := make ( [ ] map [ string ] string , len ( results ) )
for i , result := range results {
if result . GetStatus ( ) . GetErrorCode ( ) != commonpb . ErrorCode_Success {
return nil , fmt . Errorf ( "fail to decode result, reason=%s" , result . GetStatus ( ) . GetReason ( ) )
}
ret [ i ] = funcutil . KeyValuePair2Map ( result . GetStats ( ) )
}
return ret , nil
}
func reduceStatisticResponse ( results [ ] map [ string ] string ) ( [ ] * commonpb . KeyValuePair , error ) {
mergedResults := map [ string ] interface { } {
"row_count" : int64 ( 0 ) ,
}
fieldMethod := map [ string ] func ( string ) error {
"row_count" : func ( str string ) error {
count , err := strconv . ParseInt ( str , 10 , 64 )
if err != nil {
return err
}
mergedResults [ "row_count" ] = mergedResults [ "row_count" ] . ( int64 ) + count
return nil
} ,
}
err := funcutil . MapReduce ( results , fieldMethod )
stringMap := make ( map [ string ] string )
for k , v := range mergedResults {
stringMap [ k ] = fmt . Sprint ( v )
}
return funcutil . Map2KeyValuePair ( stringMap ) , err
}
// implement Task
// try to compatible with old API (getCollectionStatistics & getPartitionStatistics)
//type getPartitionStatisticsTask struct {
// getStatisticsTask
// request *milvuspb.GetPartitionStatisticsRequest
// result *milvuspb.GetPartitionStatisticsResponse
//}
//
//func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error {
// g.getStatisticsTask.DbID = 0
// g.getStatisticsTask.collectionName = g.request.GetCollectionName()
// g.getStatisticsTask.partitionNames = []string{g.request.GetPartitionName()}
// // g.TravelTimestamp = g.request.GetTravelTimestamp()
// // g.GuaranteeTimestamp = g.request.GetGuaranteeTimestamp()
// return g.getStatisticsTask.PreExecute(ctx)
//}
//
//func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error {
// if g.fromQueryNode {
// err := g.getStatisticsTask.Execute(ctx)
// if err != nil {
// return err
// }
// log.Debug("get partition statistics from QueryNode execute done", zap.Int64("msgID", g.ID()))
// }
// if g.fromDataCoord {
// collID := g.CollectionID
// partIDs := g.unloadedPartitionIDs
//
// req := &datapb.GetPartitionStatisticsRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_GetPartitionStatistics,
// MsgID: g.Base.MsgID,
// Timestamp: g.Base.Timestamp,
// SourceID: g.Base.SourceID,
// },
// CollectionID: collID,
// PartitionIDs: partIDs,
// }
//
// result, err := g.dc.GetPartitionStatistics(ctx, req)
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
// Stats: result.Stats,
// })
// log.Debug("get partition statistics from DataCoord execute done", zap.Int64("msgID", g.ID()))
// return nil
// }
// return nil
//}
//
//func (g *getPartitionStatisticsTask) PostExecute(ctx context.Context) error {
// err := g.getStatisticsTask.PostExecute(ctx)
// if err != nil {
// return err
// }
// g.result = &milvuspb.GetPartitionStatisticsResponse{
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
// Stats: g.innerResult,
// }
// return nil
//}
//
//type getCollectionStatisticsTask struct {
// getStatisticsTask
// request *milvuspb.GetCollectionStatisticsRequest
// result *milvuspb.GetCollectionStatisticsResponse
//}
//
//func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error {
// g.getStatisticsTask.DbID = 0
// g.getStatisticsTask.collectionName = g.request.GetCollectionName()
// g.getStatisticsTask.partitionNames = []string{}
// // g.TravelTimestamp = g.request.GetTravelTimestamp()
// // g.GuaranteeTimestamp = g.request.GetGuaranteeTimestamp()
// return g.getStatisticsTask.PreExecute(ctx)
//}
//
//func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error {
// if g.fromQueryNode {
// // if you get entire collection, we need to pass partition ids param.
// if len(g.unloadedPartitionIDs) == 0 {
// g.GetStatisticsRequest.PartitionIDs = nil
// }
// err := g.getStatisticsTask.Execute(ctx)
// if err != nil {
// return err
// }
// log.Debug("get collection statistics from QueryNode execute done", zap.Int64("msgID", g.ID()))
// }
// if g.fromDataCoord {
// collID := g.CollectionID
// partIDs := g.unloadedPartitionIDs
//
// // all collection has not been loaded, get statistics from datacoord
// if len(g.GetStatisticsRequest.PartitionIDs) == 0 {
// req := &datapb.GetCollectionStatisticsRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_GetCollectionStatistics,
// MsgID: g.Base.MsgID,
// Timestamp: g.Base.Timestamp,
// SourceID: g.Base.SourceID,
// },
// CollectionID: collID,
// }
//
// result, err := g.dc.GetCollectionStatistics(ctx, req)
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
// Stats: result.Stats,
// })
// } else { // some partitions have been loaded, get some partition statistics from datacoord
// req := &datapb.GetPartitionStatisticsRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_GetPartitionStatistics,
// MsgID: g.Base.MsgID,
// Timestamp: g.Base.Timestamp,
// SourceID: g.Base.SourceID,
// },
// CollectionID: collID,
// PartitionIDs: partIDs,
// }
//
// result, err := g.dc.GetPartitionStatistics(ctx, req)
// if err != nil {
// return err
// }
// if result.Status.ErrorCode != commonpb.ErrorCode_Success {
// return errors.New(result.Status.Reason)
// }
// g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
// Stats: result.Stats,
// })
// }
// log.Debug("get collection statistics from DataCoord execute done", zap.Int64("msgID", g.ID()))
// return nil
// }
// return nil
//}
//
//func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error {
// err := g.getStatisticsTask.PostExecute(ctx)
// if err != nil {
// return err
// }
// g.result = &milvuspb.GetCollectionStatisticsResponse{
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
// Stats: g.innerResult,
// }
// return nil
//}
// old version of get statistics
// please remove it after getStatisticsTask below is stable
type getCollectionStatisticsTask struct {
Condition
* milvuspb . GetCollectionStatisticsRequest
ctx context . Context
dataCoord types . DataCoord
result * milvuspb . GetCollectionStatisticsResponse
collectionID UniqueID
}
func ( g * getCollectionStatisticsTask ) TraceCtx ( ) context . Context {
return g . ctx
}
func ( g * getCollectionStatisticsTask ) ID ( ) UniqueID {
return g . Base . MsgID
}
func ( g * getCollectionStatisticsTask ) SetID ( uid UniqueID ) {
g . Base . MsgID = uid
}
func ( g * getCollectionStatisticsTask ) Name ( ) string {
return GetCollectionStatisticsTaskName
}
func ( g * getCollectionStatisticsTask ) Type ( ) commonpb . MsgType {
return g . Base . MsgType
}
func ( g * getCollectionStatisticsTask ) BeginTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getCollectionStatisticsTask ) EndTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getCollectionStatisticsTask ) SetTs ( ts Timestamp ) {
g . Base . Timestamp = ts
}
func ( g * getCollectionStatisticsTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
g . Base = commonpbutil . NewMsgBase ( )
2022-07-18 01:58:28 +00:00
return nil
}
func ( g * getCollectionStatisticsTask ) PreExecute ( ctx context . Context ) error {
g . Base . MsgType = commonpb . MsgType_GetCollectionStatistics
g . Base . SourceID = Params . ProxyCfg . GetNodeID ( )
return nil
}
func ( g * getCollectionStatisticsTask ) Execute ( ctx context . Context ) error {
collID , err := globalMetaCache . GetCollectionID ( ctx , g . CollectionName )
if err != nil {
return err
}
g . collectionID = collID
req := & datapb . GetCollectionStatisticsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBaseCopy (
g . Base ,
commonpbutil . WithMsgType ( commonpb . MsgType_GetCollectionStatistics ) ,
) ,
2022-07-18 01:58:28 +00:00
CollectionID : collID ,
}
2022-09-16 03:12:47 +00:00
result , err := g . dataCoord . GetCollectionStatistics ( ctx , req )
if err != nil {
return err
2022-07-18 01:58:28 +00:00
}
if result . Status . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( result . Status . Reason )
}
g . result = & milvuspb . GetCollectionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
Stats : result . Stats ,
}
return nil
}
func ( g * getCollectionStatisticsTask ) PostExecute ( ctx context . Context ) error {
return nil
}
type getPartitionStatisticsTask struct {
Condition
* milvuspb . GetPartitionStatisticsRequest
ctx context . Context
dataCoord types . DataCoord
result * milvuspb . GetPartitionStatisticsResponse
collectionID UniqueID
}
func ( g * getPartitionStatisticsTask ) TraceCtx ( ) context . Context {
return g . ctx
}
func ( g * getPartitionStatisticsTask ) ID ( ) UniqueID {
return g . Base . MsgID
}
func ( g * getPartitionStatisticsTask ) SetID ( uid UniqueID ) {
g . Base . MsgID = uid
}
func ( g * getPartitionStatisticsTask ) Name ( ) string {
return GetPartitionStatisticsTaskName
}
func ( g * getPartitionStatisticsTask ) Type ( ) commonpb . MsgType {
return g . Base . MsgType
}
func ( g * getPartitionStatisticsTask ) BeginTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getPartitionStatisticsTask ) EndTs ( ) Timestamp {
return g . Base . Timestamp
}
func ( g * getPartitionStatisticsTask ) SetTs ( ts Timestamp ) {
g . Base . Timestamp = ts
}
func ( g * getPartitionStatisticsTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
g . Base = commonpbutil . NewMsgBase ( )
2022-07-18 01:58:28 +00:00
return nil
}
func ( g * getPartitionStatisticsTask ) PreExecute ( ctx context . Context ) error {
g . Base . MsgType = commonpb . MsgType_GetPartitionStatistics
g . Base . SourceID = Params . ProxyCfg . GetNodeID ( )
return nil
}
func ( g * getPartitionStatisticsTask ) Execute ( ctx context . Context ) error {
collID , err := globalMetaCache . GetCollectionID ( ctx , g . CollectionName )
if err != nil {
return err
}
g . collectionID = collID
partitionID , err := globalMetaCache . GetPartitionID ( ctx , g . CollectionName , g . PartitionName )
if err != nil {
return err
}
req := & datapb . GetPartitionStatisticsRequest {
2022-10-19 02:01:26 +00:00
Base : commonpbutil . NewMsgBaseCopy (
g . Base ,
commonpbutil . WithMsgType ( commonpb . MsgType_GetCollectionStatistics ) ,
) ,
2022-07-18 01:58:28 +00:00
CollectionID : collID ,
PartitionIDs : [ ] int64 { partitionID } ,
}
result , _ := g . dataCoord . GetPartitionStatistics ( ctx , req )
if result == nil {
return errors . New ( "get partition statistics resp is nil" )
}
if result . Status . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( result . Status . Reason )
}
g . result = & milvuspb . GetPartitionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
Stats : result . Stats ,
}
return nil
}
func ( g * getPartitionStatisticsTask ) PostExecute ( ctx context . Context ) error {
return nil
}