mirror of https://github.com/milvus-io/milvus.git
Enhance log print in data coordinator (#6681)
issue: #6680 Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/6775/head
parent
964af814ab
commit
23bffe399d
|
@ -238,7 +238,7 @@ func (c *Cluster) handleEvent(node *NodeInfo) {
|
|||
resp, err := cli.WatchDmChannels(tCtx, req)
|
||||
cancel()
|
||||
if err = VerifyResponse(resp, err); err != nil {
|
||||
log.Warn("Failed to watch dm channels", zap.String("addr", node.Info.GetAddress()))
|
||||
log.Warn("failed to watch dm channels", zap.String("addr", node.Info.GetAddress()))
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.nodes.SetWatched(node.Info.GetVersion(), parseChannelsFromReq(req))
|
||||
|
@ -355,9 +355,13 @@ func (c *Cluster) handleRegister(n *NodeInfo) {
|
|||
c.mu.Lock()
|
||||
cNodes := c.nodes.GetNodes()
|
||||
var nodes []*NodeInfo
|
||||
log.Debug("before register policy applied", zap.Any("n.Channels", n.Info.GetChannels()), zap.Any("buffer", c.chanBuffer))
|
||||
log.Debug("channels info before register policy applied",
|
||||
zap.Any("n.Channels", n.Info.GetChannels()),
|
||||
zap.Any("buffer", c.chanBuffer))
|
||||
nodes, c.chanBuffer = c.registerPolicy(cNodes, n, c.chanBuffer)
|
||||
log.Debug("after register policy applied", zap.Any("ret", nodes), zap.Any("buffer", c.chanBuffer))
|
||||
log.Debug("delta changes after register policy applied",
|
||||
zap.Any("nodes", nodes),
|
||||
zap.Any("buffer", c.chanBuffer))
|
||||
go c.handleEvent(n)
|
||||
c.txnSaveNodesAndBuffer(nodes, c.chanBuffer)
|
||||
for _, node := range nodes {
|
||||
|
@ -383,7 +387,7 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) {
|
|||
c.nodes.DeleteNode(n.Info.GetVersion())
|
||||
|
||||
cNodes := c.nodes.GetNodes()
|
||||
log.Debug("before unregister policy applied", zap.Any("node.Channels", node.Info.GetChannels()), zap.Any("buffer", c.chanBuffer), zap.Any("nodes", cNodes))
|
||||
log.Debug("channels info before unregister policy applied", zap.Any("node.Channels", node.Info.GetChannels()), zap.Any("buffer", c.chanBuffer), zap.Any("nodes", cNodes))
|
||||
var rets []*NodeInfo
|
||||
if len(cNodes) == 0 {
|
||||
for _, chStat := range node.Info.GetChannels() {
|
||||
|
@ -393,7 +397,7 @@ func (c *Cluster) handleUnRegister(n *NodeInfo) {
|
|||
} else {
|
||||
rets = c.unregisterPolicy(cNodes, node)
|
||||
}
|
||||
log.Debug("after unregister policy", zap.Any("rets", rets))
|
||||
log.Debug("delta changes after unregister policy", zap.Any("nodes", rets), zap.Any("buffer", c.chanBuffer))
|
||||
c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
|
||||
for _, node := range rets {
|
||||
c.nodes.SetNode(node.Info.GetVersion(), node)
|
||||
|
@ -488,8 +492,10 @@ func (c *Cluster) watch(n *NodeInfo) {
|
|||
if len(uncompletes) == 0 {
|
||||
return // all set, just return
|
||||
}
|
||||
log.Debug("plan to watch channel", zap.String("node", n.Info.GetAddress()),
|
||||
zap.Int64("version", n.Info.GetVersion()), zap.Strings("channels", channelNames))
|
||||
log.Debug("plan to watch channel",
|
||||
zap.String("node", n.Info.GetAddress()),
|
||||
zap.Int64("version", n.Info.GetVersion()),
|
||||
zap.Strings("channels", channelNames))
|
||||
|
||||
vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
|
||||
if err != nil {
|
||||
|
@ -507,7 +513,10 @@ func (c *Cluster) watch(n *NodeInfo) {
|
|||
Req: req,
|
||||
}
|
||||
ch := n.GetEventChannel()
|
||||
log.Debug("put watch event to node channel", zap.Any("e", e), zap.Any("n", n.Info))
|
||||
log.Debug("put watch event to node channel",
|
||||
zap.Any("event", e),
|
||||
zap.Any("node.version", n.Info.GetVersion()),
|
||||
zap.String("node.address", n.Info.GetAddress()))
|
||||
ch <- e
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
|
|||
}
|
||||
|
||||
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
|
||||
log.Debug("Receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID()))
|
||||
log.Debug("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID()))
|
||||
resp := &datapb.FlushResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -57,7 +57,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
|
|||
}
|
||||
sealedSegments, err := s.segmentManager.SealAllSegments(ctx, req.CollectionID)
|
||||
if err != nil {
|
||||
resp.Status.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err)
|
||||
resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err)
|
||||
return resp, nil
|
||||
}
|
||||
log.Debug("flush response with segments", zap.Any("segments", sealedSegments))
|
||||
|
@ -81,7 +81,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
|
|||
assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
|
||||
|
||||
for _, r := range req.SegmentIDRequests {
|
||||
log.Debug("Handle assign segment request",
|
||||
log.Debug("handle assign segment request",
|
||||
zap.Int64("collectionID", r.GetCollectionID()),
|
||||
zap.Int64("partitionID", r.GetPartitionID()),
|
||||
zap.String("channelName", r.GetChannelName()),
|
||||
|
@ -150,7 +150,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
|
|||
segmentInfo := s.meta.GetSegment(segmentID)
|
||||
if segmentInfo == nil {
|
||||
state.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
state.Status.Reason = fmt.Sprintf("Failed to get segment %d", segmentID)
|
||||
state.Status.Reason = fmt.Sprintf("failed to get segment %d", segmentID)
|
||||
} else {
|
||||
state.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
state.State = segmentInfo.GetState()
|
||||
|
@ -256,7 +256,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
|||
for _, id := range req.SegmentIDs {
|
||||
info := s.meta.GetSegment(id)
|
||||
if info == nil {
|
||||
resp.Status.Reason = fmt.Sprintf("Failed to get segment %d", id)
|
||||
resp.Status.Reason = fmt.Sprintf("failed to get segment %d", id)
|
||||
return resp, nil
|
||||
}
|
||||
infos = append(infos, info.SegmentInfo)
|
||||
|
@ -274,14 +274,14 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
|||
resp.Reason = serverNotServingErrMsg
|
||||
return resp, nil
|
||||
}
|
||||
log.Debug("Receive SaveBinlogPaths request",
|
||||
log.Debug("receive SaveBinlogPaths request",
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.Int64("segmentID", req.GetSegmentID()),
|
||||
zap.Any("checkpoints", req.GetCheckPoints()))
|
||||
|
||||
binlogs, err := s.prepareBinlog(req)
|
||||
if err != nil {
|
||||
log.Error("Prepare binlog meta failed", zap.Error(err))
|
||||
log.Error("prepare binlog meta failed", zap.Error(err))
|
||||
resp.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -290,13 +290,13 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
|||
err = s.meta.SaveBinlogAndCheckPoints(req.GetSegmentID(), req.GetFlushed(),
|
||||
binlogs, req.GetCheckPoints(), req.GetStartPositions())
|
||||
if err != nil {
|
||||
log.Error("Save binlog and checkpoints failed",
|
||||
log.Error("save binlog and checkpoints failed",
|
||||
zap.Int64("segmentID", req.GetSegmentID()),
|
||||
zap.Error(err))
|
||||
resp.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
log.Debug("Flush segment with meta", zap.Int64("id", req.SegmentID),
|
||||
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
|
||||
zap.Any("meta", binlogs))
|
||||
|
||||
if req.Flushed {
|
||||
|
@ -335,7 +335,7 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS
|
|||
func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
|
||||
collectionID := req.GetCollectionID()
|
||||
partitionID := req.GetPartitionID()
|
||||
log.Info("Receive get recovery info request",
|
||||
log.Info("receive get recovery info request",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID))
|
||||
resp := &datapb.GetRecoveryInfoResponse{
|
||||
|
@ -352,7 +352,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
for _, id := range segmentIDs {
|
||||
segment := s.meta.GetSegment(id)
|
||||
if segment == nil {
|
||||
errMsg := fmt.Sprintf("Failed to get segment %d", id)
|
||||
errMsg := fmt.Sprintf("failed to get segment %d", id)
|
||||
log.Error(errMsg)
|
||||
resp.Status.Reason = errMsg
|
||||
return resp, nil
|
||||
|
@ -363,7 +363,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
|
||||
meta, err := s.getSegmentBinlogMeta(id)
|
||||
if err != nil {
|
||||
log.Error("Get segment binlog meta failed", zap.Int64("segmentID", id))
|
||||
log.Error("get segment binlog meta failed", zap.Int64("segmentID", id))
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
CollectionID: collectionID,
|
||||
})
|
||||
if err = VerifyResponse(dresp, err); err != nil {
|
||||
log.Error("Get collection info from master failed",
|
||||
log.Error("get collection info from master failed",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Error(err))
|
||||
|
||||
|
@ -417,7 +417,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
|
||||
channelInfos, err := s.GetVChanPositions(vchans, false)
|
||||
if err != nil {
|
||||
log.Error("Get channel positions failed",
|
||||
log.Error("get channel positions failed",
|
||||
zap.Strings("channels", channels),
|
||||
zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
|
|
|
@ -11,7 +11,6 @@ package datacoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -176,7 +175,7 @@ func (s *Server) Start() error {
|
|||
s.startServerLoop()
|
||||
|
||||
atomic.StoreInt64(&s.isServing, ServerStateHealthy)
|
||||
log.Debug("DataCoordinator startup success")
|
||||
log.Debug("dataCoordinator startup success")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -189,7 +188,7 @@ func (s *Server) initCluster() error {
|
|||
func (s *Server) initServiceDiscovery() error {
|
||||
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
|
||||
if err != nil {
|
||||
log.Debug("DataCoord initMeta failed", zap.Error(err))
|
||||
log.Debug("dataCoord initMeta failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("registered sessions", zap.Any("sessions", sessions))
|
||||
|
@ -267,7 +266,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
|
|||
defer s.serverLoopWg.Done()
|
||||
statsStream, _ := s.msFactory.NewMsgStream(ctx)
|
||||
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
|
||||
log.Debug("DataCoord stats stream",
|
||||
log.Debug("dataCoord create stats channel consumer",
|
||||
zap.String("channelName", Params.StatisticsChannelName),
|
||||
zap.String("descriptionName", Params.DataCoordSubscriptionName))
|
||||
statsStream.Start()
|
||||
|
@ -290,7 +289,6 @@ func (s *Server) startStatsChannel(ctx context.Context) {
|
|||
zap.Stringer("msgType", msg.Type()))
|
||||
continue
|
||||
}
|
||||
log.Debug("Receive DataNode segment statistics update")
|
||||
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
|
||||
for _, stat := range ssMsg.SegStats {
|
||||
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
|
||||
|
@ -309,8 +307,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
}
|
||||
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
|
||||
Params.DataCoordSubscriptionName)
|
||||
log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
|
||||
Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
|
||||
log.Debug("dataCoord create time tick channel consumer",
|
||||
zap.String("timeTickChannelName", Params.TimeTickChannelName),
|
||||
zap.String("subscriptionName", Params.DataCoordSubscriptionName))
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
for {
|
||||
|
@ -327,7 +326,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
}
|
||||
for _, msg := range msgPack.Msgs {
|
||||
if msg.Type() != commonpb.MsgType_DataNodeTt {
|
||||
log.Warn("Receive unexpected msg type from tt channel",
|
||||
log.Warn("receive unexpected msg type from tt channel",
|
||||
zap.Stringer("msgType", msg.Type()))
|
||||
continue
|
||||
}
|
||||
|
@ -344,7 +343,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
if len(segments) == 0 {
|
||||
continue
|
||||
}
|
||||
log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
|
||||
log.Debug("flush segments", zap.Int64s("segmentIDs", segments))
|
||||
segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
|
||||
for _, id := range segments {
|
||||
sInfo := s.meta.GetSegment(id)
|
||||
|
@ -380,12 +379,12 @@ func (s *Server) startWatchService(ctx context.Context) {
|
|||
node := NewNodeInfo(ctx, info)
|
||||
switch event.EventType {
|
||||
case sessionutil.SessionAddEvent:
|
||||
log.Info("Received datanode register",
|
||||
log.Info("received datanode register",
|
||||
zap.String("address", info.Address),
|
||||
zap.Int64("serverID", info.Version))
|
||||
s.cluster.Register(node)
|
||||
case sessionutil.SessionDelEvent:
|
||||
log.Info("Received datanode unregister",
|
||||
log.Info("received datanode unregister",
|
||||
zap.String("address", info.Address),
|
||||
zap.Int64("serverID", info.Version))
|
||||
s.cluster.UnRegister(node)
|
||||
|
@ -486,7 +485,7 @@ func (s *Server) Stop() error {
|
|||
if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
|
||||
return nil
|
||||
}
|
||||
log.Debug("DataCoord server shutdown")
|
||||
log.Debug("dataCoord server shutdown")
|
||||
s.cluster.Close()
|
||||
s.stopServerLoop()
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue