From fc42ee5bb3e1ccf97093f253b5a970d3c9f63d18 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Fri, 19 Aug 2022 19:50:50 +0800 Subject: [PATCH] Fix # of rows of recovering segment (#18736) Signed-off-by: Yuchen Gao Signed-off-by: Yuchen Gao --- internal/datacoord/segment_info.go | 4 ++-- internal/datacoord/server.go | 14 ++++++++++++-- internal/datacoord/session_manager.go | 12 ++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 0b9f516dd1..d3d7189534 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -42,13 +42,13 @@ type SegmentInfo struct { } // NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo` -// assign current rows to 0 and pre-allocate `allocations` slice +// assign current rows to last checkpoint and pre-allocate `allocations` slice // Note that the allocation information is not preserved, // the worst case scenario is to have a segment with twice size we expects func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { return &SegmentInfo{ SegmentInfo: info, - currRows: 0, + currRows: info.GetNumOfRows(), allocations: make([]*Allocation, 0, 16), lastFlushTime: time.Now().Add(-1 * flushInterval), } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 967b901abf..a3f402746b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -308,7 +308,7 @@ func (s *Server) Start() error { // data from all DataNode. // This will prevent DataCoord from missing out any important segment stats // data while offline. - log.Info("DataNode (re)starts successfully and re-collecting segment stats from DataNodes") + log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes") s.reCollectSegmentStats(s.ctx) return nil @@ -621,6 +621,16 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) { for _, stat := range stats { + // Log if # of rows is updated. + if s.meta.GetAllSegment(stat.GetSegmentID()) != nil && + s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() { + log.Debug("Updating segment number of rows", + zap.Int64("segment ID", stat.GetSegmentID()), + zap.Int64("old value", s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows()), + zap.Int64("new value", stat.GetNumRows()), + zap.Any("seg info", s.meta.GetSegment(stat.GetSegmentID())), + ) + } s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows()) } } @@ -958,7 +968,7 @@ func (s *Server) reCollectSegmentStats(ctx context.Context) { log.Error("null channel manager found, which should NOT happen in non-testing environment") return } - nodes := s.channelManager.store.GetNodes() + nodes := s.sessionManager.getLiveNodeIDs() log.Info("re-collecting segment stats from DataNodes", zap.Int64s("DataNode IDs", nodes)) for _, node := range nodes { diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 74299b84bd..86c53c8719 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -95,6 +95,18 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) { } } +// getLiveNodeIDs returns IDs of all live DataNodes. +func (c *SessionManager) getLiveNodeIDs() []int64 { + c.sessions.RLock() + defer c.sessions.RUnlock() + + ret := make([]int64, 0, len(c.sessions.data)) + for id := range c.sessions.data { + ret = append(ret, id) + } + return ret +} + // GetSessions gets all node sessions func (c *SessionManager) GetSessions() []*Session { c.sessions.RLock()