From 23bffe399d14a6b17e14a52713a682b6a0ea4a27 Mon Sep 17 00:00:00 2001 From: sunby Date: Wed, 28 Jul 2021 11:43:22 +0800 Subject: [PATCH] Enhance log print in data coordinator (#6681) issue: #6680 Signed-off-by: sunby --- internal/datacoord/cluster.go | 25 +++++++++++++++++-------- internal/datacoord/grpc_services.go | 28 ++++++++++++++-------------- internal/datacoord/server.go | 23 +++++++++++------------ 3 files changed, 42 insertions(+), 34 deletions(-) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 6d81601c34..ce7af59f90 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -238,7 +238,7 @@ func (c *Cluster) handleEvent(node *NodeInfo) { resp, err := cli.WatchDmChannels(tCtx, req) cancel() if err = VerifyResponse(resp, err); err != nil { - log.Warn("Failed to watch dm channels", zap.String("addr", node.Info.GetAddress())) + log.Warn("failed to watch dm channels", zap.String("addr", node.Info.GetAddress())) } c.mu.Lock() c.nodes.SetWatched(node.Info.GetVersion(), parseChannelsFromReq(req)) @@ -355,9 +355,13 @@ func (c *Cluster) handleRegister(n *NodeInfo) { c.mu.Lock() cNodes := c.nodes.GetNodes() var nodes []*NodeInfo - log.Debug("before register policy applied", zap.Any("n.Channels", n.Info.GetChannels()), zap.Any("buffer", c.chanBuffer)) + log.Debug("channels info before register policy applied", + zap.Any("n.Channels", n.Info.GetChannels()), + zap.Any("buffer", c.chanBuffer)) nodes, c.chanBuffer = c.registerPolicy(cNodes, n, c.chanBuffer) - log.Debug("after register policy applied", zap.Any("ret", nodes), zap.Any("buffer", c.chanBuffer)) + log.Debug("delta changes after register policy applied", + zap.Any("nodes", nodes), + zap.Any("buffer", c.chanBuffer)) go c.handleEvent(n) c.txnSaveNodesAndBuffer(nodes, c.chanBuffer) for _, node := range nodes { @@ -383,7 +387,7 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) { c.nodes.DeleteNode(n.Info.GetVersion()) cNodes := c.nodes.GetNodes() - log.Debug("before unregister policy applied", zap.Any("node.Channels", node.Info.GetChannels()), zap.Any("buffer", c.chanBuffer), zap.Any("nodes", cNodes)) + log.Debug("channels info before unregister policy applied", zap.Any("node.Channels", node.Info.GetChannels()), zap.Any("buffer", c.chanBuffer), zap.Any("nodes", cNodes)) var rets []*NodeInfo if len(cNodes) == 0 { for _, chStat := range node.Info.GetChannels() { @@ -393,7 +397,7 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) { } else { rets = c.unregisterPolicy(cNodes, node) } - log.Debug("after unregister policy", zap.Any("rets", rets)) + log.Debug("delta changes after unregister policy", zap.Any("nodes", rets), zap.Any("buffer", c.chanBuffer)) c.txnSaveNodesAndBuffer(rets, c.chanBuffer) for _, node := range rets { c.nodes.SetNode(node.Info.GetVersion(), node) @@ -488,8 +492,10 @@ func (c *Cluster) watch(n *NodeInfo) { if len(uncompletes) == 0 { return // all set, just return } - log.Debug("plan to watch channel", zap.String("node", n.Info.GetAddress()), - zap.Int64("version", n.Info.GetVersion()), zap.Strings("channels", channelNames)) + log.Debug("plan to watch channel", + zap.String("node", n.Info.GetAddress()), + zap.Int64("version", n.Info.GetVersion()), + zap.Strings("channels", channelNames)) vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true) if err != nil { @@ -507,7 +513,10 @@ func (c *Cluster) watch(n *NodeInfo) { Req: req, } ch := n.GetEventChannel() - log.Debug("put watch event to node channel", zap.Any("e", e), zap.Any("n", n.Info)) + log.Debug("put watch event to node channel", + zap.Any("event", e), + zap.Any("node.version", n.Info.GetVersion()), + zap.String("node.address", n.Info.GetAddress())) ch <- e } diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go index 3183da6e3d..ef2801655c 100644 --- a/internal/datacoord/grpc_services.go +++ b/internal/datacoord/grpc_services.go @@ -41,7 +41,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp } func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) { - log.Debug("Receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID())) + log.Debug("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID())) resp := &datapb.FlushResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -57,7 +57,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F } sealedSegments, err := s.segmentManager.SealAllSegments(ctx, req.CollectionID) if err != nil { - resp.Status.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err) + resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err) return resp, nil } log.Debug("flush response with segments", zap.Any("segments", sealedSegments)) @@ -81,7 +81,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests)) for _, r := range req.SegmentIDRequests { - log.Debug("Handle assign segment request", + log.Debug("handle assign segment request", zap.Int64("collectionID", r.GetCollectionID()), zap.Int64("partitionID", r.GetPartitionID()), zap.String("channelName", r.GetChannelName()), @@ -150,7 +150,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta segmentInfo := s.meta.GetSegment(segmentID) if segmentInfo == nil { state.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - state.Status.Reason = fmt.Sprintf("Failed to get segment %d", segmentID) + state.Status.Reason = fmt.Sprintf("failed to get segment %d", segmentID) } else { state.Status.ErrorCode = commonpb.ErrorCode_Success state.State = segmentInfo.GetState() @@ -256,7 +256,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR for _, id := range req.SegmentIDs { info := s.meta.GetSegment(id) if info == nil { - resp.Status.Reason = fmt.Sprintf("Failed to get segment %d", id) + resp.Status.Reason = fmt.Sprintf("failed to get segment %d", id) return resp, nil } infos = append(infos, info.SegmentInfo) @@ -274,14 +274,14 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath resp.Reason = serverNotServingErrMsg return resp, nil } - log.Debug("Receive SaveBinlogPaths request", + log.Debug("receive SaveBinlogPaths request", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Any("checkpoints", req.GetCheckPoints())) binlogs, err := s.prepareBinlog(req) if err != nil { - log.Error("Prepare binlog meta failed", zap.Error(err)) + log.Error("prepare binlog meta failed", zap.Error(err)) resp.Reason = err.Error() return resp, nil } @@ -290,13 +290,13 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath err = s.meta.SaveBinlogAndCheckPoints(req.GetSegmentID(), req.GetFlushed(), binlogs, req.GetCheckPoints(), req.GetStartPositions()) if err != nil { - log.Error("Save binlog and checkpoints failed", + log.Error("save binlog and checkpoints failed", zap.Int64("segmentID", req.GetSegmentID()), zap.Error(err)) resp.Reason = err.Error() return resp, nil } - log.Debug("Flush segment with meta", zap.Int64("id", req.SegmentID), + log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", binlogs)) if req.Flushed { @@ -335,7 +335,7 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { collectionID := req.GetCollectionID() partitionID := req.GetPartitionID() - log.Info("Receive get recovery info request", + log.Info("receive get recovery info request", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) resp := &datapb.GetRecoveryInfoResponse{ @@ -352,7 +352,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf for _, id := range segmentIDs { segment := s.meta.GetSegment(id) if segment == nil { - errMsg := fmt.Sprintf("Failed to get segment %d", id) + errMsg := fmt.Sprintf("failed to get segment %d", id) log.Error(errMsg) resp.Status.Reason = errMsg return resp, nil @@ -363,7 +363,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf meta, err := s.getSegmentBinlogMeta(id) if err != nil { - log.Error("Get segment binlog meta failed", zap.Int64("segmentID", id)) + log.Error("get segment binlog meta failed", zap.Int64("segmentID", id)) resp.Status.Reason = err.Error() return resp, nil } @@ -398,7 +398,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf CollectionID: collectionID, }) if err = VerifyResponse(dresp, err); err != nil { - log.Error("Get collection info from master failed", + log.Error("get collection info from master failed", zap.Int64("collectionID", collectionID), zap.Error(err)) @@ -417,7 +417,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf channelInfos, err := s.GetVChanPositions(vchans, false) if err != nil { - log.Error("Get channel positions failed", + log.Error("get channel positions failed", zap.Strings("channels", channels), zap.Error(err)) resp.Status.Reason = err.Error() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 95736bf7f7..cdbd066016 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -11,7 +11,6 @@ package datacoord import ( "context" - "fmt" "math/rand" "sync" "sync/atomic" @@ -176,7 +175,7 @@ func (s *Server) Start() error { s.startServerLoop() atomic.StoreInt64(&s.isServing, ServerStateHealthy) - log.Debug("DataCoordinator startup success") + log.Debug("dataCoordinator startup success") return nil } @@ -189,7 +188,7 @@ func (s *Server) initCluster() error { func (s *Server) initServiceDiscovery() error { sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole) if err != nil { - log.Debug("DataCoord initMeta failed", zap.Error(err)) + log.Debug("dataCoord initMeta failed", zap.Error(err)) return err } log.Debug("registered sessions", zap.Any("sessions", sessions)) @@ -267,7 +266,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { defer s.serverLoopWg.Done() statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName) - log.Debug("DataCoord stats stream", + log.Debug("dataCoord create stats channel consumer", zap.String("channelName", Params.StatisticsChannelName), zap.String("descriptionName", Params.DataCoordSubscriptionName)) statsStream.Start() @@ -290,7 +289,6 @@ func (s *Server) startStatsChannel(ctx context.Context) { zap.Stringer("msgType", msg.Type())) continue } - log.Debug("Receive DataNode segment statistics update") ssMsg := msg.(*msgstream.SegmentStatisticsMsg) for _, stat := range ssMsg.SegStats { s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows()) @@ -309,8 +307,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { } ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataCoordSubscriptionName) - log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s", - Params.TimeTickChannelName, Params.DataCoordSubscriptionName)) + log.Debug("dataCoord create time tick channel consumer", + zap.String("timeTickChannelName", Params.TimeTickChannelName), + zap.String("subscriptionName", Params.DataCoordSubscriptionName)) ttMsgStream.Start() defer ttMsgStream.Close() for { @@ -327,7 +326,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { } for _, msg := range msgPack.Msgs { if msg.Type() != commonpb.MsgType_DataNodeTt { - log.Warn("Receive unexpected msg type from tt channel", + log.Warn("receive unexpected msg type from tt channel", zap.Stringer("msgType", msg.Type())) continue } @@ -344,7 +343,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { if len(segments) == 0 { continue } - log.Debug("Flush segments", zap.Int64s("segmentIDs", segments)) + log.Debug("flush segments", zap.Int64s("segmentIDs", segments)) segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments)) for _, id := range segments { sInfo := s.meta.GetSegment(id) @@ -380,12 +379,12 @@ func (s *Server) startWatchService(ctx context.Context) { node := NewNodeInfo(ctx, info) switch event.EventType { case sessionutil.SessionAddEvent: - log.Info("Received datanode register", + log.Info("received datanode register", zap.String("address", info.Address), zap.Int64("serverID", info.Version)) s.cluster.Register(node) case sessionutil.SessionDelEvent: - log.Info("Received datanode unregister", + log.Info("received datanode unregister", zap.String("address", info.Address), zap.Int64("serverID", info.Version)) s.cluster.UnRegister(node) @@ -486,7 +485,7 @@ func (s *Server) Stop() error { if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) { return nil } - log.Debug("DataCoord server shutdown") + log.Debug("dataCoord server shutdown") s.cluster.Close() s.stopServerLoop() return nil