mirror of https://github.com/milvus-io/milvus.git
enhance: avoid to create many timer object in the target (#36570)
/kind improvement Signed-off-by: SimFG <bang.fu@zilliz.com>pull/36602/head
parent
a47abb2f2b
commit
9c1772f659
|
@ -28,7 +28,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/mq/common"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/lifetime"
|
||||
)
|
||||
|
||||
func TestDispatcher(t *testing.T) {
|
||||
|
@ -71,18 +70,15 @@ func TestDispatcher(t *testing.T) {
|
|||
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)
|
||||
assert.NoError(t, err)
|
||||
output := make(chan *msgstream.MsgPack, 1024)
|
||||
d.AddTarget(&target{
|
||||
vchannel: "mock_vchannel_0",
|
||||
pos: nil,
|
||||
ch: output,
|
||||
cancelCh: lifetime.NewSafeChan(),
|
||||
})
|
||||
d.AddTarget(&target{
|
||||
vchannel: "mock_vchannel_1",
|
||||
pos: nil,
|
||||
ch: nil,
|
||||
cancelCh: lifetime.NewSafeChan(),
|
||||
})
|
||||
|
||||
getTarget := func(vchannel string, pos *Pos, ch chan *msgstream.MsgPack) *target {
|
||||
target := newTarget(vchannel, pos)
|
||||
target.ch = ch
|
||||
return target
|
||||
}
|
||||
|
||||
d.AddTarget(getTarget("mock_vchannel_0", nil, output))
|
||||
d.AddTarget(getTarget("mock_vchannel_1", nil, nil))
|
||||
num := d.TargetNum()
|
||||
assert.Equal(t, 2, num)
|
||||
|
||||
|
@ -106,12 +102,8 @@ func TestDispatcher(t *testing.T) {
|
|||
t.Run("test concurrent send and close", func(t *testing.T) {
|
||||
for i := 0; i < 100; i++ {
|
||||
output := make(chan *msgstream.MsgPack, 1024)
|
||||
target := &target{
|
||||
vchannel: "mock_vchannel_0",
|
||||
pos: nil,
|
||||
ch: output,
|
||||
cancelCh: lifetime.NewSafeChan(),
|
||||
}
|
||||
target := newTarget("mock_vchannel_0", nil)
|
||||
target.ch = output
|
||||
assert.Equal(t, cap(output), cap(target.ch))
|
||||
wg := &sync.WaitGroup{}
|
||||
for j := 0; j < 100; j++ {
|
||||
|
|
|
@ -36,16 +36,21 @@ type target struct {
|
|||
closeMu sync.Mutex
|
||||
closeOnce sync.Once
|
||||
closed bool
|
||||
maxLag time.Duration
|
||||
timer *time.Timer
|
||||
|
||||
cancelCh lifetime.SafeChan
|
||||
}
|
||||
|
||||
func newTarget(vchannel string, pos *Pos) *target {
|
||||
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
|
||||
t := &target{
|
||||
vchannel: vchannel,
|
||||
ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()),
|
||||
pos: pos,
|
||||
cancelCh: lifetime.NewSafeChan(),
|
||||
maxLag: maxTolerantLag,
|
||||
timer: time.NewTimer(maxTolerantLag),
|
||||
}
|
||||
t.closed = false
|
||||
return t
|
||||
|
@ -57,6 +62,7 @@ func (t *target) close() {
|
|||
defer t.closeMu.Unlock()
|
||||
t.closeOnce.Do(func() {
|
||||
t.closed = true
|
||||
t.timer.Stop()
|
||||
close(t.ch)
|
||||
})
|
||||
}
|
||||
|
@ -67,13 +73,13 @@ func (t *target) send(pack *MsgPack) error {
|
|||
if t.closed {
|
||||
return nil
|
||||
}
|
||||
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
|
||||
t.timer.Reset(t.maxLag)
|
||||
select {
|
||||
case <-t.cancelCh.CloseCh():
|
||||
log.Info("target closed", zap.String("vchannel", t.vchannel))
|
||||
return nil
|
||||
case <-time.After(maxTolerantLag):
|
||||
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag)
|
||||
case <-t.timer.C:
|
||||
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, t.maxLag)
|
||||
case t.ch <- pack:
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -103,7 +103,7 @@ func SkipRemote(skip bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func skipEnv(skip bool) Option {
|
||||
func SkipEnv(skip bool) Option {
|
||||
return func(bt *baseTableConfig) {
|
||||
bt.skipEnv = skip
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func skipEnv(skip bool) Option {
|
|||
// NewBaseTableFromYamlOnly only used in migration tool.
|
||||
// Maybe we shouldn't limit the configDir internally.
|
||||
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
|
||||
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), skipEnv(true))
|
||||
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), SkipEnv(true))
|
||||
}
|
||||
|
||||
func NewBaseTable(opts ...Option) *BaseTable {
|
||||
|
|
|
@ -121,3 +121,12 @@ func (m *ConcurrentMap[K, V]) Values() []V {
|
|||
})
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *ConcurrentMap[K, V]) Keys() []K {
|
||||
ret := make([]K, m.Len())
|
||||
m.inner.Range(func(key, value any) bool {
|
||||
ret = append(ret, key.(K))
|
||||
return true
|
||||
})
|
||||
return ret
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue