mirror of https://github.com/milvus-io/milvus.git
enhance: Remove rpc during querycoord start (#28396)
issue: #28332 during querycoord's recover, it try to call `DescribeCollection` and `ShowPartitions` to root coord, to checker whether collection or partition has been released in rootcoord. but if rootcoord isn't not ready yet, the rpc will fail, the querycoord panic. to fix this, we remove rpc call during querycoord's start Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/28502/head
parent
82915a9630
commit
7895ac96b5
|
@ -23,7 +23,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
@ -129,18 +128,6 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||
ctxLog.Info("recover collections and partitions from kv store")
|
||||
|
||||
for _, collection := range collections {
|
||||
// Dropped collection should be deprecated
|
||||
_, err = broker.DescribeCollection(ctx, collection.GetCollectionID())
|
||||
if errors.Is(err, merr.ErrCollectionNotFound) {
|
||||
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
|
||||
m.catalog.ReleaseCollection(collection.GetCollectionID())
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
ctxLog.Warn("failed to get collection schema", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if collection.GetReplicaNumber() <= 0 {
|
||||
ctxLog.Info("skip recovery and release collection due to invalid replica number",
|
||||
zap.Int64("collectionID", collection.GetCollectionID()),
|
||||
|
@ -169,31 +156,6 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||
}
|
||||
|
||||
for collection, partitions := range partitions {
|
||||
existPartitions, err := broker.GetPartitions(ctx, collection)
|
||||
if errors.Is(err, merr.ErrCollectionNotFound) {
|
||||
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection))
|
||||
m.catalog.ReleaseCollection(collection)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
ctxLog.Warn("failed to get partitions", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
omitPartitions := make([]int64, 0)
|
||||
partitions = lo.Filter(partitions, func(partition *querypb.PartitionLoadInfo, _ int) bool {
|
||||
if !lo.Contains(existPartitions, partition.GetPartitionID()) {
|
||||
omitPartitions = append(omitPartitions, partition.GetPartitionID())
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(omitPartitions) > 0 {
|
||||
ctxLog.Info("skip dropped partitions during recovery",
|
||||
zap.Int64("collection", collection),
|
||||
zap.Int64s("partitions", omitPartitions))
|
||||
m.catalog.ReleasePartition(collection, omitPartitions...)
|
||||
}
|
||||
|
||||
for _, partition := range partitions {
|
||||
// Partitions not loaded done should be deprecated
|
||||
if partition.GetStatus() != querypb.LoadStatus_Loaded {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -35,7 +34,6 @@ import (
|
|||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -178,13 +176,6 @@ func (suite *CollectionManagerSuite) TestGet() {
|
|||
func (suite *CollectionManagerSuite) TestUpdate() {
|
||||
mgr := suite.mgr
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
for _, collection := range suite.collections {
|
||||
if len(suite.partitions[collection]) > 0 {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
|
||||
}
|
||||
}
|
||||
|
||||
collections := mgr.GetAllCollections()
|
||||
partitions := mgr.GetAllPartitions()
|
||||
for _, collection := range collections {
|
||||
|
@ -251,11 +242,6 @@ func (suite *CollectionManagerSuite) TestGetFieldIndex() {
|
|||
func (suite *CollectionManagerSuite) TestRemove() {
|
||||
mgr := suite.mgr
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
for _, collection := range suite.collections {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
|
||||
}
|
||||
|
||||
// Remove collections/partitions
|
||||
for i, collectionID := range suite.collections {
|
||||
if suite.loadTypes[i] == querypb.LoadType_LoadCollection {
|
||||
|
@ -320,13 +306,6 @@ func (suite *CollectionManagerSuite) TestRemove() {
|
|||
func (suite *CollectionManagerSuite) TestRecover_normal() {
|
||||
mgr := suite.mgr
|
||||
|
||||
// recover successfully
|
||||
for _, collection := range suite.collections {
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, nil)
|
||||
if len(suite.partitions[collection]) > 0 {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
|
||||
}
|
||||
}
|
||||
suite.clearMemory()
|
||||
err := mgr.Recover(suite.broker)
|
||||
suite.NoError(err)
|
||||
|
@ -342,7 +321,6 @@ func (suite *CollectionManagerSuite) TestRecover_normal() {
|
|||
func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
|
||||
mgr := suite.mgr
|
||||
suite.releaseAll()
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
// test put collection with partitions
|
||||
for i, collection := range suite.collections {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
|
||||
|
@ -424,64 +402,6 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
|
|||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) TestRecover_with_dropped() {
|
||||
mgr := suite.mgr
|
||||
|
||||
droppedCollection := int64(101)
|
||||
droppedPartition := int64(13)
|
||||
|
||||
for _, collection := range suite.collections {
|
||||
if collection == droppedCollection {
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound)
|
||||
} else {
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, nil)
|
||||
}
|
||||
if len(suite.partitions[collection]) != 0 {
|
||||
if collection == droppedCollection {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound)
|
||||
} else {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).
|
||||
Return(lo.Filter(suite.partitions[collection], func(partition int64, _ int) bool {
|
||||
return partition != droppedPartition
|
||||
}), nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
suite.clearMemory()
|
||||
err := mgr.Recover(suite.broker)
|
||||
suite.NoError(err)
|
||||
for _, collection := range suite.collections {
|
||||
exist := collection != droppedCollection
|
||||
suite.Equal(exist, mgr.Exist(collection))
|
||||
if !exist {
|
||||
continue
|
||||
}
|
||||
for _, partitionID := range suite.partitions[collection] {
|
||||
partition := mgr.GetPartition(partitionID)
|
||||
exist = partitionID != droppedPartition
|
||||
suite.Equal(exist, partition != nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) TestRecover_Failed() {
|
||||
mockErr1 := fmt.Errorf("mock.DescribeCollection err")
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, mockErr1)
|
||||
suite.clearMemory()
|
||||
err := suite.mgr.Recover(suite.broker)
|
||||
suite.Error(err)
|
||||
suite.ErrorIs(err, mockErr1)
|
||||
|
||||
mockErr2 := fmt.Errorf("mock GetPartitions err")
|
||||
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return(nil, mockErr2)
|
||||
suite.clearMemory()
|
||||
err = suite.mgr.Recover(suite.broker)
|
||||
suite.Error(err)
|
||||
suite.ErrorIs(err, mockErr2)
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() {
|
||||
mgr := suite.mgr
|
||||
mgr.PutCollection(&Collection{
|
||||
|
@ -539,13 +459,6 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
|
|||
suite.releaseAll()
|
||||
mgr := suite.mgr
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
|
||||
for _, collection := range suite.collections {
|
||||
if len(suite.partitions[collection]) > 0 {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
|
||||
}
|
||||
}
|
||||
|
||||
// put old version of collections and partitions
|
||||
for i, collection := range suite.collections {
|
||||
status := querypb.LoadStatus_Loaded
|
||||
|
|
|
@ -25,11 +25,9 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
|
@ -148,11 +146,6 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
path := params.Params.MinioCfg.RootPath.GetValue()
|
||||
// refill log ID with log path
|
||||
for _, segmentInfo := range recoveryInfo.Segments {
|
||||
datacoord.DecompressBinLog(path, segmentInfo)
|
||||
}
|
||||
return recoveryInfo.Channels, recoveryInfo.Segments, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue