enhance: speed up GetByCollection/AndNode (2.4 patch) (#32234)

Related to https://github.com/milvus-io/milvus/issues/32165

Avoid iterating through all replicas/collections if possible. Iteration
is expensive when there are large number of replicas/collections.

Signed-off-by: yiwangdr <yiwangdr@gmail.com>
pull/32290/head
yiwangdr 2024-04-15 04:53:21 -07:00 committed by GitHub
parent f6377a5018
commit 018a784989
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 47 additions and 28 deletions

View File

@ -103,16 +103,18 @@ func (replica *Replica) Clone() *Replica {
type ReplicaManager struct {
rwmutex sync.RWMutex
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
catalog metastore.QueryCoordCatalog
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
}
func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager {
return &ReplicaManager{
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
catalog: catalog,
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet),
catalog: catalog,
}
}
@ -130,10 +132,10 @@ func (m *ReplicaManager) Recover(collections []int64) error {
}
if collectionSet.Contain(replica.GetCollectionID()) {
m.replicas[replica.GetID()] = &Replica{
m.putReplicaInMemory(&Replica{
Replica: replica,
nodes: typeutil.NewUniqueSet(replica.GetNodes()...),
}
})
log.Info("recover replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
@ -205,11 +207,25 @@ func (m *ReplicaManager) put(replicas ...*Replica) error {
if err != nil {
return err
}
m.replicas[replica.ID] = replica
m.putReplicaInMemory(replicas...)
}
return nil
}
// putReplicaInMemory puts replicas into in-memory map and collIDToReplicaIDs.
func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) {
for _, replica := range replicas {
// update in-memory replicas.
m.replicas[replica.GetID()] = replica
// update collIDToReplicaIDs.
if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil {
m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet()
}
m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID())
}
}
// RemoveCollection removes replicas of given collection,
// returns error if failed to remove replica from KV
func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error {
@ -220,11 +236,11 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error
if err != nil {
return err
}
for id, replica := range m.replicas {
if replica.CollectionID == collectionID {
delete(m.replicas, id)
}
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for replicaID := range m.collIDToReplicaIDs[collectionID] {
delete(m.replicas, replicaID)
}
delete(m.collIDToReplicaIDs, collectionID)
return nil
}
@ -233,9 +249,9 @@ func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Repl
defer m.rwmutex.RUnlock()
replicas := make([]*Replica, 0)
for _, replica := range m.replicas {
if replica.CollectionID == collectionID {
replicas = append(replicas, replica)
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replicas = append(replicas, m.replicas[replicaID])
}
}
@ -246,9 +262,12 @@ func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.Un
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
for _, replica := range m.replicas {
if replica.CollectionID == collectionID && replica.nodes.Contain(nodeID) {
return replica
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replica := m.replicas[replicaID]
if replica.Contains(nodeID) {
return replica
}
}
}
@ -274,12 +293,13 @@ func (m *ReplicaManager) GetByCollectionAndRG(collectionID int64, rgName string)
defer m.rwmutex.RUnlock()
ret := make([]*Replica, 0)
for _, replica := range m.replicas {
if replica.GetCollectionID() == collectionID && replica.GetResourceGroup() == rgName {
ret = append(ret, replica)
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
if m.replicas[replicaID].GetResourceGroup() == rgName {
ret = append(ret, m.replicas[replicaID])
}
}
}
return ret
}
@ -325,16 +345,15 @@ func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeut
return m.put(replica)
}
func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.UniqueID) typeutil.Set[string] {
func (m *ReplicaManager) GetResourceGroupByCollection(collectionID typeutil.UniqueID) typeutil.Set[string] {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
ret := typeutil.NewSet[string]()
for _, r := range m.replicas {
if r.GetCollectionID() == collection {
ret.Insert(r.GetResourceGroup())
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
ret.Insert(m.replicas[replicaID].GetResourceGroup())
}
}
return ret
}