diff --git a/internal/proxy/lb_policy.go b/internal/proxy/lb_policy.go index 7a5394863f..1e130baa03 100644 --- a/internal/proxy/lb_policy.go +++ b/internal/proxy/lb_policy.go @@ -204,11 +204,16 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad return err } + // let every request could retry at least twice, which could retry after update shard leader cache + retryTimes := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt() wg, ctx := errgroup.WithContext(ctx) for channel, nodes := range dml2leaders { channel := channel nodes := lo.Map(nodes, func(node nodeInfo, _ int) int64 { return node.nodeID }) - retryOnReplica := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt() + channelRetryTimes := retryTimes + if len(nodes) > 0 { + channelRetryTimes *= len(nodes) + } wg.Go(func() error { return lb.ExecuteWithRetry(ctx, ChannelWorkload{ db: workload.db, @@ -218,7 +223,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad shardLeaders: nodes, nq: workload.nq, exec: workload.exec, - retryTimes: uint(len(nodes) * retryOnReplica), + retryTimes: uint(channelRetryTimes), }) }) } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index fb87d87e51..36ed8e1510 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -942,6 +942,16 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade } } + // to avoid node down during GetShardLeaders + if len(ids) == 0 { + msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) + log.Warn(msg, zap.Error(channelErr)) + resp.Status = merr.Status( + errors.Wrap(merr.WrapErrChannelNotAvailable(channel.GetChannelName()), channelErr.Error())) + resp.Shards = nil + return resp, nil + } + resp.Shards = append(resp.Shards, &querypb.ShardLeadersList{ ChannelName: channel.GetChannelName(), NodeIds: ids,