enhance: Remove rpc during querycoord start (#28396) (#28604)

issue: #28332
pr: #28396

during querycoord's recover, it try to call `DescribeCollection` and
`ShowPartitions` to root coord, to checker whether collection or
partition has been released in rootcoord. but if rootcoord isn't not
ready yet, the rpc will fail, the querycoord panic.

to fix this, we remove rpc call during querycoord's start

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/28621/head
wei liu 2023-11-21 18:08:29 +08:00 committed by GitHub
parent d724b07037
commit c7ec882033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 0 additions and 132 deletions

View File

@ -23,7 +23,6 @@ import (
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
@ -129,18 +128,6 @@ func (m *CollectionManager) Recover(broker Broker) error {
ctxLog.Info("recover collections and partitions from kv store")
for _, collection := range collections {
// Dropped collection should be deprecated
_, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID())
if errors.Is(err, merr.ErrCollectionNotFound) {
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
m.catalog.ReleaseCollection(collection.GetCollectionID())
continue
}
if err != nil {
ctxLog.Warn("failed to get collection schema", zap.Error(err))
return err
}
if collection.GetReplicaNumber() <= 0 {
ctxLog.Info("skip recovery and release collection due to invalid replica number",
zap.Int64("collectionID", collection.GetCollectionID()),
@ -169,31 +156,6 @@ func (m *CollectionManager) Recover(broker Broker) error {
}
for collection, partitions := range partitions {
existPartitions, err := broker.GetPartitions(ctx, collection)
if errors.Is(err, merr.ErrCollectionNotFound) {
ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection))
m.catalog.ReleaseCollection(collection)
continue
}
if err != nil {
ctxLog.Warn("failed to get partitions", zap.Error(err))
return err
}
omitPartitions := make([]int64, 0)
partitions = lo.Filter(partitions, func(partition *querypb.PartitionLoadInfo, _ int) bool {
if !lo.Contains(existPartitions, partition.GetPartitionID()) {
omitPartitions = append(omitPartitions, partition.GetPartitionID())
return false
}
return true
})
if len(omitPartitions) > 0 {
ctxLog.Info("skip dropped partitions during recovery",
zap.Int64("collection", collection),
zap.Int64s("partitions", omitPartitions))
m.catalog.ReleasePartition(collection, omitPartitions...)
}
for _, partition := range partitions {
// Partitions not loaded done should be deprecated
if partition.GetStatus() != querypb.LoadStatus_Loaded {

View File

@ -17,7 +17,6 @@
package meta
import (
"fmt"
"sort"
"testing"
"time"
@ -35,7 +34,6 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -178,13 +176,6 @@ func (suite *CollectionManagerSuite) TestGet() {
func (suite *CollectionManagerSuite) TestUpdate() {
mgr := suite.mgr
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
for _, collection := range suite.collections {
if len(suite.partitions[collection]) > 0 {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
}
}
collections := mgr.GetAllCollections()
partitions := mgr.GetAllPartitions()
for _, collection := range collections {
@ -251,11 +242,6 @@ func (suite *CollectionManagerSuite) TestGetFieldIndex() {
func (suite *CollectionManagerSuite) TestRemove() {
mgr := suite.mgr
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
for _, collection := range suite.collections {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
}
// Remove collections/partitions
for i, collectionID := range suite.collections {
if suite.loadTypes[i] == querypb.LoadType_LoadCollection {
@ -320,13 +306,6 @@ func (suite *CollectionManagerSuite) TestRemove() {
func (suite *CollectionManagerSuite) TestRecover_normal() {
mgr := suite.mgr
// recover successfully
for _, collection := range suite.collections {
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, nil)
if len(suite.partitions[collection]) > 0 {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
}
}
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
@ -342,7 +321,6 @@ func (suite *CollectionManagerSuite) TestRecover_normal() {
func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
mgr := suite.mgr
suite.releaseAll()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
// test put collection with partitions
for i, collection := range suite.collections {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
@ -424,64 +402,6 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
}
}
func (suite *CollectionManagerSuite) TestRecover_with_dropped() {
mgr := suite.mgr
droppedCollection := int64(101)
droppedPartition := int64(13)
for _, collection := range suite.collections {
if collection == droppedCollection {
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound)
} else {
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, nil)
}
if len(suite.partitions[collection]) != 0 {
if collection == droppedCollection {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(nil, merr.ErrCollectionNotFound)
} else {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).
Return(lo.Filter(suite.partitions[collection], func(partition int64, _ int) bool {
return partition != droppedPartition
}), nil)
}
}
}
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
for _, collection := range suite.collections {
exist := collection != droppedCollection
suite.Equal(exist, mgr.Exist(collection))
if !exist {
continue
}
for _, partitionID := range suite.partitions[collection] {
partition := mgr.GetPartition(partitionID)
exist = partitionID != droppedPartition
suite.Equal(exist, partition != nil)
}
}
}
func (suite *CollectionManagerSuite) TestRecover_Failed() {
mockErr1 := fmt.Errorf("mock GetCollectionSchema err")
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, mockErr1)
suite.clearMemory()
err := suite.mgr.Recover(suite.broker)
suite.Error(err)
suite.ErrorIs(err, mockErr1)
mockErr2 := fmt.Errorf("mock GetPartitions err")
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return(nil, mockErr2)
suite.clearMemory()
err = suite.mgr.Recover(suite.broker)
suite.Error(err)
suite.ErrorIs(err, mockErr2)
}
func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() {
mgr := suite.mgr
mgr.PutCollection(&Collection{
@ -539,13 +459,6 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
suite.releaseAll()
mgr := suite.mgr
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
for _, collection := range suite.collections {
if len(suite.partitions[collection]) > 0 {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
}
}
// put old version of collections and partitions
for i, collection := range suite.collections {
status := querypb.LoadStatus_Loaded

View File

@ -26,11 +26,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -149,11 +147,6 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
return nil, nil, err
}
path := params.Params.MinioCfg.RootPath.GetValue()
// refill log ID with log path
for _, segmentInfo := range recoveryInfo.Segments {
datacoord.DecompressBinLog(path, segmentInfo)
}
return recoveryInfo.Channels, recoveryInfo.Segments, nil
}