mirror of https://github.com/milvus-io/milvus.git
fix: fix GetShardLeaders return empty node list (#32685)
issue: #32449 to avoid GetShardLeaders return empty node list, this PR add node list check in both client side and server side. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32697/head
parent
ef4c875d4c
commit
d900e68440
|
@ -204,11 +204,16 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// let every request could retry at least twice, which could retry after update shard leader cache
|
||||||
|
retryTimes := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt()
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
for channel, nodes := range dml2leaders {
|
for channel, nodes := range dml2leaders {
|
||||||
channel := channel
|
channel := channel
|
||||||
nodes := lo.Map(nodes, func(node nodeInfo, _ int) int64 { return node.nodeID })
|
nodes := lo.Map(nodes, func(node nodeInfo, _ int) int64 { return node.nodeID })
|
||||||
retryOnReplica := Params.ProxyCfg.RetryTimesOnReplica.GetAsInt()
|
channelRetryTimes := retryTimes
|
||||||
|
if len(nodes) > 0 {
|
||||||
|
channelRetryTimes *= len(nodes)
|
||||||
|
}
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
return lb.ExecuteWithRetry(ctx, ChannelWorkload{
|
return lb.ExecuteWithRetry(ctx, ChannelWorkload{
|
||||||
db: workload.db,
|
db: workload.db,
|
||||||
|
@ -218,7 +223,7 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad
|
||||||
shardLeaders: nodes,
|
shardLeaders: nodes,
|
||||||
nq: workload.nq,
|
nq: workload.nq,
|
||||||
exec: workload.exec,
|
exec: workload.exec,
|
||||||
retryTimes: uint(len(nodes) * retryOnReplica),
|
retryTimes: uint(channelRetryTimes),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -942,6 +942,16 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// to avoid node down during GetShardLeaders
|
||||||
|
if len(ids) == 0 {
|
||||||
|
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
|
||||||
|
log.Warn(msg, zap.Error(channelErr))
|
||||||
|
resp.Status = merr.Status(
|
||||||
|
errors.Wrap(merr.WrapErrChannelNotAvailable(channel.GetChannelName()), channelErr.Error()))
|
||||||
|
resp.Shards = nil
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
resp.Shards = append(resp.Shards, &querypb.ShardLeadersList{
|
resp.Shards = append(resp.Shards, &querypb.ShardLeadersList{
|
||||||
ChannelName: channel.GetChannelName(),
|
ChannelName: channel.GetChannelName(),
|
||||||
NodeIds: ids,
|
NodeIds: ids,
|
||||||
|
|
Loading…
Reference in New Issue