enhance: [10kcp] Refine Replica manager colle2Replicas secondary index (#37907)

Related to #37630

This PR add a new util coll2Replicas secondary index to reduce map
access & iteration while get replicas by collection

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38249/head
congqixia 2024-12-05 11:57:29 +08:00 committed by GitHub
parent d75fb5b3f8
commit c4df6b5910
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 60 additions and 41 deletions

View File

@ -35,18 +35,43 @@ import (
type ReplicaManager struct {
rwmutex sync.RWMutex
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
coll2Replicas map[typeutil.UniqueID]*collectionReplicas // typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
}
// collectionReplicas maintains collection secondary index mapping
type collectionReplicas struct {
id2replicas map[typeutil.UniqueID]*Replica
replicas []*Replica
}
func (crs *collectionReplicas) removeReplicas(replicaIDs ...int64) (empty bool) {
for _, replicaID := range replicaIDs {
delete(crs.id2replicas, replicaID)
}
crs.replicas = lo.Values(crs.id2replicas)
return len(crs.replicas) == 0
}
func (crs *collectionReplicas) putReplica(replica *Replica) {
crs.id2replicas[replica.GetID()] = replica
crs.replicas = lo.Values(crs.id2replicas)
}
func newCollectionReplicas() *collectionReplicas {
return &collectionReplicas{
id2replicas: make(map[typeutil.UniqueID]*Replica),
}
}
func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager {
return &ReplicaManager{
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet),
catalog: catalog,
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
coll2Replicas: make(map[int64]*collectionReplicas),
catalog: catalog,
}
}
@ -163,10 +188,10 @@ func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) {
m.replicas[replica.GetID()] = replica
// update collIDToReplicaIDs.
if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil {
m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet()
if m.coll2Replicas[replica.GetCollectionID()] == nil {
m.coll2Replicas[replica.GetCollectionID()] = newCollectionReplicas()
}
m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID())
m.coll2Replicas[replica.GetCollectionID()].putReplica(replica)
}
}
@ -217,7 +242,7 @@ func (m *ReplicaManager) MoveReplica(dstRGName string, toMove []*Replica) error
// getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName.
func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) {
// Check if collection is loaded.
if m.collIDToReplicaIDs[collectionID] == nil {
if m.coll2Replicas[collectionID] == nil {
return nil, merr.WrapErrParameterInvalid(
"Collection not loaded",
fmt.Sprintf("collectionID %d", collectionID),
@ -248,11 +273,14 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error
if err != nil {
return err
}
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for replicaID := range m.collIDToReplicaIDs[collectionID] {
delete(m.replicas, replicaID)
if collReplicas, ok := m.coll2Replicas[collectionID]; ok {
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for _, replica := range collReplicas.replicas {
delete(m.replicas, replica.GetID())
}
delete(m.coll2Replicas, collectionID)
}
delete(m.collIDToReplicaIDs, collectionID)
return nil
}
@ -275,9 +303,8 @@ func (m *ReplicaManager) removeReplicas(collectionID typeutil.UniqueID, replicas
delete(m.replicas, replica)
}
m.collIDToReplicaIDs[collectionID].Remove(replicas...)
if m.collIDToReplicaIDs[collectionID].Len() == 0 {
delete(m.collIDToReplicaIDs, collectionID)
if m.coll2Replicas[collectionID].removeReplicas(replicas...) {
delete(m.coll2Replicas, collectionID)
}
return nil
@ -290,22 +317,20 @@ func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Repl
}
func (m *ReplicaManager) getByCollection(collectionID typeutil.UniqueID) []*Replica {
replicas := make([]*Replica, 0)
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replicas = append(replicas, m.replicas[replicaID])
}
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return nil
}
return replicas
return collReplicas.replicas
}
func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.UniqueID) *Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replica := m.replicas[replicaID]
if m.coll2Replicas[collectionID] != nil {
for _, replica := range m.coll2Replicas[collectionID].replicas {
if replica.Contains(nodeID) {
return replica
}
@ -330,19 +355,14 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica {
}
func (m *ReplicaManager) getByCollectionAndRG(collectionID int64, rgName string) []*Replica {
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return make([]*Replica, 0)
return nil
}
ret := make([]*Replica, 0)
replicaIDs.Range(func(replicaID typeutil.UniqueID) bool {
if m.replicas[replicaID].GetResourceGroup() == rgName {
ret = append(ret, m.replicas[replicaID])
}
return true
return lo.Filter(collReplicas.replicas, func(replica *Replica, _ int) bool {
return replica.GetResourceGroup() == rgName
})
return ret
}
func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica {
@ -426,17 +446,16 @@ func (m *ReplicaManager) validateResourceGroups(rgs map[string]typeutil.UniqueSe
// getCollectionAssignmentHelper checks if the collection is recoverable and group replicas by resource group.
func (m *ReplicaManager) getCollectionAssignmentHelper(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) (*collectionAssignmentHelper, error) {
// check if the collection is exist.
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return nil, errors.Errorf("collection %d not loaded", collectionID)
}
rgToReplicas := make(map[string][]*Replica)
for replicaID := range replicaIDs {
replica := m.replicas[replicaID]
for _, replica := range collReplicas.replicas {
rgName := replica.GetResourceGroup()
if _, ok := rgs[rgName]; !ok {
return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replicaID, rgName)
return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replica.GetID(), rgName)
}
if _, ok := rgToReplicas[rgName]; !ok {
rgToReplicas[rgName] = make([]*Replica, 0)