mirror of https://github.com/milvus-io/milvus.git
Clear stale replicas (#20456)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/20488/head
parent
174310a14e
commit
c71c6378ff
|
@ -180,8 +180,14 @@ func (job *LoadCollectionJob) Execute() error {
|
|||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
)
|
||||
|
||||
// Clear stale replicas
|
||||
err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
|
||||
if err != nil {
|
||||
log.Warn("failed to clear stale replicas", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Create replicas
|
||||
// TODO(yah01): store replicas and collection atomically
|
||||
replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager,
|
||||
job.nodeMgr,
|
||||
req.GetCollectionID(),
|
||||
|
@ -381,8 +387,14 @@ func (job *LoadPartitionJob) Execute() error {
|
|||
zap.Int64s("partitionIDs", req.GetPartitionIDs()),
|
||||
)
|
||||
|
||||
// Clear stale replicas
|
||||
err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
|
||||
if err != nil {
|
||||
log.Warn("failed to clear stale replicas", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Create replicas
|
||||
// TODO(yah01): store replicas and collection atomically
|
||||
replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager,
|
||||
job.nodeMgr,
|
||||
req.GetCollectionID(),
|
||||
|
|
|
@ -21,8 +21,11 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
. "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Replica struct {
|
||||
|
@ -64,15 +67,34 @@ func NewReplicaManager(idAllocator func() (int64, error), store Store) *ReplicaM
|
|||
}
|
||||
|
||||
// Recover recovers the replicas for given collections from meta store
|
||||
func (m *ReplicaManager) Recover() error {
|
||||
func (m *ReplicaManager) Recover(collections []int64) error {
|
||||
replicas, err := m.store.GetReplicas()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to recover replicas, err=%w", err)
|
||||
}
|
||||
|
||||
collectionSet := typeutil.NewUniqueSet(collections...)
|
||||
for _, replica := range replicas {
|
||||
m.replicas[replica.GetID()] = &Replica{
|
||||
Replica: replica,
|
||||
Nodes: NewUniqueSet(replica.GetNodes()...),
|
||||
if collectionSet.Contain(replica.GetCollectionID()) {
|
||||
m.replicas[replica.GetID()] = &Replica{
|
||||
Replica: replica,
|
||||
Nodes: NewUniqueSet(replica.GetNodes()...),
|
||||
}
|
||||
log.Info("recover replica",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replica.GetID()),
|
||||
zap.Int64s("nodes", replica.GetNodes()),
|
||||
)
|
||||
} else {
|
||||
err := m.store.ReleaseReplica(replica.GetCollectionID(), replica.GetID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("clear stale replica",
|
||||
zap.Int64("collectionID", replica.GetCollectionID()),
|
||||
zap.Int64("replicaID", replica.GetID()),
|
||||
zap.Int64s("nodes", replica.GetNodes()),
|
||||
)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -111,7 +111,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
|
|||
|
||||
// Clear data in memory, and then recover from meta store
|
||||
suite.clearMemory()
|
||||
mgr.Recover()
|
||||
mgr.Recover(suite.collections)
|
||||
suite.TestGet()
|
||||
|
||||
// Test recover from 2.1 meta store
|
||||
|
@ -125,7 +125,7 @@ func (suite *ReplicaManagerSuite) TestRecover() {
|
|||
suite.kv.Save(ReplicaMetaPrefixV1+"/2100", string(value))
|
||||
|
||||
suite.clearMemory()
|
||||
mgr.Recover()
|
||||
mgr.Recover(append(suite.collections, 1000))
|
||||
replica := mgr.Get(2100)
|
||||
suite.NotNil(replica)
|
||||
suite.EqualValues(1000, replica.CollectionID)
|
||||
|
@ -148,7 +148,7 @@ func (suite *ReplicaManagerSuite) TestRemove() {
|
|||
}
|
||||
|
||||
// Check whether the replicas are also removed from meta store
|
||||
mgr.Recover()
|
||||
mgr.Recover(suite.collections)
|
||||
for _, collection := range suite.collections {
|
||||
replicas := mgr.GetByCollection(collection)
|
||||
suite.Empty(replicas)
|
||||
|
@ -179,7 +179,7 @@ func (suite *ReplicaManagerSuite) TestNodeManipulate() {
|
|||
|
||||
// Check these modifications are applied to meta store
|
||||
suite.clearMemory()
|
||||
mgr.Recover()
|
||||
mgr.Recover(suite.collections)
|
||||
for _, collection := range suite.collections {
|
||||
replica := mgr.GetByCollectionAndNode(collection, firstNode)
|
||||
suite.Nil(replica)
|
||||
|
|
|
@ -256,7 +256,7 @@ func (s *Server) initMeta() error {
|
|||
}
|
||||
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(s.meta.GetAll())))
|
||||
|
||||
err = s.meta.ReplicaManager.Recover()
|
||||
err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll())
|
||||
if err != nil {
|
||||
log.Error("failed to recover replicas")
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue