mirror of https://github.com/milvus-io/milvus.git
fix datacoord consume datanode tt metrics (#25524)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/25260/head
parent
dbfade77c5
commit
012fd1152a
|
@ -1417,7 +1417,7 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat
|
|||
}
|
||||
|
||||
for _, ttMsg := range req.GetMsgs() {
|
||||
sub := tsoutil.SubByNow(req.GetBase().GetTimestamp())
|
||||
sub := tsoutil.SubByNow(ttMsg.GetTimestamp())
|
||||
metrics.DataCoordConsumeDataNodeTimeTickLag.
|
||||
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), ttMsg.GetChannelName()).
|
||||
Set(float64(sub))
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
// timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
|
||||
|
@ -65,8 +66,8 @@ func (m *timeTickSender) start(ctx context.Context) {
|
|||
case <-ctx.Done():
|
||||
log.Info("timeTickSender context done")
|
||||
return
|
||||
case t := <-ticker.C:
|
||||
m.sendReport(ctx, uint64(t.UnixMilli()))
|
||||
case <-ticker.C:
|
||||
m.sendReport(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -154,10 +155,11 @@ func (m *timeTickSender) cleanStatesCache(sendedLastTss map[string]uint64) {
|
|||
log.Debug("timeTickSender channelStatesCaches", zap.Int("sizeAfterClean", len(m.channelStatesCaches)))
|
||||
}
|
||||
|
||||
func (m *timeTickSender) sendReport(ctx context.Context, submitTs Timestamp) error {
|
||||
func (m *timeTickSender) sendReport(ctx context.Context) error {
|
||||
toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg()
|
||||
log.Debug("timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss))
|
||||
err := retry.Do(ctx, func() error {
|
||||
submitTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
statusResp, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt),
|
||||
|
|
|
@ -90,7 +90,7 @@ func TestTimetickManagerNormal(t *testing.T) {
|
|||
}
|
||||
manager.update(channelName2, ts3, segmentStats3)
|
||||
|
||||
err := manager.sendReport(ctx, 100)
|
||||
err := manager.sendReport(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, channelExistAfterSubmit := manager.channelStatesCaches[channelName1]
|
||||
|
@ -115,7 +115,7 @@ func TestTimetickManagerNormal(t *testing.T) {
|
|||
}
|
||||
manager.update(channelName3, ts4, segmentStats4)
|
||||
|
||||
err = manager.sendReport(ctx, 100)
|
||||
err = manager.sendReport(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, channelExistAfterSubmit2 := manager.channelStatesCaches[channelName1]
|
||||
|
@ -140,7 +140,7 @@ func TestTimetickManagerSendErr(t *testing.T) {
|
|||
}
|
||||
// update first time
|
||||
manager.update(channelName1, ts, segmentStats)
|
||||
err := manager.sendReport(ctx, 100)
|
||||
err := manager.sendReport(ctx)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
@ -159,7 +159,7 @@ func TestTimetickManagerSendNotSuccess(t *testing.T) {
|
|||
}
|
||||
// update first time
|
||||
manager.update(channelName1, ts, segmentStats)
|
||||
err := manager.sendReport(ctx, 100)
|
||||
err := manager.sendReport(ctx)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue