mirror of https://github.com/milvus-io/milvus.git
Refine datanode Timetick Sender (#28393)
- Use explicit lifetime control methods: `Start` and `Stop` - Allow control retry option - Make sure tt sender worker exit after `Stop` return Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/28402/head
parent
371875f650
commit
b1eb1ea506
|
@ -355,8 +355,9 @@ func (node *DataNode) Start() error {
|
|||
go node.compactionExecutor.start(node.ctx)
|
||||
|
||||
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
|
||||
node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID)
|
||||
go node.timeTickSender.start(node.ctx)
|
||||
node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID,
|
||||
retry.Attempts(20), retry.Sleep(time.Millisecond*100))
|
||||
node.timeTickSender.start()
|
||||
}
|
||||
|
||||
node.stopWaiter.Add(1)
|
||||
|
@ -420,6 +421,10 @@ func (node *DataNode) Stop() error {
|
|||
node.session.Stop()
|
||||
}
|
||||
|
||||
if node.timeTickSender != nil {
|
||||
node.timeTickSender.Stop()
|
||||
}
|
||||
|
||||
node.stopWaiter.Wait()
|
||||
})
|
||||
return nil
|
||||
|
|
|
@ -71,6 +71,7 @@ func TestWatchChannel(t *testing.T) {
|
|||
|
||||
node.broker = broker
|
||||
|
||||
node.timeTickSender.Stop()
|
||||
node.timeTickSender = newTimeTickSender(node.broker, 0)
|
||||
|
||||
t.Run("test watch channel", func(t *testing.T) {
|
||||
|
|
|
@ -38,6 +38,11 @@ type timeTickSender struct {
|
|||
nodeID int64
|
||||
broker broker.Broker
|
||||
|
||||
wg sync.WaitGroup
|
||||
cancelFunc context.CancelFunc
|
||||
|
||||
options []retry.Option
|
||||
|
||||
mu sync.Mutex
|
||||
channelStatesCaches map[string]*segmentStatesSequence // string -> *segmentStatesSequence
|
||||
}
|
||||
|
@ -47,15 +52,33 @@ type segmentStatesSequence struct {
|
|||
data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats
|
||||
}
|
||||
|
||||
func newTimeTickSender(broker broker.Broker, nodeID int64) *timeTickSender {
|
||||
func newTimeTickSender(broker broker.Broker, nodeID int64, opts ...retry.Option) *timeTickSender {
|
||||
return &timeTickSender{
|
||||
nodeID: nodeID,
|
||||
broker: broker,
|
||||
channelStatesCaches: make(map[string]*segmentStatesSequence, 0),
|
||||
options: opts,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *timeTickSender) start(ctx context.Context) {
|
||||
func (m *timeTickSender) start() {
|
||||
m.wg.Add(1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
m.cancelFunc = cancel
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
m.work(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
func (m *timeTickSender) Stop() {
|
||||
if m.cancelFunc != nil {
|
||||
m.cancelFunc()
|
||||
m.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *timeTickSender) work(ctx context.Context) {
|
||||
ticker := time.NewTicker(Params.DataNodeCfg.DataNodeTimeTickInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
@ -157,7 +180,7 @@ func (m *timeTickSender) sendReport(ctx context.Context) error {
|
|||
log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss))
|
||||
err := retry.Do(ctx, func() error {
|
||||
return m.broker.ReportTimeTick(ctx, toSendMsgs)
|
||||
}, retry.Attempts(20), retry.Sleep(time.Millisecond*100))
|
||||
}, m.options...)
|
||||
if err != nil {
|
||||
log.Error("ReportDataNodeTtMsgs fail after retry", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/datanode/broker"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
)
|
||||
|
||||
func TestTimetickManagerNormal(t *testing.T) {
|
||||
|
@ -138,7 +139,7 @@ func TestTimetickManagerSendErr(t *testing.T) {
|
|||
broker := broker.NewMockBroker(t)
|
||||
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(errors.New("mock")).Maybe()
|
||||
|
||||
manager := newTimeTickSender(broker, 0)
|
||||
manager := newTimeTickSender(broker, 0, retry.Attempts(1))
|
||||
|
||||
channelName1 := "channel1"
|
||||
ts := uint64(time.Now().Unix())
|
||||
|
@ -156,8 +157,6 @@ func TestTimetickManagerSendErr(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTimetickManagerSendReport(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
mockDataCoord := mocks.NewMockDataCoordClient(t)
|
||||
|
||||
called := atomic.NewBool(false)
|
||||
|
@ -170,9 +169,11 @@ func TestTimetickManagerSendReport(t *testing.T) {
|
|||
Return(nil)
|
||||
mockDataCoord.EXPECT().ReportDataNodeTtMsgs(mock.Anything, mock.Anything).Return(merr.Status(nil), nil).Maybe()
|
||||
manager := newTimeTickSender(broker, 0)
|
||||
go manager.start(ctx)
|
||||
manager.start()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
return called.Load()
|
||||
}, 2*time.Second, 500*time.Millisecond)
|
||||
|
||||
manager.Stop()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue