enhance: replica manager enhancement (#31496)

issue: #30647 

- ReplicaManager manage read only node now, and always do persistent of
node distribution of replica.

- All segment/channel checker using ReplicaManager to get read-only node
or read-write node, but not ResourceManager.

- ReplicaManager promise that only apply unique querynode to one replica
in same collection now (replicas in same collection never hold same
querynode at same time).

- ReplicaManager promise that fairly node count assignment policy if
multi replicas of collection is assigned to one resource group.

- Move some parameters check into ReplicaManager to avoid data race.

- Allow transfer replica to resource group that already load replica of
same collection

- Allow transfer node between resource groups that load replica of same
collection

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31926/head
chyezh 2024-04-05 04:57:16 +08:00 committed by GitHub
parent d6d3b01a04
commit a2502bde75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 1962 additions and 638 deletions

View File

@ -153,7 +153,7 @@ type DataCoordCatalog interface {
type QueryCoordCatalog interface {
SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
SavePartition(info ...*querypb.PartitionLoadInfo) error
SaveReplica(replica *querypb.Replica) error
SaveReplica(replicas ...*querypb.Replica) error
GetCollections() ([]*querypb.CollectionLoadInfo, error)
GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
GetReplicas() ([]*querypb.Replica, error)

View File

@ -70,13 +70,17 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
return nil
}
func (s Catalog) SaveReplica(replica *querypb.Replica) error {
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
value, err := proto.Marshal(replica)
if err != nil {
return err
func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error {
kvs := make(map[string]string)
for _, replica := range replicas {
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
value, err := proto.Marshal(replica)
if err != nil {
return err
}
kvs[key] = string(value)
}
return s.cli.Save(key, string(value))
return s.cli.MultiSave(kvs)
}
func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {

View File

@ -720,13 +720,19 @@ func (_c *QueryCoordCatalog_SavePartition_Call) RunAndReturn(run func(...*queryp
return _c
}
// SaveReplica provides a mock function with given fields: replica
func (_m *QueryCoordCatalog) SaveReplica(replica *querypb.Replica) error {
ret := _m.Called(replica)
// SaveReplica provides a mock function with given fields: replicas
func (_m *QueryCoordCatalog) SaveReplica(replicas ...*querypb.Replica) error {
_va := make([]interface{}, len(replicas))
for _i := range replicas {
_va[_i] = replicas[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 error
if rf, ok := ret.Get(0).(func(*querypb.Replica) error); ok {
r0 = rf(replica)
if rf, ok := ret.Get(0).(func(...*querypb.Replica) error); ok {
r0 = rf(replicas...)
} else {
r0 = ret.Error(0)
}
@ -740,14 +746,21 @@ type QueryCoordCatalog_SaveReplica_Call struct {
}
// SaveReplica is a helper method to define mock.On call
// - replica *querypb.Replica
func (_e *QueryCoordCatalog_Expecter) SaveReplica(replica interface{}) *QueryCoordCatalog_SaveReplica_Call {
return &QueryCoordCatalog_SaveReplica_Call{Call: _e.mock.On("SaveReplica", replica)}
// - replicas ...*querypb.Replica
func (_e *QueryCoordCatalog_Expecter) SaveReplica(replicas ...interface{}) *QueryCoordCatalog_SaveReplica_Call {
return &QueryCoordCatalog_SaveReplica_Call{Call: _e.mock.On("SaveReplica",
append([]interface{}{}, replicas...)...)}
}
func (_c *QueryCoordCatalog_SaveReplica_Call) Run(run func(replica *querypb.Replica)) *QueryCoordCatalog_SaveReplica_Call {
func (_c *QueryCoordCatalog_SaveReplica_Call) Run(run func(replicas ...*querypb.Replica)) *QueryCoordCatalog_SaveReplica_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*querypb.Replica))
variadicArgs := make([]*querypb.Replica, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(*querypb.Replica)
}
}
run(variadicArgs...)
})
return _c
}
@ -757,7 +770,7 @@ func (_c *QueryCoordCatalog_SaveReplica_Call) Return(_a0 error) *QueryCoordCatal
return _c
}
func (_c *QueryCoordCatalog_SaveReplica_Call) RunAndReturn(run func(*querypb.Replica) error) *QueryCoordCatalog_SaveReplica_Call {
func (_c *QueryCoordCatalog_SaveReplica_Call) RunAndReturn(run func(...*querypb.Replica) error) *QueryCoordCatalog_SaveReplica_Call {
_c.Call.Return(run)
return _c
}

View File

@ -667,8 +667,10 @@ message PartitionLoadInfo {
message Replica {
int64 ID = 1;
int64 collectionID = 2;
repeated int64 nodes = 3;
repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes.
string resource_group = 4;
repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica.
// can not load new channel or segment on it anymore.
}
enum SyncType {

View File

@ -458,34 +458,36 @@ type MultiTargetBalancer struct {
func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
log := log.With(
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Int64("collection", replica.GetCollectionID()),
zap.Int64("replica id", replica.GetID()),
zap.String("replica group", replica.GetResourceGroup()),
)
nodes := replica.GetNodes()
if len(nodes) == 0 {
if replica.NodesCount() == 0 {
return nil, nil
}
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
onlineNodes := make([]int64, 0)
offlineNodes := make([]int64, 0)
for _, nid := range nodes {
// read only nodes is offline in current replica.
if replica.RONodesCount() > 0 {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes()))
offlineNodes = append(offlineNodes, replica.GetRONodes()...)
}
for _, nid := range replica.GetNodes() {
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err))
continue
} else if isStopping {
offlineNodes = append(offlineNodes, nid)
} else if outboundNodes.Contain(nid) {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid))
offlineNodes = append(offlineNodes, nid)
} else {
onlineNodes = append(onlineNodes, nid)
}
}
if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 {
if len(onlineNodes) == 0 {
// no available nodes to balance
return nil, nil
}
@ -524,8 +526,8 @@ func (b *MultiTargetBalancer) genSegmentPlan(replica *meta.Replica) []SegmentAss
// get segments distribution on replica level and global level
nodeSegments := make(map[int64][]*meta.Segment)
globalNodeSegments := make(map[int64][]*meta.Segment)
for _, node := range replica.Nodes {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.CollectionID), meta.WithNodeID(node))
for _, node := range replica.GetNodes() {
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&

View File

@ -162,33 +162,34 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.RowCountBasedBalancer", 1, 60).With(
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetCollectionID()),
zap.String("resourceGroup", replica.Replica.GetResourceGroup()),
zap.String("resourceGroup", replica.GetResourceGroup()),
)
nodes := replica.GetNodes()
if len(nodes) < 2 {
if replica.NodesCount() == 0 {
return nil, nil
}
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
onlineNodes := make([]int64, 0)
offlineNodes := make([]int64, 0)
for _, nid := range nodes {
// read only nodes is offline in current replica.
if replica.RONodesCount() > 0 {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes()))
offlineNodes = append(offlineNodes, replica.GetRONodes()...)
}
for _, nid := range replica.GetNodes() {
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err))
continue
} else if isStopping {
offlineNodes = append(offlineNodes, nid)
} else if outboundNodes.Contain(nid) {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid))
offlineNodes = append(offlineNodes, nid)
} else {
onlineNodes = append(onlineNodes, nid)
}
}
if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 {
if len(onlineNodes) == 0 {
// no available nodes to balance
return nil, nil
}
@ -231,7 +232,7 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, on
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
})
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes, false)
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
@ -299,7 +300,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode
return nil
}
segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, nodesWithLessRow, false)
segmentPlans := b.AssignSegment(replica.GetCollectionID(), segmentsToMove, nodesWithLessRow, false)
for i := range segmentPlans {
segmentPlans[i].From = segmentPlans[i].Segment.Node
segmentPlans[i].Replica = replica

View File

@ -835,6 +835,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
suite.NoError(err)
err = balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
suite.NoError(err)
utils.RecoverAllCollection(balancer.meta)
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1)
assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans)
assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans)

View File

@ -187,34 +187,36 @@ func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int {
func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
log := log.With(
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Int64("collection", replica.GetCollectionID()),
zap.Int64("replica id", replica.GetID()),
zap.String("replica group", replica.GetResourceGroup()),
)
nodes := replica.GetNodes()
if len(nodes) == 0 {
if replica.NodesCount() == 0 {
return nil, nil
}
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
onlineNodes := make([]int64, 0)
offlineNodes := make([]int64, 0)
for _, nid := range nodes {
// read only nodes is offline in current replica.
if replica.RONodesCount() > 0 {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet read only node, try to move out all segment/channel", zap.Int64s("node", replica.GetRONodes()))
offlineNodes = append(offlineNodes, replica.GetRONodes()...)
}
for _, nid := range replica.GetNodes() {
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err))
continue
} else if isStopping {
offlineNodes = append(offlineNodes, nid)
} else if outboundNodes.Contain(nid) {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid))
offlineNodes = append(offlineNodes, nid)
} else {
onlineNodes = append(onlineNodes, nid)
}
}
if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 {
if len(onlineNodes) == 0 {
// no available nodes to balance
return nil, nil
}
@ -258,7 +260,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
})
plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes, false)
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
@ -283,7 +285,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
})
segmentDist[node] = segments
rowCount := b.calculateScore(replica.CollectionID, node)
rowCount := b.calculateScore(replica.GetCollectionID(), node)
totalScore += rowCount
nodeScore[node] = rowCount
}
@ -322,7 +324,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
return nil
}
segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, onlineNodes, false)
segmentPlans := b.AssignSegment(replica.GetCollectionID(), segmentsToMove, onlineNodes, false)
for i := range segmentPlans {
segmentPlans[i].From = segmentPlans[i].Segment.Node
segmentPlans[i].Replica = replica

View File

@ -703,6 +703,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
for i := range c.outBoundNodes {
suite.balancer.meta.ResourceManager.UnassignNode(meta.DefaultResourceGroupName, c.outBoundNodes[i])
}
utils.RecoverAllCollection(balancer.meta)
// 4. balance and verify result
segmentPlans, channelPlans := suite.getCollectionBalancePlans(suite.balancer, c.collectionID)

View File

@ -141,7 +141,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica,
stoppingNodesSegments map[int64][]*meta.Segment, nodeSegments map[int64][]*meta.Segment,
channelManager *meta.ChannelDistManager, segmentDistMgr *meta.SegmentDistManager,
) {
distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", DistInfoPrefix, replica.CollectionID, replica.GetID())
distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", DistInfoPrefix, replica.GetCollectionID(), replica.GetID())
// 1. print stopping nodes segment distribution
distInfo += "[stoppingNodesSegmentDist:"
for stoppingNodeID, stoppedSegments := range stoppingNodesSegments {

View File

@ -20,7 +20,6 @@ import (
"context"
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
@ -103,16 +102,16 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
ret := make([]task.Task, 0)
lacks, redundancies := c.getDmChannelDiff(replica.GetCollectionID(), replica.GetID())
tasks := c.createChannelLoadTask(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica)
tasks := c.createChannelLoadTask(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica)
task.SetReason("lacks of channel", tasks...)
ret = append(ret, tasks...)
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica)
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica)
task.SetReason("collection released", tasks...)
ret = append(ret, tasks...)
repeated := c.findRepeatedChannels(ctx, replica.GetID())
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica)
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), repeated, replica)
task.SetReason("redundancies of channel", tasks...)
ret = append(ret, tasks...)
@ -218,11 +217,7 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int
}
func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task {
outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool {
return !outboundNodes.Contain(node)
})
plans := c.balancer.AssignChannel(channels, availableNodes, false)
plans := c.balancer.AssignChannel(channels, replica.GetNodes(), false)
for i := range plans {
plans[i].Replica = replica
}

View File

@ -110,25 +110,25 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
// compare with targets to find the lack and redundancy of segments
lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID())
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)
tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica)
tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica)
task.SetReason("lacks of segment", tasks...)
ret = append(ret, tasks...)
redundancies = c.filterSegmentInUse(replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("segment not exists in target", tasks...)
ret = append(ret, tasks...)
// compare inner dists to find repeated loaded segments
redundancies = c.findRepeatedSealedSegments(replica.GetID())
redundancies = c.filterExistedOnLeader(replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Historical)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("redundancies of segment", tasks...)
ret = append(ret, tasks...)
// compare with target to find the lack and redundancy of segments
_, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica, querypb.DataScope_Streaming)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming)
task.SetReason("streaming segment not exists in target", tasks...)
ret = append(ret, tasks...)
@ -147,7 +147,7 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64,
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 1, 60).With(
zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", replica.ID))
zap.Int64("replicaID", replica.GetID()))
leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for channelName, node := range leaders {
@ -362,14 +362,13 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
return nil
}
// filter out stopping nodes and outbound nodes
outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool {
// filter out stopping nodes.
availableNodes := lo.Filter(replica.GetNodes(), func(node int64, _ int) bool {
stop, err := c.nodeMgr.IsStoppingNode(node)
if err != nil {
return false
}
return !outboundNodes.Contain(node) && !stop
return !stop
})
if len(availableNodes) == 0 {
@ -399,7 +398,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
SegmentInfo: s,
}
})
shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes, false)
shardPlans := c.balancer.AssignSegment(replica.GetCollectionID(), segmentInfos, availableNodes, false)
for i := range shardPlans {
shardPlans[i].Replica = replica
}

View File

@ -109,7 +109,7 @@ func (s *Server) balanceSegments(ctx context.Context,
tasks := make([]task.Task, 0, len(plans))
for _, plan := range plans {
log.Info("manually balance segment...",
zap.Int64("replica", plan.Replica.ID),
zap.Int64("replica", plan.Replica.GetID()),
zap.String("channel", plan.Segment.InsertChannel),
zap.Int64("from", plan.From),
zap.Int64("to", plan.To),
@ -133,7 +133,7 @@ func (s *Server) balanceSegments(ctx context.Context,
)
if err != nil {
log.Warn("create segment task for balance failed",
zap.Int64("replica", plan.Replica.ID),
zap.Int64("replica", plan.Replica.GetID()),
zap.String("channel", plan.Segment.InsertChannel),
zap.Int64("from", plan.From),
zap.Int64("to", plan.To),
@ -186,7 +186,7 @@ func (s *Server) balanceChannels(ctx context.Context,
tasks := make([]task.Task, 0, len(plans))
for _, plan := range plans {
log.Info("manually balance channel...",
zap.Int64("replica", plan.Replica.ID),
zap.Int64("replica", plan.Replica.GetID()),
zap.String("channel", plan.Channel.GetChannelName()),
zap.Int64("from", plan.From),
zap.Int64("to", plan.To),
@ -209,7 +209,7 @@ func (s *Server) balanceChannels(ctx context.Context,
)
if err != nil {
log.Warn("create channel task for balance failed",
zap.Int64("replica", plan.Replica.ID),
zap.Int64("replica", plan.Replica.GetID()),
zap.String("channel", plan.Channel.GetChannelName()),
zap.Int64("from", plan.From),
zap.Int64("to", plan.To),

View File

@ -149,6 +149,8 @@ func (job *LoadCollectionJob) Execute() error {
// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if len(replicas) == 0 {
// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
replicas, err = utils.SpawnReplicasWithRG(job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber())
if err != nil {
msg := "failed to spawn replica for collection"

View File

@ -1195,7 +1195,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrFailedAllocateID)
suite.ErrorIs(err, meta.ErrNodeNotEnough)
}
}

View File

@ -0,0 +1,169 @@
package meta
import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// NilReplica is used to represent a nil replica.
var NilReplica = newReplica(&querypb.Replica{
ID: -1,
})
// Replica is a immutable type for manipulating replica meta info for replica manager.
// Performed a copy-on-write strategy to keep the consistency of the replica manager.
// So only read only operations are allowed on these type.
type Replica struct {
replicaPB *querypb.Replica
rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field.
// always keep consistent with replicaPB.Nodes.
// mutual exclusive with roNodes.
roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field.
// always keep consistent with replicaPB.RoNodes.
// node used by replica but cannot add more channel or segment ont it.
// include rebalance node or node out of resource group.
}
// Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead.
func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica {
r := proto.Clone(replica).(*querypb.Replica)
// TODO: nodes is a bad parameter, break the consistency, should be removed in future.
// keep it for old unittest.
if len(nodes) > 0 && len(replica.Nodes) == 0 && nodes[0].Len() > 0 {
r.Nodes = nodes[0].Collect()
}
return newReplica(r)
}
// newReplica creates a new replica from pb.
func newReplica(replica *querypb.Replica) *Replica {
return &Replica{
replicaPB: proto.Clone(replica).(*querypb.Replica),
rwNodes: typeutil.NewUniqueSet(replica.Nodes...),
roNodes: typeutil.NewUniqueSet(replica.RoNodes...),
}
}
// GetID returns the id of the replica.
func (replica *Replica) GetID() typeutil.UniqueID {
return replica.replicaPB.GetID()
}
// GetCollectionID returns the collection id of the replica.
func (replica *Replica) GetCollectionID() typeutil.UniqueID {
return replica.replicaPB.GetCollectionID()
}
// GetResourceGroup returns the resource group name of the replica.
func (replica *Replica) GetResourceGroup() string {
return replica.replicaPB.GetResourceGroup()
}
// GetNodes returns the rw nodes of the replica.
// readonly, don't modify the returned slice.
func (replica *Replica) GetNodes() []int64 {
return replica.replicaPB.GetNodes()
}
// GetRONodes returns the ro nodes of the replica.
// readonly, don't modify the returned slice.
func (replica *Replica) GetRONodes() []int64 {
return replica.replicaPB.GetRoNodes()
}
// RangeOverRWNodes iterates over the read and write nodes of the replica.
func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) {
replica.rwNodes.Range(f)
}
// RangeOverRONodes iterates over the ro nodes of the replica.
func (replica *Replica) RangeOverRONodes(f func(node int64) bool) {
replica.roNodes.Range(f)
}
// RWNodesCount returns the count of rw nodes of the replica.
func (replica *Replica) RWNodesCount() int {
return replica.rwNodes.Len()
}
// RONodesCount returns the count of ro nodes of the replica.
func (replica *Replica) RONodesCount() int {
return replica.roNodes.Len()
}
// NodesCount returns the count of rw nodes and ro nodes of the replica.
func (replica *Replica) NodesCount() int {
return replica.rwNodes.Len() + replica.roNodes.Len()
}
// Contains checks if the node is in rw nodes of the replica.
func (replica *Replica) Contains(node int64) bool {
return replica.rwNodes.Contain(node)
}
// ContainRONode checks if the node is in ro nodes of the replica.
func (replica *Replica) ContainRONode(node int64) bool {
return replica.roNodes.Contain(node)
}
// Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead.
// TODO: removed in future, only for old unittest now.
func (replica *Replica) AddRWNode(nodes ...int64) {
replica.roNodes.Remove(nodes...)
replica.replicaPB.RoNodes = replica.roNodes.Collect()
replica.rwNodes.Insert(nodes...)
replica.replicaPB.Nodes = replica.rwNodes.Collect()
}
// copyForWrite returns a mutable replica for write operations.
func (replica *Replica) copyForWrite() *mutableReplica {
return &mutableReplica{
&Replica{
replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica),
rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...),
roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...),
},
}
}
// mutableReplica is a mutable type (COW) for manipulating replica meta info for replica manager.
type mutableReplica struct {
*Replica
}
// SetResourceGroup sets the resource group name of the replica.
func (replica *mutableReplica) SetResourceGroup(resourceGroup string) {
replica.replicaPB.ResourceGroup = resourceGroup
}
// AddRWNode adds the node to rw nodes of the replica.
func (replica *mutableReplica) AddRWNode(nodes ...int64) {
replica.Replica.AddRWNode(nodes...)
}
// AddRONode moves the node from rw nodes to ro nodes of the replica.
// only used in replica manager.
func (replica *mutableReplica) AddRONode(nodes ...int64) {
replica.rwNodes.Remove(nodes...)
replica.replicaPB.Nodes = replica.rwNodes.Collect()
replica.roNodes.Insert(nodes...)
replica.replicaPB.RoNodes = replica.roNodes.Collect()
}
// RemoveNode removes the node from rw nodes and ro nodes of the replica.
// only used in replica manager.
func (replica *mutableReplica) RemoveNode(nodes ...int64) {
replica.roNodes.Remove(nodes...)
replica.replicaPB.RoNodes = replica.roNodes.Collect()
replica.rwNodes.Remove(nodes...)
replica.replicaPB.Nodes = replica.rwNodes.Collect()
}
// IntoReplica returns the immutable replica, After calling this method, the mutable replica should not be used again.
func (replica *mutableReplica) IntoReplica() *Replica {
r := replica.Replica
replica.Replica = nil
return r
}

View File

@ -20,7 +20,7 @@ import (
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore"
@ -30,89 +30,21 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// NilReplica is used to represent a nil replica.
var NilReplica = NewReplica(&querypb.Replica{
ID: -1,
}, typeutil.NewUniqueSet())
type Replica struct {
*querypb.Replica
nodes typeutil.UniqueSet // a helper field for manipulating replica's Nodes slice field
rwmutex sync.RWMutex
}
func NewReplica(replica *querypb.Replica, nodes typeutil.UniqueSet) *Replica {
return &Replica{
Replica: replica,
nodes: nodes,
}
}
func (replica *Replica) AddNode(nodes ...int64) {
replica.rwmutex.Lock()
defer replica.rwmutex.Unlock()
replica.nodes.Insert(nodes...)
replica.Replica.Nodes = replica.nodes.Collect()
}
func (replica *Replica) GetNodes() []int64 {
replica.rwmutex.RLock()
defer replica.rwmutex.RUnlock()
if replica.nodes != nil {
return replica.nodes.Collect()
}
return nil
}
func (replica *Replica) Len() int {
replica.rwmutex.RLock()
defer replica.rwmutex.RUnlock()
if replica.nodes != nil {
return replica.nodes.Len()
}
return 0
}
func (replica *Replica) Contains(node int64) bool {
replica.rwmutex.RLock()
defer replica.rwmutex.RUnlock()
if replica.nodes != nil {
return replica.nodes.Contain(node)
}
return false
}
func (replica *Replica) RemoveNode(nodes ...int64) {
replica.rwmutex.Lock()
defer replica.rwmutex.Unlock()
replica.nodes.Remove(nodes...)
replica.Replica.Nodes = replica.nodes.Collect()
}
func (replica *Replica) Clone() *Replica {
replica.rwmutex.RLock()
defer replica.rwmutex.RUnlock()
return &Replica{
Replica: proto.Clone(replica.Replica).(*querypb.Replica),
nodes: typeutil.NewUniqueSet(replica.Replica.Nodes...),
}
}
type ReplicaManager struct {
rwmutex sync.RWMutex
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
catalog metastore.QueryCoordCatalog
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
}
func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager {
return &ReplicaManager{
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
catalog: catalog,
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet),
catalog: catalog,
}
}
@ -130,10 +62,7 @@ func (m *ReplicaManager) Recover(collections []int64) error {
}
if collectionSet.Contain(replica.GetCollectionID()) {
m.replicas[replica.GetID()] = &Replica{
Replica: replica,
nodes: typeutil.NewUniqueSet(replica.GetNodes()...),
}
m.putReplicaInMemory(newReplica(replica))
log.Info("recover replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
@ -154,6 +83,8 @@ func (m *ReplicaManager) Recover(collections []int64) error {
return nil
}
// Get returns the replica by id.
// Replica should be read-only, do not modify it.
func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -161,22 +92,36 @@ func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica {
return m.replicas[id]
}
// Spawn spawns replicas of the given number, for given collection,
// this doesn't store these replicas and assign nodes to them.
func (m *ReplicaManager) Spawn(collection int64, replicaNumber int32, rgName string) ([]*Replica, error) {
var (
replicas = make([]*Replica, replicaNumber)
err error
)
for i := range replicas {
replicas[i], err = m.spawn(collection, rgName)
if err != nil {
return nil, err
// Spawn spawns N replicas at resource group for given collection in ReplicaManager.
func (m *ReplicaManager) Spawn(collection int64, replicaNumInRG map[string]int) ([]*Replica, error) {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
if m.collIDToReplicaIDs[collection] != nil {
return nil, fmt.Errorf("replicas of collection %d is already spawned", collection)
}
replicas := make([]*Replica, 0)
for rgName, replicaNum := range replicaNumInRG {
for ; replicaNum > 0; replicaNum-- {
id, err := m.idAllocator()
if err != nil {
return nil, err
}
replicas = append(replicas, newReplica(&querypb.Replica{
ID: id,
CollectionID: collection,
ResourceGroup: rgName,
}))
}
}
return replicas, err
if err := m.put(replicas...); err != nil {
return nil, err
}
return replicas, nil
}
// Deprecated: Warning, break the consistency of ReplicaManager,
// never use it in non-test code, use Spawn instead.
func (m *ReplicaManager) Put(replicas ...*Replica) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -184,30 +129,81 @@ func (m *ReplicaManager) Put(replicas ...*Replica) error {
return m.put(replicas...)
}
func (m *ReplicaManager) spawn(collectionID typeutil.UniqueID, rgName string) (*Replica, error) {
id, err := m.idAllocator()
if err != nil {
return nil, err
func (m *ReplicaManager) put(replicas ...*Replica) error {
if len(replicas) == 0 {
return nil
}
return &Replica{
Replica: &querypb.Replica{
ID: id,
CollectionID: collectionID,
ResourceGroup: rgName,
},
nodes: make(typeutil.UniqueSet),
}, nil
// Persist replicas into KV.
replicaPBs := make([]*querypb.Replica, 0, len(replicas))
for _, replica := range replicas {
replicaPBs = append(replicaPBs, replica.replicaPB)
}
if err := m.catalog.SaveReplica(replicaPBs...); err != nil {
return err
}
m.putReplicaInMemory(replicas...)
return nil
}
func (m *ReplicaManager) put(replicas ...*Replica) error {
// putReplicaInMemory puts replicas into in-memory map and collIDToReplicaIDs.
func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) {
for _, replica := range replicas {
err := m.catalog.SaveReplica(replica.Replica)
if err != nil {
return err
// update in-memory replicas.
m.replicas[replica.GetID()] = replica
// update collIDToReplicaIDs.
if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil {
m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet()
}
m.replicas[replica.ID] = replica
m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID())
}
return nil
}
// TransferReplica transfers N replicas from srcRGName to dstRGName.
func (m *ReplicaManager) TransferReplica(collectionID typeutil.UniqueID, srcRGName string, dstRGName string, replicaNum int) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
// Check if replica can be transfer.
srcReplicas, err := m.getSrcReplicasAndCheckIfTransferable(collectionID, srcRGName, replicaNum)
if err != nil {
return err
}
// Transfer N replicas from srcRGName to dstRGName.
// Node Change will be executed by replica_observer in background.
replicas := make([]*Replica, 0, replicaNum)
for i := 0; i < replicaNum; i++ {
mutableReplica := srcReplicas[i].copyForWrite()
mutableReplica.SetResourceGroup(dstRGName)
replicas = append(replicas, mutableReplica.IntoReplica())
}
return m.put(replicas...)
}
// getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName.
func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) {
// Check if collection is loaded.
if m.collIDToReplicaIDs[collectionID] == nil {
return nil, merr.WrapErrParameterInvalid(
"Collection not loaded",
fmt.Sprintf("collectionID %d", collectionID),
)
}
// Check if replica in srcRGName is enough.
srcReplicas := m.getByCollectionAndRG(collectionID, srcRGName)
if len(srcReplicas) < replicaNum {
err := merr.WrapErrParameterInvalid(
"NumReplica not greater than the number of replica in source resource group", fmt.Sprintf("only found [%d] replicas of collection [%d] in source resource group [%s], but %d require",
len(srcReplicas),
collectionID,
srcRGName,
replicaNum))
return nil, err
}
return srcReplicas, nil
}
// RemoveCollection removes replicas of given collection,
@ -220,11 +216,11 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error
if err != nil {
return err
}
for id, replica := range m.replicas {
if replica.CollectionID == collectionID {
delete(m.replicas, id)
}
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for replicaID := range m.collIDToReplicaIDs[collectionID] {
delete(m.replicas, replicaID)
}
delete(m.collIDToReplicaIDs, collectionID)
return nil
}
@ -233,12 +229,11 @@ func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Repl
defer m.rwmutex.RUnlock()
replicas := make([]*Replica, 0)
for _, replica := range m.replicas {
if replica.CollectionID == collectionID {
replicas = append(replicas, replica)
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replicas = append(replicas, m.replicas[replicaID])
}
}
return replicas
}
@ -247,7 +242,7 @@ func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.Un
defer m.rwmutex.RUnlock()
for _, replica := range m.replicas {
if replica.CollectionID == collectionID && replica.nodes.Contain(nodeID) {
if replica.GetCollectionID() == collectionID && replica.Contains(nodeID) {
return replica
}
}
@ -261,7 +256,7 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica {
replicas := make([]*Replica, 0)
for _, replica := range m.replicas {
if replica.nodes.Contain(nodeID) {
if replica.rwNodes.Contain(nodeID) {
replicas = append(replicas, replica)
}
}
@ -269,17 +264,19 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica {
return replicas
}
func (m *ReplicaManager) GetByCollectionAndRG(collectionID int64, rgName string) []*Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
ret := make([]*Replica, 0)
for _, replica := range m.replicas {
if replica.GetCollectionID() == collectionID && replica.GetResourceGroup() == rgName {
ret = append(ret, replica)
}
func (m *ReplicaManager) getByCollectionAndRG(collectionID int64, rgName string) []*Replica {
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
if !ok {
return make([]*Replica, 0)
}
ret := make([]*Replica, 0)
replicaIDs.Range(func(replicaID typeutil.UniqueID) bool {
if m.replicas[replicaID].GetResourceGroup() == rgName {
ret = append(ret, m.replicas[replicaID])
}
return true
})
return ret
}
@ -297,20 +294,93 @@ func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica {
return ret
}
func (m *ReplicaManager) AddNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error {
// RecoverNodesInCollection recovers all nodes in collection with latest resource group.
// Promise a node will be only assigned to one replica in same collection at same time.
// 1. Move the rw nodes to ro nodes if they are not in related resource group.
// 2. Add new incoming nodes into the replica if they are not in-used by other replicas of same collection.
// 3. replicas in same resource group will shared the nodes in resource group fairly.
func (m *ReplicaManager) RecoverNodesInCollection(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) error {
if err := m.validateResourceGroups(rgs); err != nil {
return err
}
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
replica, ok := m.replicas[replicaID]
if !ok {
return merr.WrapErrReplicaNotFound(replicaID)
// create a helper to do the recover.
helper, err := m.getCollectionAssignmentHelper(collectionID, rgs)
if err != nil {
return err
}
replica = replica.Clone()
replica.AddNode(nodes...)
return m.put(replica)
modifiedReplicas := make([]*Replica, 0)
// recover node by resource group.
helper.RangeOverResourceGroup(func(replicaHelper *replicasInSameRGAssignmentHelper) {
replicaHelper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) {
roNodes := assignment.GetNewRONodes()
recoverableNodes, incomingNodeCount := assignment.GetRecoverNodesAndIncomingNodeCount()
// There may be not enough incoming nodes for current replica,
// Even we filtering the nodes that are used by other replica of same collection in other resource group,
// current replica's expected node may be still used by other replica of same collection in same resource group.
incomingNode := replicaHelper.AllocateIncomingNodes(incomingNodeCount)
if len(roNodes) == 0 && len(recoverableNodes) == 0 && len(incomingNode) == 0 {
// nothing to do.
return
}
mutableReplica := m.replicas[assignment.GetReplicaID()].copyForWrite()
mutableReplica.AddRONode(roNodes...) // rw -> ro
mutableReplica.AddRWNode(recoverableNodes...) // ro -> rw
mutableReplica.AddRWNode(incomingNode...) // unused -> rw
log.Info(
"new replica recovery found",
zap.Int64s("newRONodes", roNodes),
zap.Int64s("roToRWNodes", recoverableNodes),
zap.Int64s("newIncomingNodes", incomingNode))
modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica())
})
})
return m.put(modifiedReplicas...)
}
// validateResourceGroups checks if the resource groups are valid.
func (m *ReplicaManager) validateResourceGroups(rgs map[string]typeutil.UniqueSet) error {
// make sure that node in resource group is mutual exclusive.
node := typeutil.NewUniqueSet()
for _, rg := range rgs {
for id := range rg {
if node.Contain(id) {
return errors.New("node in resource group is not mutual exclusive")
}
node.Insert(id)
}
}
return nil
}
// getCollectionAssignmentHelper checks if the collection is recoverable and group replicas by resource group.
func (m *ReplicaManager) getCollectionAssignmentHelper(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) (*collectionAssignmentHelper, error) {
// check if the collection is exist.
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
if !ok {
return nil, errors.Errorf("collection %d not loaded", collectionID)
}
rgToReplicas := make(map[string][]*Replica)
for replicaID := range replicaIDs {
replica := m.replicas[replicaID]
rgName := replica.GetResourceGroup()
if _, ok := rgs[rgName]; !ok {
return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replicaID, rgName)
}
if _, ok := rgToReplicas[rgName]; !ok {
rgToReplicas[rgName] = make([]*Replica, 0)
}
rgToReplicas[rgName] = append(rgToReplicas[rgName], replica)
}
return newCollectionAssignmentHelper(collectionID, rgToReplicas, rgs), nil
}
// RemoveNode removes the node from all replicas of given collection.
func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -320,9 +390,9 @@ func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeut
return merr.WrapErrReplicaNotFound(replicaID)
}
replica = replica.Clone()
replica.RemoveNode(nodes...)
return m.put(replica)
mutableReplica := replica.copyForWrite()
mutableReplica.RemoveNode(nodes...) // ro -> unused
return m.put(mutableReplica.IntoReplica())
}
func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.UniqueID) typeutil.Set[string] {

View File

@ -0,0 +1,272 @@
package meta
import (
"sort"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// collectionAssignmentHelper is a helper to manage the replica assignment in same collection.
type collectionAssignmentHelper struct {
collectionID typeutil.UniqueID
resourceGroupToReplicas map[string]*replicasInSameRGAssignmentHelper
}
// newCollectionAssignmentHelper creates a new collectionAssignmentHelper.
func newCollectionAssignmentHelper(
collectionID typeutil.UniqueID,
rgToReplicas map[string][]*Replica,
rgs map[string]typeutil.UniqueSet,
) *collectionAssignmentHelper {
resourceGroupToReplicas := make(map[string]*replicasInSameRGAssignmentHelper)
for rgName, replicas := range rgToReplicas {
resourceGroupToReplicas[rgName] = newReplicaAssignmentHelper(rgName, replicas, rgs[rgName])
}
helper := &collectionAssignmentHelper{
collectionID: collectionID,
resourceGroupToReplicas: resourceGroupToReplicas,
}
helper.updateIncomingNodesAndExpectedNode()
return helper
}
// updateIncomingNodesAndExpectedNode updates the incoming nodes for all resource groups.
// An incoming node is a node that not used by current collection but in resource group.
func (h *collectionAssignmentHelper) updateIncomingNodesAndExpectedNode() {
// incoming nodes should be compared with all node of replica in same collection, even not in same resource group.
for _, helper := range h.resourceGroupToReplicas {
// some node in current resource group may load other replica data of same collection in other resource group.
// those node cannot be used right now.
newIncomingNodes := helper.nodesInRG.Clone()
currentUsedNodeCount := newIncomingNodes.Len()
h.RangeOverReplicas(func(rgName string, assignment *replicaAssignmentInfo) {
assignment.RangeOverAllNodes(func(nodeID int64) {
if newIncomingNodes.Contain(nodeID) {
newIncomingNodes.Remove(nodeID)
if rgName != helper.rgName {
// Node is still used by other replica of same collection in other resource group, cannot be used right now.
// filter it out to calculate the expected node count to avoid node starve of some replica in same resource group.
currentUsedNodeCount--
}
}
})
})
helper.incomingNodes = newIncomingNodes
helper.updateExpectedNodeCountForReplicas(currentUsedNodeCount)
}
}
// RangeOverResourceGroup iterate resource groups
func (h *collectionAssignmentHelper) RangeOverResourceGroup(f func(helper *replicasInSameRGAssignmentHelper)) {
for _, helper := range h.resourceGroupToReplicas {
f(helper)
}
}
// RangeOverReplicas iterate replicas
func (h *collectionAssignmentHelper) RangeOverReplicas(f func(rgName string, assignment *replicaAssignmentInfo)) {
for _, helper := range h.resourceGroupToReplicas {
for _, assignment := range helper.replicas {
f(helper.rgName, assignment)
}
}
}
// newReplicaAssignmentHelper creates a new replicaAssignmentHelper.
func newReplicaAssignmentHelper(rgName string, replicas []*Replica, nodeInRG typeutil.UniqueSet) *replicasInSameRGAssignmentHelper {
assignmentInfos := make([]*replicaAssignmentInfo, 0, len(replicas))
for _, replica := range replicas {
assignmentInfos = append(assignmentInfos, newReplicaAssignmentInfo(replica, nodeInRG))
}
h := &replicasInSameRGAssignmentHelper{
rgName: rgName,
nodesInRG: nodeInRG,
replicas: assignmentInfos,
}
return h
}
// replicasInSameRGAssignmentHelper is a helper to manage the replica assignment in same rg.
type replicasInSameRGAssignmentHelper struct {
rgName string
nodesInRG typeutil.UniqueSet
incomingNodes typeutil.UniqueSet // nodes that not used by current replicas in resource group.
replicas []*replicaAssignmentInfo
}
func (h *replicasInSameRGAssignmentHelper) AllocateIncomingNodes(n int) []int64 {
nodeIDs := make([]int64, 0, n)
h.incomingNodes.Range(func(nodeID int64) bool {
if n > 0 {
nodeIDs = append(nodeIDs, nodeID)
n--
} else {
return false
}
return true
})
h.incomingNodes.Remove(nodeIDs...)
return nodeIDs
}
// RangeOverReplicas iterate replicas.
func (h *replicasInSameRGAssignmentHelper) RangeOverReplicas(f func(*replicaAssignmentInfo)) {
for _, info := range h.replicas {
f(info)
}
}
// updateExpectedNodeCountForReplicas updates the expected node count for all replicas in same resource group.
func (h *replicasInSameRGAssignmentHelper) updateExpectedNodeCountForReplicas(currentUsageNodesCount int) {
minimumNodeCount := currentUsageNodesCount / len(h.replicas)
maximumNodeCount := minimumNodeCount
remainder := currentUsageNodesCount % len(h.replicas)
if remainder > 0 {
maximumNodeCount += 1
}
// rule:
// 1. make minimumNodeCount <= expectedNodeCount <= maximumNodeCount
// 2. expectedNodeCount should be closed to len(assignedNodes) for each replica as much as possible to avoid unnecessary node transfer.
sorter := make(replicaAssignmentInfoSorter, 0, len(h.replicas))
for _, info := range h.replicas {
sorter = append(sorter, info)
}
sort.Sort(sort.Reverse(replicaAssignmentInfoSortByAvailableAndRecoverable{sorter}))
for _, info := range sorter {
if remainder > 0 {
info.expectedNodeCount = maximumNodeCount
remainder--
} else {
info.expectedNodeCount = minimumNodeCount
}
}
}
// newReplicaAssignmentInfo creates a new replicaAssignmentInfo.
func newReplicaAssignmentInfo(replica *Replica, nodeInRG typeutil.UniqueSet) *replicaAssignmentInfo {
// node in replica can be split into 3 part.
rwNodes := make(typeutil.UniqueSet, replica.RWNodesCount())
newRONodes := make(typeutil.UniqueSet, replica.RONodesCount())
unrecoverableRONodes := make(typeutil.UniqueSet, replica.RONodesCount())
recoverableRONodes := make(typeutil.UniqueSet, replica.RONodesCount())
replica.RangeOverRWNodes(func(nodeID int64) bool {
if nodeInRG.Contain(nodeID) {
rwNodes.Insert(nodeID)
} else {
newRONodes.Insert(nodeID)
}
return true
})
replica.RangeOverRONodes(func(nodeID int64) bool {
if nodeInRG.Contain(nodeID) {
recoverableRONodes.Insert(nodeID)
} else {
unrecoverableRONodes.Insert(nodeID)
}
return true
})
return &replicaAssignmentInfo{
replicaID: replica.GetID(),
expectedNodeCount: 0,
rwNodes: rwNodes,
newRONodes: newRONodes,
recoverableRONodes: recoverableRONodes,
unrecoverableRONodes: unrecoverableRONodes,
}
}
type replicaAssignmentInfo struct {
replicaID typeutil.UniqueID
expectedNodeCount int // expected node count for each replica.
rwNodes typeutil.UniqueSet // rw nodes is used by current replica. (rw -> rw)
newRONodes typeutil.UniqueSet // new ro nodes for these replica. (rw -> ro)
recoverableRONodes typeutil.UniqueSet // recoverable ro nodes for these replica (ro node can be put back to rw node if it's in current resource group). (may ro -> rw)
unrecoverableRONodes typeutil.UniqueSet // unrecoverable ro nodes for these replica (ro node can't be put back to rw node if it's not in current resource group). (ro -> ro)
}
// GetReplicaID returns the replica id for these replica.
func (s *replicaAssignmentInfo) GetReplicaID() typeutil.UniqueID {
return s.replicaID
}
// GetNewRONodes returns the new ro nodes for these replica.
func (s *replicaAssignmentInfo) GetNewRONodes() []int64 {
newRONodes := make([]int64, 0, s.newRONodes.Len())
// not in current resource group must be set ro.
for nodeID := range s.newRONodes {
newRONodes = append(newRONodes, nodeID)
}
// too much node is occupied by current replica, then set some node to ro.
if s.rwNodes.Len() > s.expectedNodeCount {
cnt := s.rwNodes.Len() - s.expectedNodeCount
s.rwNodes.Range(func(node int64) bool {
if cnt > 0 {
newRONodes = append(newRONodes, node)
cnt--
} else {
return false
}
return true
})
}
return newRONodes
}
// GetRecoverNodesAndIncomingNodeCount returns the recoverable ro nodes and incoming node count for these replica.
func (s *replicaAssignmentInfo) GetRecoverNodesAndIncomingNodeCount() (recoverNodes []int64, incomingNodeCount int) {
recoverNodes = make([]int64, 0, s.recoverableRONodes.Len())
incomingNodeCount = 0
if s.rwNodes.Len() < s.expectedNodeCount {
incomingNodeCount = s.expectedNodeCount - s.rwNodes.Len()
s.recoverableRONodes.Range(func(node int64) bool {
if incomingNodeCount > 0 {
recoverNodes = append(recoverNodes, node)
incomingNodeCount--
} else {
return false
}
return true
})
}
return recoverNodes, incomingNodeCount
}
// RangeOverAllNodes iterate all nodes in replica.
func (s *replicaAssignmentInfo) RangeOverAllNodes(f func(nodeID int64)) {
ff := func(nodeID int64) bool {
f(nodeID)
return true
}
s.rwNodes.Range(ff)
s.newRONodes.Range(ff)
s.recoverableRONodes.Range(ff)
s.unrecoverableRONodes.Range(ff)
}
type replicaAssignmentInfoSorter []*replicaAssignmentInfo
func (s replicaAssignmentInfoSorter) Len() int {
return len(s)
}
func (s replicaAssignmentInfoSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type replicaAssignmentInfoSortByAvailableAndRecoverable struct {
replicaAssignmentInfoSorter
}
func (s replicaAssignmentInfoSortByAvailableAndRecoverable) Less(i, j int) bool {
left := s.replicaAssignmentInfoSorter[i].rwNodes.Len() + s.replicaAssignmentInfoSorter[i].recoverableRONodes.Len()
right := s.replicaAssignmentInfoSorter[j].rwNodes.Len() + s.replicaAssignmentInfoSorter[j].recoverableRONodes.Len()
// Reach stable sort result by replica id.
// Otherwise unstable assignment may cause unnecessary node transfer.
return left < right || (left == right && s.replicaAssignmentInfoSorter[i].replicaID < s.replicaAssignmentInfoSorter[j].replicaID)
}

View File

@ -0,0 +1,490 @@
package meta
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type expectedReplicaPlan struct {
newRONodes int
recoverNodes int
incomingNodeCount int
expectedNodeCount int
}
type testCase struct {
collectionID typeutil.UniqueID // collection id
rgToReplicas map[string][]*Replica // from resource group to replicas
rgs map[string]typeutil.UniqueSet // from resource group to nodes
expectedPlan map[typeutil.UniqueID]expectedReplicaPlan // from replica id to expected plan
expectedNewIncomingNodes map[string]typeutil.UniqueSet // from resource group to incoming nodes
}
type CollectionAssignmentHelperSuite struct {
suite.Suite
}
func (s *CollectionAssignmentHelperSuite) TestNoModificationCase() {
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2, 3, 4},
RoNodes: []int64{},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{5, 6},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{7, 8},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7, 8),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
3: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(),
"rg2": typeutil.NewUniqueSet(),
},
})
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2, 3, 4},
RoNodes: []int64{},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{5},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{6, 7},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
3: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(),
"rg2": typeutil.NewUniqueSet(),
},
})
}
func (s *CollectionAssignmentHelperSuite) TestRO() {
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2, 3, 4, 5},
RoNodes: []int64{},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{6},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{7, 8},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7, 8),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 1,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
3: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(),
"rg2": typeutil.NewUniqueSet(), // 5 is still used rg1 of replica 1.
},
})
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2, 3, 4, 5},
RoNodes: []int64{},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{6},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{7, 8},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 7, 8),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 1,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 4,
},
2: {
newRONodes: 1,
recoverNodes: 0,
incomingNodeCount: 1,
expectedNodeCount: 1,
},
3: {
newRONodes: 1,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(),
"rg2": typeutil.NewUniqueSet(), // 5 is still used rg1 of replica 1.
},
})
}
func (s *CollectionAssignmentHelperSuite) TestIncomingNode() {
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2},
RoNodes: []int64{5},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{6},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{7},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7, 8),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 2,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
3: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 1,
expectedNodeCount: 2,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(3, 4),
"rg2": typeutil.NewUniqueSet(8),
},
})
}
func (s *CollectionAssignmentHelperSuite) TestRecoverNode() {
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2},
RoNodes: []int64{3},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{6},
RoNodes: []int64{7},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{8},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7, 8),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 0,
recoverNodes: 1,
incomingNodeCount: 1,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 1,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
3: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 1,
expectedNodeCount: 2,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(4),
"rg2": typeutil.NewUniqueSet(5),
},
})
}
func (s *CollectionAssignmentHelperSuite) TestMixRecoverNode() {
s.runCase(testCase{
collectionID: 1,
rgToReplicas: map[string][]*Replica{
"rg1": {
newReplica(&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2},
RoNodes: []int64{3},
}),
},
"rg2": {
newReplica(&querypb.Replica{
ID: 2,
CollectionID: 1,
Nodes: []int64{6},
RoNodes: []int64{7},
}),
newReplica(&querypb.Replica{
ID: 3,
CollectionID: 1,
Nodes: []int64{8},
RoNodes: []int64{},
}),
},
"rg3": {
newReplica(&querypb.Replica{
ID: 4,
CollectionID: 1,
Nodes: []int64{9},
RoNodes: []int64{},
}),
newReplica(&querypb.Replica{
ID: 5,
CollectionID: 1,
Nodes: []int64{10},
RoNodes: []int64{},
}),
},
},
rgs: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(1, 2, 3, 4),
"rg2": typeutil.NewUniqueSet(5, 6, 7),
"rg3": typeutil.NewUniqueSet(8, 9, 10),
},
expectedPlan: map[typeutil.UniqueID]expectedReplicaPlan{
1: {
newRONodes: 0,
recoverNodes: 1,
incomingNodeCount: 1,
expectedNodeCount: 4,
},
2: {
newRONodes: 0,
recoverNodes: 1,
incomingNodeCount: 0,
expectedNodeCount: 2,
},
3: {
newRONodes: 1,
recoverNodes: 0,
incomingNodeCount: 1,
expectedNodeCount: 1,
},
4: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
5: {
newRONodes: 0,
recoverNodes: 0,
incomingNodeCount: 0,
expectedNodeCount: 1,
},
},
expectedNewIncomingNodes: map[string]typeutil.UniqueSet{
"rg1": typeutil.NewUniqueSet(4),
"rg2": typeutil.NewUniqueSet(5),
"rg3": typeutil.NewUniqueSet(),
},
})
}
func (s *CollectionAssignmentHelperSuite) runCase(c testCase) {
cHelper := newCollectionAssignmentHelper(c.collectionID, c.rgToReplicas, c.rgs)
cHelper.RangeOverResourceGroup(func(rHelper *replicasInSameRGAssignmentHelper) {
s.ElementsMatch(c.expectedNewIncomingNodes[rHelper.rgName].Collect(), rHelper.incomingNodes.Collect())
rHelper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) {
roNodes := assignment.GetNewRONodes()
recoverNodes, incomingNodes := assignment.GetRecoverNodesAndIncomingNodeCount()
plan := c.expectedPlan[assignment.GetReplicaID()]
s.Equal(
plan.newRONodes,
len(roNodes),
)
s.Equal(
plan.incomingNodeCount,
incomingNodes,
)
s.Equal(
plan.recoverNodes,
len(recoverNodes),
)
s.Equal(
plan.expectedNodeCount,
assignment.expectedNodeCount,
)
})
})
}
func TestCollectionAssignmentHelper(t *testing.T) {
suite.Run(t, new(CollectionAssignmentHelperSuite))
}

View File

@ -20,6 +20,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -27,29 +28,59 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type collectionLoadConfig struct {
spawnConfig map[string]int
}
func (c *collectionLoadConfig) getTotalSpawn() int {
totalSpawn := 0
for _, spawnNum := range c.spawnConfig {
totalSpawn += spawnNum
}
return totalSpawn
}
// Old replica manager test suite.
type ReplicaManagerSuite struct {
suite.Suite
nodes []int64
collections []int64
replicaNumbers []int32
idAllocator func() (int64, error)
kv kv.MetaKv
catalog metastore.QueryCoordCatalog
mgr *ReplicaManager
rgs map[string]typeutil.UniqueSet
collections map[int64]collectionLoadConfig
idAllocator func() (int64, error)
kv kv.MetaKv
catalog metastore.QueryCoordCatalog
mgr *ReplicaManager
}
func (suite *ReplicaManagerSuite) SetupSuite() {
paramtable.Init()
suite.nodes = []int64{1, 2, 3}
suite.collections = []int64{100, 101, 102}
suite.replicaNumbers = []int32{1, 2, 3}
suite.rgs = map[string]typeutil.UniqueSet{
"RG1": typeutil.NewUniqueSet(1),
"RG2": typeutil.NewUniqueSet(2, 3),
"RG3": typeutil.NewUniqueSet(4, 5, 6),
}
suite.collections = map[int64]collectionLoadConfig{
100: {
spawnConfig: map[string]int{"RG1": 1},
},
101: {
spawnConfig: map[string]int{"RG2": 2},
},
102: {
spawnConfig: map[string]int{"RG3": 2},
},
103: {
spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1},
},
}
}
func (suite *ReplicaManagerSuite) SetupTest() {
@ -69,7 +100,7 @@ func (suite *ReplicaManagerSuite) SetupTest() {
suite.idAllocator = RandomIncrementIDAllocator()
suite.mgr = NewReplicaManager(suite.idAllocator, suite.catalog)
suite.spawnAndPutAll()
suite.spawnAll()
}
func (suite *ReplicaManagerSuite) TearDownTest() {
@ -79,39 +110,39 @@ func (suite *ReplicaManagerSuite) TearDownTest() {
func (suite *ReplicaManagerSuite) TestSpawn() {
mgr := suite.mgr
for i, collection := range suite.collections {
replicas, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName)
suite.NoError(err)
suite.Len(replicas, int(suite.replicaNumbers[i]))
}
mgr.idAllocator = ErrorIDAllocator()
for i, collection := range suite.collections {
_, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName)
suite.Error(err)
}
_, err := mgr.Spawn(1, map[string]int{DefaultResourceGroupName: 1})
suite.Error(err)
replicas := mgr.GetByCollection(1)
suite.Len(replicas, 0)
}
func (suite *ReplicaManagerSuite) TestGet() {
mgr := suite.mgr
for i, collection := range suite.collections {
replicas := mgr.GetByCollection(collection)
for collectionID, collectionCfg := range suite.collections {
replicas := mgr.GetByCollection(collectionID)
replicaNodes := make(map[int64][]int64)
nodes := make([]int64, 0)
for _, replica := range replicas {
suite.Equal(collection, replica.GetCollectionID())
suite.Equal(collectionID, replica.GetCollectionID())
suite.Equal(replica, mgr.Get(replica.GetID()))
suite.Equal(len(replica.Replica.GetNodes()), replica.Len())
suite.Equal(replica.Replica.GetNodes(), replica.GetNodes())
replicaNodes[replica.GetID()] = replica.Replica.GetNodes()
nodes = append(nodes, replica.Replica.Nodes...)
suite.Equal(len(replica.replicaPB.GetNodes()), replica.RWNodesCount())
suite.Equal(replica.replicaPB.GetNodes(), replica.GetNodes())
replicaNodes[replica.GetID()] = replica.GetNodes()
nodes = append(nodes, replica.GetNodes()...)
}
suite.Len(nodes, int(suite.replicaNumbers[i]))
expectedNodes := make([]int64, 0)
for rg := range collectionCfg.spawnConfig {
expectedNodes = append(expectedNodes, suite.rgs[rg].Collect()...)
}
suite.ElementsMatch(nodes, expectedNodes)
for replicaID, nodes := range replicaNodes {
for _, node := range nodes {
replica := mgr.GetByCollectionAndNode(collection, node)
replica := mgr.GetByCollectionAndNode(collectionID, node)
suite.Equal(replicaID, replica.GetID())
}
}
@ -122,14 +153,18 @@ func (suite *ReplicaManagerSuite) TestGetByNode() {
mgr := suite.mgr
randomNodeID := int64(11111)
testReplica1, err := mgr.spawn(3002, DefaultResourceGroupName)
suite.NoError(err)
testReplica1.AddNode(randomNodeID)
testReplica2, err := mgr.spawn(3002, DefaultResourceGroupName)
suite.NoError(err)
testReplica2.AddNode(randomNodeID)
testReplica1 := newReplica(&querypb.Replica{
CollectionID: 3002,
ID: 10086,
Nodes: []int64{randomNodeID},
ResourceGroup: DefaultResourceGroupName,
})
testReplica2 := newReplica(&querypb.Replica{
CollectionID: 3002,
ID: 10087,
Nodes: []int64{randomNodeID},
ResourceGroup: DefaultResourceGroupName,
})
mgr.Put(testReplica1, testReplica2)
replicas := mgr.GetByNode(randomNodeID)
@ -141,7 +176,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
// Clear data in memory, and then recover from meta store
suite.clearMemory()
mgr.Recover(suite.collections)
mgr.Recover(lo.Keys(suite.collections))
suite.TestGet()
// Test recover from 2.1 meta store
@ -155,13 +190,13 @@ func (suite *ReplicaManagerSuite) TestRecover() {
suite.kv.Save(querycoord.ReplicaMetaPrefixV1+"/2100", string(value))
suite.clearMemory()
mgr.Recover(append(suite.collections, 1000))
mgr.Recover(append(lo.Keys(suite.collections), 1000))
replica := mgr.Get(2100)
suite.NotNil(replica)
suite.EqualValues(1000, replica.CollectionID)
suite.EqualValues([]int64{1, 2, 3}, replica.Replica.Nodes)
suite.Len(replica.GetNodes(), len(replica.Replica.GetNodes()))
for _, node := range replica.Replica.GetNodes() {
suite.EqualValues(1000, replica.GetCollectionID())
suite.EqualValues([]int64{1, 2, 3}, replica.GetNodes())
suite.Len(replica.GetNodes(), len(replica.GetNodes()))
for _, node := range replica.GetNodes() {
suite.True(replica.Contains(node))
}
}
@ -169,7 +204,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
func (suite *ReplicaManagerSuite) TestRemove() {
mgr := suite.mgr
for _, collection := range suite.collections {
for collection := range suite.collections {
err := mgr.RemoveCollection(collection)
suite.NoError(err)
@ -178,8 +213,8 @@ func (suite *ReplicaManagerSuite) TestRemove() {
}
// Check whether the replicas are also removed from meta store
mgr.Recover(suite.collections)
for _, collection := range suite.collections {
mgr.Recover(lo.Keys(suite.collections))
for collection := range suite.collections {
replicas := mgr.GetByCollection(collection)
suite.Empty(replicas)
}
@ -188,69 +223,72 @@ func (suite *ReplicaManagerSuite) TestRemove() {
func (suite *ReplicaManagerSuite) TestNodeManipulate() {
mgr := suite.mgr
firstNode := suite.nodes[0]
newNode := suite.nodes[len(suite.nodes)-1] + 1
// Add a new node for the replica with node 1 of all collections,
// then remove the node 1
for _, collection := range suite.collections {
replica := mgr.GetByCollectionAndNode(collection, firstNode)
err := mgr.AddNode(replica.GetID(), newNode)
suite.NoError(err)
// add node into rg.
rgs := map[string]typeutil.UniqueSet{
"RG1": typeutil.NewUniqueSet(1, 7),
"RG2": typeutil.NewUniqueSet(2, 3, 8),
"RG3": typeutil.NewUniqueSet(4, 5, 6, 9),
}
replica = mgr.GetByCollectionAndNode(collection, newNode)
suite.Contains(replica.GetNodes(), newNode)
suite.Contains(replica.Replica.GetNodes(), newNode)
err = mgr.RemoveNode(replica.GetID(), firstNode)
suite.NoError(err)
replica = mgr.GetByCollectionAndNode(collection, firstNode)
suite.Nil(replica)
// Add node into rg.
for collectionID, cfg := range suite.collections {
rgsOfCollection := make(map[string]typeutil.UniqueSet)
for rg := range cfg.spawnConfig {
rgsOfCollection[rg] = rgs[rg]
}
mgr.RecoverNodesInCollection(collectionID, rgsOfCollection)
for rg := range cfg.spawnConfig {
for _, node := range rgs[rg].Collect() {
replica := mgr.GetByCollectionAndNode(collectionID, node)
suite.Contains(replica.GetNodes(), node)
}
}
}
// Check these modifications are applied to meta store
suite.clearMemory()
mgr.Recover(suite.collections)
for _, collection := range suite.collections {
replica := mgr.GetByCollectionAndNode(collection, firstNode)
suite.Nil(replica)
replica = mgr.GetByCollectionAndNode(collection, newNode)
suite.Contains(replica.GetNodes(), newNode)
suite.Contains(replica.Replica.GetNodes(), newNode)
mgr.Recover(lo.Keys(suite.collections))
for collectionID, cfg := range suite.collections {
for rg := range cfg.spawnConfig {
for _, node := range rgs[rg].Collect() {
replica := mgr.GetByCollectionAndNode(collectionID, node)
suite.Contains(replica.GetNodes(), node)
}
}
}
}
func (suite *ReplicaManagerSuite) spawnAndPutAll() {
func (suite *ReplicaManagerSuite) spawnAll() {
mgr := suite.mgr
for i, collection := range suite.collections {
replicas, err := mgr.Spawn(collection, suite.replicaNumbers[i], DefaultResourceGroupName)
for id, cfg := range suite.collections {
replicas, err := mgr.Spawn(id, cfg.spawnConfig)
suite.NoError(err)
suite.Len(replicas, int(suite.replicaNumbers[i]))
for j, replica := range replicas {
replica.AddNode(suite.nodes[j])
totalSpawn := 0
rgsOfCollection := make(map[string]typeutil.UniqueSet)
for rg, spawnNum := range cfg.spawnConfig {
totalSpawn += spawnNum
rgsOfCollection[rg] = suite.rgs[rg]
}
err = mgr.Put(replicas...)
suite.NoError(err)
mgr.RecoverNodesInCollection(id, rgsOfCollection)
suite.Len(replicas, totalSpawn)
}
}
func (suite *ReplicaManagerSuite) TestResourceGroup() {
mgr := NewReplicaManager(suite.idAllocator, suite.catalog)
replica1, err := mgr.spawn(int64(1000), DefaultResourceGroupName)
replica1.AddNode(1)
replicas1, err := mgr.Spawn(int64(1000), map[string]int{DefaultResourceGroupName: 1})
suite.NoError(err)
mgr.Put(replica1)
suite.NotNil(replicas1)
suite.Len(replicas1, 1)
replica2, err := mgr.spawn(int64(2000), DefaultResourceGroupName)
replica2.AddNode(1)
replica2, err := mgr.Spawn(int64(2000), map[string]int{DefaultResourceGroupName: 1})
suite.NoError(err)
mgr.Put(replica2)
suite.NotNil(replica2)
suite.Len(replica2, 1)
replicas := mgr.GetByResourceGroup(DefaultResourceGroupName)
suite.Len(replicas, 2)
replicas = mgr.GetByCollectionAndRG(int64(1000), DefaultResourceGroupName)
suite.Len(replicas, 1)
rgNames := mgr.GetResourceGroupByCollection(int64(1000))
suite.Len(rgNames, 1)
suite.True(rgNames.Contain(DefaultResourceGroupName))
@ -260,6 +298,175 @@ func (suite *ReplicaManagerSuite) clearMemory() {
suite.mgr.replicas = make(map[int64]*Replica)
}
type ReplicaManagerV2Suite struct {
suite.Suite
rgs map[string]typeutil.UniqueSet
collections map[int64]collectionLoadConfig
kv kv.MetaKv
catalog metastore.QueryCoordCatalog
mgr *ReplicaManager
}
func (suite *ReplicaManagerV2Suite) SetupSuite() {
paramtable.Init()
suite.rgs = map[string]typeutil.UniqueSet{
"RG1": typeutil.NewUniqueSet(1),
"RG2": typeutil.NewUniqueSet(2, 3),
"RG3": typeutil.NewUniqueSet(4, 5, 6),
"RG4": typeutil.NewUniqueSet(7, 8, 9, 10),
"RG5": typeutil.NewUniqueSet(11, 12, 13, 14, 15),
}
suite.collections = map[int64]collectionLoadConfig{
// 1000: {
// spawnConfig: map[string]int{"RG1": 1},
// },
// 1001: {
// spawnConfig: map[string]int{"RG2": 2},
// },
// 1002: {
// spawnConfig: map[string]int{"RG3": 2},
// },
// 1003: {
// spawnConfig: map[string]int{"RG1": 1, "RG2": 1, "RG3": 1},
// },
// 1004: {
// spawnConfig: map[string]int{"RG4": 2, "RG5": 3},
// },
1005: {
spawnConfig: map[string]int{"RG4": 3, "RG5": 2},
},
}
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
suite.catalog = querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.mgr = NewReplicaManager(idAllocator, suite.catalog)
}
func (suite *ReplicaManagerV2Suite) TearDownSuite() {
suite.kv.Close()
}
func (suite *ReplicaManagerV2Suite) TestSpawn() {
mgr := suite.mgr
for id, cfg := range suite.collections {
replicas, err := mgr.Spawn(id, cfg.spawnConfig)
suite.NoError(err)
rgsOfCollection := make(map[string]typeutil.UniqueSet)
for rg := range cfg.spawnConfig {
rgsOfCollection[rg] = suite.rgs[rg]
}
mgr.RecoverNodesInCollection(id, rgsOfCollection)
for rg := range cfg.spawnConfig {
for _, node := range suite.rgs[rg].Collect() {
replica := mgr.GetByCollectionAndNode(id, node)
suite.Contains(replica.GetNodes(), node)
}
}
suite.Len(replicas, cfg.getTotalSpawn())
replicas = mgr.GetByCollection(id)
suite.Len(replicas, cfg.getTotalSpawn())
}
suite.testIfBalanced()
}
func (suite *ReplicaManagerV2Suite) testIfBalanced() {
// If balanced
for id := range suite.collections {
replicas := suite.mgr.GetByCollection(id)
rgToReplica := make(map[string][]*Replica, 0)
for _, r := range replicas {
rgToReplica[r.GetResourceGroup()] = append(rgToReplica[r.GetResourceGroup()], r)
}
for _, replicas := range rgToReplica {
maximumNodes := -1
minimumNodes := -1
nodes := make([]int64, 0)
for _, r := range replicas {
availableNodes := suite.rgs[r.GetResourceGroup()]
if maximumNodes == -1 || r.RWNodesCount() > maximumNodes {
maximumNodes = r.RWNodesCount()
}
if minimumNodes == -1 || r.RWNodesCount() < minimumNodes {
minimumNodes = r.RWNodesCount()
}
nodes = append(nodes, r.GetNodes()...)
r.RangeOverRONodes(func(node int64) bool {
if availableNodes.Contain(node) {
nodes = append(nodes, node)
}
return true
})
}
suite.ElementsMatch(nodes, suite.rgs[replicas[0].GetResourceGroup()].Collect())
suite.True(maximumNodes-minimumNodes <= 1)
}
}
}
func (suite *ReplicaManagerV2Suite) TestTransferReplica() {
suite.mgr.TransferReplica(1005, "RG4", "RG5", 1)
suite.recoverReplica(2, true)
suite.testIfBalanced()
}
func (suite *ReplicaManagerV2Suite) TestTransferReplicaAndAddNode() {
suite.mgr.TransferReplica(1005, "RG4", "RG5", 1)
suite.recoverReplica(1, false)
suite.rgs["RG5"].Insert(16, 17, 18)
suite.recoverReplica(2, true)
suite.testIfBalanced()
}
func (suite *ReplicaManagerV2Suite) TestTransferNode() {
suite.rgs["RG4"].Remove(7)
suite.rgs["RG5"].Insert(7)
suite.recoverReplica(2, true)
suite.testIfBalanced()
}
func (suite *ReplicaManagerV2Suite) recoverReplica(k int, clearOutbound bool) {
// need at least two times to recover the replicas.
// transfer node between replicas need set to outbound and then set to incoming.
for i := 0; i < k; i++ {
// do a recover
for id, cfg := range suite.collections {
rgsOfCollection := make(map[string]typeutil.UniqueSet)
for rg := range cfg.spawnConfig {
rgsOfCollection[rg] = suite.rgs[rg]
}
suite.mgr.RecoverNodesInCollection(id, rgsOfCollection)
}
// clear all outbound nodes
if clearOutbound {
for id := range suite.collections {
replicas := suite.mgr.GetByCollection(id)
for _, r := range replicas {
outboundNodes := r.GetRONodes()
suite.mgr.RemoveNode(r.GetID(), outboundNodes...)
}
}
}
}
}
func TestReplicaManager(t *testing.T) {
suite.Run(t, new(ReplicaManagerSuite))
suite.Run(t, new(ReplicaManagerV2Suite))
}

View File

@ -0,0 +1,175 @@
package meta
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
type ReplicaSuite struct {
suite.Suite
replicaPB *querypb.Replica
}
func (suite *ReplicaSuite) SetupSuite() {
suite.replicaPB = &querypb.Replica{
ID: 1,
CollectionID: 2,
Nodes: []int64{1, 2, 3},
ResourceGroup: DefaultResourceGroupName,
RoNodes: []int64{4},
}
}
func (suite *ReplicaSuite) TestReadOperations() {
r := newReplica(suite.replicaPB)
suite.testRead(r)
// keep same after clone.
mutableReplica := r.copyForWrite()
suite.testRead(mutableReplica.IntoReplica())
}
func (suite *ReplicaSuite) TestClone() {
r := newReplica(suite.replicaPB)
r2 := r.copyForWrite()
suite.testRead(r)
// after apply write operation on copy, the original should not be affected.
r2.AddRWNode(5, 6)
r2.AddRONode(1, 2)
r2.RemoveNode(3)
suite.testRead(r)
}
func (suite *ReplicaSuite) TestRange() {
count := 0
r := newReplica(suite.replicaPB)
r.RangeOverRWNodes(func(nodeID int64) bool {
count++
return true
})
suite.Equal(3, count)
count = 0
r.RangeOverRONodes(func(nodeID int64) bool {
count++
return true
})
suite.Equal(1, count)
count = 0
r.RangeOverRWNodes(func(nodeID int64) bool {
count++
return false
})
suite.Equal(1, count)
mr := r.copyForWrite()
mr.AddRONode(1)
count = 0
mr.RangeOverRWNodes(func(nodeID int64) bool {
count++
return false
})
suite.Equal(1, count)
}
func (suite *ReplicaSuite) TestWriteOperation() {
r := newReplica(suite.replicaPB)
mr := r.copyForWrite()
// test add available node.
suite.False(mr.Contains(5))
suite.False(mr.Contains(6))
mr.AddRWNode(5, 6)
suite.Equal(3, r.RWNodesCount())
suite.Equal(1, r.RONodesCount())
suite.Equal(4, r.NodesCount())
suite.Equal(5, mr.RWNodesCount())
suite.Equal(1, mr.RONodesCount())
suite.Equal(6, mr.NodesCount())
suite.True(mr.Contains(5))
suite.True(mr.Contains(5))
suite.True(mr.Contains(6))
// test add ro node.
suite.False(mr.Contains(4))
suite.False(mr.Contains(7))
mr.AddRWNode(4, 7)
suite.Equal(3, r.RWNodesCount())
suite.Equal(1, r.RONodesCount())
suite.Equal(4, r.NodesCount())
suite.Equal(7, mr.RWNodesCount())
suite.Equal(0, mr.RONodesCount())
suite.Equal(7, mr.NodesCount())
suite.True(mr.Contains(4))
suite.True(mr.Contains(7))
// test remove node to ro.
mr.AddRONode(4, 7)
suite.Equal(3, r.RWNodesCount())
suite.Equal(1, r.RONodesCount())
suite.Equal(4, r.NodesCount())
suite.Equal(5, mr.RWNodesCount())
suite.Equal(2, mr.RONodesCount())
suite.Equal(7, mr.NodesCount())
suite.False(mr.Contains(4))
suite.False(mr.Contains(7))
// test remove node.
mr.RemoveNode(4, 5, 7, 8)
suite.Equal(3, r.RWNodesCount())
suite.Equal(1, r.RONodesCount())
suite.Equal(4, r.NodesCount())
suite.Equal(4, mr.RWNodesCount())
suite.Equal(0, mr.RONodesCount())
suite.Equal(4, mr.NodesCount())
suite.False(mr.Contains(4))
suite.False(mr.Contains(5))
suite.False(mr.Contains(7))
// test set resource group.
mr.SetResourceGroup("rg1")
suite.Equal(r.GetResourceGroup(), DefaultResourceGroupName)
suite.Equal("rg1", mr.GetResourceGroup())
// should panic after IntoReplica.
mr.IntoReplica()
suite.Panics(func() {
mr.SetResourceGroup("newResourceGroup")
})
}
func (suite *ReplicaSuite) testRead(r *Replica) {
// Test GetID()
suite.Equal(suite.replicaPB.GetID(), r.GetID())
// Test GetCollectionID()
suite.Equal(suite.replicaPB.GetCollectionID(), r.GetCollectionID())
// Test GetResourceGroup()
suite.Equal(suite.replicaPB.GetResourceGroup(), r.GetResourceGroup())
// Test GetNodes()
suite.ElementsMatch(suite.replicaPB.GetNodes(), r.GetNodes())
// Test GetRONodes()
suite.ElementsMatch(suite.replicaPB.GetRoNodes(), r.GetRONodes())
// Test AvailableNodesCount()
suite.Equal(len(suite.replicaPB.GetNodes()), r.RWNodesCount())
// Test Contains()
suite.True(r.Contains(1))
suite.False(r.Contains(4))
// Test ContainRONode()
suite.True(r.ContainRONode(4))
}
func TestReplica(t *testing.T) {
suite.Run(t, new(ReplicaSuite))
}

View File

@ -311,6 +311,21 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
return nil
}
// GetNodesOfMultiRG return nodes of multi rg, it can be used to get a consistent view of nodes of multi rg.
func (rm *ResourceManager) GetNodesOfMultiRG(rgName []string) (map[string]typeutil.UniqueSet, error) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
ret := make(map[string]typeutil.UniqueSet)
for _, name := range rgName {
if rm.groups[name] == nil {
return nil, merr.WrapErrResourceGroupNotFound(name)
}
rm.checkRGNodeStatus(name)
ret[name] = typeutil.NewUniqueSet(rm.groups[name].GetNodes()...)
}
return ret, nil
}
func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
@ -323,26 +338,6 @@ func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error) {
return rm.groups[rgName].GetNodes(), nil
}
// return all outbound node
func (rm *ResourceManager) CheckOutboundNodes(replica *Replica) typeutil.UniqueSet {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
if rm.groups[replica.GetResourceGroup()] == nil {
return typeutil.NewUniqueSet()
}
rg := rm.groups[replica.GetResourceGroup()]
ret := typeutil.NewUniqueSet()
for _, node := range replica.GetNodes() {
if !rg.containsNode(node) {
ret.Insert(node)
}
}
return ret
}
// return outgoing node num on each rg from this replica
func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32 {
rm.rwmutex.RLock()
@ -354,15 +349,15 @@ func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[str
rg := rm.groups[replica.GetResourceGroup()]
ret := make(map[string]int32)
for _, node := range replica.GetNodes() {
replica.RangeOverRONodes(func(node int64) bool {
if !rg.containsNode(node) {
rgName, err := rm.findResourceGroupByNode(node)
if err == nil {
ret[rgName]++
}
}
}
return true
})
return ret
}

View File

@ -312,20 +312,6 @@ func (suite *ResourceManagerSuite) TestCheckOutboundNodes() {
suite.manager.AssignNode("rg", 1)
suite.manager.AssignNode("rg", 2)
suite.manager.AssignNode("rg", 3)
replica := NewReplica(
&querypb.Replica{
ID: 1,
CollectionID: 1,
Nodes: []int64{1, 2, 3, 4},
ResourceGroup: "rg",
},
typeutil.NewUniqueSet(1, 2, 3, 4),
)
outboundNodes := suite.manager.CheckOutboundNodes(replica)
suite.Len(outboundNodes, 1)
suite.True(outboundNodes.Contain(4))
}
func (suite *ResourceManagerSuite) TestCheckResourceGroup() {
@ -391,9 +377,10 @@ func (suite *ResourceManagerSuite) TestGetOutboundNode() {
ID: 1,
CollectionID: 100,
ResourceGroup: "rg",
Nodes: []int64{1, 2, 3},
Nodes: []int64{1, 2},
RoNodes: []int64{3},
},
typeutil.NewUniqueSet(1, 2, 3),
typeutil.NewUniqueSet(1, 2),
)
outgoingNodes := suite.manager.GetOutgoingNodeNumByReplica(replica)

View File

@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SegmentDistManagerSuite struct {
@ -125,26 +124,20 @@ func (suite *SegmentDistManagerSuite) TestGetBy() {
suite.Len(segments, 0)
// Test GetBy With Wrong Replica
replica := &Replica{
Replica: &querypb.Replica{
ID: 1,
CollectionID: suite.collection + 1,
Nodes: []int64{suite.nodes[0]},
},
nodes: typeutil.NewUniqueSet(suite.nodes[0]),
}
replica := newReplica(&querypb.Replica{
ID: 1,
CollectionID: suite.collection + 1,
Nodes: []int64{suite.nodes[0]},
})
segments = dist.GetByFilter(WithReplica(replica))
suite.Len(segments, 0)
// Test GetBy With Correct Replica
replica = &Replica{
Replica: &querypb.Replica{
ID: 1,
CollectionID: suite.collection,
Nodes: []int64{suite.nodes[0]},
},
nodes: typeutil.NewUniqueSet(suite.nodes[0]),
}
replica = newReplica(&querypb.Replica{
ID: 1,
CollectionID: suite.collection,
Nodes: []int64{suite.nodes[0]},
})
segments = dist.GetByFilter(WithReplica(replica))
suite.Len(segments, 2)
}

View File

@ -390,10 +390,10 @@ func (suite *CollectionObserverSuite) loadAll() {
func (suite *CollectionObserverSuite) load(collection int64) {
// Mock meta data
replicas, err := suite.meta.ReplicaManager.Spawn(collection, suite.replicaNumber[collection], meta.DefaultResourceGroupName)
replicas, err := suite.meta.ReplicaManager.Spawn(collection, map[string]int{meta.DefaultResourceGroupName: int(suite.replicaNumber[collection])})
suite.NoError(err)
for _, replica := range replicas {
replica.AddNode(suite.nodes...)
replica.AddRWNode(suite.nodes...)
}
err = suite.meta.ReplicaManager.Put(replicas...)
suite.NoError(err)

View File

@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)
// check replica, find outbound nodes and remove it from replica if all segment/channel has been moved
// check replica, find read only nodes and remove it from replica if all segment/channel has been moved
type ReplicaObserver struct {
cancel context.CancelFunc
wg sync.WaitGroup
@ -85,46 +85,45 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
log := log.Ctx(context.Background()).WithRateGroup("qcv2.replicaObserver", 1, 60)
collections := ob.meta.GetAll()
for _, collectionID := range collections {
removedNodes := make([]int64, 0)
// remove nodes from replica which has been transferred to other rg
utils.RecoverReplicaOfCollection(ob.meta, collectionID)
}
// check all ro nodes, remove it from replica if all segment/channel has been moved
for _, collectionID := range collections {
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
outboundNodes := ob.meta.ResourceManager.CheckOutboundNodes(replica)
if len(outboundNodes) > 0 {
log.RatedInfo(10, "found outbound nodes in replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("allOutboundNodes", outboundNodes.Collect()),
)
for node := range outboundNodes {
segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node))
channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
if len(channels) == 0 && len(segments) == 0 {
replica.RemoveNode(node)
removedNodes = append(removedNodes, node)
log.Info("all segment/channel has been removed from outbound node, remove it from replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("removedNodes", node),
zap.Int64s("availableNodes", replica.GetNodes()),
)
}
}
}
}
// assign removed nodes to other replicas in current rg
for _, node := range removedNodes {
rg, err := ob.meta.ResourceManager.FindResourceGroupByNode(node)
if err != nil {
// unreachable logic path
log.Warn("found node which does not belong to any resource group", zap.Int64("nodeID", node))
roNodes := replica.GetRONodes()
if len(roNodes) == 0 {
continue
}
replicas := ob.meta.ReplicaManager.GetByCollectionAndRG(collectionID, rg)
utils.AddNodesToReplicas(ob.meta, replicas, node)
log.RatedInfo(10, "found ro nodes in replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("RONodes", roNodes),
)
removeNodes := make([]int64, 0, len(roNodes))
for _, node := range roNodes {
channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node))
if len(channels) == 0 && len(segments) == 0 {
removeNodes = append(removeNodes, node)
}
}
if len(removeNodes) == 0 {
continue
}
logger := log.With(
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("removedNodes", removeNodes),
zap.Int64s("roNodes", roNodes),
zap.Int64s("availableNodes", replica.GetNodes()),
)
if err := ob.meta.ReplicaManager.RemoveNode(replica.GetID(), removeNodes...); err != nil {
logger.Warn("fail to remove node from replica", zap.Error(err))
continue
}
logger.Info("all segment/channel has been removed from ro node, try to remove it from replica")
}
}
}

View File

@ -147,7 +147,7 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
suite.Eventually(func() bool {
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return suite.Contains(replica0.GetNodes(), int64(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1)
return (replica0.Contains(3) || replica0.ContainRONode(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1)
}, 6*time.Second, 2*time.Second)
suite.distMgr.ChannelDistManager.Update(3)
@ -156,7 +156,7 @@ func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
suite.Eventually(func() bool {
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return suite.NotContains(replica0.GetNodes(), int64(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2)
return (!replica0.Contains(3) && !replica0.ContainRONode(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2)
}, 6*time.Second, 2*time.Second)
}

View File

@ -105,9 +105,10 @@ func (ob *ResourceObserver) checkResourceGroup() {
zap.Error(err),
)
}
utils.AddNodesToCollectionsInRG(ob.meta, rgName, nodes...)
}
}
}
if enableRGAutoRecover {
utils.RecoverAllCollection(ob.meta)
}
}

View File

@ -91,7 +91,7 @@ func (suite *ResourceObserverSuite) SetupTest() {
func (suite *ResourceObserverSuite) TestCheckNodesInReplica() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
@ -176,7 +176,7 @@ func (suite *ResourceObserverSuite) TestRecoverResourceGroupFailed() {
func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
suite.store.EXPECT().SaveCollection(mock.Anything).Return(nil)
suite.store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(2)
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Times(2)
suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
suite.meta.ReplicaManager.Put(meta.NewReplica(
&querypb.Replica{
@ -199,7 +199,7 @@ func (suite *ResourceObserverSuite) TestRecoverReplicaFailed() {
typeutil.NewUniqueSet(),
))
suite.store.EXPECT().SaveReplica(mock.Anything).Return(errors.New("store error"))
suite.store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(errors.New("store error"))
suite.meta.ResourceManager.AddResourceGroup("rg")
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(100),

View File

@ -92,9 +92,9 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.NoError(err)
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
suite.NoError(err)
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1, meta.DefaultResourceGroupName)
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, map[string]int{meta.DefaultResourceGroupName: 1})
suite.NoError(err)
replicas[0].AddNode(2)
replicas[0].AddRWNode(2)
err = suite.meta.ReplicaManager.Put(replicas...)
suite.NoError(err)
@ -276,9 +276,9 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
suite.NoError(err)
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
suite.NoError(err)
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1, meta.DefaultResourceGroupName)
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, map[string]int{meta.DefaultResourceGroupName: 1})
suite.NoError(err)
replicas[0].AddNode(2)
replicas[0].AddRWNode(2)
err = suite.meta.ReplicaManager.Put(replicas...)
suite.NoError(err)
}

View File

@ -276,9 +276,7 @@ func (s *Server) TransferSegment(ctx context.Context, req *querypb.TransferSegme
// when no dst node specified, default to use all other nodes in same
dstNodeSet := typeutil.NewUniqueSet()
if req.GetToAllNodes() {
outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) })
dstNodeSet.Insert(availableNodes...)
dstNodeSet.Insert(replica.GetNodes()...)
} else {
// check whether dstNode is healthy
if err := s.isStoppingNode(req.GetTargetNodeID()); err != nil {
@ -350,9 +348,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann
// when no dst node specified, default to use all other nodes in same
dstNodeSet := typeutil.NewUniqueSet()
if req.GetToAllNodes() {
outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) })
dstNodeSet.Insert(availableNodes...)
dstNodeSet.Insert(replica.GetNodes()...)
} else {
// check whether dstNode is healthy
if err := s.isStoppingNode(req.GetTargetNodeID()); err != nil {

View File

@ -722,7 +722,7 @@ func (s *Server) handleNodeUp(node int64) {
zap.String("resourceGroup", rgName),
)
utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
utils.RecoverAllCollection(s.meta)
}
func (s *Server) handleNodeDown(node int64) {
@ -776,7 +776,6 @@ func (s *Server) checkReplicas() {
log := log.With(zap.Int64("collectionID", collection))
replicas := s.meta.ReplicaManager.GetByCollection(collection)
for _, replica := range replicas {
replica := replica.Clone()
toRemove := make([]int64, 0)
for _, node := range replica.GetNodes() {
if s.nodeMgr.Get(node) == nil {
@ -790,9 +789,7 @@ func (s *Server) checkReplicas() {
zap.Int64s("offlineNodes", toRemove),
)
log.Info("some nodes are offline, remove them from replica", zap.Any("toRemove", toRemove))
replica.RemoveNode(toRemove...)
err := s.meta.ReplicaManager.Put(replica)
if err != nil {
if err := s.meta.ReplicaManager.RemoveNode(replica.GetID(), toRemove...); err != nil {
log.Warn("failed to remove offline nodes from replica")
}
}

View File

@ -686,9 +686,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
// when no dst node specified, default to use all other nodes in same
dstNodeSet := typeutil.NewUniqueSet()
if len(req.GetDstNodeIDs()) == 0 {
outboundNodes := s.meta.ResourceManager.CheckOutboundNodes(replica)
availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { return !outboundNodes.Contain(node) })
dstNodeSet.Insert(availableNodes...)
dstNodeSet.Insert(replica.GetNodes()...)
} else {
for _, dstNode := range req.GetDstNodeIDs() {
if !replica.Contains(dstNode) {
@ -1086,30 +1084,14 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
return merr.Status(err), nil
}
replicasInSource := s.meta.ReplicaManager.GetByResourceGroup(req.GetSourceResourceGroup())
replicasInTarget := s.meta.ReplicaManager.GetByResourceGroup(req.GetTargetResourceGroup())
loadSameCollection := false
sameCollectionID := int64(0)
for _, r1 := range replicasInSource {
for _, r2 := range replicasInTarget {
if r1.GetCollectionID() == r2.GetCollectionID() {
loadSameCollection = true
sameCollectionID = r1.GetCollectionID()
}
}
}
if loadSameCollection {
err := merr.WrapErrParameterInvalid("resource groups load not the same collection", fmt.Sprintf("collection %d loaded for both", sameCollectionID))
return merr.Status(err), nil
}
nodes, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
// Move node from source resource group to target resource group.
_, err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn("failed to transfer node", zap.Error(err))
return merr.Status(err), nil
}
utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...)
// Recover all replica on the source and target resource group.
utils.RecoverAllCollection(s.meta)
return merr.Success(), nil
}
@ -1127,6 +1109,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
return merr.Status(err), nil
}
// TODO: !!!WARNING, replica manager and resource manager doesn't protected with each other by lock.
if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok {
err := merr.WrapErrResourceGroupNotFound(req.GetSourceResourceGroup())
return merr.Status(errors.Wrap(err,
@ -1144,50 +1127,9 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
return merr.Status(err), nil
}
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
err := merr.WrapErrParameterInvalid("NumReplica not greater than the number of replica in source resource group", fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup()))
return merr.Status(err), nil
}
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
if len(replicas) > 0 {
err := merr.WrapErrParameterInvalid("no same collection in target resource group", fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetTargetResourceGroup()))
return merr.Status(err), nil
}
replicas = s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) &&
len(replicas) != int(req.GetNumReplica()) {
err := merr.WrapErrParameterInvalid("tranfer all replicas from/to default resource group",
fmt.Sprintf("try to transfer %d replicas from/to but %d replicas exist", req.GetNumReplica(), len(replicas)))
return merr.Status(err), nil
}
err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()])
if err != nil {
return merr.Status(err), nil
}
return merr.Success(), nil
}
func (s *Server) transferReplica(targetRG string, replicas []*meta.Replica) error {
ret := make([]*meta.Replica, 0)
for _, replica := range replicas {
newReplica := replica.Clone()
newReplica.ResourceGroup = targetRG
ret = append(ret, newReplica)
}
err := utils.AssignNodesToReplicas(s.meta, targetRG, ret...)
if err != nil {
return err
}
return s.meta.ReplicaManager.Put(ret...)
// Apply change into replica manager.
err := s.meta.TransferReplica(req.GetCollectionID(), req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumReplica()))
return merr.Status(err), nil
}
func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {

View File

@ -541,13 +541,6 @@ func (suite *ServiceSuite) TestTransferNode() {
},
typeutil.NewUniqueSet(),
))
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg1",
TargetResourceGroup: "rg2",
NumNode: 1,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
// test transfer node meet non-exist source rg
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
@ -745,7 +738,8 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 1,
})
suite.NoError(err)
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
// we support dynamically increase replica num in resource group now.
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
@ -754,14 +748,24 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 1,
})
suite.NoError(err)
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
// we support transfer replica to resource group load same collection.
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(1))
suite.Equal(3, replicaNum)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg3",
CollectionID: 1,
NumReplica: int64(replicaNum),
NumReplica: 2,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: "rg1",
TargetResourceGroup: "rg3",
CollectionID: 1,
NumReplica: 1,
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
@ -1232,8 +1236,8 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() {
// update two collection's dist
for _, collection := range suite.collections {
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
replicas[0].AddNode(srcNode)
replicas[0].AddNode(dstNode)
replicas[0].AddRWNode(srcNode)
replicas[0].AddRWNode(dstNode)
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
for partition, segments := range suite.segments[collection] {
@ -1258,8 +1262,8 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() {
defer func() {
for _, collection := range suite.collections {
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
replicas[0].RemoveNode(srcNode)
replicas[0].RemoveNode(dstNode)
suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), srcNode)
suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), dstNode)
}
suite.nodeMgr.Remove(1001)
suite.nodeMgr.Remove(1002)
@ -1377,7 +1381,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.Contains(resp.Reason, "mock error")
suite.meta.ReplicaManager.AddNode(replicas[0].ID, 10)
suite.meta.ReplicaManager.RecoverNodesInCollection(collection, map[string]typeutil.UniqueSet{meta.DefaultResourceGroupName: typeutil.NewUniqueSet(10)})
req.SourceNodeIDs = []int64{10}
resp, err = server.LoadBalance(ctx, req)
suite.NoError(err)
@ -1399,7 +1403,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
suite.nodeMgr.Remove(10)
suite.meta.ReplicaManager.RemoveNode(replicas[0].ID, 10)
suite.meta.ReplicaManager.RemoveNode(replicas[0].GetID(), 10)
}
}
@ -1697,8 +1701,8 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
}))
server.handleNodeUp(111)
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 1)
suite.Equal(int64(111), nodes[0])
nodesInRG, _ := suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)
log.Info("handleNodeUp")
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
@ -1710,9 +1714,8 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
}))
server.handleNodeUp(222)
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
suite.Len(nodes, 2)
suite.Contains(nodes, int64(111))
suite.Contains(nodes, int64(222))
nodesInRG, _ = suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)
}
func (suite *ServiceSuite) loadAll() {

View File

@ -17,9 +17,6 @@
package utils
import (
"math/rand"
"sort"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -28,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
@ -72,7 +70,7 @@ func GroupNodesByReplica(replicaMgr *meta.ReplicaManager, collectionID int64, no
for _, replica := range replicas {
for _, node := range nodes {
if replica.Contains(node) {
ret[replica.ID] = append(ret[replica.ID], node)
ret[replica.GetID()] = append(ret[replica.GetID()], node)
}
}
}
@ -98,142 +96,91 @@ func GroupSegmentsByReplica(replicaMgr *meta.ReplicaManager, collectionID int64,
for _, replica := range replicas {
for _, segment := range segments {
if replica.Contains(segment.Node) {
ret[replica.ID] = append(ret[replica.ID], segment)
ret[replica.GetID()] = append(ret[replica.GetID()], segment)
}
}
}
return ret
}
// AssignNodesToReplicas assigns nodes to the given replicas,
// all given replicas must be the same collection,
// the given replicas have to be not in ReplicaManager
func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replica) error {
replicaIDs := lo.Map(replicas, func(r *meta.Replica, _ int) int64 { return r.GetID() })
log := log.With(zap.Int64("collectionID", replicas[0].GetCollectionID()),
zap.Int64s("replicas", replicaIDs),
zap.String("rgName", rgName),
)
if len(replicaIDs) == 0 {
return nil
}
nodeGroup, err := m.ResourceManager.GetNodes(rgName)
if err != nil {
log.Warn("failed to get nodes", zap.Error(err))
return err
}
if len(nodeGroup) < len(replicaIDs) {
log.Warn(meta.ErrNodeNotEnough.Error(), zap.Error(meta.ErrNodeNotEnough))
return meta.ErrNodeNotEnough
}
rand.Shuffle(len(nodeGroup), func(i, j int) {
nodeGroup[i], nodeGroup[j] = nodeGroup[j], nodeGroup[i]
})
log.Info("assign nodes to replicas",
zap.Int64s("nodes", nodeGroup),
)
for i, node := range nodeGroup {
replicas[i%len(replicas)].AddNode(node)
}
return nil
}
// add nodes to all collections in rgName
// for each collection, add node to replica with least number of nodes
func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) {
for _, node := range nodes {
for _, collection := range m.CollectionManager.GetAll() {
replica := m.ReplicaManager.GetByCollectionAndNode(collection, node)
if replica == nil {
replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName)
AddNodesToReplicas(m, replicas, node)
}
}
}
}
func AddNodesToReplicas(m *meta.Meta, replicas []*meta.Replica, node int64) {
if len(replicas) == 0 {
// RecoverReplicaOfCollection recovers all replica of collection with latest resource group.
func RecoverReplicaOfCollection(m *meta.Meta, collectionID typeutil.UniqueID) {
logger := log.With(zap.Int64("collectionID", collectionID))
rgNames := m.ReplicaManager.GetResourceGroupByCollection(collectionID)
if rgNames.Len() == 0 {
logger.Error("no resource group found for collection", zap.Int64("collectionID", collectionID))
return
}
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].Len() < replicas[j].Len()
})
replica := replicas[0]
// TODO(yah01): this may fail, need a component to check whether a node is assigned
err := m.ReplicaManager.AddNode(replica.GetID(), node)
rgs, err := m.ResourceManager.GetNodesOfMultiRG(rgNames.Collect())
if err != nil {
log.Warn("failed to assign node to replicas",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeId", node),
zap.Error(err),
)
logger.Error("unreachable code as expected, fail to get resource group for replica", zap.Error(err))
return
}
log.Info("assign node to replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeID", node),
)
if err := m.ReplicaManager.RecoverNodesInCollection(collectionID, rgs); err != nil {
logger.Warn("fail to set available nodes in replica", zap.Error(err))
}
}
// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them
func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) {
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName)
if err != nil {
return nil, err
// RecoverAllCollectionrecovers all replica of all collection in resource group.
func RecoverAllCollection(m *meta.Meta) {
for _, collection := range m.CollectionManager.GetAll() {
RecoverReplicaOfCollection(m, collection)
}
err = AssignNodesToReplicas(m, rgName, replicas...)
if err != nil {
return nil, err
}
return replicas, m.ReplicaManager.Put(replicas...)
}
func checkResourceGroup(collectionID int64, replicaNumber int32, resourceGroups []string) error {
func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) {
if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) {
return ErrUseWrongNumRG
return nil, ErrUseWrongNumRG
}
return nil
replicaNumInRG := make(map[string]int)
if len(resourceGroups) == 0 {
// All replicas should be spawned in default resource group.
replicaNumInRG[meta.DefaultResourceGroupName] = int(replicaNumber)
} else if len(resourceGroups) == 1 {
// All replicas should be spawned in the given resource group.
replicaNumInRG[resourceGroups[0]] = int(replicaNumber)
} else {
// replicas should be spawned in different resource groups one by one.
for _, rgName := range resourceGroups {
replicaNumInRG[rgName] += 1
}
}
// TODO: !!!Warning, ResourceManager and ReplicaManager doesn't protected with each other in concurrent operation.
// 1. replica1 got rg1's node snapshot but doesn't spawn finished.
// 2. rg1 is removed.
// 3. replica1 spawn finished, but cannot find related resource group.
for rgName, num := range replicaNumInRG {
if !m.ContainResourceGroup(rgName) {
return nil, ErrGetNodesFromRG
}
nodes, err := m.ResourceManager.GetNodes(rgName)
if err != nil {
return nil, err
}
if num > len(nodes) {
log.Warn("node not enough", zap.Error(meta.ErrNodeNotEnough), zap.Int("replicaNum", num), zap.Int("nodeNum", len(nodes)), zap.String("rgName", rgName))
return nil, meta.ErrNodeNotEnough
}
}
return replicaNumInRG, nil
}
// SpawnReplicasWithRG spawns replicas in rgs one by one for given collection.
func SpawnReplicasWithRG(m *meta.Meta, collection int64, resourceGroups []string, replicaNumber int32) ([]*meta.Replica, error) {
if err := checkResourceGroup(collection, replicaNumber, resourceGroups); err != nil {
replicaNumInRG, err := checkResourceGroup(m, resourceGroups, replicaNumber)
if err != nil {
return nil, err
}
if len(resourceGroups) == 0 {
return SpawnAllReplicasInRG(m, collection, replicaNumber, meta.DefaultResourceGroupName)
// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumInRG)
if err != nil {
return nil, err
}
if len(resourceGroups) == 1 {
return SpawnAllReplicasInRG(m, collection, replicaNumber, resourceGroups[0])
}
replicaSet := make([]*meta.Replica, 0)
for _, rgName := range resourceGroups {
if !m.ResourceManager.ContainResourceGroup(rgName) {
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
replicas, err := m.ReplicaManager.Spawn(collection, 1, rgName)
if err != nil {
return nil, err
}
err = AssignNodesToReplicas(m, rgName, replicas...)
if err != nil {
return nil, err
}
replicaSet = append(replicaSet, replicas...)
}
return replicaSet, m.ReplicaManager.Put(replicaSet...)
// Active recover it.
RecoverReplicaOfCollection(m, collection)
return replicas, nil
}

View File

@ -93,14 +93,14 @@ func TestSpawnReplicasWithRG(t *testing.T) {
{
name: "test 3 replica on 2 rg",
args: args{m, 1000, []string{"rg1", "rg2"}, 3},
args: args{m, 1001, []string{"rg1", "rg2"}, 3},
wantReplicaNum: 0,
wantErr: true,
},
{
name: "test 3 replica on 3 rg",
args: args{m, 1000, []string{"rg1", "rg2", "rg3"}, 3},
args: args{m, 1002, []string{"rg1", "rg2", "rg3"}, 3},
wantReplicaNum: 3,
wantErr: false,
},
@ -127,6 +127,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil).Times(4)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
@ -174,7 +175,7 @@ func TestAddNodesToCollectionsInRGFailed(t *testing.T) {
storeErr := errors.New("store error")
store.EXPECT().SaveReplica(mock.Anything).Return(storeErr)
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
RecoverAllCollection(m)
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 0)
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 0)
@ -188,7 +189,9 @@ func TestAddNodesToCollectionsInRG(t *testing.T) {
store := mocks.NewQueryCoordCatalog(t)
store.EXPECT().SaveCollection(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything).Return(nil)
store.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil)
store.EXPECT().SaveResourceGroup(mock.Anything).Return(nil)
store.EXPECT().SaveResourceGroup(mock.Anything, mock.Anything).Return(nil)
nodeMgr := session.NewNodeManager()
m := meta.NewMeta(RandomIncrementIDAllocator(), store, nodeMgr)
m.ResourceManager.AddResourceGroup("rg")
@ -233,8 +236,33 @@ func TestAddNodesToCollectionsInRG(t *testing.T) {
},
typeutil.NewUniqueSet(),
))
AddNodesToCollectionsInRG(m, "rg", []int64{1, 2, 3, 4}...)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
}))
_, err := m.ResourceManager.HandleNodeUp(1)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(2)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(3)
assert.NoError(t, err)
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
Address: "localhost",
}))
_, err = m.ResourceManager.HandleNodeUp(4)
assert.NoError(t, err)
_, err = m.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg", 4)
assert.NoError(t, err)
RecoverAllCollection(m)
assert.Len(t, m.ReplicaManager.Get(1).GetNodes(), 2)
assert.Len(t, m.ReplicaManager.Get(2).GetNodes(), 2)

View File

@ -109,6 +109,24 @@ func (set Set[T]) Len() int {
return len(set)
}
// Range iterates over elements in the set
func (set Set[T]) Range(f func(element T) bool) {
for elem := range set {
if !f(elem) {
break
}
}
}
// Clone returns a new set with the same elements
func (set Set[T]) Clone() Set[T] {
ret := make(Set[T], set.Len())
for elem := range set {
ret.Insert(elem)
}
return ret
}
type ConcurrentSet[T comparable] struct {
inner sync.Map
}

View File

@ -35,6 +35,19 @@ func TestUniqueSet(t *testing.T) {
assert.False(t, set.Contain(7))
assert.True(t, set.Contain(9))
assert.False(t, set.Contain(5, 7, 9))
count := 0
set.Range(func(element UniqueID) bool {
count++
return true
})
assert.Equal(t, set.Len(), count)
count = 0
set.Range(func(element UniqueID) bool {
count++
return false
})
assert.Equal(t, 1, count)
}
func TestUniqueSetClear(t *testing.T) {