mirror of https://github.com/milvus-io/milvus.git
Add some log when node being reachable or unreachable (#25572)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/25694/head
parent
a8f73051e0
commit
77a9553c3f
|
@ -128,6 +128,7 @@ func (lb *LBPolicyImpl) selectNode(ctx context.Context, workload ChannelWorkload
|
|||
targetNode, err = lb.balancer.SelectNode(ctx, availableNodes, workload.nq)
|
||||
if err != nil {
|
||||
log.Warn("failed to select shard",
|
||||
zap.Int64s("availableNodes", availableNodes),
|
||||
zap.Error(err))
|
||||
return -1, err
|
||||
}
|
||||
|
|
|
@ -194,34 +194,38 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), checkInterval)
|
||||
defer cancel()
|
||||
|
||||
checkHealthFailed := func(err error) bool {
|
||||
log.RatedWarn(5, "query node check health failed, add it to unreachable nodes list",
|
||||
zap.Int64("nodeID", node),
|
||||
zap.Error(err))
|
||||
b.unreachableQueryNodes.Insert(node)
|
||||
return true
|
||||
setUnreachable := func() bool {
|
||||
return b.unreachableQueryNodes.Insert(node)
|
||||
}
|
||||
|
||||
qn, err := b.clientMgr.GetClient(ctx, node)
|
||||
if err != nil {
|
||||
checkHealthFailed(err)
|
||||
if setUnreachable() {
|
||||
log.Warn("get client failed, set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}
|
||||
|
||||
resp, err := qn.GetComponentStates(ctx)
|
||||
if err != nil {
|
||||
checkHealthFailed(err)
|
||||
if setUnreachable() {
|
||||
log.Warn("get component status failed,set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}
|
||||
|
||||
if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy {
|
||||
checkHealthFailed(merr.WrapErrNodeOffline(node))
|
||||
if setUnreachable() {
|
||||
log.Warn("component status unhealthy,set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}
|
||||
|
||||
// check health successfully, update check health ts
|
||||
b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli())
|
||||
b.unreachableQueryNodes.Remove(node)
|
||||
if b.unreachableQueryNodes.TryRemove(node) {
|
||||
log.Info("component recuperated, set node reachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
|
|
|
@ -149,6 +149,13 @@ func (set *ConcurrentSet[T]) Remove(elements ...T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Try remove element from set,
|
||||
// return false if not exist
|
||||
func (set *ConcurrentSet[T]) TryRemove(element T) bool {
|
||||
_, exist := set.inner.LoadAndDelete(element)
|
||||
return exist
|
||||
}
|
||||
|
||||
// Get all elements in the set
|
||||
func (set *ConcurrentSet[T]) Collect() []T {
|
||||
elements := make([]T, 0)
|
||||
|
|
Loading…
Reference in New Issue