diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 44e4c214d6..7d65d67973 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -75,6 +75,9 @@ type Cluster interface { allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) error allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error + assignNodesToReplicas(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaIds []int64) error + assignSegmentsToReplica(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaID int64, wait bool) error + getSessionVersion() int64 getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse @@ -700,3 +703,13 @@ func (c *queryNodeCluster) allocateSegmentsToQueryNode(ctx context.Context, reqs func (c *queryNodeCluster) allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64) error { return c.channelAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs) } + +func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaIds []int64) error { + // todo(yah01) + return nil +} + +func (c *queryNodeCluster) assignSegmentsToReplica(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, replicaID int64, wait bool) error { + // todo(yah01) + return nil +} diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index c6a16ae0a3..94fe5a1b20 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -90,6 +90,11 @@ type Meta interface { sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo + + addReplica(replicaNum int32, collectionID int64, partitionIds []int64) error + setReplicaInfo(replicaID int64, info *querypb.ReplicaInfo) error + getReplicaByID(replicaID int64) (*querypb.ReplicaInfo, error) + getReplicasByNodeID(nodeID int64, collectionID int64) (*querypb.ReplicaInfo, error) } // MetaReplica records the current load information on all querynodes @@ -114,6 +119,7 @@ type MetaReplica struct { segmentsInfo *segmentsInfo //partitionStates map[UniqueID]*querypb.PartitionStates + // replicas *ReplicaInfos } func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAllocator func() (UniqueID, error)) (Meta, error) { @@ -138,6 +144,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll queryStreams: queryMsgStream, segmentsInfo: newSegmentsInfo(kv), + // replicas: NewReplicaInfos(), } err := m.reloadFromKV() @@ -1014,6 +1021,52 @@ func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.Unsubscr return unsubscribeChannelInfo } +func (m *MetaReplica) addReplica(replicaNum int32, collectionID int64, partitionIds []int64) error { + // collection, err := meta.getCollectionInfoByID(collectionId) + // if err != nil { + // return err + // } + + // replicas := make([]*querypb.ReplicaInfo, 0, replicaNum) + + // for i := 0; i < int(replicaNum); i++ { + // replicaId, err := meta.idAllocator() + // if err != nil { + // return err + // } + + // replicas = append(replicas, &querypb.ReplicaInfo{ + // ReplicaId: replicaId, + // CollectionId: collectionId, + // PartitionIds: partitionIds, + // ShardReplicas: make([]*querypb.ShardReplica, 0), + // }) + // } + + // for _, replica := range replicas { + // meta.replicas.Insert(replica.ReplicaId, replica) + // } + + return nil + // todo(yah01) +} + +func (m *MetaReplica) setReplicaInfo(replicaID int64, info *querypb.ReplicaInfo) error { + return nil + // todo(yah01) +} + +func (m *MetaReplica) getReplicaByID(replicaID int64) (*querypb.ReplicaInfo, error) { + return nil, nil + // todo(yah01) + // return meta.replicas.Get(replicaID) +} + +func (m *MetaReplica) getReplicasByNodeID(nodeID int64, collectionID int64) (*querypb.ReplicaInfo, error) { + return nil, nil + // todo(yah01) +} + //func (m *MetaReplica) printMeta() { // m.RLock() // defer m.RUnlock()