mirror of https://github.com/milvus-io/milvus.git
Refine Pool to improve the ease of use (#22412)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/22436/head
parent
416866e42f
commit
167581c08a
|
@ -1,11 +1,11 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ func initIOPool() {
|
|||
capacity = 32
|
||||
}
|
||||
// error only happens with negative expiry duration or with negative pre-alloc size.
|
||||
ioPool, _ = concurrency.NewPool(capacity)
|
||||
ioPool = concurrency.NewPool(capacity)
|
||||
}
|
||||
|
||||
func getOrCreateIOPool() *concurrency.Pool {
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
|
@ -35,7 +34,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -118,7 +116,7 @@ func TestImpl_WatchDmChannels(t *testing.T) {
|
|||
defer func() {
|
||||
node.taskPool = originPool
|
||||
}()
|
||||
node.taskPool, _ = concurrency.NewPool(runtime.GOMAXPROCS(0), ants.WithPreAlloc(true))
|
||||
node.taskPool = concurrency.NewDefaultPool()
|
||||
node.taskPool.Release()
|
||||
status, err = node.WatchDmChannels(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -1696,11 +1696,7 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory
|
|||
node.etcdCli = etcdCli
|
||||
node.initSession()
|
||||
|
||||
node.taskPool, err = concurrency.NewPool(2, ants.WithPreAlloc(true))
|
||||
if err != nil {
|
||||
log.Error("QueryNode init channel pool failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
node.taskPool = concurrency.NewPool(2, ants.WithPreAlloc(true))
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
node.etcdKV = etcdKV
|
||||
|
||||
|
|
|
@ -56,7 +56,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/samber/lo"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
@ -272,17 +271,8 @@ func (node *QueryNode) Init() error {
|
|||
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
|
||||
|
||||
cpuNum := runtime.GOMAXPROCS(0)
|
||||
|
||||
node.taskPool, err = concurrency.NewPool(cpuNum, ants.WithPreAlloc(true))
|
||||
if err != nil {
|
||||
log.Error("QueryNode init channel pool failed", zap.Error(err))
|
||||
initError = err
|
||||
return
|
||||
}
|
||||
|
||||
node.taskPool = concurrency.NewDefaultPool()
|
||||
node.metaReplica = newCollectionReplica()
|
||||
|
||||
node.loader = newSegmentLoader(
|
||||
node.metaReplica,
|
||||
node.etcdKV,
|
||||
|
|
|
@ -985,17 +985,8 @@ func newSegmentLoader(
|
|||
if ioPoolSize > 256 {
|
||||
ioPoolSize = 256
|
||||
}
|
||||
ioPool, err := concurrency.NewPool(ioPoolSize, ants.WithPreAlloc(true))
|
||||
if err != nil {
|
||||
log.Error("failed to create goroutine pool for segment loader",
|
||||
zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
cpuPool, err := concurrency.NewPool(cpuNum, ants.WithPreAlloc(true))
|
||||
if err != nil {
|
||||
log.Error("failed to create cpu goroutine pool for segment loader", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
ioPool := concurrency.NewPool(ioPoolSize, ants.WithPreAlloc(true))
|
||||
cpuPool := concurrency.NewPool(cpuNum, ants.WithPreAlloc(true))
|
||||
|
||||
log.Info("SegmentLoader created",
|
||||
zap.Int("ioPoolSize", ioPoolSize),
|
||||
|
|
|
@ -16,24 +16,35 @@
|
|||
|
||||
package concurrency
|
||||
|
||||
import "github.com/panjf2000/ants/v2"
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
// A goroutine pool
|
||||
type Pool struct {
|
||||
inner *ants.Pool
|
||||
}
|
||||
|
||||
// Return error if provides invalid parameters
|
||||
// cap: the number of workers
|
||||
func NewPool(cap int, opts ...ants.Option) (*Pool, error) {
|
||||
// NewPool returns a goroutine pool.
|
||||
// cap: the number of workers.
|
||||
// This panic if provide any invalid option.
|
||||
func NewPool(cap int, opts ...ants.Option) *Pool {
|
||||
pool, err := ants.NewPool(cap, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &Pool{
|
||||
inner: pool,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewDefaultPool returns a pool with cap of the number of logical CPU,
|
||||
// and pre-alloced goroutines.
|
||||
func NewDefaultPool() *Pool {
|
||||
return NewPool(runtime.GOMAXPROCS(0), ants.WithPreAlloc(true))
|
||||
}
|
||||
|
||||
// Submit a task into the pool,
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package concurrency
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -25,8 +24,7 @@ import (
|
|||
)
|
||||
|
||||
func TestPool(t *testing.T) {
|
||||
pool, err := NewPool(runtime.NumCPU())
|
||||
assert.NoError(t, err)
|
||||
pool := NewDefaultPool()
|
||||
|
||||
taskNum := pool.Cap() * 2
|
||||
futures := make([]*Future[any], 0, taskNum)
|
||||
|
|
Loading…
Reference in New Issue