mirror of https://github.com/milvus-io/milvus.git
Fix # of rows of recovering segment (#18736)
Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/18741/head
parent
516fd928f9
commit
fc42ee5bb3
|
@ -42,13 +42,13 @@ type SegmentInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
|
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
|
||||||
// assign current rows to 0 and pre-allocate `allocations` slice
|
// assign current rows to last checkpoint and pre-allocate `allocations` slice
|
||||||
// Note that the allocation information is not preserved,
|
// Note that the allocation information is not preserved,
|
||||||
// the worst case scenario is to have a segment with twice size we expects
|
// the worst case scenario is to have a segment with twice size we expects
|
||||||
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
|
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
|
||||||
return &SegmentInfo{
|
return &SegmentInfo{
|
||||||
SegmentInfo: info,
|
SegmentInfo: info,
|
||||||
currRows: 0,
|
currRows: info.GetNumOfRows(),
|
||||||
allocations: make([]*Allocation, 0, 16),
|
allocations: make([]*Allocation, 0, 16),
|
||||||
lastFlushTime: time.Now().Add(-1 * flushInterval),
|
lastFlushTime: time.Now().Add(-1 * flushInterval),
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,7 +308,7 @@ func (s *Server) Start() error {
|
||||||
// data from all DataNode.
|
// data from all DataNode.
|
||||||
// This will prevent DataCoord from missing out any important segment stats
|
// This will prevent DataCoord from missing out any important segment stats
|
||||||
// data while offline.
|
// data while offline.
|
||||||
log.Info("DataNode (re)starts successfully and re-collecting segment stats from DataNodes")
|
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
|
||||||
s.reCollectSegmentStats(s.ctx)
|
s.reCollectSegmentStats(s.ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -621,6 +621,16 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
|
||||||
|
|
||||||
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
|
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
|
||||||
for _, stat := range stats {
|
for _, stat := range stats {
|
||||||
|
// Log if # of rows is updated.
|
||||||
|
if s.meta.GetAllSegment(stat.GetSegmentID()) != nil &&
|
||||||
|
s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() {
|
||||||
|
log.Debug("Updating segment number of rows",
|
||||||
|
zap.Int64("segment ID", stat.GetSegmentID()),
|
||||||
|
zap.Int64("old value", s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows()),
|
||||||
|
zap.Int64("new value", stat.GetNumRows()),
|
||||||
|
zap.Any("seg info", s.meta.GetSegment(stat.GetSegmentID())),
|
||||||
|
)
|
||||||
|
}
|
||||||
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
|
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -958,7 +968,7 @@ func (s *Server) reCollectSegmentStats(ctx context.Context) {
|
||||||
log.Error("null channel manager found, which should NOT happen in non-testing environment")
|
log.Error("null channel manager found, which should NOT happen in non-testing environment")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodes := s.channelManager.store.GetNodes()
|
nodes := s.sessionManager.getLiveNodeIDs()
|
||||||
log.Info("re-collecting segment stats from DataNodes",
|
log.Info("re-collecting segment stats from DataNodes",
|
||||||
zap.Int64s("DataNode IDs", nodes))
|
zap.Int64s("DataNode IDs", nodes))
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
|
|
|
@ -95,6 +95,18 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getLiveNodeIDs returns IDs of all live DataNodes.
|
||||||
|
func (c *SessionManager) getLiveNodeIDs() []int64 {
|
||||||
|
c.sessions.RLock()
|
||||||
|
defer c.sessions.RUnlock()
|
||||||
|
|
||||||
|
ret := make([]int64, 0, len(c.sessions.data))
|
||||||
|
for id := range c.sessions.data {
|
||||||
|
ret = append(ret, id)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
// GetSessions gets all node sessions
|
// GetSessions gets all node sessions
|
||||||
func (c *SessionManager) GetSessions() []*Session {
|
func (c *SessionManager) GetSessions() []*Session {
|
||||||
c.sessions.RLock()
|
c.sessions.RLock()
|
||||||
|
|
Loading…
Reference in New Issue