Refactor data coordinator (#6008)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/6013/head^2
sunby 2021-06-23 12:10:12 +08:00 committed by GitHub
parent a493151f70
commit fe8432016d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 12 deletions

View File

@ -11,5 +11,5 @@ approvers:
- scsven
labels:
- component/dataservice
- component/datacoord

View File

@ -40,8 +40,8 @@ func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
resp, err := alloc.rootCoordClient.AllocTimestamp(alloc.ctx, &rootcoordpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Count: 1,
@ -56,8 +56,8 @@ func (alloc *rootCoordAllocator) allocID() (UniqueID, error) {
resp, err := alloc.rootCoordClient.AllocID(alloc.ctx, &rootcoordpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestID,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
Count: 1,

View File

@ -78,7 +78,9 @@ func defaultAssignPolicy() channelAssignPolicy {
return newBalancedAssignPolicy()
}
func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster {
func newCluster(ctx context.Context, dataManager *clusterNodeManager,
sessionManager sessionManager, posProvider positionProvider,
opts ...clusterOption) *cluster {
c := &cluster{
ctx: ctx,
sessionManager: sessionManager,

View File

@ -512,8 +512,8 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
MsgID: -1, // todo
Timestamp: 0, // todo
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
DbName: "",
@ -556,8 +556,8 @@ func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack {
completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
MsgID: 0, // TODO
Timestamp: 0, // TODO
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
SegmentID: segmentID,

View File

@ -20,12 +20,15 @@ type Response interface {
GetStatus() *commonpb.Status
}
var errNilResponse = errors.New("response is nil")
var errUnknownResponseType = errors.New("unknown response type")
func VerifyResponse(response interface{}, err error) error {
if err != nil {
return err
}
if response == nil {
return errors.New("response is nil")
return errNilResponse
}
switch resp := response.(type) {
case Response:
@ -37,7 +40,7 @@ func VerifyResponse(response interface{}, err error) error {
return errors.New(resp.Reason)
}
default:
return errors.New("unknown response type")
return errUnknownResponseType
}
return nil
}