Add replica interfaces for Meta and Cluster (#16190)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/16194/head
yah01 2022-03-25 14:03:25 +08:00 committed by GitHub
parent f4ebd3a9ce
commit 227889b0d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 0 deletions

View File

@ -75,6 +75,9 @@ type Cluster interface {
allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) error
allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error
assignNodesToReplicas(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaIds []int64) error
assignSegmentsToReplica(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaID int64, wait bool) error
getSessionVersion() int64
getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse
@ -700,3 +703,13 @@ func (c *queryNodeCluster) allocateSegmentsToQueryNode(ctx context.Context, reqs
func (c *queryNodeCluster) allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error {
return c.channelAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs)
}
func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaIds []int64) error {
// todo(yah01)
return nil
}
func (c *queryNodeCluster) assignSegmentsToReplica(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaID int64, wait bool) error {
// todo(yah01)
return nil
}

View File

@ -90,6 +90,11 @@ type Meta interface {
sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error)
getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo
addReplica(replicaNum int32, collectionID int64, partitionIds []int64) error
setReplicaInfo(replicaID int64, info *querypb.ReplicaInfo) error
getReplicaByID(replicaID int64) (*querypb.ReplicaInfo, error)
getReplicasByNodeID(nodeID int64, collectionID int64) (*querypb.ReplicaInfo, error)
}
// MetaReplica records the current load information on all querynodes
@ -114,6 +119,7 @@ type MetaReplica struct {
segmentsInfo *segmentsInfo
//partitionStates map[UniqueID]*querypb.PartitionStates
// replicas *ReplicaInfos
}
func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAllocator func() (UniqueID, error)) (Meta, error) {
@ -138,6 +144,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
queryStreams: queryMsgStream,
segmentsInfo: newSegmentsInfo(kv),
// replicas: NewReplicaInfos(),
}
err := m.reloadFromKV()
@ -1014,6 +1021,52 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr
return unsubscribeChannelInfo
}
func (m *MetaReplica) addReplica(replicaNum int32, collectionID int64, partitionIds []int64) error {
// collection, err := meta.getCollectionInfoByID(collectionId)
// if err != nil {
// return err
// }
// replicas := make([]*querypb.ReplicaInfo, 0, replicaNum)
// for i := 0; i < int(replicaNum); i++ {
// replicaId, err := meta.idAllocator()
// if err != nil {
// return err
// }
// replicas = append(replicas, &querypb.ReplicaInfo{
// ReplicaId: replicaId,
// CollectionId: collectionId,
// PartitionIds: partitionIds,
// ShardReplicas: make([]*querypb.ShardReplica, 0),
// })
// }
// for _, replica := range replicas {
// meta.replicas.Insert(replica.ReplicaId, replica)
// }
return nil
// todo(yah01)
}
func (m *MetaReplica) setReplicaInfo(replicaID int64, info *querypb.ReplicaInfo) error {
return nil
// todo(yah01)
}
func (m *MetaReplica) getReplicaByID(replicaID int64) (*querypb.ReplicaInfo, error) {
return nil, nil
// todo(yah01)
// return meta.replicas.Get(replicaID)
}
func (m *MetaReplica) getReplicasByNodeID(nodeID int64, collectionID int64) (*querypb.ReplicaInfo, error) {
return nil, nil
// todo(yah01)
}
//func (m *MetaReplica) printMeta() {
// m.RLock()
// defer m.RUnlock()