fix: Make `ResourceGroup.nodes` concurrent safe (#32159)

See also #32158

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32118/head
congqixia 2024-04-11 17:53:18 +08:00 committed by GitHub
parent 0bfe130991
commit b9a487608a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 9 deletions

View File

@ -55,13 +55,13 @@ var DefaultResourceGroupName = "__default_resource_group"
var DefaultResourceGroupCapacity = 1000000
type ResourceGroup struct {
nodes typeutil.UniqueSet
nodes *typeutil.ConcurrentSet[int64] // typeutil.UniqueSet
capacity int
}
func NewResourceGroup(capacity int) *ResourceGroup {
rg := &ResourceGroup{
nodes: typeutil.NewUniqueSet(),
nodes: typeutil.NewConcurrentSet[int64](),
capacity: capacity,
}
@ -95,7 +95,7 @@ func (rg *ResourceGroup) unassignNode(id int64, deltaCapacity int) error {
}
func (rg *ResourceGroup) LackOfNodes() int {
return rg.capacity - len(rg.nodes)
return rg.capacity - len(rg.nodes.Collect())
}
func (rg *ResourceGroup) containsNode(id int64) bool {
@ -271,11 +271,12 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
}
newNodes := make([]int64, 0)
for nid := range rm.groups[rgName].nodes {
if nid != node {
newNodes = append(newNodes, nid)
rm.groups[rgName].nodes.Range(func(nodeID int64) bool {
if nodeID != node {
newNodes = append(newNodes, nodeID)
}
}
return true
})
deltaCapacity := -1
if rgName == DefaultResourceGroupName {
@ -509,7 +510,7 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([]
rm.checkRGNodeStatus(from)
rm.checkRGNodeStatus(to)
if len(rm.groups[from].nodes) < numNode {
if len(rm.groups[from].nodes.Collect()) < numNode {
return nil, ErrNodeNotEnough
}

View File

@ -509,7 +509,7 @@ func (suite *ResourceManagerSuite) TestStoreFailed() {
suite.ErrorIs(err, storeErr)
manager.groups["rg"] = &ResourceGroup{
nodes: typeutil.NewUniqueSet(),
nodes: typeutil.NewConcurrentSet[int64](),
capacity: 0,
}