enhance: Mark cgo thread with tag name (#38000)

Related to #37999

This PR add `SetThreadName` API for marking cgo thread and utilize it
when initializing cgo worker.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/37872/head
congqixia 2024-11-26 11:22:35 +08:00 committed by GitHub
parent 6b23e668b0
commit cb6542339e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 49 additions and 7 deletions

View File

@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "pthread.h"
#include "config/ConfigKnowhere.h"
#include "fmt/core.h"
#include "log/Log.h"
@ -105,4 +106,13 @@ GetMinimalIndexVersion() {
return milvus::config::GetMinimalIndexVersion();
}
extern "C" void
SetThreadName(const char* name) {
#ifdef __linux__
pthread_setname_np(pthread_self(), name);
#elif __APPLE__
pthread_setname_np(name);
#endif
}
} // namespace milvus::segcore

View File

@ -56,6 +56,9 @@ GetCurrentIndexVersion();
int32_t
GetMinimalIndexVersion();
void
SetThreadName(const char*);
#ifdef __cplusplus
}
#endif

View File

@ -16,6 +16,16 @@
package segments
/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include <stdint.h>
#include "common/init_c.h"
#include "segcore/segcore_init_c.h"
*/
import "C"
import (
"context"
"math"
@ -51,6 +61,12 @@ var (
bfPool atomic.Pointer[conc.Pool[any]]
bfApplyOnce sync.Once
// intentionally leaked CGO tag names
cgoTagSQ = C.CString("CGO_SQ")
cgoTagLoad = C.CString("CGO_LOAD")
cgoTagDynamic = C.CString("CGO_DYN")
cgoTagWarmup = C.CString("CGO_WARMUP")
)
// initSQPool initialize
@ -63,7 +79,10 @@ func initSQPool() {
conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here
conc.WithDisablePurge(true),
)
conc.WarmupPool(pool, runtime.LockOSThread)
conc.WarmupPool(pool, func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagSQ)
})
sqp.Store(pool)
pt.Watch(pt.QueryNodeCfg.MaxReadConcurrency.Key, config.NewHandler("qn.sqpool.maxconc", ResizeSQPool))
@ -74,15 +93,19 @@ func initSQPool() {
func initDynamicPool() {
dynOnce.Do(func() {
size := hardware.GetCPUNum()
pool := conc.NewPool[any](
hardware.GetCPUNum(),
size,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagDynamic)
}), // lock os thread for cgo thread disposal
)
dp.Store(pool)
log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum()))
log.Info("init dynamicPool done", zap.Int("size", size))
})
}
@ -94,7 +117,10 @@ func initLoadPool() {
poolSize,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagLoad)
}), // lock os thread for cgo thread disposal
)
loadPool.Store(pool)
@ -112,7 +138,10 @@ func initWarmupPool() {
poolSize,
conc.WithPreAlloc(false),
conc.WithDisablePurge(false),
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
conc.WithPreHandler(func() {
runtime.LockOSThread()
C.SetThreadName(cgoTagWarmup)
}), // lock os thread for cgo thread disposal
conc.WithNonBlocking(true), // make warming up non blocking
)