mirror of https://github.com/milvus-io/milvus.git
parent
bbcaf7a703
commit
d10a82dba4
|
@ -115,6 +115,8 @@ linters-settings:
|
|||
- 'Reason:\s+\w+\.Error\(\)'
|
||||
- 'errors.New\((.+)\.GetReason\(\)\)'
|
||||
- 'commonpb\.Status\{[\s\n]*ErrorCode:[\s\n]*.+[\s\S\n]*?\}'
|
||||
- "runtime.NumCPU"
|
||||
- "runtime.GOMAXPROCS(0)"
|
||||
#- 'fmt\.Print.*' WIP
|
||||
|
||||
issues:
|
||||
|
|
|
@ -458,7 +458,7 @@ func setResult(result *datapb.CompactionResult) compactionTaskOpt {
|
|||
func calculateParallel() int {
|
||||
// TODO after node memory management enabled, use this config as hard limit
|
||||
return Params.DataCoordCfg.CompactionWorkerParalleTasks.GetAsInt()
|
||||
//cores := runtime.NumCPU()
|
||||
//cores := hardware.GetCPUNum()
|
||||
//if cores < 16 {
|
||||
//return 4
|
||||
//}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -34,7 +34,7 @@ func getOrCreateIOPool() *conc.Pool[any] {
|
|||
func initStatsPool() {
|
||||
poolSize := Params.DataNodeCfg.ChannelWorkPoolSize.GetAsInt()
|
||||
if poolSize <= 0 {
|
||||
poolSize = runtime.GOMAXPROCS(0)
|
||||
poolSize = hardware.GetCPUNum()
|
||||
}
|
||||
statsPool = conc.NewPool[any](poolSize, conc.WithPreAlloc(false), conc.WithNonBlocking(false))
|
||||
}
|
||||
|
@ -46,8 +46,8 @@ func getOrCreateStatsPool() *conc.Pool[any] {
|
|||
|
||||
func initMultiReadPool() {
|
||||
capacity := Params.DataNodeCfg.FileReadConcurrency.GetAsInt()
|
||||
if capacity > runtime.GOMAXPROCS(0) {
|
||||
capacity = runtime.GOMAXPROCS(0)
|
||||
if capacity > hardware.GetCPUNum() {
|
||||
capacity = hardware.GetCPUNum()
|
||||
}
|
||||
// error only happens with negative expiry duration or with negative pre-alloc size.
|
||||
ioPool = conc.NewPool[any](capacity)
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -38,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparams"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
|
@ -221,10 +221,10 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
|
|||
blobs[idx] = blob
|
||||
return nil
|
||||
}
|
||||
// Use runtime.GOMAXPROCS(0) instead of runtime.NumCPU()
|
||||
// Use hardware.GetCPUNum() instead of hardware.GetCPUNum()
|
||||
// to respect CPU quota of container/pod
|
||||
// gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value
|
||||
err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.GOMAXPROCS(0), loadKey, "loadKey")
|
||||
err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), hardware.GetCPUNum(), loadKey, "loadKey")
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("loadKey failed", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -14,7 +14,6 @@ package server
|
|||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -154,7 +153,7 @@ func parseCompressionType(params *paramtable.ComponentParam) ([]gorocksdb.Compre
|
|||
func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) {
|
||||
params := paramtable.Get()
|
||||
// TODO we should use same rocksdb instance with different cfs
|
||||
maxProcs := runtime.GOMAXPROCS(0)
|
||||
maxProcs := hardware.GetCPUNum()
|
||||
parallelism := 1
|
||||
if maxProcs > 32 {
|
||||
parallelism = 4
|
||||
|
|
|
@ -19,7 +19,6 @@ package task
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -36,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
. "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -552,7 +552,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
|
|||
// The scheduler doesn't limit the number of tasks,
|
||||
// to commit tasks to executors as soon as possible, to reach higher merge possibility
|
||||
failCount := atomic.NewInt32(0)
|
||||
funcutil.ProcessFuncParallel(len(toProcess), runtime.GOMAXPROCS(0), func(idx int) error {
|
||||
funcutil.ProcessFuncParallel(len(toProcess), hardware.GetCPUNum(), func(idx int) error {
|
||||
if !scheduler.process(toProcess[idx]) {
|
||||
failCount.Inc()
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -56,7 +57,7 @@ func initSQPool() {
|
|||
func initDynamicPool() {
|
||||
dynOnce.Do(func() {
|
||||
pool := conc.NewPool[any](
|
||||
runtime.GOMAXPROCS(0),
|
||||
hardware.GetCPUNum(),
|
||||
conc.WithPreAlloc(false),
|
||||
conc.WithDisablePurge(false),
|
||||
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -103,7 +102,7 @@ func NewLoader(
|
|||
manager *Manager,
|
||||
cm storage.ChunkManager,
|
||||
) *segmentLoader {
|
||||
cpuNum := runtime.GOMAXPROCS(0)
|
||||
cpuNum := hardware.GetCPUNum()
|
||||
ioPoolSize := cpuNum * 8
|
||||
// make sure small machines could load faster
|
||||
if ioPoolSize < 32 {
|
||||
|
@ -366,7 +365,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
|||
}
|
||||
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
|
||||
|
||||
poolCap := runtime.NumCPU() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
|
||||
poolCap := hardware.GetCPUNum() * paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt()
|
||||
if poolCap > 256 {
|
||||
poolCap = 256
|
||||
}
|
||||
|
@ -378,7 +377,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
|||
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
|
||||
}
|
||||
|
||||
concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos))
|
||||
concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))
|
||||
|
||||
for _, info := range infos {
|
||||
for _, field := range info.GetBinlogPaths() {
|
||||
|
|
|
@ -3,8 +3,9 @@ package cgoconverter
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
func TestBytesConverter(t *testing.T) {
|
||||
|
@ -29,7 +30,7 @@ func TestBytesConverter(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestConcurrentBytesConverter(t *testing.T) {
|
||||
concurrency := runtime.GOMAXPROCS(0)
|
||||
concurrency := hardware.GetCPUNum()
|
||||
if concurrency <= 1 {
|
||||
concurrency = 4
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
func TestZstdCompress(t *testing.T) {
|
||||
|
@ -124,7 +125,7 @@ func TestGlobalMethods(t *testing.T) {
|
|||
func TestCurrencyGlobalMethods(t *testing.T) {
|
||||
prefix := "Test Currency Global Methods"
|
||||
|
||||
currency := runtime.GOMAXPROCS(0) * 2
|
||||
currency := hardware.GetCPUNum() * 2
|
||||
if currency < 6 {
|
||||
currency = 6
|
||||
}
|
||||
|
|
|
@ -18,12 +18,12 @@ package conc
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
ants "github.com/panjf2000/ants/v2"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/generic"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
// A goroutine pool
|
||||
|
@ -55,7 +55,7 @@ func NewPool[T any](cap int, opts ...PoolOption) *Pool[T] {
|
|||
// NewDefaultPool returns a pool with cap of the number of logical CPU,
|
||||
// and pre-alloced goroutines.
|
||||
func NewDefaultPool[T any]() *Pool[T] {
|
||||
return NewPool[T](runtime.GOMAXPROCS(0), WithPreAlloc(true))
|
||||
return NewPool[T](hardware.GetCPUNum(), WithPreAlloc(true))
|
||||
}
|
||||
|
||||
// Submit a task into the pool,
|
||||
|
|
|
@ -18,13 +18,14 @@ package funcutil
|
|||
|
||||
import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
func dummyFunc() {
|
||||
|
@ -65,7 +66,7 @@ func Test_ProcessFuncParallel(t *testing.T) {
|
|||
assert.Equal(t, err, nil, "process function parallel must be right")
|
||||
assert.Equal(t, s, expectedS, "process function parallel must be right")
|
||||
|
||||
err = ProcessFuncParallel(total, runtime.NumCPU(), naiveF, "naiveF") // Parallel by CPU
|
||||
err = ProcessFuncParallel(total, hardware.GetCPUNum(), naiveF, "naiveF") // Parallel by CPU
|
||||
assert.Equal(t, err, nil, "process function parallel must be right")
|
||||
assert.Equal(t, s, expectedS, "process function parallel must be right")
|
||||
|
||||
|
@ -82,7 +83,7 @@ func Test_ProcessFuncParallel(t *testing.T) {
|
|||
err = ProcessFuncParallel(total, total, oddErrorF, "oddErrorF") // Totally Parallel
|
||||
assert.NotEqual(t, err, nil, "process function parallel must be right")
|
||||
|
||||
err = ProcessFuncParallel(total, runtime.NumCPU(), oddErrorF, "oddErrorF") // Parallel by CPU
|
||||
err = ProcessFuncParallel(total, hardware.GetCPUNum(), oddErrorF, "oddErrorF") // Parallel by CPU
|
||||
assert.NotEqual(t, err, nil, "process function parallel must be right")
|
||||
|
||||
evenErrorF := func(idx int) error {
|
||||
|
@ -98,7 +99,7 @@ func Test_ProcessFuncParallel(t *testing.T) {
|
|||
err = ProcessFuncParallel(total, total, evenErrorF, "evenErrorF") // Totally Parallel
|
||||
assert.NotEqual(t, err, nil, "process function parallel must be right")
|
||||
|
||||
err = ProcessFuncParallel(total, runtime.NumCPU(), evenErrorF, "evenErrorF") // Parallel by CPU
|
||||
err = ProcessFuncParallel(total, hardware.GetCPUNum(), evenErrorF, "evenErrorF") // Parallel by CPU
|
||||
assert.NotEqual(t, err, nil, "process function parallel must be right")
|
||||
|
||||
// rand.Int() may be always an even number
|
||||
|
@ -115,6 +116,6 @@ func Test_ProcessFuncParallel(t *testing.T) {
|
|||
err = ProcessFuncParallel(total, total, randomErrorF, "randomErrorF") // Totally Parallel
|
||||
assert.Error(t, err)
|
||||
|
||||
err = ProcessFuncParallel(total, runtime.NumCPU(), randomErrorF, "randomErrorF") // Parallel by CPU
|
||||
err = ProcessFuncParallel(total, hardware.GetCPUNum(), randomErrorF, "randomErrorF") // Parallel by CPU
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
@ -45,8 +45,10 @@ func InitMaxprocs(serverType string, flags *flag.FlagSet) {
|
|||
|
||||
// GetCPUNum returns the count of cpu core.
|
||||
func GetCPUNum() int {
|
||||
//nolint
|
||||
cur := runtime.GOMAXPROCS(0)
|
||||
if cur <= 0 {
|
||||
//nolint
|
||||
cur = runtime.NumCPU()
|
||||
}
|
||||
return cur
|
||||
|
|
|
@ -14,7 +14,6 @@ package paramtable
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -25,6 +24,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
)
|
||||
|
||||
|
@ -1643,7 +1643,7 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||
} else if factor > 32 {
|
||||
factor = 32
|
||||
}
|
||||
knowhereThreadPoolSize := uint32(runtime.GOMAXPROCS(0)) * uint32(factor)
|
||||
knowhereThreadPoolSize := uint32(hardware.GetCPUNum()) * uint32(factor)
|
||||
return strconv.FormatUint(uint64(knowhereThreadPoolSize), 10)
|
||||
},
|
||||
Doc: "The number of threads in knowhere's thread pool. If disk is enabled, the pool size will multiply with knowhereThreadPoolNumRatio([1, 32]).",
|
||||
|
@ -1780,7 +1780,7 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||
DefaultValue: "1.0",
|
||||
Formatter: func(v string) string {
|
||||
ratio := getAsFloat(v)
|
||||
cpuNum := int64(runtime.GOMAXPROCS(0))
|
||||
cpuNum := int64(hardware.GetCPUNum())
|
||||
concurrency := int64(float64(cpuNum) * ratio)
|
||||
if concurrency < 1 {
|
||||
return "1" // MaxReadConcurrency must >= 1
|
||||
|
@ -1790,9 +1790,9 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||
return strconv.FormatInt(concurrency, 10)
|
||||
},
|
||||
Doc: `maxReadConcurrentRatio is the concurrency ratio of read task (search task and query task).
|
||||
Max read concurrency would be the value of ` + "runtime.NumCPU * maxReadConcurrentRatio" + `.
|
||||
It defaults to 2.0, which means max read concurrency would be the value of runtime.NumCPU * 2.
|
||||
Max read concurrency must greater than or equal to 1, and less than or equal to runtime.NumCPU * 100.
|
||||
Max read concurrency would be the value of ` + "hardware.GetCPUNum * maxReadConcurrentRatio" + `.
|
||||
It defaults to 2.0, which means max read concurrency would be the value of hardware.GetCPUNum * 2.
|
||||
Max read concurrency must greater than or equal to 1, and less than or equal to hardware.GetCPUNum * 100.
|
||||
(0, 100]`,
|
||||
Export: true,
|
||||
}
|
||||
|
|
|
@ -12,13 +12,13 @@
|
|||
package paramtable
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
)
|
||||
|
||||
func shouldPanic(t *testing.T, name string, f func()) {
|
||||
|
@ -307,7 +307,7 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, int32(10240), Params.MaxReceiveChanSize.GetAsInt32())
|
||||
assert.Equal(t, int32(10240), Params.MaxUnsolvedQueueSize.GetAsInt32())
|
||||
assert.Equal(t, 10.0, Params.CPURatio.GetAsFloat())
|
||||
assert.Equal(t, uint32(runtime.GOMAXPROCS(0)), Params.KnowhereThreadPoolSize.GetAsUint32())
|
||||
assert.Equal(t, uint32(hardware.GetCPUNum()), Params.KnowhereThreadPoolSize.GetAsUint32())
|
||||
|
||||
// chunk cache
|
||||
assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue())
|
||||
|
|
Loading…
Reference in New Issue