enhance: [2.4] avoid to create many timer object in the target (#36573)

/kind improvement
- pr: #36570

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/36616/head
SimFG 2024-09-29 19:27:16 +08:00 committed by GitHub
parent 9a07c1bca9
commit 58a763c529
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 40 additions and 20 deletions

View File

@ -73,16 +73,13 @@ func TestDispatcher(t *testing.T) {
"mock_subName_0", common.SubscriptionPositionEarliest, nil, nil)
assert.NoError(t, err)
output := make(chan *msgstream.MsgPack, 1024)
d.AddTarget(&target{
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
})
d.AddTarget(&target{
vchannel: "mock_vchannel_1",
pos: nil,
ch: nil,
})
getTarget := func(vchannel string, pos *Pos, ch chan *msgstream.MsgPack) *target {
target := newTarget(vchannel, pos)
target.ch = ch
return target
}
d.AddTarget(getTarget("mock_vchannel_0", nil, output))
d.AddTarget(getTarget("mock_vchannel_1", nil, nil))
num := d.TargetNum()
assert.Equal(t, 2, num)
@ -106,11 +103,8 @@ func TestDispatcher(t *testing.T) {
t.Run("test concurrent send and close", func(t *testing.T) {
for i := 0; i < 100; i++ {
output := make(chan *msgstream.MsgPack, 1024)
target := &target{
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
}
target := newTarget("mock_vchannel_0", nil)
target.ch = output
assert.Equal(t, cap(output), cap(target.ch))
wg := &sync.WaitGroup{}
for j := 0; j < 100; j++ {

View File

@ -21,6 +21,10 @@ import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -32,23 +36,33 @@ type target struct {
closeMu sync.Mutex
closeOnce sync.Once
closed bool
maxLag time.Duration
timer *time.Timer
cancelCh lifetime.SafeChan
}
func newTarget(vchannel string, pos *Pos) *target {
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
t := &target{
vchannel: vchannel,
ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()),
pos: pos,
cancelCh: lifetime.NewSafeChan(),
maxLag: maxTolerantLag,
timer: time.NewTimer(maxTolerantLag),
}
t.closed = false
return t
}
func (t *target) close() {
t.cancelCh.Close()
t.closeMu.Lock()
defer t.closeMu.Unlock()
t.closeOnce.Do(func() {
t.closed = true
t.timer.Stop()
close(t.ch)
})
}
@ -59,10 +73,13 @@ func (t *target) send(pack *MsgPack) error {
if t.closed {
return nil
}
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
t.timer.Reset(t.maxLag)
select {
case <-time.After(maxTolerantLag):
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag)
case <-t.cancelCh.CloseCh():
log.Info("target closed", zap.String("vchannel", t.vchannel))
return nil
case <-t.timer.C:
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, t.maxLag)
case t.ch <- pack:
return nil
}

View File

@ -98,7 +98,7 @@ func SkipRemote(skip bool) Option {
}
}
func skipEnv(skip bool) Option {
func SkipEnv(skip bool) Option {
return func(bt *baseTableConfig) {
bt.skipEnv = skip
}
@ -107,7 +107,7 @@ func skipEnv(skip bool) Option {
// NewBaseTableFromYamlOnly only used in migration tool.
// Maybe we shouldn't limit the configDir internally.
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), skipEnv(true))
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), SkipEnv(true))
}
func NewBaseTable(opts ...Option) *BaseTable {

View File

@ -121,3 +121,12 @@ func (m *ConcurrentMap[K, V]) Values() []V {
})
return ret
}
func (m *ConcurrentMap[K, V]) Keys() []K {
ret := make([]K, m.Len())
m.inner.Range(func(key, value any) bool {
ret = append(ret, key.(K))
return true
})
return ret
}