enhance: improve load speed (#28518) (#29070)

This check rejects load request if running out the pool workers, but
small segment would be loaded soon, another segments would been loading
again after a check interval, which leads to slow loading for collection

Block the request by go pool

pr: #28518

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/29089/head
yah01 2023-12-08 15:58:37 +08:00 committed by GitHub
parent 0a996f777e
commit 125bd39ade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 30 deletions

View File

@ -33,10 +33,12 @@ var (
// and other operations (insert/delete/statistics/etc.)
// since in concurrent situation, there operation may block each other in high payload
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
loadPool atomic.Pointer[conc.Pool[any]]
loadOnce sync.Once
)
// initSQPool initialize
@ -67,6 +69,19 @@ func initDynamicPool() {
})
}
func initLoadPool() {
loadOnce.Do(func() {
pool := conc.NewPool[any](
hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(),
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
)
loadPool.Store(pool)
})
}
// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
@ -78,3 +93,8 @@ func GetDynamicPool() *conc.Pool[any] {
initDynamicPool()
return dp.Load()
}
func GetLoadPool() *conc.Pool[any] {
initLoadPool()
return loadPool.Load()
}

View File

@ -620,7 +620,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -672,7 +672,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
log.Info("submitted loadFieldData task to dy pool")
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
@ -723,7 +723,7 @@ func (s *LocalSegment) AddFieldDataInfo(rowCount int64, fields []*datapb.FieldBi
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.AddFieldDataInfoForSealed(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -851,7 +851,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
GetLoadPool().Submit(func() (any, error) {
status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo)
return nil, nil
}).Await()

View File

@ -83,19 +83,16 @@ type Loader interface {
type LoadResource struct {
MemorySize uint64
DiskSize uint64
WorkNum int
}
func (r *LoadResource) Add(resource LoadResource) {
r.MemorySize += resource.MemorySize
r.DiskSize += resource.DiskSize
r.WorkNum += resource.WorkNum
}
func (r *LoadResource) Sub(resource LoadResource) {
r.MemorySize -= resource.MemorySize
r.DiskSize -= resource.DiskSize
r.WorkNum -= resource.WorkNum
}
func NewLoader(
@ -365,29 +362,13 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
poolCap := hardware.GetCPUNum() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
if poolCap > 256 {
poolCap = 256
}
if loader.committedResource.WorkNum >= poolCap {
return resource, 0, merr.WrapErrServiceRequestLimitExceeded(int32(poolCap))
} else if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}
concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))
for _, info := range infos {
for _, field := range info.GetBinlogPaths() {
resource.WorkNum += len(field.GetBinlogs())
}
for _, index := range info.GetIndexInfos() {
resource.WorkNum += len(index.IndexFilePaths)
}
}
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(ctx, infos, concurrencyLevel)
if err == nil {
@ -409,8 +390,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
loader.committedResource.Add(resource)
log.Info("request resource for loading segments (unit in MiB)",
zap.Int("workerNum", resource.WorkNum),
zap.Int("committedWorkerNum", loader.committedResource.WorkNum),
zap.Float64("memory", toMB(resource.MemorySize)),
zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)),
zap.Float64("disk", toMB(resource.DiskSize)),