enhance: improve load speed (#28518)

This check rejects load request if running out the pool workers, but
small segment would be loaded soon, another segments would been loading
again after a check interval, which leads to slow loading for collection

Block the request by go pool

---------

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/28498/head
yah01 2023-11-17 19:56:21 +08:00 committed by GitHub
parent a3cd0bc9c3
commit d2f53aefa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 30 deletions

View File

@ -33,10 +33,12 @@ var (
// and other operations (insert/delete/statistics/etc.)
// since in concurrent situation, there operation may block each other in high payload
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
loadPool atomic.Pointer[conc.Pool[any]]
loadOnce sync.Once
)
// initSQPool initialize
@ -67,6 +69,19 @@ func initDynamicPool() {
})
}
func initLoadPool() {
loadOnce.Do(func() {
pool := conc.NewPool[any](
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
)
loadPool.Store(pool)
})
}
// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
@ -78,3 +93,8 @@ func GetDynamicPool() *conc.Pool[any] {
initDynamicPool()
return dp.Load()
}
func GetLoadPool() *conc.Pool[any] {
initLoadPool()
return loadPool.Load()
}

View File

@ -652,7 +652,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -705,7 +705,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
log.Info("submitted loadFieldData task to dy pool")
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
@ -757,7 +757,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -886,7 +886,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo)
return nil, nil
}).Await()

View File

@ -70,19 +70,16 @@ type Loader interface {
type LoadResource struct {
MemorySize uint64
DiskSize uint64
WorkNum int
}
func (r *LoadResource) Add(resource LoadResource) {
r.MemorySize += resource.MemorySize
r.DiskSize += resource.DiskSize
r.WorkNum += resource.WorkNum
}
func (r *LoadResource) Sub(resource LoadResource) {
r.MemorySize -= resource.MemorySize
r.DiskSize -= resource.DiskSize
r.WorkNum -= resource.WorkNum
}
func NewLoader(
@ -371,29 +368,13 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
poolCap := hardware.GetCPUNum() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
if poolCap > 256 {
poolCap = 256
}
if loader.committedResource.WorkNum >= poolCap {
return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap))
} else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}
concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))
for _, info := range infos {
for _, field := range info.GetBinlogPaths() {
resource.WorkNum += len(field.GetBinlogs())
}
for _, index := range info.GetIndexInfos() {
resource.WorkNum += len(index.IndexFilePaths)
}
}
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err == nil {
@ -415,8 +396,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
loader.committedResource.Add(resource)
log.Info("request resource for loading segments (unit in MiB)",
zap.Int("workerNum", resource.WorkNum),
zap.Int("committedWorkerNum", loader.committedResource.WorkNum),
zap.Float64("memory", toMB(resource.MemorySize)),
zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)),
zap.Float64("disk", toMB(resource.DiskSize)),