Add IoPoolSize config (#23360)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/23423/head
yah01 2023-04-13 14:50:28 +08:00 committed by GitHub
parent 178559a875
commit dc0eb433ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 38 deletions

View File

@ -70,10 +70,42 @@ type Loader interface {
LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)
}
func NewLoader(
manager CollectionManager,
cm storage.ChunkManager,
) *segmentLoader {
cpuNum := runtime.GOMAXPROCS(0)
ioPoolSize := cpuNum * 8
// make sure small machines could load faster
if ioPoolSize < 32 {
ioPoolSize = 32
}
// limit the number of concurrency
if ioPoolSize > 256 {
ioPoolSize = 256
}
if configPoolSize := paramtable.Get().QueryNodeCfg.IoPoolSize.GetAsInt(); configPoolSize > 0 {
ioPoolSize = configPoolSize
}
ioPool := conc.NewPool(ioPoolSize, ants.WithPreAlloc(true))
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
loader := &segmentLoader{
manager: manager,
cm: cm,
ioPool: ioPool,
}
return loader
}
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
manager CollectionManager
cm storage.ChunkManager // minio cm
cm storage.ChunkManager
ioPool *conc.Pool
}
@ -124,10 +156,19 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
}
if err != nil {
log.Error("load failed, OOM if loaded", zap.Error(err))
log.Warn("load failed, OOM if loaded", zap.Error(err))
return nil, err
}
logNum := 0
for _, field := range infos[0].GetBinlogPaths() {
logNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, loader.ioPool.Free()/logNum)
}
newSegments := make(map[UniqueID]*LocalSegment, len(infos))
clearAll := func() {
for _, s := range newSegments {
@ -868,31 +909,3 @@ func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
return fieldSize
}
func NewLoader(
manager CollectionManager,
cm storage.ChunkManager,
pool *conc.Pool) *segmentLoader {
cpuNum := runtime.GOMAXPROCS(0)
ioPoolSize := cpuNum * 8
// make sure small machines could load faster
if ioPoolSize < 32 {
ioPoolSize = 32
}
// limit the number of concurrency
if ioPoolSize > 256 {
ioPoolSize = 256
}
ioPool := conc.NewPool(ioPoolSize, ants.WithPreAlloc(true))
log.Info("SegmentLoader created", zap.Int("io-pool-size", ioPoolSize))
loader := &segmentLoader{
manager: manager,
cm: cm,
ioPool: ioPool,
}
return loader
}

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -38,7 +37,6 @@ type SegmentLoaderSuite struct {
// Dependencies
collectionManager CollectionManager
chunkManager storage.ChunkManager
pool *conc.Pool
// Data
collectionID int64
@ -60,9 +58,8 @@ func (suite *SegmentLoaderSuite) SetupTest() {
suite.collectionManager = NewCollectionManager()
suite.chunkManager = storage.NewLocalChunkManager(storage.RootPath(
fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63())))
suite.pool = conc.NewPool(10)
suite.loader = NewLoader(suite.collectionManager, suite.chunkManager, suite.pool)
suite.loader = NewLoader(suite.collectionManager, suite.chunkManager)
// Data
schema := GenTestCollectionSchema("test", schemapb.DataType_Int64)

View File

@ -118,8 +118,6 @@ type QueryNode struct {
vectorStorage storage.ChunkManager
etcdKV *etcdkv.EtcdKV
// Pool for load segments
loadPool *conc.Pool
// Pool for search/query
taskPool *conc.Pool
@ -269,7 +267,6 @@ func (node *QueryNode) Init() error {
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, paramtable.Get().EtcdCfg.MetaRootPath.GetValue())
log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))
node.loadPool = conc.NewDefaultPool()
node.taskPool = conc.NewDefaultPool()
node.scheduler = tasks.NewScheduler()
@ -301,7 +298,7 @@ func (node *QueryNode) Init() error {
node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
node.subscribingChannels = typeutil.NewConcurrentSet[string]()
node.manager = segments.NewManager()
node.loader = segments.NewLoader(node.manager.Collection, node.vectorStorage, node.loadPool)
node.loader = segments.NewLoader(node.manager.Collection, node.vectorStorage)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())
// init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)

View File

@ -80,6 +80,11 @@ func (pool *Pool) Running() int {
return pool.inner.Running()
}
// Free returns the number of free workers
func (pool *Pool) Free() int {
return pool.inner.Free()
}
func (pool *Pool) Release() {
pool.inner.Release()
}

View File

@ -1402,6 +1402,9 @@ type queryNodeConfig struct {
// delete buffer
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
// loader
IoPoolSize ParamItem `refreshable:"false"`
}
func (p *queryNodeConfig) init(base *BaseTable) {
@ -1708,6 +1711,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
DefaultValue: "10000000",
}
p.MaxSegmentDeleteBuffer.Init(base.mgr)
p.IoPoolSize = ParamItem{
Key: "queryNode.ioPoolSize",
Version: "2.3.0",
DefaultValue: "0",
Doc: "Control how many goroutines will the loader use to pull files, if the given value is non-positive, the value will be set to CpuNum * 8, at least 32, and at most 256",
}
p.IoPoolSize.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////