Use different cgo pool for SQ and other operations (#26021)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/26046/head
congqixia 2023-08-01 09:19:05 +08:00 committed by GitHub
parent 35f3e263cd
commit 8b5e276361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 22 deletions

View File

@ -27,13 +27,19 @@ import (
)
var (
p atomic.Pointer[conc.Pool[any]]
initOnce sync.Once
// Use separate pool for search/query
// and other operations (insert/delete/statistics/etc.)
// since in concurrent situation, there operation may block each other in high payload
sqp atomic.Pointer[conc.Pool[any]]
sqOnce sync.Once
dp atomic.Pointer[conc.Pool[any]]
dynOnce sync.Once
)
// InitPool initialize
func InitPool() {
initOnce.Do(func() {
// initSQPool initialize
func initSQPool() {
sqOnce.Do(func() {
pt := paramtable.Get()
pool := conc.NewPool[any](
int(math.Ceil(pt.QueryNodeCfg.MaxReadConcurrency.GetAsFloat()*pt.QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())),
@ -42,12 +48,31 @@ func InitPool() {
)
conc.WarmupPool(pool, runtime.LockOSThread)
p.Store(pool)
sqp.Store(pool)
})
}
// GetPool returns the singleton pool instance.
func GetPool() *conc.Pool[any] {
InitPool()
return p.Load()
func initDynamicPool() {
dynOnce.Do(func() {
pool := conc.NewPool[any](
runtime.GOMAXPROCS(0),
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
)
dp.Store(pool)
})
}
// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
return sqp.Load()
}
// GetDynamicPool returns the singleton pool for dynamic cgo operations.
func GetDynamicPool() *conc.Pool[any] {
initDynamicPool()
return dp.Load()
}

View File

@ -237,7 +237,7 @@ func (s *LocalSegment) InsertCount() int64 {
return 0
}
var rowCount C.int64_t
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRowCount(s.ptr)
return nil, nil
}).Await()
@ -253,7 +253,7 @@ func (s *LocalSegment) RowNum() int64 {
return 0
}
var rowCount C.int64_t
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRealCount(s.ptr)
return nil, nil
}).Await()
@ -269,7 +269,7 @@ func (s *LocalSegment) MemSize() int64 {
return 0
}
var memoryUsageInBytes C.int64_t
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
memoryUsageInBytes = C.GetMemoryUsageInBytes(s.ptr)
return nil, nil
}).Await()
@ -384,7 +384,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
var searchResult SearchResult
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetSQPool().Submit(func() (any, error) {
tr := timerecord.NewTimeRecorder("cgoSearch")
status = C.Search(s.ptr,
searchReq.plan.cSearchPlan,
@ -431,7 +431,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
var retrieveResult RetrieveResult
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetSQPool().Submit(func() (any, error) {
ts := C.uint64_t(plan.Timestamp)
tr := timerecord.NewTimeRecorder("cgoRetrieve")
status = C.Retrieve(s.ptr,
@ -515,7 +515,7 @@ func (s *LocalSegment) preInsert(numOfRecords int) (int64, error) {
cOffset := (*C.int64_t)(&offset)
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.PreInsert(s.ptr, C.int64_t(int64(numOfRecords)), cOffset)
return nil, nil
}).Await()
@ -555,7 +555,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.Insert(s.ptr,
cOffset,
cNumOfRows,
@ -632,7 +632,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
return fmt.Errorf("failed to marshal ids: %s", err)
}
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.Delete(s.ptr,
cOffset,
cSize,
@ -691,7 +691,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
}
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -740,7 +740,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
return nil, nil
}).Await()
@ -816,7 +816,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info)
*/
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.LoadDeletedRecord(s.ptr, loadInfo)
return nil, nil
}).Await()
@ -870,7 +870,7 @@ func (s *LocalSegment) LoadIndexInfo(indexInfo *querypb.FieldIndexInfo, info *Lo
}
var status C.CStatus
GetPool().Submit(func() (any, error) {
GetDynamicPool().Submit(func() (any, error) {
status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo)
return nil, nil
}).Await()

View File

@ -37,6 +37,9 @@ type poolOption struct {
concealPanic bool
// panicHandler when task panics
panicHandler func(any)
// preHandler function executed before actual method executed
preHandler func()
}
func (opt *poolOption) antsOptions() []ants.Option {
@ -104,3 +107,9 @@ func WithConcealPanic(v bool) PoolOption {
opt.concealPanic = v
}
}
func WithPreHandler(fn func()) PoolOption {
return func(opt *poolOption) {
opt.preHandler = fn
}
}

View File

@ -47,6 +47,7 @@ func NewPool[T any](cap int, opts ...PoolOption) *Pool[T] {
return &Pool[T]{
inner: pool,
opt: opt,
}
}
@ -70,6 +71,10 @@ func (pool *Pool[T]) Submit(method func() (T, error)) *Future[T] {
panic(x) // throw panic out
}
}()
// execute pre handler
if pool.opt.preHandler != nil {
pool.opt.preHandler()
}
res, err := method()
if err != nil {
future.err = err