Fix io pool goroutine leakage (#19892)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
pull/19903/head
Jiquan Long 2022-10-19 18:23:27 +08:00 committed by GitHub
parent b79687687d
commit 579c50fa1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 14 deletions

View File

@ -54,9 +54,6 @@ type dataSyncService struct {
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
compactor *compactionExecutor // reference to compaction executor
// concurrent add segments, reduce time to load delta log from oss
ioPool *concurrency.Pool
}
func newDataSyncService(ctx context.Context,
@ -77,14 +74,6 @@ func newDataSyncService(ctx context.Context,
return nil, errors.New("Nil input")
}
// Initialize io cocurrency pool
log.Info("initialize io concurrency pool", zap.String("vchannel", vchan.GetChannelName()), zap.Int("ioConcurrency", Params.DataNodeCfg.IOConcurrency))
ioPool, err := concurrency.NewPool(Params.DataNodeCfg.IOConcurrency)
if err != nil {
log.Error("failed to create goroutine pool for dataSyncService", zap.Error(err))
return nil, err
}
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
@ -103,7 +92,6 @@ func newDataSyncService(ctx context.Context,
flushingSegCache: flushingSegCache,
chunkManager: chunkManager,
compactor: compactor,
ioPool: ioPool,
}
if err := service.initNodes(vchan); err != nil {
@ -204,7 +192,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
// avoid closure capture iteration variable
segment := us
future := dsService.ioPool.Submit(func() (interface{}, error) {
future := getOrCreateIOPool().Submit(func() (interface{}, error) {
if err := dsService.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Normal,
segID: segment.GetID(),
@ -239,7 +227,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
)
// avoid closure capture iteration variable
segment := fs
future := dsService.ioPool.Submit(func() (interface{}, error) {
future := getOrCreateIOPool().Submit(func() (interface{}, error) {
if err := dsService.channel.addSegment(addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: segment.GetID(),

View File

@ -0,0 +1,24 @@
package datanode
import (
"sync"
"github.com/milvus-io/milvus/internal/util/concurrency"
)
var ioPool *concurrency.Pool
var ioPoolInitOnce sync.Once
func initIOPool() {
capacity := Params.DataNodeCfg.IOConcurrency
if capacity > 32 {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool, _ = concurrency.NewPool(capacity)
}
func getOrCreateIOPool() *concurrency.Pool {
ioPoolInitOnce.Do(initIOPool)
return ioPool
}

View File

@ -0,0 +1,37 @@
package datanode
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/concurrency"
)
func Test_getOrCreateIOPool(t *testing.T) {
Params.InitOnce()
ioConcurrency := Params.DataNodeCfg.IOConcurrency
Params.DataNodeCfg.IOConcurrency = 64
defer func() { Params.DataNodeCfg.IOConcurrency = ioConcurrency }()
nP := 10
nTask := 10
wg := sync.WaitGroup{}
for i := 0; i < nP; i++ {
wg.Add(1)
go func() {
defer wg.Done()
p := getOrCreateIOPool()
futures := make([]*concurrency.Future, 0, nTask)
for j := 0; j < nTask; j++ {
future := p.Submit(func() (interface{}, error) {
return nil, nil
})
futures = append(futures, future)
}
err := concurrency.AwaitAll(futures...)
assert.NoError(t, err)
}()
}
wg.Wait()
}