mirror of https://github.com/milvus-io/milvus.git
Add ratedgroup for some info/warning log (#23095)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/23165/head
parent
bd5fab1e53
commit
127867b873
|
@ -247,7 +247,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
// if all IndexNodes are executing task, wait for one of them to finish the task.
|
||||
nodeID, client := ib.nodeManager.PeekClient(meta)
|
||||
if client == nil {
|
||||
log.Ctx(ib.ctx).RatedInfo(5, "index builder peek client error, there is no available")
|
||||
log.Ctx(ib.ctx).WithRateGroup("dc.indexBuilder", 1, 60).RatedInfo(5, "index builder peek client error, there is no available")
|
||||
return false
|
||||
}
|
||||
// update version and set nodeID
|
||||
|
|
|
@ -632,6 +632,7 @@ func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStrea
|
|||
}
|
||||
|
||||
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
|
||||
log := log.Ctx(ctx).WithRateGroup("dc.handleTimetick", 1, 60)
|
||||
ch := ttMsg.GetChannelName()
|
||||
ts := ttMsg.GetTimestamp()
|
||||
physical, _ := tsoutil.ParseTS(ts)
|
||||
|
|
|
@ -1139,6 +1139,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
|
|||
|
||||
// GetFlushState gets the flush state of multiple segments
|
||||
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
|
||||
log := log.Ctx(ctx).WithRateGroup("dc.GetFlushState", 1, 60)
|
||||
resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}
|
||||
if s.isClosed() {
|
||||
log.Warn("DataCoord receive GetFlushState request, server closed",
|
||||
|
@ -1159,7 +1160,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
|
|||
}
|
||||
|
||||
if len(unflushed) != 0 {
|
||||
log.RatedInfo(10, "DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("segmentIDs", unflushed), zap.Int("len", len(unflushed)))
|
||||
log.RatedInfo(10, "DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("unflushed", unflushed), zap.Int("len", len(unflushed)))
|
||||
resp.Flushed = false
|
||||
} else {
|
||||
log.Info("DataCoord receive GetFlushState request, Flushed is true", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
|
||||
|
|
|
@ -117,9 +117,12 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
|
|||
}
|
||||
|
||||
func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("ClusterID", req.GetClusterID()),
|
||||
).WithRateGroup("in.queryJobs", 1, 60)
|
||||
if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
stateCode := i.lifetime.GetState()
|
||||
log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID))
|
||||
log.Warn("index node not ready", zap.String("state", stateCode.String()))
|
||||
return &indexpb.QueryJobsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -159,7 +162,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
|
|||
ret.IndexInfos[i].IndexFileKeys = info.fileKeys
|
||||
ret.IndexInfos[i].SerializedSize = info.serializedSize
|
||||
ret.IndexInfos[i].FailReason = info.failReason
|
||||
log.RatedDebug(5, "querying index build task", zap.String("ClusterID", req.ClusterID),
|
||||
log.RatedDebug(5, "querying index build task",
|
||||
zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()),
|
||||
zap.String("fail reason", info.failReason))
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package balance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
@ -105,6 +106,7 @@ func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignP
|
|||
}
|
||||
|
||||
func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
||||
log := log.Ctx(context.Background()).WithRateGroup("qcv2.rowCountBalancer", 1.0, 60.0)
|
||||
nodes := replica.GetNodes()
|
||||
if len(nodes) == 0 {
|
||||
return nil, nil
|
||||
|
|
|
@ -56,7 +56,7 @@ type distHandler struct {
|
|||
|
||||
func (dh *distHandler) start(ctx context.Context) {
|
||||
defer dh.wg.Done()
|
||||
log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60)
|
||||
log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qcv2.distHandler", 1, 60)
|
||||
log.Info("start dist handler")
|
||||
ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
|
|
|
@ -81,6 +81,7 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (ob *ReplicaObserver) checkNodesInReplica() {
|
||||
log := log.Ctx(context.Background()).WithRateGroup("qcv2.replicaObserver", 1, 60)
|
||||
collections := ob.meta.GetAll()
|
||||
for _, collectionID := range collections {
|
||||
removedNodes := make([]int64, 0)
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
package tso
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
@ -174,7 +175,7 @@ func (t *timestampOracle) UpdateTimestamp() error {
|
|||
|
||||
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
|
||||
if jetLag > 3*UpdateTimestampStep {
|
||||
log.RatedWarn(60.0, "clock offset is huge, check network latency and clock skew", zap.Duration("jet-lag", jetLag),
|
||||
log.Ctx(context.TODO()).WithRateGroup("tso", 1, 60).RatedWarn(60.0, "clock offset is huge, check network latency and clock skew", zap.Duration("jet-lag", jetLag),
|
||||
zap.Time("prev-physical", prev.physical), zap.Time("now", now))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue