mirror of https://github.com/milvus-io/milvus.git
fix: Delegator stuck at unserviceable status (#37694)
issue: #37679 pr #36549 introduce the logic error which update current target when only parts of channel is ready. This PR fix the logic error and let dist handler keep pull distribution on querynode until all delegator becomes serviceable. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/37697/head
parent
b9357e4716
commit
a1b6be1253
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type TriggerUpdateTargetVersion = func(collectionID int64)
|
||||
|
@ -195,6 +196,8 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
|||
channels := lo.SliceToMap(resp.GetChannels(), func(channel *querypb.ChannelVersionInfo) (string, *querypb.ChannelVersionInfo) {
|
||||
return channel.GetChannel(), channel
|
||||
})
|
||||
|
||||
collectionsToSync := typeutil.NewUniqueSet()
|
||||
for _, lview := range resp.GetLeaderViews() {
|
||||
segments := make(map[int64]*meta.Segment)
|
||||
|
||||
|
@ -247,9 +250,10 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
|||
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
|
||||
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))
|
||||
|
||||
// segment and channel already loaded, trigger target observer to check target version
|
||||
dh.syncTargetVersionFn(lview.GetCollection())
|
||||
view.UnServiceableError = err
|
||||
// make dist handler pull next distribution until all delegator is serviceable
|
||||
dh.lastUpdateTs = 0
|
||||
collectionsToSync.Insert(lview.Collection)
|
||||
log.Info("leader is not available due to target version not ready",
|
||||
zap.Int64("collectionID", view.CollectionID),
|
||||
zap.Int64("nodeID", view.ID),
|
||||
|
@ -259,6 +263,12 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
|||
}
|
||||
|
||||
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
|
||||
|
||||
// segment and channel already loaded, trigger target observer to update
|
||||
collectionsToSync.Range(func(collection int64) bool {
|
||||
dh.syncTargetVersionFn(collection)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) {
|
||||
|
|
|
@ -386,16 +386,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
|
|||
channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool {
|
||||
return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil
|
||||
})
|
||||
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
|
||||
|
||||
// to avoid stuck here in dynamic increase replica case, we just check available delegator number
|
||||
if int32(len(collectionReadyLeaders)) < replicaNum {
|
||||
if int32(len(channelReadyLeaders)) < replicaNum {
|
||||
log.RatedInfo(10, "channel not ready",
|
||||
zap.Int("readyReplicaNum", len(channelReadyLeaders)),
|
||||
zap.String("channelName", channel),
|
||||
)
|
||||
return false
|
||||
}
|
||||
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
|
||||
}
|
||||
|
||||
var collectionInfo *milvuspb.DescribeCollectionResponse
|
||||
|
|
|
@ -147,7 +147,6 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
|
|||
s.initCollection(name, 1, 2, 2, 2000)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
CollectionName: name,
|
||||
|
@ -156,6 +155,16 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
|
|||
s.True(merr.Ok(info.GetStatus()))
|
||||
collectionID := info.GetCollectionID()
|
||||
|
||||
// wait until all shards are ready
|
||||
// cause showCollections won't just wait all collection becomes loaded, proxy will use retry to block until all shard are ready
|
||||
s.Eventually(func() bool {
|
||||
resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
CollectionID: collectionID,
|
||||
})
|
||||
return err == nil && merr.Ok(resp.GetStatus()) && len(resp.Shards) == 2
|
||||
}, 60*time.Second, 1*time.Second)
|
||||
|
||||
// trigger old coord stop
|
||||
s.Cluster.StopQueryCoord()
|
||||
|
||||
|
|
Loading…
Reference in New Issue