2022-10-11 03:39:22 +00:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
package utils
|
|
|
|
|
|
|
|
import (
|
2024-08-20 02:30:55 +00:00
|
|
|
"strings"
|
|
|
|
|
2023-02-26 03:31:49 +00:00
|
|
|
"github.com/cockroachdb/errors"
|
2023-04-06 11:14:32 +00:00
|
|
|
"github.com/samber/lo"
|
|
|
|
"go.uber.org/zap"
|
2023-02-26 03:31:49 +00:00
|
|
|
|
2022-09-15 10:48:32 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
2023-04-06 11:14:32 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
2023-07-17 06:59:34 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
2024-04-04 20:57:16 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
2023-01-30 02:19:48 +00:00
|
|
|
)
|
|
|
|
|
2023-03-20 06:55:57 +00:00
|
|
|
func GetPartitions(collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) {
|
2022-09-15 10:48:32 +00:00
|
|
|
collection := collectionMgr.GetCollection(collectionID)
|
|
|
|
if collection != nil {
|
2023-03-20 06:55:57 +00:00
|
|
|
partitions := collectionMgr.GetPartitionsByCollection(collectionID)
|
|
|
|
if partitions != nil {
|
|
|
|
return lo.Map(partitions, func(partition *meta.Partition, i int) int64 {
|
|
|
|
return partition.PartitionID
|
|
|
|
}), nil
|
|
|
|
}
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|
|
|
|
|
2024-04-02 12:35:14 +00:00
|
|
|
return nil, merr.WrapErrCollectionNotLoaded(collectionID)
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// GroupNodesByReplica groups nodes by replica,
|
|
|
|
// returns ReplicaID -> NodeIDs
|
|
|
|
func GroupNodesByReplica(replicaMgr *meta.ReplicaManager, collectionID int64, nodes []int64) map[int64][]int64 {
|
|
|
|
ret := make(map[int64][]int64)
|
|
|
|
replicas := replicaMgr.GetByCollection(collectionID)
|
|
|
|
for _, replica := range replicas {
|
|
|
|
for _, node := range nodes {
|
2023-01-30 02:19:48 +00:00
|
|
|
if replica.Contains(node) {
|
2024-04-04 20:57:16 +00:00
|
|
|
ret[replica.GetID()] = append(ret[replica.GetID()], node)
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// GroupPartitionsByCollection groups partitions by collection,
|
|
|
|
// returns CollectionID -> Partitions
|
|
|
|
func GroupPartitionsByCollection(partitions []*meta.Partition) map[int64][]*meta.Partition {
|
|
|
|
ret := make(map[int64][]*meta.Partition, 0)
|
|
|
|
for _, partition := range partitions {
|
|
|
|
collection := partition.GetCollectionID()
|
|
|
|
ret[collection] = append(ret[collection], partition)
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// GroupSegmentsByReplica groups segments by replica,
|
|
|
|
// returns ReplicaID -> Segments
|
|
|
|
func GroupSegmentsByReplica(replicaMgr *meta.ReplicaManager, collectionID int64, segments []*meta.Segment) map[int64][]*meta.Segment {
|
|
|
|
ret := make(map[int64][]*meta.Segment)
|
|
|
|
replicas := replicaMgr.GetByCollection(collectionID)
|
|
|
|
for _, replica := range replicas {
|
|
|
|
for _, segment := range segments {
|
2023-01-30 02:19:48 +00:00
|
|
|
if replica.Contains(segment.Node) {
|
2024-04-04 20:57:16 +00:00
|
|
|
ret[replica.GetID()] = append(ret[replica.GetID()], segment)
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
// RecoverReplicaOfCollection recovers all replica of collection with latest resource group.
|
|
|
|
func RecoverReplicaOfCollection(m *meta.Meta, collectionID typeutil.UniqueID) {
|
|
|
|
logger := log.With(zap.Int64("collectionID", collectionID))
|
|
|
|
rgNames := m.ReplicaManager.GetResourceGroupByCollection(collectionID)
|
|
|
|
if rgNames.Len() == 0 {
|
|
|
|
logger.Error("no resource group found for collection", zap.Int64("collectionID", collectionID))
|
2023-03-08 10:57:51 +00:00
|
|
|
return
|
|
|
|
}
|
2024-04-04 20:57:16 +00:00
|
|
|
rgs, err := m.ResourceManager.GetNodesOfMultiRG(rgNames.Collect())
|
2023-03-08 10:57:51 +00:00
|
|
|
if err != nil {
|
2024-04-04 20:57:16 +00:00
|
|
|
logger.Error("unreachable code as expected, fail to get resource group for replica", zap.Error(err))
|
2023-03-08 10:57:51 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
if err := m.ReplicaManager.RecoverNodesInCollection(collectionID, rgs); err != nil {
|
|
|
|
logger.Warn("fail to set available nodes in replica", zap.Error(err))
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
// RecoverAllCollectionrecovers all replica of all collection in resource group.
|
|
|
|
func RecoverAllCollection(m *meta.Meta) {
|
|
|
|
for _, collection := range m.CollectionManager.GetAll() {
|
|
|
|
RecoverReplicaOfCollection(m, collection)
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) {
|
|
|
|
if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) {
|
2024-08-20 02:30:55 +00:00
|
|
|
return nil, errors.Errorf(
|
|
|
|
"replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ","))
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
replicaNumInRG := make(map[string]int)
|
2023-01-30 02:19:48 +00:00
|
|
|
if len(resourceGroups) == 0 {
|
2024-04-04 20:57:16 +00:00
|
|
|
// All replicas should be spawned in default resource group.
|
|
|
|
replicaNumInRG[meta.DefaultResourceGroupName] = int(replicaNumber)
|
|
|
|
} else if len(resourceGroups) == 1 {
|
|
|
|
// All replicas should be spawned in the given resource group.
|
|
|
|
replicaNumInRG[resourceGroups[0]] = int(replicaNumber)
|
|
|
|
} else {
|
|
|
|
// replicas should be spawned in different resource groups one by one.
|
|
|
|
for _, rgName := range resourceGroups {
|
|
|
|
replicaNumInRG[rgName] += 1
|
|
|
|
}
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
// TODO: !!!Warning, ResourceManager and ReplicaManager doesn't protected with each other in concurrent operation.
|
|
|
|
// 1. replica1 got rg1's node snapshot but doesn't spawn finished.
|
|
|
|
// 2. rg1 is removed.
|
|
|
|
// 3. replica1 spawn finished, but cannot find related resource group.
|
|
|
|
for rgName, num := range replicaNumInRG {
|
|
|
|
if !m.ContainResourceGroup(rgName) {
|
2024-08-20 02:30:55 +00:00
|
|
|
return nil, merr.WrapErrResourceGroupNotFound(rgName)
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
2024-04-04 20:57:16 +00:00
|
|
|
nodes, err := m.ResourceManager.GetNodes(rgName)
|
2023-01-30 02:19:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-04-04 20:57:16 +00:00
|
|
|
if num > len(nodes) {
|
2024-08-20 02:30:55 +00:00
|
|
|
err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num)
|
|
|
|
log.Warn("failed to check resource group", zap.Error(err))
|
|
|
|
return nil, err
|
2023-01-30 02:19:48 +00:00
|
|
|
}
|
|
|
|
}
|
2024-04-04 20:57:16 +00:00
|
|
|
return replicaNumInRG, nil
|
|
|
|
}
|
2023-01-30 02:19:48 +00:00
|
|
|
|
2024-04-04 20:57:16 +00:00
|
|
|
// SpawnReplicasWithRG spawns replicas in rgs one by one for given collection.
|
2024-05-10 09:27:31 +00:00
|
|
|
func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string, replicaNumber int32, channels []string) ([]*meta.Replica, error) {
|
2024-04-04 20:57:16 +00:00
|
|
|
replicaNumInRG, err := checkResourceGroup(m, resourceGroups, replicaNumber)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Spawn it in replica manager.
|
2024-05-10 09:27:31 +00:00
|
|
|
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumInRG, channels)
|
2024-04-04 20:57:16 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// Active recover it.
|
|
|
|
RecoverReplicaOfCollection(m, collection)
|
|
|
|
return replicas, nil
|
2022-09-15 10:48:32 +00:00
|
|
|
}
|