diff --git a/internal/datanode/timetick_sender.go b/internal/datanode/timetick_sender.go index a73406e54c..145e60aec8 100644 --- a/internal/datanode/timetick_sender.go +++ b/internal/datanode/timetick_sender.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -32,8 +33,8 @@ import ( ) // timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically -// timeTickSender hold a SegmentStats time sequence cache for each channel, -// after send succeeds will clean the cache earlier than the sended timestamp +// timeTickSender hold segmentStats cache for each channel, +// after send succeeds will clean the cache earlier than last sent timestamp type timeTickSender struct { nodeID int64 broker broker.Broker @@ -43,21 +44,27 @@ type timeTickSender struct { options []retry.Option - mu sync.Mutex - channelStatesCaches map[string]*segmentStatesSequence // string -> *segmentStatesSequence + mu sync.RWMutex + statsCache map[string]*channelStats // channel -> channelStats +} + +type channelStats struct { + segStats map[int64]*segmentStats // segmentID -> segmentStats + lastTs uint64 } // data struct only used in timeTickSender -type segmentStatesSequence struct { - data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats +type segmentStats struct { + *commonpb.SegmentStats + ts uint64 } func newTimeTickSender(broker broker.Broker, nodeID int64, opts ...retry.Option) *timeTickSender { return &timeTickSender{ - nodeID: nodeID, - broker: broker, - channelStatesCaches: make(map[string]*segmentStatesSequence, 0), - options: opts, + nodeID: nodeID, + broker: broker, + statsCache: make(map[string]*channelStats), + options: opts, } } @@ -92,91 +99,74 @@ func (m *timeTickSender) work(ctx context.Context) { } } -func (m *timeTickSender) update(channelName string, timestamp uint64, segmentStats []*commonpb.SegmentStats) { +func (m *timeTickSender) update(channelName string, timestamp uint64, segStats []*commonpb.SegmentStats) { m.mu.Lock() defer m.mu.Unlock() - channelStates, ok := m.channelStatesCaches[channelName] + _, ok := m.statsCache[channelName] if !ok { - channelStates = &segmentStatesSequence{ - data: make(map[uint64][]*commonpb.SegmentStats, 0), + m.statsCache[channelName] = &channelStats{ + segStats: make(map[int64]*segmentStats), } } - channelStates.data[timestamp] = segmentStats - m.channelStatesCaches[channelName] = channelStates + for _, stats := range segStats { + segmentID := stats.GetSegmentID() + m.statsCache[channelName].segStats[segmentID] = &segmentStats{ + SegmentStats: stats, + ts: timestamp, + } + } + m.statsCache[channelName].lastTs = timestamp } -func (m *timeTickSender) mergeDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { - m.mu.Lock() - defer m.mu.Unlock() +func (m *timeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { + m.mu.RLock() + defer m.mu.RUnlock() var msgs []*msgpb.DataNodeTtMsg - sendedLastTss := make(map[string]uint64, 0) + lastSentTss := make(map[string]uint64, 0) - for channelName, channelSegmentStates := range m.channelStatesCaches { - var lastTs uint64 - segNumRows := make(map[int64]int64, 0) - for ts, segmentStates := range channelSegmentStates.data { - if ts > lastTs { - lastTs = ts - } - // merge the same segments into one - for _, segmentStat := range segmentStates { - if v, ok := segNumRows[segmentStat.GetSegmentID()]; ok { - // numRows is supposed to keep growing - if segmentStat.GetNumRows() > v { - segNumRows[segmentStat.GetSegmentID()] = segmentStat.GetNumRows() - } - } else { - segNumRows[segmentStat.GetSegmentID()] = segmentStat.GetNumRows() - } - } - } - toSendSegmentStats := make([]*commonpb.SegmentStats, 0) - for id, numRows := range segNumRows { - toSendSegmentStats = append(toSendSegmentStats, &commonpb.SegmentStats{ - SegmentID: id, - NumRows: numRows, - }) - } + for channelName, chanStats := range m.statsCache { + toSendSegmentStats := lo.Map(lo.Values(chanStats.segStats), func(stats *segmentStats, _ int) *commonpb.SegmentStats { + return stats.SegmentStats + }) msgs = append(msgs, &msgpb.DataNodeTtMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), commonpbutil.WithSourceID(m.nodeID), ), ChannelName: channelName, - Timestamp: lastTs, + Timestamp: chanStats.lastTs, SegmentsStats: toSendSegmentStats, }) - sendedLastTss[channelName] = lastTs + lastSentTss[channelName] = chanStats.lastTs } - return msgs, sendedLastTss + return msgs, lastSentTss } -func (m *timeTickSender) cleanStatesCache(sendedLastTss map[string]uint64) { +func (m *timeTickSender) cleanStatesCache(lastSentTss map[string]uint64) { m.mu.Lock() defer m.mu.Unlock() - sizeBeforeClean := len(m.channelStatesCaches) - log := log.With(zap.Any("sendedLastTss", sendedLastTss), zap.Int("sizeBeforeClean", sizeBeforeClean)) - for channelName, sendedLastTs := range sendedLastTss { - channelCache, ok := m.channelStatesCaches[channelName] + sizeBeforeClean := len(m.statsCache) + log := log.With(zap.Any("lastSentTss", lastSentTss), zap.Int("sizeBeforeClean", sizeBeforeClean)) + for channelName, lastSentTs := range lastSentTss { + _, ok := m.statsCache[channelName] if ok { - for ts := range channelCache.data { - if ts <= sendedLastTs { - delete(channelCache.data, ts) + for segmentID, stats := range m.statsCache[channelName].segStats { + if stats.ts <= lastSentTs { + delete(m.statsCache[channelName].segStats, segmentID) } } - m.channelStatesCaches[channelName] = channelCache } - if len(channelCache.data) == 0 { - delete(m.channelStatesCaches, channelName) + if len(m.statsCache[channelName].segStats) == 0 { + delete(m.statsCache, channelName) } } - log.RatedDebug(30, "timeTickSender channelStatesCaches", zap.Int("sizeAfterClean", len(m.channelStatesCaches))) + log.RatedDebug(30, "timeTickSender stats", zap.Int("sizeAfterClean", len(m.statsCache))) } func (m *timeTickSender) sendReport(ctx context.Context) error { - toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg() + toSendMsgs, sendLastTss := m.assembleDatanodeTtMsg() log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss)) err := retry.Do(ctx, func() error { return m.broker.ReportTimeTick(ctx, toSendMsgs) diff --git a/internal/datanode/timetick_sender_test.go b/internal/datanode/timetick_sender_test.go index 445e8a0a9c..61e7245a27 100644 --- a/internal/datanode/timetick_sender_test.go +++ b/internal/datanode/timetick_sender_test.go @@ -55,11 +55,14 @@ func TestTimetickManagerNormal(t *testing.T) { // update first time manager.update(channelName1, ts, segmentStats) - channel1SegmentStates, channelSegmentStatesExist := manager.channelStatesCaches[channelName1] - assert.Equal(t, true, channelSegmentStatesExist) - segmentState1, segmentState1Exist := channel1SegmentStates.data[ts] - assert.Equal(t, segmentStats[0], segmentState1[0]) - assert.Equal(t, true, segmentState1Exist) + chanStats, exist := manager.statsCache[channelName1] + assert.Equal(t, true, exist) + assert.Equal(t, 1, len(chanStats.segStats)) + seg1, exist := manager.statsCache[channelName1].segStats[segmentID1] + assert.Equal(t, true, exist) + assert.Equal(t, segmentID1, seg1.GetSegmentID()) + assert.Equal(t, int64(100), seg1.GetNumRows()) + assert.Equal(t, ts, seg1.ts) // update second time segmentStats2 := []*commonpb.SegmentStats{ @@ -75,12 +78,19 @@ func TestTimetickManagerNormal(t *testing.T) { ts2 := ts + 100 manager.update(channelName1, ts2, segmentStats2) - channelSegmentStates, channelSegmentStatesExist := manager.channelStatesCaches[channelName1] - assert.Equal(t, true, channelSegmentStatesExist) - - segmentStates, segmentStatesExist := channelSegmentStates.data[ts2] - assert.Equal(t, true, segmentStatesExist) - assert.Equal(t, 2, len(segmentStates)) + chanStats, exist = manager.statsCache[channelName1] + assert.Equal(t, true, exist) + assert.Equal(t, 2, len(chanStats.segStats)) + seg1, exist = manager.statsCache[channelName1].segStats[segmentID1] + assert.Equal(t, true, exist) + assert.Equal(t, segmentID1, seg1.GetSegmentID()) + assert.Equal(t, int64(10000), seg1.GetNumRows()) + assert.Equal(t, ts2, seg1.ts) + seg2, exist := manager.statsCache[channelName1].segStats[segmentID2] + assert.Equal(t, true, exist) + assert.Equal(t, segmentID2, seg2.GetSegmentID()) + assert.Equal(t, int64(33333), seg2.GetNumRows()) + assert.Equal(t, ts2, seg2.ts) var segmentID3 int64 = 28259 var segmentID4 int64 = 28260 @@ -101,11 +111,10 @@ func TestTimetickManagerNormal(t *testing.T) { err := manager.sendReport(ctx) assert.NoError(t, err) - _, channelExistAfterSubmit := manager.channelStatesCaches[channelName1] - assert.Equal(t, false, channelExistAfterSubmit) - - _, channelSegmentStatesExistAfterSubmit := manager.channelStatesCaches[channelName1] - assert.Equal(t, false, channelSegmentStatesExistAfterSubmit) + _, exist = manager.statsCache[channelName1] + assert.Equal(t, false, exist) + _, exist = manager.statsCache[channelName2] + assert.Equal(t, false, exist) var segmentID5 int64 = 28261 var segmentID6 int64 = 28262 @@ -126,11 +135,8 @@ func TestTimetickManagerNormal(t *testing.T) { err = manager.sendReport(ctx) assert.NoError(t, err) - _, channelExistAfterSubmit2 := manager.channelStatesCaches[channelName1] - assert.Equal(t, false, channelExistAfterSubmit2) - - _, channelSegmentStatesExistAfterSubmit2 := manager.channelStatesCaches[channelName1] - assert.Equal(t, false, channelSegmentStatesExistAfterSubmit2) + _, exist = manager.statsCache[channelName3] + assert.Equal(t, false, exist) } func TestTimetickManagerSendErr(t *testing.T) {