return error before queryCoord ready (#6067)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/6071/head
xige-16 2021-06-24 16:00:15 +08:00 committed by GitHub
parent 133814be8e
commit 710e2ca185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 109 additions and 28 deletions

View File

@ -72,12 +72,22 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
dbID := req.DbID
log.Debug("show collection start", zap.Int64("dbID", dbID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("show collection end with query coordinator not healthy")
return &querypb.ShowCollectionsResponse{
Status: status,
}, err
}
collectionIDs := qc.meta.showCollections()
log.Debug("show collection end", zap.Int64s("collections", collectionIDs))
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: status,
CollectionIDs: collectionIDs,
}, nil
}
@ -90,6 +100,13 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("load collection end with query coordinator not healthy")
return status, err
}
hasCollection := qc.meta.hasCollection(collectionID)
if hasCollection {
@ -132,6 +149,14 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("release collection end with query coordinator not healthy")
return status, err
}
hasCollection := qc.meta.hasCollection(collectionID)
if !hasCollection {
log.Warn("release collection end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID)))
@ -165,22 +190,32 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
collectionID := req.CollectionID
log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("show partition end with query coordinator not healthy")
return &querypb.ShowPartitionsResponse{
Status: status,
}, err
}
partitionIDs, err := qc.meta.showPartitions(collectionID)
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: status,
}, err
}
log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: status,
PartitionIDs: partitionIDs,
}, nil
}
@ -192,6 +227,13 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("load partition end with query coordinator not healthy")
return status, err
}
if len(partitionIDs) == 0 {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
@ -261,6 +303,14 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("release partition end with query coordinator not healthy")
return status, err
}
hasCollection := qc.meta.hasCollection(collectionID)
if !hasCollection {
log.Warn("release partitions end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID)))
@ -299,29 +349,52 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
}
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("createQueryChannel end with query coordinator not healthy")
return &querypb.CreateQueryChannelResponse{
Status: status,
}, err
}
collectionID := req.CollectionID
queryChannel, queryResultChannel := qc.meta.GetQueryChannel(collectionID)
return &querypb.CreateQueryChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: status,
RequestChannel: queryChannel,
ResultChannel: queryResultChannel,
}, nil
}
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("getPartitionStates end with query coordinator not healthy")
return &querypb.GetPartitionStatesResponse{
Status: status,
}, err
}
partitionIDs := req.PartitionIDs
partitionStates := make([]*querypb.PartitionStates, 0)
for _, partitionID := range partitionIDs {
state, err := qc.meta.getPartitionStateByID(partitionID)
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return &querypb.GetPartitionStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: status,
}, err
}
partitionState := &querypb.PartitionStates{
@ -332,14 +405,25 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
}
return &querypb.GetPartitionStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: status,
PartitionDescriptions: partitionStates,
}, nil
}
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("query coordinator is not healthy")
status.Reason = err.Error()
log.Debug("getSegmentInfo end with query coordinator not healthy")
return &querypb.GetSegmentInfoResponse{
Status: status,
}, err
}
totalMemSize := int64(0)
totalNumRows := int64(0)
//TODO::get segment infos from meta
@ -347,11 +431,10 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
//segmentInfos, err := qs.meta.getSegmentInfos(segmentIDs)
segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: status,
}, err
}
for _, info := range segmentInfos {
@ -360,9 +443,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
}
log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
return &querypb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: segmentInfos,
Status: status,
Infos: segmentInfos,
}, nil
}