mirror of https://github.com/milvus-io/milvus.git
Use conc.PoolOption instead of ants.Option (#24585)
- Add conc.PoolOption to setup conc.Pool - Change panic default behavior - Make future has error when job panicks Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/24610/head
parent
3022e37298
commit
31880ab427
|
@ -26,7 +26,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
@ -144,7 +143,7 @@ var _ Channel = &ChannelMeta{}
|
|||
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
|
||||
metaService := newMetaService(rc, collID)
|
||||
|
||||
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), ants.WithPreAlloc(false), ants.WithNonblocking(false))
|
||||
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), conc.WithPreAlloc(false), conc.WithNonBlocking(false))
|
||||
|
||||
channel := ChannelMeta{
|
||||
collectionID: collID,
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
ants "github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
|
@ -36,8 +35,8 @@ func InitPool() {
|
|||
initOnce.Do(func() {
|
||||
pool := conc.NewPool[any](
|
||||
paramtable.Get().QueryNodeCfg.MaxReadConcurrency.GetAsInt(),
|
||||
ants.WithPreAlloc(true),
|
||||
ants.WithDisablePurge(true),
|
||||
conc.WithPreAlloc(true),
|
||||
conc.WithDisablePurge(true),
|
||||
)
|
||||
conc.WarmupPool(pool, runtime.LockOSThread)
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
ants "github.com/panjf2000/ants/v2"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
@ -91,7 +90,7 @@ func NewLoader(
|
|||
ioPoolSize = configPoolSize
|
||||
}
|
||||
|
||||
ioPool := conc.NewPool[*storage.Blob](ioPoolSize, ants.WithPreAlloc(true))
|
||||
ioPool := conc.NewPool[*storage.Blob](ioPoolSize, conc.WithPreAlloc(true))
|
||||
|
||||
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -38,7 +37,7 @@ func NewScheduler() *Scheduler {
|
|||
mergedSearchTasks: make(chan *SearchTask),
|
||||
// queryProcessQueue: make(chan),
|
||||
|
||||
pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)),
|
||||
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ require (
|
|||
github.com/klauspost/compress v1.14.4
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
|
||||
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723
|
||||
github.com/panjf2000/ants/v2 v2.4.8
|
||||
github.com/panjf2000/ants/v2 v2.7.2
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
github.com/samber/lo v1.27.0
|
||||
github.com/shirou/gopsutil/v3 v3.22.9
|
||||
|
@ -29,7 +29,7 @@ require (
|
|||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
|
||||
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
||||
golang.org/x/sync v0.1.0
|
||||
google.golang.org/grpc v1.52.3
|
||||
google.golang.org/protobuf v1.28.1
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
|
|
|
@ -519,8 +519,8 @@ github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNia
|
|||
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/panjf2000/ants/v2 v2.4.8 h1:JgTbolX6K6RreZ4+bfctI0Ifs+3mrE5BIHudQxUDQ9k=
|
||||
github.com/panjf2000/ants/v2 v2.4.8/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
|
||||
github.com/panjf2000/ants/v2 v2.7.2 h1:2NUt9BaZFO5kQzrieOmK/wdb/tQ/K+QHaxN8sOgD63U=
|
||||
github.com/panjf2000/ants/v2 v2.7.2/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ=
|
||||
|
@ -881,8 +881,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package conc
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type poolOption struct {
|
||||
// pre-allocs workers
|
||||
preAlloc bool
|
||||
// block or not when pool is full
|
||||
nonBlocking bool
|
||||
// duration to cleanup worker goroutine
|
||||
expiryDuration time.Duration
|
||||
// disable purge worker
|
||||
disablePurge bool
|
||||
// whether conceal panic when job has panic
|
||||
concealPanic bool
|
||||
// panicHandler when task panics
|
||||
panicHandler func(any)
|
||||
}
|
||||
|
||||
func (opt *poolOption) antsOptions() []ants.Option {
|
||||
var result []ants.Option
|
||||
result = append(result, ants.WithPreAlloc(opt.preAlloc))
|
||||
result = append(result, ants.WithNonblocking(opt.nonBlocking))
|
||||
result = append(result, ants.WithDisablePurge(opt.disablePurge))
|
||||
// ants recovers panic by default
|
||||
// however the error is not returned
|
||||
result = append(result, ants.WithPanicHandler(func(v any) {
|
||||
log.Error("Conc pool panicked", zap.Any("panic", v))
|
||||
if !opt.concealPanic {
|
||||
panic(v)
|
||||
}
|
||||
}))
|
||||
if opt.panicHandler != nil {
|
||||
result = append(result, ants.WithPanicHandler(opt.panicHandler))
|
||||
}
|
||||
if opt.expiryDuration > 0 {
|
||||
result = append(result, ants.WithExpiryDuration(opt.expiryDuration))
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// PoolOption options function to setup pool.
|
||||
type PoolOption func(opt *poolOption)
|
||||
|
||||
func defaultPoolOption() *poolOption {
|
||||
return &poolOption{
|
||||
preAlloc: false,
|
||||
nonBlocking: false,
|
||||
expiryDuration: 0,
|
||||
disablePurge: false,
|
||||
concealPanic: false,
|
||||
}
|
||||
}
|
||||
|
||||
func WithPreAlloc(v bool) PoolOption {
|
||||
return func(opt *poolOption) {
|
||||
opt.preAlloc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithNonBlocking(v bool) PoolOption {
|
||||
return func(opt *poolOption) {
|
||||
opt.nonBlocking = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithDisablePurge(v bool) PoolOption {
|
||||
return func(opt *poolOption) {
|
||||
opt.disablePurge = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithExpiryDuration(d time.Duration) PoolOption {
|
||||
return func(opt *poolOption) {
|
||||
opt.expiryDuration = d
|
||||
}
|
||||
}
|
||||
|
||||
func WithConcealPanic(v bool) PoolOption {
|
||||
return func(opt *poolOption) {
|
||||
opt.concealPanic = v
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package conc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPoolOption(t *testing.T) {
|
||||
opt := &poolOption{}
|
||||
|
||||
o := WithPreAlloc(true)
|
||||
o(opt)
|
||||
assert.True(t, opt.preAlloc)
|
||||
|
||||
o = WithNonBlocking(true)
|
||||
o(opt)
|
||||
assert.True(t, opt.nonBlocking)
|
||||
|
||||
o = WithDisablePurge(true)
|
||||
o(opt)
|
||||
assert.True(t, opt.disablePurge)
|
||||
|
||||
o = WithExpiryDuration(time.Second)
|
||||
o(opt)
|
||||
assert.Equal(t, time.Second, opt.expiryDuration)
|
||||
|
||||
o = WithConcealPanic(true)
|
||||
o(opt)
|
||||
assert.True(t, opt.concealPanic)
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package conc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
|
@ -27,13 +28,19 @@ import (
|
|||
// A goroutine pool
|
||||
type Pool[T any] struct {
|
||||
inner *ants.Pool
|
||||
opt *poolOption
|
||||
}
|
||||
|
||||
// NewPool returns a goroutine pool.
|
||||
// cap: the number of workers.
|
||||
// This panic if provide any invalid option.
|
||||
func NewPool[T any](cap int, opts ...ants.Option) *Pool[T] {
|
||||
pool, err := ants.NewPool(cap, opts...)
|
||||
func NewPool[T any](cap int, opts ...PoolOption) *Pool[T] {
|
||||
opt := defaultPoolOption()
|
||||
for _, o := range opts {
|
||||
o(opt)
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(cap, opt.antsOptions()...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -46,7 +53,7 @@ func NewPool[T any](cap int, opts ...ants.Option) *Pool[T] {
|
|||
// NewDefaultPool returns a pool with cap of the number of logical CPU,
|
||||
// and pre-alloced goroutines.
|
||||
func NewDefaultPool[T any]() *Pool[T] {
|
||||
return NewPool[T](runtime.GOMAXPROCS(0), ants.WithPreAlloc(true))
|
||||
return NewPool[T](runtime.GOMAXPROCS(0), WithPreAlloc(true))
|
||||
}
|
||||
|
||||
// Submit a task into the pool,
|
||||
|
@ -57,6 +64,12 @@ func (pool *Pool[T]) Submit(method func() (T, error)) *Future[T] {
|
|||
future := newFuture[T]()
|
||||
err := pool.inner.Submit(func() {
|
||||
defer close(future.ch)
|
||||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
future.err = fmt.Errorf("panicked with error: %v", x)
|
||||
panic(x) // throw panic out
|
||||
}
|
||||
}()
|
||||
res, err := method()
|
||||
if err != nil {
|
||||
future.err = err
|
||||
|
|
|
@ -54,3 +54,15 @@ func TestPool(t *testing.T) {
|
|||
assert.Equal(t, err, errDup)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolWithPanic(t *testing.T) {
|
||||
pool := NewPool[any](1, WithConcealPanic(true))
|
||||
|
||||
future := pool.Submit(func() (any, error) {
|
||||
panic("mocked panic")
|
||||
})
|
||||
|
||||
// make sure error returned when conceal panic
|
||||
_, err := future.Await()
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue