Rename concurrency package to conc (#22453)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/22475/head
yah01 2023-02-28 14:19:47 +08:00 committed by GitHub
parent 608615e5bd
commit 60cd548bf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 32 additions and 32 deletions

View File

@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -203,7 +203,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
return err
}
futures := make([]*concurrency.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos))
futures := make([]*conc.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos))
for _, us := range unflushedSegmentInfos {
if us.CollectionID != dsService.collectionID ||
@ -283,7 +283,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo, tick
tickler.watch()
defer tickler.stop()
err = concurrency.AwaitAll(futures...)
err = conc.AwaitAll(futures...)
if err != nil {
return err
}

View File

@ -3,10 +3,10 @@ package datanode
import (
"sync"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
)
var ioPool *concurrency.Pool
var ioPool *conc.Pool
var ioPoolInitOnce sync.Once
func initIOPool() {
@ -15,10 +15,10 @@ func initIOPool() {
capacity = 32
}
// error only happens with negative expiry duration or with negative pre-alloc size.
ioPool = concurrency.NewPool(capacity)
ioPool = conc.NewPool(capacity)
}
func getOrCreateIOPool() *concurrency.Pool {
func getOrCreateIOPool() *conc.Pool {
ioPoolInitOnce.Do(initIOPool)
return ioPool
}

View File

@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
@ -23,14 +23,14 @@ func Test_getOrCreateIOPool(t *testing.T) {
go func() {
defer wg.Done()
p := getOrCreateIOPool()
futures := make([]*concurrency.Future[any], 0, nTask)
futures := make([]*conc.Future[any], 0, nTask)
for j := 0; j < nTask; j++ {
future := p.Submit(func() (interface{}, error) {
return nil, nil
})
futures = append(futures, future)
}
err := concurrency.AwaitAll(futures...)
err := conc.AwaitAll(futures...)
assert.NoError(t, err)
}()
}

View File

@ -30,7 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -116,7 +116,7 @@ func TestImpl_WatchDmChannels(t *testing.T) {
defer func() {
node.taskPool = originPool
}()
node.taskPool = concurrency.NewDefaultPool()
node.taskPool = conc.NewDefaultPool()
node.taskPool.Release()
status, err = node.WatchDmChannels(ctx, req)
assert.NoError(t, err)

View File

@ -46,7 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -1696,7 +1696,7 @@ func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory
node.etcdCli = etcdCli
node.initSession()
node.taskPool = concurrency.NewPool(2, ants.WithPreAlloc(true))
node.taskPool = conc.NewPool(2, ants.WithPreAlloc(true))
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
node.etcdKV = etcdKV

View File

@ -46,7 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/gc"
"github.com/milvus-io/milvus/internal/util/hardware"
@ -124,7 +124,7 @@ type QueryNode struct {
queryShardService *queryShardService
// pool for load/release channel
taskPool *concurrency.Pool
taskPool *conc.Pool
IsStandAlone bool
}
@ -271,7 +271,7 @@ func (node *QueryNode) Init() error {
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
node.taskPool = concurrency.NewDefaultPool()
node.taskPool = conc.NewDefaultPool()
node.metaReplica = newCollectionReplica()
node.loader = newSegmentLoader(
node.metaReplica,

View File

@ -45,7 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/conc"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
@ -73,8 +73,8 @@ type segmentLoader struct {
cm storage.ChunkManager // minio cm
etcdKV *etcdkv.EtcdKV
ioPool *concurrency.Pool
cpuPool *concurrency.Pool
ioPool *conc.Pool
cpuPool *conc.Pool
factory msgstream.Factory
}
@ -319,7 +319,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(ctx context.Context, segme
iCodec := storage.InsertCodec{}
// change all field bin log loading into concurrent
loadFutures := make([]*concurrency.Future[any], 0, len(fieldBinlogs))
loadFutures := make([]*conc.Future[any], 0, len(fieldBinlogs))
for _, fieldBinlog := range fieldBinlogs {
futures := loader.loadFieldBinlogsAsync(ctx, fieldBinlog)
loadFutures = append(loadFutures, futures...)
@ -402,7 +402,7 @@ func (loader *segmentLoader) loadSealedField(ctx context.Context, segment *Segme
// acquire a CPU worker before load field binlogs
futures := loader.loadFieldBinlogsAsync(ctx, field)
err := concurrency.AwaitAll(futures...)
err := conc.AwaitAll(futures...)
if err != nil {
return err
}
@ -427,8 +427,8 @@ func (loader *segmentLoader) loadSealedField(ctx context.Context, segment *Segme
}
// Load binlogs concurrently into memory from KV storage asyncly
func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*concurrency.Future[any] {
futures := make([]*concurrency.Future[any], 0, len(field.Binlogs))
func (loader *segmentLoader) loadFieldBinlogsAsync(ctx context.Context, field *datapb.FieldBinlog) []*conc.Future[any] {
futures := make([]*conc.Future[any], 0, len(field.Binlogs))
for i := range field.Binlogs {
path := field.Binlogs[i].GetLogPath()
future := loader.ioPool.Submit(func() (interface{}, error) {
@ -473,7 +473,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se
log := log.With(zap.Int64("segment", segment.ID()))
indexBuffer := make([][]byte, 0, len(indexInfo.IndexFilePaths))
filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths))
futures := make([]*concurrency.Future[any], 0, len(indexInfo.IndexFilePaths))
futures := make([]*conc.Future[any], 0, len(indexInfo.IndexFilePaths))
indexCodec := storage.NewIndexFileBinlogCodec()
// TODO, remove the load index info froam
@ -552,7 +552,7 @@ func (loader *segmentLoader) loadFieldIndexData(ctx context.Context, segment *Se
futures = append(futures, indexFuture)
}
err = concurrency.AwaitAll(futures...)
err = conc.AwaitAll(futures...)
if err != nil {
return err
}
@ -985,8 +985,8 @@ func newSegmentLoader(
if ioPoolSize > 256 {
ioPoolSize = 256
}
ioPool := concurrency.NewPool(ioPoolSize, ants.WithPreAlloc(true))
cpuPool := concurrency.NewPool(cpuNum, ants.WithPreAlloc(true))
ioPool := conc.NewPool(ioPoolSize, ants.WithPreAlloc(true))
cpuPool := conc.NewPool(cpuNum, ants.WithPreAlloc(true))
log.Info("SegmentLoader created",
zap.Int("ioPoolSize", ioPoolSize),

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency
package conc
type future interface {
wait()

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency
package conc
import (
"testing"

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency
package conc
import (
"runtime"

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency
package conc
import (
"testing"