enhance: Reduce the memory usage of the timeTickSender (#30968) (#30991)

In the cache of the timeTickSender, retain only the latest stats instead
of storing stats for every time tick.

issue: https://github.com/milvus-io/milvus/issues/30967

pr: https://github.com/milvus-io/milvus/pull/30968

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31024/head^2
yihao.dai 2024-03-05 10:59:01 +08:00 committed by GitHub
parent 81b197267a
commit a5350f64a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 83 deletions

View File

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -32,8 +33,8 @@ import (
) )
// timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically // timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
// timeTickSender hold a SegmentStats time sequence cache for each channel, // timeTickSender hold segmentStats cache for each channel,
// after send succeeds will clean the cache earlier than the sended timestamp // after send succeeds will clean the cache earlier than last sent timestamp
type timeTickSender struct { type timeTickSender struct {
nodeID int64 nodeID int64
broker broker.Broker broker broker.Broker
@ -43,21 +44,27 @@ type timeTickSender struct {
options []retry.Option options []retry.Option
mu sync.Mutex mu sync.RWMutex
channelStatesCaches map[string]*segmentStatesSequence // string -> *segmentStatesSequence statsCache map[string]*channelStats // channel -> channelStats
}
type channelStats struct {
segStats map[int64]*segmentStats // segmentID -> segmentStats
lastTs uint64
} }
// data struct only used in timeTickSender // data struct only used in timeTickSender
type segmentStatesSequence struct { type segmentStats struct {
data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats *commonpb.SegmentStats
ts uint64
} }
func newTimeTickSender(broker broker.Broker, nodeID int64, opts ...retry.Option) *timeTickSender { func newTimeTickSender(broker broker.Broker, nodeID int64, opts ...retry.Option) *timeTickSender {
return &timeTickSender{ return &timeTickSender{
nodeID: nodeID, nodeID: nodeID,
broker: broker, broker: broker,
channelStatesCaches: make(map[string]*segmentStatesSequence, 0), statsCache: make(map[string]*channelStats),
options: opts, options: opts,
} }
} }
@ -92,91 +99,74 @@ func (m *timeTickSender) work(ctx context.Context) {
} }
} }
func (m *timeTickSender) update(channelName string, timestamp uint64, segmentStats []*commonpb.SegmentStats) { func (m *timeTickSender) update(channelName string, timestamp uint64, segStats []*commonpb.SegmentStats) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
channelStates, ok := m.channelStatesCaches[channelName] _, ok := m.statsCache[channelName]
if !ok { if !ok {
channelStates = &segmentStatesSequence{ m.statsCache[channelName] = &channelStats{
data: make(map[uint64][]*commonpb.SegmentStats, 0), segStats: make(map[int64]*segmentStats),
} }
} }
channelStates.data[timestamp] = segmentStats for _, stats := range segStats {
m.channelStatesCaches[channelName] = channelStates segmentID := stats.GetSegmentID()
m.statsCache[channelName].segStats[segmentID] = &segmentStats{
SegmentStats: stats,
ts: timestamp,
}
}
m.statsCache[channelName].lastTs = timestamp
} }
func (m *timeTickSender) mergeDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { func (m *timeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) {
m.mu.Lock() m.mu.RLock()
defer m.mu.Unlock() defer m.mu.RUnlock()
var msgs []*msgpb.DataNodeTtMsg var msgs []*msgpb.DataNodeTtMsg
sendedLastTss := make(map[string]uint64, 0) lastSentTss := make(map[string]uint64, 0)
for channelName, channelSegmentStates := range m.channelStatesCaches { for channelName, chanStats := range m.statsCache {
var lastTs uint64 toSendSegmentStats := lo.Map(lo.Values(chanStats.segStats), func(stats *segmentStats, _ int) *commonpb.SegmentStats {
segNumRows := make(map[int64]int64, 0) return stats.SegmentStats
for ts, segmentStates := range channelSegmentStates.data { })
if ts > lastTs {
lastTs = ts
}
// merge the same segments into one
for _, segmentStat := range segmentStates {
if v, ok := segNumRows[segmentStat.GetSegmentID()]; ok {
// numRows is supposed to keep growing
if segmentStat.GetNumRows() > v {
segNumRows[segmentStat.GetSegmentID()] = segmentStat.GetNumRows()
}
} else {
segNumRows[segmentStat.GetSegmentID()] = segmentStat.GetNumRows()
}
}
}
toSendSegmentStats := make([]*commonpb.SegmentStats, 0)
for id, numRows := range segNumRows {
toSendSegmentStats = append(toSendSegmentStats, &commonpb.SegmentStats{
SegmentID: id,
NumRows: numRows,
})
}
msgs = append(msgs, &msgpb.DataNodeTtMsg{ msgs = append(msgs, &msgpb.DataNodeTtMsg{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt),
commonpbutil.WithSourceID(m.nodeID), commonpbutil.WithSourceID(m.nodeID),
), ),
ChannelName: channelName, ChannelName: channelName,
Timestamp: lastTs, Timestamp: chanStats.lastTs,
SegmentsStats: toSendSegmentStats, SegmentsStats: toSendSegmentStats,
}) })
sendedLastTss[channelName] = lastTs lastSentTss[channelName] = chanStats.lastTs
} }
return msgs, sendedLastTss return msgs, lastSentTss
} }
func (m *timeTickSender) cleanStatesCache(sendedLastTss map[string]uint64) { func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
sizeBeforeClean := len(m.channelStatesCaches) sizeBeforeClean := len(m.statsCache)
log := log.With(zap.Any("sendedLastTss", sendedLastTss), zap.Int("sizeBeforeClean", sizeBeforeClean)) log := log.With(zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean))
for channelName, sendedLastTs := range sendedLastTss { for channelName, lastSentTs := range lastSentTss {
channelCache, ok := m.channelStatesCaches[channelName] _, ok := m.statsCache[channelName]
if ok { if ok {
for ts := range channelCache.data { for segmentID, stats := range m.statsCache[channelName].segStats {
if ts <= sendedLastTs { if stats.ts <= lastSentTs {
delete(channelCache.data, ts) delete(m.statsCache[channelName].segStats, segmentID)
} }
} }
m.channelStatesCaches[channelName] = channelCache
} }
if len(channelCache.data) == 0 { if len(m.statsCache[channelName].segStats) == 0 {
delete(m.channelStatesCaches, channelName) delete(m.statsCache, channelName)
} }
} }
log.RatedDebug(30, "timeTickSender channelStatesCaches", zap.Int("sizeAfterClean", len(m.channelStatesCaches))) log.RatedDebug(30, "timeTickSender stats", zap.Int("sizeAfterClean", len(m.statsCache)))
} }
func (m *timeTickSender) sendReport(ctx context.Context) error { func (m *timeTickSender) sendReport(ctx context.Context) error {
toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg() toSendMsgs, sendLastTss := m.assembleDatanodeTtMsg()
log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss)) log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss))
err := retry.Do(ctx, func() error { err := retry.Do(ctx, func() error {
return m.broker.ReportTimeTick(ctx, toSendMsgs) return m.broker.ReportTimeTick(ctx, toSendMsgs)

View File

@ -55,11 +55,14 @@ func TestTimetickManagerNormal(t *testing.T) {
// update first time // update first time
manager.update(channelName1, ts, segmentStats) manager.update(channelName1, ts, segmentStats)
channel1SegmentStates, channelSegmentStatesExist := manager.channelStatesCaches[channelName1] chanStats, exist := manager.statsCache[channelName1]
assert.Equal(t, true, channelSegmentStatesExist) assert.Equal(t, true, exist)
segmentState1, segmentState1Exist := channel1SegmentStates.data[ts] assert.Equal(t, 1, len(chanStats.segStats))
assert.Equal(t, segmentStats[0], segmentState1[0]) seg1, exist := manager.statsCache[channelName1].segStats[segmentID1]
assert.Equal(t, true, segmentState1Exist) assert.Equal(t, true, exist)
assert.Equal(t, segmentID1, seg1.GetSegmentID())
assert.Equal(t, int64(100), seg1.GetNumRows())
assert.Equal(t, ts, seg1.ts)
// update second time // update second time
segmentStats2 := []*commonpb.SegmentStats{ segmentStats2 := []*commonpb.SegmentStats{
@ -75,12 +78,19 @@ func TestTimetickManagerNormal(t *testing.T) {
ts2 := ts + 100 ts2 := ts + 100
manager.update(channelName1, ts2, segmentStats2) manager.update(channelName1, ts2, segmentStats2)
channelSegmentStates, channelSegmentStatesExist := manager.channelStatesCaches[channelName1] chanStats, exist = manager.statsCache[channelName1]
assert.Equal(t, true, channelSegmentStatesExist) assert.Equal(t, true, exist)
assert.Equal(t, 2, len(chanStats.segStats))
segmentStates, segmentStatesExist := channelSegmentStates.data[ts2] seg1, exist = manager.statsCache[channelName1].segStats[segmentID1]
assert.Equal(t, true, segmentStatesExist) assert.Equal(t, true, exist)
assert.Equal(t, 2, len(segmentStates)) assert.Equal(t, segmentID1, seg1.GetSegmentID())
assert.Equal(t, int64(10000), seg1.GetNumRows())
assert.Equal(t, ts2, seg1.ts)
seg2, exist := manager.statsCache[channelName1].segStats[segmentID2]
assert.Equal(t, true, exist)
assert.Equal(t, segmentID2, seg2.GetSegmentID())
assert.Equal(t, int64(33333), seg2.GetNumRows())
assert.Equal(t, ts2, seg2.ts)
var segmentID3 int64 = 28259 var segmentID3 int64 = 28259
var segmentID4 int64 = 28260 var segmentID4 int64 = 28260
@ -101,11 +111,10 @@ func TestTimetickManagerNormal(t *testing.T) {
err := manager.sendReport(ctx) err := manager.sendReport(ctx)
assert.NoError(t, err) assert.NoError(t, err)
_, channelExistAfterSubmit := manager.channelStatesCaches[channelName1] _, exist = manager.statsCache[channelName1]
assert.Equal(t, false, channelExistAfterSubmit) assert.Equal(t, false, exist)
_, exist = manager.statsCache[channelName2]
_, channelSegmentStatesExistAfterSubmit := manager.channelStatesCaches[channelName1] assert.Equal(t, false, exist)
assert.Equal(t, false, channelSegmentStatesExistAfterSubmit)
var segmentID5 int64 = 28261 var segmentID5 int64 = 28261
var segmentID6 int64 = 28262 var segmentID6 int64 = 28262
@ -126,11 +135,8 @@ func TestTimetickManagerNormal(t *testing.T) {
err = manager.sendReport(ctx) err = manager.sendReport(ctx)
assert.NoError(t, err) assert.NoError(t, err)
_, channelExistAfterSubmit2 := manager.channelStatesCaches[channelName1] _, exist = manager.statsCache[channelName3]
assert.Equal(t, false, channelExistAfterSubmit2) assert.Equal(t, false, exist)
_, channelSegmentStatesExistAfterSubmit2 := manager.channelStatesCaches[channelName1]
assert.Equal(t, false, channelSegmentStatesExistAfterSubmit2)
} }
func TestTimetickManagerSendErr(t *testing.T) { func TestTimetickManagerSendErr(t *testing.T) {