mirror of https://github.com/milvus-io/milvus.git
fix: use seperate warmup pool and disable warmup by default (#33348)
1. use a small warmup pool to reduce the impact of warmup 2. change the warmup pool to nonblocking mode 3. disable warmup by default 4. remove the maximum size limit of 16 for the load pool issue: https://github.com/milvus-io/milvus/issues/32772 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Co-authored-by: xiaofanluan <xiaofan.luan@zilliz.com>pull/33130/head
parent
1b67cecd65
commit
760223f80a
|
@ -335,7 +335,7 @@ queryNode:
|
|||
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
|
||||
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
|
||||
# 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query.
|
||||
warmup: async
|
||||
warmup: off
|
||||
mmap:
|
||||
mmapEnabled: false # Enable mmap for loading data
|
||||
lazyload:
|
||||
|
|
|
@ -37,12 +37,14 @@ var (
|
|||
// and other operations (insert/delete/statistics/etc.)
|
||||
// since in concurrent situation, there operation may block each other in high payload
|
||||
|
||||
sqp atomic.Pointer[conc.Pool[any]]
|
||||
sqOnce sync.Once
|
||||
dp atomic.Pointer[conc.Pool[any]]
|
||||
dynOnce sync.Once
|
||||
loadPool atomic.Pointer[conc.Pool[any]]
|
||||
loadOnce sync.Once
|
||||
sqp atomic.Pointer[conc.Pool[any]]
|
||||
sqOnce sync.Once
|
||||
dp atomic.Pointer[conc.Pool[any]]
|
||||
dynOnce sync.Once
|
||||
loadPool atomic.Pointer[conc.Pool[any]]
|
||||
loadOnce sync.Once
|
||||
warmupPool atomic.Pointer[conc.Pool[any]]
|
||||
warmupOnce sync.Once
|
||||
)
|
||||
|
||||
// initSQPool initialize
|
||||
|
@ -80,9 +82,6 @@ func initLoadPool() {
|
|||
loadOnce.Do(func() {
|
||||
pt := paramtable.Get()
|
||||
poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
|
||||
if poolSize > 16 {
|
||||
poolSize = 16
|
||||
}
|
||||
pool := conc.NewPool[any](
|
||||
poolSize,
|
||||
conc.WithPreAlloc(false),
|
||||
|
@ -96,6 +95,23 @@ func initLoadPool() {
|
|||
})
|
||||
}
|
||||
|
||||
func initWarmupPool() {
|
||||
warmupOnce.Do(func() {
|
||||
pt := paramtable.Get()
|
||||
poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
|
||||
pool := conc.NewPool[any](
|
||||
poolSize,
|
||||
conc.WithPreAlloc(false),
|
||||
conc.WithDisablePurge(false),
|
||||
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
|
||||
conc.WithNonBlocking(true), // make warming up non blocking
|
||||
)
|
||||
|
||||
warmupPool.Store(pool)
|
||||
pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool))
|
||||
})
|
||||
}
|
||||
|
||||
// GetSQPool returns the singleton pool instance for search/query operations.
|
||||
func GetSQPool() *conc.Pool[any] {
|
||||
initSQPool()
|
||||
|
@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] {
|
|||
return loadPool.Load()
|
||||
}
|
||||
|
||||
func GetWarmupPool() *conc.Pool[any] {
|
||||
initWarmupPool()
|
||||
return warmupPool.Load()
|
||||
}
|
||||
|
||||
func ResizeSQPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
pt := paramtable.Get()
|
||||
|
@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func ResizeWarmupPool(evt *config.Event) {
|
||||
if evt.HasUpdated {
|
||||
pt := paramtable.Get()
|
||||
newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
|
||||
resizePool(GetWarmupPool(), newSize, "WarmupPool")
|
||||
}
|
||||
}
|
||||
|
||||
func resizePool(pool *conc.Pool[any], newSize int, tag string) {
|
||||
log := log.Ctx(context.Background()).
|
||||
With(
|
||||
|
|
|
@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) {
|
|||
assert.Equal(t, expectedCap, GetLoadPool().Cap())
|
||||
})
|
||||
|
||||
t.Run("WarmupPool", func(t *testing.T) {
|
||||
expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
|
||||
|
||||
ResizeWarmupPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
|
||||
|
||||
pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64))
|
||||
ResizeWarmupPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
|
||||
|
||||
pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0")
|
||||
ResizeWarmupPool(&config.Event{
|
||||
HasUpdated: true,
|
||||
})
|
||||
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
|
||||
})
|
||||
|
||||
t.Run("error_pool", func(*testing.T) {
|
||||
pool := conc.NewDefaultPool[any]()
|
||||
c := pool.Cap()
|
||||
|
|
|
@ -1386,7 +1386,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
|
|||
warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue())
|
||||
switch warmingUp {
|
||||
case "sync":
|
||||
GetLoadPool().Submit(func() (any, error) {
|
||||
GetWarmupPool().Submit(func() (any, error) {
|
||||
cFieldID := C.int64_t(fieldID)
|
||||
status = C.WarmupChunkCache(s.ptr, cFieldID)
|
||||
if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil {
|
||||
|
@ -1397,7 +1397,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
|
|||
return nil, nil
|
||||
}).Await()
|
||||
case "async":
|
||||
GetLoadPool().Submit(func() (any, error) {
|
||||
GetWarmupPool().Submit(func() (any, error) {
|
||||
if !s.ptrLock.RLockIf(state.IsNotReleased) {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -2357,7 +2357,7 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||
p.ChunkCacheWarmingUp = ParamItem{
|
||||
Key: "queryNode.cache.warmup",
|
||||
Version: "2.3.6",
|
||||
DefaultValue: "async",
|
||||
DefaultValue: "off",
|
||||
Doc: `options: async, sync, off.
|
||||
Specifies the necessity for warming up the chunk cache.
|
||||
1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the
|
||||
|
|
|
@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) {
|
|||
|
||||
// chunk cache
|
||||
assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue())
|
||||
assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue())
|
||||
assert.Equal(t, "false", Params.ChunkCacheWarmingUp.GetValue())
|
||||
|
||||
// test small indexNlist/NProbe default
|
||||
params.Remove("queryNode.segcore.smallIndex.nlist")
|
||||
|
|
Loading…
Reference in New Issue