Fix deadlock in mergedTimeTickSender (#12697)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/12706/head
congqixia 2021-12-03 16:39:33 +08:00 committed by GitHub
parent b832071e41
commit ffd05eb140
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 6 deletions

View File

@ -61,6 +61,8 @@ func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp) {
defer mt.lastMut.RUnlock()
if !mt.lastSent.IsZero() && time.Since(mt.lastSent) > time.Millisecond*100 {
mt.cond.L.Lock()
defer mt.cond.L.Unlock()
mt.cond.Signal()
}
}
@ -72,24 +74,33 @@ func (mt *mergedTimeTickerSender) tick() {
for {
select {
case <-t:
mt.cond.L.Lock()
mt.cond.Signal() // allow worker to check every 0.1s
mt.cond.L.Unlock()
case <-mt.closeCh:
return
}
}
}
func (mt *mergedTimeTickerSender) isClosed() bool {
select {
case <-mt.closeCh:
return true
default:
return false
}
}
func (mt *mergedTimeTickerSender) work() {
defer mt.wg.Done()
ts, lastTs := uint64(0), uint64(0)
for {
select {
case <-mt.closeCh:
return
default:
}
mt.cond.L.Lock()
if mt.isClosed() {
mt.cond.L.Unlock()
return
}
mt.cond.Wait()
ts = mt.ts.Load()
mt.cond.L.Unlock()
@ -105,8 +116,10 @@ func (mt *mergedTimeTickerSender) work() {
func (mt *mergedTimeTickerSender) close() {
mt.closeOnce.Do(func() {
mt.cond.L.Lock()
close(mt.closeCh)
mt.cond.Broadcast()
mt.cond.L.Unlock()
mt.wg.Wait()
})
}

View File

@ -29,3 +29,30 @@ func TestMergedTimeTicker(t *testing.T) {
assert.Less(t, len(ticks), 20)
mut.Unlock()
}
func TestMergedTimeTicker_close10000(t *testing.T) {
var wg sync.WaitGroup
batchSize := 10000
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
return nil
})
go func(mt *mergedTimeTickerSender) {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
mt.close()
}(mt)
}
tm := time.NewTimer(time.Millisecond * 20)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-tm.C:
t.FailNow()
case <-done:
}
}