Use single instance for mergedTimeTickerSender (#27730)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/28379/head
smellthemoon 2023-11-13 10:18:17 +08:00 committed by GitHub
parent 4d30405a6e
commit 5365748338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 7 deletions

View File

@ -743,7 +743,7 @@ func newInsertBufferNode(
wTtMsgStream := wTt
wTtMsgStream.EnableProduce(true)
mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error {
mt := getOrCreateMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error {
stats := make([]*commonpb.SegmentStats, 0, len(segmentIDs))
for _, sid := range segmentIDs {
stat, err := config.channel.getSegmentStatisticsUpdates(sid)

View File

@ -46,19 +46,29 @@ type mergedTimeTickerSender struct {
closeOnce sync.Once
}
func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
mt := &mergedTimeTickerSender{
var (
uniqueMergedTimeTickerSender *mergedTimeTickerSender
getUniqueMergedTimeTickerSender sync.Once
)
func newUniqueMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
return &mergedTimeTickerSender{
ts: 0, // 0 for not tt send
segmentIDs: make(map[int64]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
send: send,
closeCh: make(chan struct{}),
}
mt.wg.Add(2)
go mt.tick()
go mt.work()
}
return mt
func getOrCreateMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
getUniqueMergedTimeTickerSender.Do(func() {
uniqueMergedTimeTickerSender = newUniqueMergedTimeTickerSender(send)
})
uniqueMergedTimeTickerSender.wg.Add(2)
go uniqueMergedTimeTickerSender.tick()
go uniqueMergedTimeTickerSender.work()
return uniqueMergedTimeTickerSender
}
func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) {