Modify proto to add GetReplicas() interface (#16328)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/16326/head
yah01 2022-04-01 16:15:29 +08:00 committed by GitHub
parent fd589baca7
commit c005f07ccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 632 additions and 377 deletions

View File

@ -360,6 +360,10 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return nil, nil
}
func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
return nil, nil
}
func (m *MockQueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return nil, nil
}

View File

@ -301,6 +301,20 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
return ret.(*milvuspb.GetMetricsResponse), err
}
// GetReplicas gets the replicas of a certain collection.
func (c *Client) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(querypb.QueryCoordClient).GetReplicas(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*querypb.GetReplicasResponse), err
}
// GetShardLeaders gets the shard leaders of a certain collection.
func (c *Client) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {

View File

@ -108,8 +108,11 @@ func Test_NewClient(t *testing.T) {
r16, err := client.LoadBalance(ctx, nil)
retCheck(retNotNil, r16, err)
r17, err := client.GetShardLeaders(ctx, nil)
r17, err := client.GetReplicas(ctx, nil)
retCheck(retNotNil, r17, err)
r18, err := client.GetShardLeaders(ctx, nil)
retCheck(retNotNil, r18, err)
}
client.grpcClient = &mock.ClientBase{

View File

@ -390,6 +390,11 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
return s.queryCoord.GetMetrics(ctx, req)
}
// GetReplicas returns the shard leaders of a certain collection.
func (s *Server) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
return s.queryCoord.GetReplicas(ctx, req)
}
// GetShardLeaders returns the shard leaders of a certain collection.
func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return s.queryCoord.GetShardLeaders(ctx, req)

View File

@ -48,6 +48,7 @@ type MockQueryCoord struct {
channelResp *querypb.CreateQueryChannelResponse
infoResp *querypb.GetSegmentInfoResponse
metricResp *milvuspb.GetMetricsResponse
replicasResp *querypb.GetReplicasResponse
shardLeadersResp *querypb.GetShardLeadersResponse
}
@ -143,6 +144,10 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return m.metricResp, m.err
}
func (m *MockQueryCoord) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
return m.replicasResp, m.err
}
func (m *MockQueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return m.shardLeadersResp, m.err
}

View File

@ -32,6 +32,7 @@ service QueryCoord {
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+23+--+Multiple+memory+replication+design
rpc GetReplicas(GetReplicasRequest) returns (GetReplicasResponse) {}
rpc GetShardLeaders(GetShardLeadersRequest) returns (GetShardLeadersResponse) {}
}
@ -150,20 +151,31 @@ message GetSegmentInfoResponse {
repeated SegmentInfo infos = 2;
}
message GetReplicasRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
bool with_shard_nodes = 3;
}
message GetReplicasResponse {
common.Status status = 1;
repeated ReplicaInfo replicas = 2;
}
message GetShardLeadersRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
}
message GetShardLeadersResponse {
common.Status status= 1 ;
repeated ShardLeaderList list = 2;
common.Status status = 1;
repeated ShardLeadersList shards = 2;
}
message ShardLeaderList {
message ShardLeadersList { // All leaders of all replicas of one shard
string channel_name = 1;
int64 nodeID = 2;
string address = 3; // 127.0.0.1:9000
repeated int64 node_ids = 2;
repeated string node_addrs = 3;
}
//-----------------query node grpc request and response proto----------------
@ -354,6 +366,24 @@ message CollectionInfo {
repeated int64 released_partitionIDs = 6;
int64 inMemory_percentage = 7;
repeated int64 replica_ids = 8;
int32 replica_number = 9;
}
message ReplicaInfo { // ReplicaGroup
int64 replicaID = 1;
int64 collectionID = 2;
repeated int64 partition_ids = 3; // empty indicates to load collection
repeated ShardReplica shard_replicas = 4;
repeated int64 node_ids = 5; // include leaders
}
message ShardReplica {
int64 leaderID = 1;
string leader_addr = 2; // IP:port
string dm_channel_name = 3;
// optional, DO NOT save it in meta, set it only for GetReplicas()
// if with_shard_nodes is true
repeated int64 node_ids = 4;
}
message UnsubscribeChannels {
@ -379,16 +409,3 @@ message SealedSegmentsChangeInfo {
repeated SegmentChangeInfo infos = 2;
}
message ReplicaInfo { // ReplicaGroup
int64 replica_id = 1;
int64 collection_id = 2;
repeated int64 partition_ids = 3; // empty indicates to load collection
repeated ShardReplica shard_replicas = 4;
repeated int64 node_ids = 5; // include leaders
}
message ShardReplica {
int64 leader = 1;
string dm_channel_name = 2;
}

File diff suppressed because it is too large Load Diff

View File

@ -332,6 +332,24 @@ func (coord *QueryCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMe
}, nil
}
func (coord *QueryCoordMock) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
if !coord.healthy() {
return &querypb.GetReplicasResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unhealthy",
},
}, nil
}
return &querypb.GetReplicasResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "not implemented",
},
}, nil
}
func (coord *QueryCoordMock) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
if !coord.healthy() {
return &querypb.GetShardLeadersResponse{

View File

@ -1016,6 +1016,17 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
return getMetricsResponse, nil
}
// GetReplicas gets replicas of a certain collection
func (qc *QueryCoord) GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error) {
// TODO: to impl
return &querypb.GetReplicasResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Not implemented",
},
}, nil
}
// GetShardLeaders gets shard leaders of a certain collection
func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
// TODO: to impl

View File

@ -303,7 +303,7 @@ func (m *MetaReplica) addCollection(collectionID UniqueID, loadType querypb.Load
PartitionStates: partitionStates,
LoadType: loadType,
Schema: schema,
// ReplicaIds: replicas,
// ReplicaIDs: replicas,
}
err := saveGlobalCollectionInfo(collectionID, newCollection, m.client)
if err != nil {
@ -1032,8 +1032,8 @@ func (m *MetaReplica) generateReplica(collectionID int64, partitionIds []int64)
}
return &querypb.ReplicaInfo{
ReplicaId: id,
CollectionId: collectionID,
ReplicaID: id,
CollectionID: collectionID,
PartitionIds: partitionIds,
ShardReplicas: make([]*querypb.ShardReplica, 0),
NodeIds: make([]int64, 0),
@ -1153,7 +1153,7 @@ func saveReplicaInfo(info *querypb.ReplicaInfo, kv kv.MetaKv) error {
return err
}
key := fmt.Sprintf("%s/%d", ReplicaMetaPrefix, info.ReplicaId)
key := fmt.Sprintf("%s/%d", ReplicaMetaPrefix, info.ReplicaID)
return kv.Save(key, string(infoBytes))
}

View File

@ -55,7 +55,7 @@ func (rep *ReplicaInfos) Insert(info *querypb.ReplicaInfo) {
rep.globalGuard.Lock()
defer rep.globalGuard.Unlock()
old, ok := rep.replicas[info.ReplicaId]
old, ok := rep.replicas[info.ReplicaID]
// This updates ReplicaInfo, not inserts a new one
// No need to update nodeIndex
if ok {
@ -63,7 +63,7 @@ func (rep *ReplicaInfos) Insert(info *querypb.ReplicaInfo) {
return
}
rep.replicas[info.ReplicaId] = info
rep.replicas[info.ReplicaID] = info
for _, nodeID := range info.NodeIds {
replicas, ok := rep.nodeIndex[nodeID]

View File

@ -1169,6 +1169,7 @@ type QueryCoord interface {
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
GetReplicas(ctx context.Context, req *querypb.GetReplicasRequest) (*querypb.GetReplicasResponse, error)
GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error)
}

View File

@ -90,6 +90,10 @@ func (m *QueryCoordClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetri
return &milvuspb.GetMetricsResponse{}, m.Err
}
func (m *QueryCoordClient) GetReplicas(ctx context.Context, in *querypb.GetReplicasRequest, opts ...grpc.CallOption) (*querypb.GetReplicasResponse, error) {
return &querypb.GetReplicasResponse{}, m.Err
}
func (m *QueryCoordClient) GetShardLeaders(ctx context.Context, in *querypb.GetShardLeadersRequest, opts ...grpc.CallOption) (*querypb.GetShardLeadersResponse, error) {
return &querypb.GetShardLeadersResponse{}, m.Err
}